Module data_request_api.content.consolidate_export

Functions

def map_data(data, mapping_table, version, **kwargs)
Expand source code
def map_data(data, mapping_table, version, **kwargs):
    """
    Maps the data to the one-base structure using the mapping table.

    Parameters
    ----------
    data : dict
        Three-base or one-base Airtable export.
    mapping_table dict
        The mapping table to apply to map to one base.
    version : str
        The version tag of the exported Data Request Content dictionary.

    Returns
    -------
    dict
        Mapped data with one-base structure.

    Note
    ----
        Returns the input dict if the data is already one-base.
    """
    logger = get_logger()
    missing_bases = []
    missing_tables = []
    mapped_data = {"Data Request": {}}

    # Check if data is already one-base
    if len(data.keys()) in [3, 4]:
        # Set version
        mapped_data["Data Request"]["version"] = version

        # Reset filtered records
        global filtered_records
        if filtered_records:
            filtered_records = []
        filtered_records_dict = dict()

        # Get filtered records
        for table, mapinfo in mapping_table.items():
            if mapinfo["source_base"] in data and any(
                [st in data[mapinfo["source_base"]] for st in mapinfo["source_table"]]
            ):
                source_table = [st for st in mapinfo["source_table"] if st in data[mapinfo["source_base"]]][0]
                if "internal_filters" in mapinfo:
                    for record_id, record in data[mapinfo["source_base"]][source_table]["records"].items():
                        filter_results = []
                        for filter_key, filter_val in mapinfo["internal_filters"].items():
                            if all(
                                [filter_alias not in record for filter_alias in [filter_key] + filter_val["aliases"]]
                            ):
                                filter_results.append(False)
                            elif filter_val["operator"] == "nonempty":
                                filter_results.append(
                                    any(
                                        [
                                            bool(record[fk])
                                            for fk in [filter_key] + filter_val["aliases"]
                                            if fk in record
                                        ]
                                    )
                                )
                            elif filter_val["operator"] == "in":
                                for fk in [filter_key] + filter_val["aliases"]:
                                    if fk in record:
                                        if isinstance(record[filter_key], list):
                                            filter_results.append(
                                                any(fj in filter_val["values"] for fj in record[filter_key])
                                            )
                                            break
                                        else:
                                            filter_results.append(record[filter_key] in filter_val["values"])
                            elif filter_val["operator"] == "not in":
                                for fk in [filter_key] + filter_val["aliases"]:
                                    if fk in record:
                                        if isinstance(record[filter_key], list):
                                            filter_results.append(
                                                any(fj not in filter_val["values"] for fj in record[filter_key])
                                            )
                                        break
                                else:
                                    filter_results.append(record[filter_key] not in filter_val["values"])
                        if not all(filter_results):
                            logger.debug(
                                f"Filtered record '{record_id}'"
                                f" {'(' + record['name'] + ')' if 'name' in record else ''}"
                                f" from '{table}'."
                            )
                            filtered_records.append(record_id)
                            if table in filtered_records_dict:
                                filtered_records_dict[table].append(record_id)
                            else:
                                filtered_records_dict[table] = [record_id]
        for key in filtered_records_dict:
            logger.debug(f"Filtered {len(filtered_records_dict[key])} records for '{key}'.")
        logger.debug(f"Filtered {len(filtered_records)} records in total.")

        # Perform mapping in case of three-base structure
        for table, mapinfo in mapping_table.items():
            intm = mapinfo["internal_mapping"]
            if mapinfo["source_base"] in data and any(
                [st in data[mapinfo["source_base"]] for st in mapinfo["source_table"]]
            ):
                # Copy the selected data to the one-base structure
                # - skip filtered records
                # - rename record attributes according to
                #   "internal_consistency" settings
                # - filter references to records for fields that are not
                #   internally mapped below
                source_table = [st for st in mapinfo["source_table"] if st in data[mapinfo["source_base"]]][0]
                logger.debug(f"Mapping '{mapinfo['source_base']}' : '{source_table}' -> '{table}'")
                mapped_data["Data Request"][table] = {
                    **data[mapinfo["source_base"]][source_table],
                    "records": {
                        record_id: {
                            mapinfo["internal_consistency"].get(reckey, reckey): _filter_references(
                                recvalue,
                                reckey,
                                table,
                                record_id,
                                mapinfo["field_dtypes"].get(mapinfo["internal_consistency"].get(reckey, reckey), None),
                            )
                            for reckey, recvalue in record.items()
                            if reckey not in mapinfo["drop_keys"]
                        }
                        for record_id, record in data[mapinfo["source_base"]][source_table]["records"].items()
                        if record_id not in filtered_records
                    },
                }

                # If record attributes require mapping
                if intm != {}:
                    # for each attribute that requires mapping
                    for attr in intm.keys():
                        intm_table = [
                            tn
                            for tn in mapping_table.keys()
                            if tn in mapping_table[tn]["source_table"] and tn == intm[attr]["table"]
                        ][0]
                        intm_table_alias = [
                            tn for tn in mapping_table[intm_table]["source_table"] if tn in data[intm[attr]["base"]]
                        ]
                        try:
                            intm_table_alias = intm_table_alias[0]
                        except IndexError:
                            errmsg = f"None of the following tables exist in the data: {mapping_table[intm[attr]['table']]['source_table']}."
                            logger.error(errmsg)
                            raise ValueError(errmsg)

                        for record_id, record in data[mapinfo["source_base"]][source_table]["records"].items():
                            if record_id in filtered_records:
                                continue
                            elif attr not in record or record[attr] is None or record[attr] == "" or record[attr] == []:
                                # Attribute name not found for record, but might have a different name
                                #  in another export type or release version
                                logger.debug(f"{table}: Attribute '{attr}' not found for record '{record_id}'.")
                                attr_aliases = [
                                    a
                                    for a in mapinfo["internal_consistency"].keys()
                                    if mapinfo["internal_consistency"][a] == attr
                                ]
                                attr_alias_found = False
                                for a in attr_aliases:
                                    if a in record:
                                        attr_vals = record[a]
                                        attr_alias_found = True
                                        logger.debug(
                                            f"{table}: Using attribute '{a}' instead for record '{record_id}'."
                                        )
                                        break
                                if not attr_alias_found:
                                    continue
                            else:
                                attr_vals = record[attr]

                            # Get list of record-keys of the attribute (eg. "Variables")
                            #   that is connected to the current record of the "source_table
                            #   (eg. "Variable Groups") by the specified "operation"
                            if intm[attr]["operation"] == "split":
                                if isinstance(attr_vals, list):
                                    errmsg = (
                                        f"Consolidation of {table}@{attr}: Selected 'split' operation"
                                        f" for a list {record_id}:",
                                        attr_vals,
                                    )
                                    logger.error(f"TypeError: {errmsg}")
                                    continue
                                    # raise TypeError({errmsg})
                                else:
                                    attr_vals = list(
                                        map(lambda x: x.strip('"'), re.split(r',\s*(?=(?:[^"]|"[^"]*")*$)', attr_vals))
                                    )
                            elif intm[attr]["operation"] == "":
                                if isinstance(attr_vals, str):
                                    attr_vals = [attr_vals]
                            else:
                                errmsg = (
                                    f"Unknown internal mapping operation for attribute '{attr}'"
                                    f" ('{source_table}'): '{intm[attr]['operation']}'"
                                )
                                logger.error(f"ValueError: {errmsg}")
                                raise ValueError(errmsg)

                            # Get mapped record_ids for this list of record-keys
                            # entry_type - single record_id or list of record_ids
                            # - map by record_id
                            if intm[attr]["entry_type"] == "record_id":
                                if not intm[attr]["base_copy_of_table"]:
                                    errmsg = (
                                        "A copy of the table in the same base is required if 'entry_type'"
                                        " is set to 'record_id', but 'base_copy_of_table' is set to"
                                        f" False: '{source_table}' - '{attr}'"
                                    )
                                    logger.error(f"ValueError: {errmsg}")
                                    raise ValueError(errmsg)
                                elif not intm[attr]["base"] in data:
                                    errmsg = f"Base '{intm[attr]['base']}' not found in data."
                                    logger.error(f"KeyError: {errmsg}")
                                    raise KeyError(errmsg)
                                elif intm[attr]["base_copy_of_table"] not in data[mapinfo["source_base"]]:
                                    errmsg = f"Table '{intm[attr]['base_copy_of_table']}' not found in base '{mapinfo['source_base']}'."
                                    logger.error(f"KeyError: {errmsg}")
                                    raise KeyError(errmsg)

                                recordIDs_new = []
                                for attr_val in attr_vals:
                                    # The record copy in the current base
                                    record_copy = data[mapinfo["source_base"]][intm[attr]["base_copy_of_table"]][
                                        "records"
                                    ][attr_val]
                                    # The entire list of records in the base of origin
                                    recordlist = data[intm[attr]["base"]][intm_table_alias]["records"]
                                    recordID_new = _map_record_id(
                                        record_copy,
                                        recordlist,
                                        intm[attr]["map_by_key"],
                                    )
                                    recordID_filtered = [r for r in recordID_new if r not in filtered_records]
                                    if len(recordID_filtered) == 0:
                                        if len(recordID_new) == 0:
                                            logger.debug(
                                                f"Consolidation of {table}@{intm_table_alias}: No matching"
                                                f" record found for attribute '{attr}' with value '{attr_val}'."
                                            )
                                    elif len(recordID_filtered) > 1:
                                        logger.warning(
                                            f"Consolidation of {table}@{intm_table_alias}:"
                                            f" Multiple matching records found for attribute '{attr}' with"
                                            f" value '{attr_val}': {recordID_new}. Using first match."
                                        )
                                        recordIDs_new.append(recordID_filtered[0])
                                    else:
                                        recordIDs_new.append(recordID_filtered[0])

                            # entry_type - name (eg. unique label or similar)
                            # - map by attribute value
                            elif intm[attr]["entry_type"] == "name":
                                recordIDs_new = []
                                for attr_val in attr_vals:
                                    recordID_new = _map_attribute(
                                        attr_val,
                                        data[intm[attr]["base"]][intm_table_alias]["records"],
                                        (
                                            [intm[attr]["map_by_key"]]
                                            if isinstance(intm[attr]["map_by_key"], str)
                                            else intm[attr]["map_by_key"]
                                        ),
                                    )
                                    recordID_filtered = [r for r in recordID_new if r not in filtered_records]
                                    if len(recordID_filtered) == 0:
                                        if len(recordID_new) == 0:
                                            logger.debug(
                                                f"Consolidation of {table}@{intm_table_alias}: No matching"
                                                f" record found for attribute '{attr}' with value '{attr_val}'."
                                            )
                                    elif len(recordID_filtered) > 1:
                                        logger.debug(
                                            "Consolidation of"
                                            f" {table}@{intm_table_alias}: Multiple matching records found"
                                            f" for attribute '{attr}' with value '{attr_val}': {recordID_new}"
                                        )
                                        recordIDs_new.append(recordID_filtered[0])
                                    else:
                                        recordIDs_new.append(recordID_filtered[0])
                            else:
                                errmsg = (
                                    f"Unknown 'entry_type' specified for attribute '{attr}'"
                                    f" ('{source_table}'): '{intm[attr]['entry_type']}'"
                                )
                                logger.error(f"ValueError: {errmsg}")
                                raise ValueError(errmsg)
                            if not recordIDs_new:
                                errmsg = (
                                    f"{table} (record '{record_id}'): For attribute"
                                    f" '{attr}' no records could be mapped."
                                )
                                logger.error(errmsg)
                                # This case can actually happen for the 'Coordinate and Dimension' table
                                # raise KeyError(errmsg)
                            try:
                                mapped_data["Data Request"][table]["records"][record_id][
                                    mapinfo["internal_consistency"].get(attr, attr)
                                ] = list(set(recordIDs_new))
                            except KeyError:
                                logger.debug(
                                    f"Consolidation of {table}@{intm_table_alias}:"
                                    f" '{record_id}' not found when adding"
                                    f" Attribute '{attr}': {recordIDs_new}"
                                )
            else:
                if mapinfo["source_base"] not in data:
                    missing_bases.append(mapinfo["source_base"])
                elif all([st not in data[mapinfo["source_base"]] for st in mapinfo["source_table"]]):
                    missing_tables.append(mapinfo["source_table"][0])
        if len(missing_bases) > 0:
            errmsg = "Encountered missing bases when consolidating the data:" f" {set(missing_bases)}"
            logger.critical(errmsg)
            raise KeyError(errmsg)
        if len(missing_tables) > 0:
            logger.warning(
                "Encountered missing tables when consolidating the data (not"
                f" necessarily problematic): {missing_tables}"
            )
        return _apply_hard_fixes(mapped_data)
    # Return the data if it is already one-base
    elif len(data.keys()) == 1:
        l_version = next(iter(data.keys())).replace("Data Request", "").strip()
        if l_version != version and version != "dev":
            logger.warning(
                "The Data Request version inferred from the content dictionary"
                f" ({l_version}) is different than the requested version ({version})."
            )
        # Consistency fixes
        mapped_data = next(iter(data.values()))
        mapped_data = _apply_consistency_fixes(mapped_data)
        # String fixes
        logger.debug("Consolidation: Removing / Adding (un)necessary whitespace to strings.")
        _fix_str_nested(mapped_data)
        mapped_data["version"] = version
        return {"Data Request": mapped_data}
    else:
        errmsg = "The loaded Data Request has an unexpected data structure."
        logger.error(errmsg)
        raise ValueError(errmsg)

Maps the data to the one-base structure using the mapping table.

Parameters

data : dict
Three-base or one-base Airtable export.
mapping_table dict
The mapping table to apply to map to one base.
version : str
The version tag of the exported Data Request Content dictionary.

Returns

dict
Mapped data with one-base structure.

Note

Returns the input dict if the data is already one-base.