Module data_request_api.content.dump_transformation

Script to change the basic airtable export into readable files.

Functions

def add_useful_keys(content)
Expand source code
def add_useful_keys(content):
    logger = get_logger()
    record_to_uid_index = defaultdict(lambda: dict())
    default_count = 0
    default_template = "default_{:d}"
    list_entries = sorted(list(content))
    for subelt in list_entries:
        list_record_ids = sorted(list(content[subelt]),
                                 key=lambda record_id: "|".join([content[subelt][record_id].get("name", "undef"),
                                                                 content[subelt][record_id].get("uid", "undef"),
                                                                 record_id]))
        for record_id in list_record_ids:
            if "name" not in content[subelt][record_id]:
                content[subelt][record_id]["name"] = "undef"
            if "uid" not in content[subelt][record_id]:
                uid = default_template.format(default_count)
                content[subelt][record_id]["uid"] = uid
                default_count += 1
                logger.debug(f"Undefined uid for element {os.sep.join([subelt, 'records', record_id])}, set {uid}")
            uid = content[subelt][record_id].pop("uid")
            if uid.endswith(os.linesep):
                logger.debug(f"uid of element type {subelt} and record id {record_id} endswith '\\n'.")
                uid = uid.rstrip(os.linesep)
            record_to_uid_index[subelt][record_id] = uid
            content[subelt][uid] = content[subelt].pop(record_id)
    return content, record_to_uid_index
def correct_dictionaries(input_dict, is_record_ids=False)
Expand source code
def correct_dictionaries(input_dict, is_record_ids=False):
    """
    Correct the input_dict to correct the strings except the record ids.
    :param dict input_dict: the input dictionary to be corrected
    :param bool is_record_ids: a boolean to indicate whether the keys of input_dict contain record ids or not
    :return dict: the corrected dictionary
    """
    logger = get_logger()
    if isinstance(input_dict, dict):
        rep = dict()
        for (key, value) in input_dict.items():
            if not is_record_ids:
                new_key = correct_key_string(key)
            else:
                new_key = key
            if isinstance(value, dict):
                rep[new_key] = correct_dictionaries(value, is_record_ids=key in ["records", "fields"])
            else:
                rep[new_key] = copy.deepcopy(value)
        return rep
    else:
        logger.error(f"Deal with dict types, not {type(input_dict).__name__}")
        raise TypeError(f"Deal with dict types, not {type(input_dict).__name__}")

Correct the input_dict to correct the strings except the record ids. :param dict input_dict: the input dictionary to be corrected :param bool is_record_ids: a boolean to indicate whether the keys of input_dict contain record ids or not :return dict: the corrected dictionary

def correct_key_string(input_string, *to_remove_strings)
Expand source code
def correct_key_string(input_string, *to_remove_strings):
    """
    Change the input string by replacing '&' by 'and' and spaces by underscores.
    It also removes others specified strings.
    :param str input_string: the input string to be changed
    :param list of str to_remove_strings: the list of strings to be removed from input_string
    :return str: the changed string
    """
    logger = get_logger()
    if isinstance(input_string, str):
        input_string = input_string.lower()
        for to_remove_string in to_remove_strings:
            input_string = input_string.replace(to_remove_string.lower(), "")
        input_string = input_string.strip()
        input_string = input_string.replace("&", "and").replace(" ", "_")
    else:
        logger.error(f"Deal with string types, not {type(input_string).__name__}")
        raise TypeError(f"Deal with string types, not {type(input_string).__name__}")
    return input_string

Change the input string by replacing '&' by 'and' and spaces by underscores. It also removes others specified strings. :param str input_string: the input string to be changed :param list of str to_remove_strings: the list of strings to be removed from input_string :return str: the changed string

def distribute_on_entry(func)
Expand source code
def distribute_on_entry(func):
    def distribute(content, per_entry_input, **common_inputs):
        for (key, value) in content.items():
            list_args = [value, ]
            if key in per_entry_input:
                list_args.append(per_entry_input[key])
            content[key] = func(*list_args, **copy.deepcopy(common_inputs))
        return content

    return distribute
def filter_content(content)
Expand source code
def filter_content(content):
    variable_groups = set()
    experiment_groups = set()
    variables = set()
    experiments = set()
    subelt = "opportunities"
    for record_id in sorted(list(content[subelt])):
        if content[subelt][record_id].get("status") not in ["Accepted", "Under review", None]:
            del content[subelt][record_id]
        else:
            variable_groups = variable_groups | set(content[subelt][record_id].get("variable_groups", list()))
            experiment_groups = experiment_groups | set(content[subelt][record_id].get("experiment_groups", list()))
    subelt = "variable_groups"
    for record_id in sorted(list(content[subelt])):
        if record_id not in variable_groups:
            del content[subelt][record_id]
        else:
            variables = variables | set(content[subelt][record_id].get("variables", list()))
    subelt = "experiment_groups"
    for record_id in sorted(list(content[subelt])):
        if record_id not in experiment_groups:
            del content[subelt][record_id]
        elif content[subelt][record_id].get("status") in ["Junk", ]:
            del content[subelt][record_id]
            for op in list(content["opportunities"]):
                if record_id in content["opportunities"][op]["experiment_groups"]:
                    content["opportunities"][op]["experiment_groups"].remove(record_id)
        else:
            experiments = experiments | set(content[subelt][record_id].get("experiments", list()))
    subelt = "variables"
    for record_id in sorted(list(set(content[subelt]) - variables)):
        del content[subelt][record_id]
    subelt = "experiments"
    for record_id in sorted(list(set(content[subelt]) - experiments)):
        del content[subelt][record_id]
    for subelt in list(content):
        for record_id in list(content[subelt]):
            for key in [key for key in list(content[subelt][record_id])
                        if re.compile(r".*status.*").match(key) is not None]:
                del content[subelt][record_id][key]
    return content
def get_transform_settings(version)
Expand source code
def get_transform_settings(version):
    def update_dict(elt_1, elt_2):
        rep = copy.deepcopy(elt_1)
        for (elt, value) in elt_2.items():
            if isinstance(value, dict):
                val = rep.get(elt, dict())
                for (subelt, subvalue) in value.items():
                    if isinstance(subvalue, dict):
                        val[subelt] = val.get(subelt, dict())
                        val[subelt].update(subvalue)
                    else:
                        val[subelt] = subvalue
                rep[elt] = val
            elif isinstance(value, list):
                rep[elt] = rep.get(elt, list()) + value
            else:
                rep[elt] = value
        return rep

    transform = read_json_input_file_content(os.sep.join([os.path.dirname(os.path.abspath(__file__)), "transform.json"]))
    common = transform.pop("common", dict())
    if version not in ["default", ]:
        common = update_dict(common["default"], common.get(version, dict()))
    else:
        common = common["default"]
    for (elt, content) in transform.items():
        default_content = update_dict(common, content["default"])
        if version not in ["default", ]:
            default_content = update_dict(default_content, content.get(version, dict()))
        transform[elt] = default_content
    return transform
def get_transformed_content(version='latest_stable',
export='release',
consolidate=False,
force_retrieve=False,
output_dir=None,
default_transformed_content_pattern='{kind}_{export_version}_content.json',
**kwargs)
Expand source code
@append_kwargs_from_config
def get_transformed_content(version="latest_stable", export="release", consolidate=False,
                            force_retrieve=False, output_dir=None,
                            default_transformed_content_pattern="{kind}_{export_version}_content.json", **kwargs):
    # Download specified version of data request content (if not locally cached)
    versions = dc.retrieve(version, export=export, consolidate=consolidate, **kwargs)

    # Check that there is only one version associated
    if len(versions) > 1:
        raise ValueError("Could only deal with one version.")
    elif len(versions) == 0:
        raise ValueError("No version found.")
    else:
        version = list(versions)[0]
        content = versions[version]
        if output_dir is None:
            output_dir = os.path.dirname(content)
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        DR_content = default_transformed_content_pattern.format(kind="DR", export_version=export)
        VS_content = default_transformed_content_pattern.format(kind="VS", export_version=export)
        DR_content = os.sep.join([output_dir, DR_content])
        VS_content = os.sep.join([output_dir, VS_content])
        if force_retrieve or not (all(os.path.exists(filepath) for filepath in [DR_content, VS_content])):
            if os.path.exists(DR_content):
                os.remove(DR_content)
            if os.path.exists(VS_content):
                os.remove(VS_content)
        if not (all(os.path.exists(filepath) for filepath in [DR_content, VS_content])):
            content = dc.load(version, export=export, consolidate=consolidate)
            data_request, vocabulary_server = transform_content(content, version)
            write_json_output_file_content(DR_content, data_request)
            write_json_output_file_content(VS_content, vocabulary_server)
        return dict(DR_input=DR_content, VS_input=VS_content)
def merge_useful_keys(content, per_entry_input, **common_inputs)
Expand source code
def distribute(content, per_entry_input, **common_inputs):
    for (key, value) in content.items():
        list_args = [value, ]
        if key in per_entry_input:
            list_args.append(per_entry_input[key])
        content[key] = func(*list_args, **copy.deepcopy(common_inputs))
    return content
def remove_unused_keys(content, per_entry_input, **common_inputs)
Expand source code
def distribute(content, per_entry_input, **common_inputs):
    for (key, value) in content.items():
        list_args = [value, ]
        if key in per_entry_input:
            list_args.append(per_entry_input[key])
        content[key] = func(*list_args, **copy.deepcopy(common_inputs))
    return content
def rename_useful_keys(content, per_entry_input, **common_inputs)
Expand source code
def distribute(content, per_entry_input, **common_inputs):
    for (key, value) in content.items():
        list_args = [value, ]
        if key in per_entry_input:
            list_args.append(per_entry_input[key])
        content[key] = func(*list_args, **copy.deepcopy(common_inputs))
    return content
def reshape_useful_keys(content, per_entry_input, **common_inputs)
Expand source code
def distribute(content, per_entry_input, **common_inputs):
    for (key, value) in content.items():
        list_args = [value, ]
        if key in per_entry_input:
            list_args.append(per_entry_input[key])
        content[key] = func(*list_args, **copy.deepcopy(common_inputs))
    return content
def sort_useful_keys(content, per_entry_input, **common_inputs)
Expand source code
def distribute(content, per_entry_input, **common_inputs):
    for (key, value) in content.items():
        list_args = [value, ]
        if key in per_entry_input:
            list_args.append(per_entry_input[key])
        content[key] = func(*list_args, **copy.deepcopy(common_inputs))
    return content
def split_content_one_base(content)
Expand source code
def split_content_one_base(content):
    """
    Split the one base content into two dictionaries:
    - the DR (structure)
    - the VS (vocabulary server with all information)
    :param dict content: dictionary containing the one base content
    :return dict, dict: two dictionaries containing respectively the DR and VS
    """
    logger = get_logger()
    data_request = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: dict)))
    keys_to_dr_dict = {
        "opportunities": [("experiment_groups", list, list()),
                          ("variable_groups", list, list()),
                          ("data_request_themes", list, list()),
                          ("time_subsets", list, [None, ]),
                          ("mips", list, list())],
        "variable_groups": [("variables", list, list()),
                            ("mips", list, list()),
                            ("priority_level", (str, type(None)), None)],
        "experiment_groups": [("experiments", list, list()), ]
    }
    if isinstance(content, dict):
        logger.debug("Build DR and VS")
        for subelt in sorted(list(content)):
            if subelt in keys_to_dr_dict:
                for uid in content[subelt]:
                    for (key, target_type, default) in keys_to_dr_dict[subelt]:
                        value = content[subelt][uid].pop(key, default)
                        if not isinstance(value, target_type):
                            if target_type in [list, ] and isinstance(value, (str, int, type(None))):
                                value = [value, ]
                            elif str in target_type and isinstance(value, list):
                                value = value[0]
                            else:
                                raise TypeError(f"Could not deal with target type {type(target_type)}")
                        data_request[subelt][uid][key] = value
        return data_request, content
    else:
        logger.error(f"Deal with dict types, not {type(content).__name__}")
        raise TypeError(f"Deal with dict types, not {type(content).__name__}")

Split the one base content into two dictionaries: - the DR (structure) - the VS (vocabulary server with all information) :param dict content: dictionary containing the one base content :return dict, dict: two dictionaries containing respectively the DR and VS

def tidy_content(content, record_to_uid_index)
Expand source code
def tidy_content(content, record_to_uid_index):
    logger = get_logger()
    # Replace record_id by uid
    logger.debug("Replace record ids by uids")
    to_remove_entries = defaultdict(lambda: defaultdict(lambda: 0))
    list_content = list(content)
    len_list_content = len(list_content)
    for content_subelt in list_content:
        content_string = json.dumps(content[content_subelt], indent=0)
        for subelt in record_to_uid_index:
            for (record_id, uid) in record_to_uid_index[subelt].items():
                tmp_content_string = content_string.replace(f'"{record_id}"', f'"link::{uid}"')
                if content_string == tmp_content_string:
                    to_remove_entries[subelt][(record_id, uid)] += 1
                content_string = tmp_content_string
        content[content_subelt] = json.loads(content_string)
    for content_subelt in ["opportunities", "coordinates_and_dimensions"]:
        if content_subelt in to_remove_entries:
            to_remove = [elt for (elt, nb) in to_remove_entries[content_subelt].items() if nb == len_list_content]
            for record_id, _ in to_remove:
                del record_to_uid_index[content_subelt][record_id]
            del to_remove_entries[content_subelt]
    for (subelt, to_remove) in to_remove_entries.items():
        to_remove = [elt for (elt, nb) in to_remove.items() if nb == len_list_content]
        for (record_id, uid) in to_remove:
            del content[subelt][uid]
            del record_to_uid_index[subelt][record_id]
    # Tidy the content once again
    content_str = json.dumps(content)
    for subelt in record_to_uid_index:
        for uid in [uid for uid in record_to_uid_index[subelt].values() if content_str.count(uid) < 2]:
            del content[subelt][uid]
    return content
def transform_content(content, version)
Expand source code
def transform_content(content, version):
    """
    Function to transform the export content (single or several base-s- export) to VS and DR dictionaries.
    The key "version" is added to the DR and VS dictionaries.
    :param dict content: input export content (either single base or several bases)
    :param str version: string containing the version of the export content
    :return dict, dict: DR and VS dictionaries containing respectively the structure (DR) and the vocabulary (VS)
    """
    logger = get_logger()
    if "Data Request" in content:
        content["Data Request"].pop("version", None)
    transform_settings = get_transform_settings(version)
    if isinstance(content, dict):
        # Correct dictionaries
        content = correct_dictionaries(content)
        # Get back to one database case if needed
        if len(content) == 1:
            logger.info("Single database case - no structure transformation needed")
            content = transform_content_inner(content, transform_settings["one_to_transform"])
        elif len(content) in [3, 4]:
            logger.info("Several databases case - structure transformation needed")
            content = transform_content_inner(content, transform_settings["several_to_transform"], change_tables=True)
        else:
            raise ValueError(f"Could not manage the {len(content):d} bases export file.")
        # Separate DR and VS files
        data_request, vocabulary_server = split_content_one_base(content)
        data_request["version"] = version
        vocabulary_server["version"] = version
        return data_request, vocabulary_server
    else:
        logger.error(f"Deal with dict types, not {type(content).__name__}")
        raise TypeError(f"Deal with dict types, not {type(content).__name__}")

Function to transform the export content (single or several base-s- export) to VS and DR dictionaries. The key "version" is added to the DR and VS dictionaries. :param dict content: input export content (either single base or several bases) :param str version: string containing the version of the export content :return dict, dict: DR and VS dictionaries containing respectively the structure (DR) and the vocabulary (VS)

def transform_content_inner(content, settings, change_tables=False)
Expand source code
def transform_content_inner(content, settings, change_tables=False):
    """
    Transform a one base export content to:
    - remove unused keys which could create circle import later
    - harmonise some entries
    - reshape entries if needed
    - remove elements which are not used
    - filter content on status
    :param dict content: one base content export (direct export or created from `transform_content_three_bases`
    :return dict: the transform content
    """
    logger = get_logger()
    if isinstance(content, dict) and len(content) == 1 and change_tables:
        logger.error("For one base dict, change_tables must be False.")
        raise ValueError("For one base dict, change_tables must be False.")
    elif isinstance(content, dict) and len(content) > 1 and not change_tables:
        logger.error("For several bases dict, changes_tables must be True.")
        raise ValueError("For several bases dict, changes_tables must be True.")
    elif isinstance(content, dict):
        # If needed, deal with one base creation
        if change_tables:
            new_content = dict()
            for (elt, (base, table)) in settings["tables_provenance"].items():
                new_content[elt] = content[settings["several_bases_name"][base]][table]
            logger.info("Harmonise bases content record ids")
            content_str = json.dumps(new_content)
            for ((base_old, table_old, key_old), (base_new, table_new, key_new)) in settings["several_bases_link"].values():
                old_table = content[settings["several_bases_name"][base_old]][table_old]["records"]
                new_table = content[settings["several_bases_name"][base_new]][table_new]["records"]
                old_dict = {record_id: value[key_old] for (record_id, value) in old_table.items()}
                new_dict = {value[key_new]: record_id for (record_id, value) in new_table.items()}
                for (id, val) in old_dict.items():
                    content_str = content_str.replace(f'"{id}"', f'"{new_dict[val]}"')
            content = json.loads(content_str)
        else:
            content = content[list(content)[0]]
        # Rename some elements
        for (patt, repl) in settings["tables_to_rename"].items():
            for key in [key for key in content if re.compile(patt).match(key) is not None]:
                content[re.sub(patt, repl, key)] = content.pop(key)
        for elt in [elt for elt in list(content) if any(re.compile(patt).match(elt) for patt in settings["tables_to_delete"])]:
            del content[elt]
        # Tidy the content of the export file
        default_patterns_to_remove = settings["default_keys_to_delete"]
        to_remove_keys_patterns = settings["keys_to_delete"]
        to_rename_keys_patterns = settings["keys_to_rename"]
        to_merge_keys_patterns = settings["keys_to_merge"]
        to_sort_keys_content = settings["keys_to_sort"]
        content = remove_unused_keys(content=content, per_entry_input=to_remove_keys_patterns,
                                     default_patterns_to_remove=default_patterns_to_remove)
        content = rename_useful_keys(content=content, per_entry_input=to_rename_keys_patterns)
        content = merge_useful_keys(content=content, per_entry_input=to_merge_keys_patterns)
        # Filter on status if needed then remove linked keys
        content = filter_content(content)
        # Add name and uid if needed, build equivalence dict between record_id and uid
        content, record_to_uid_index = add_useful_keys(content)
        # Tidy the content of the dictionary by removing unused entries
        content = tidy_content(content, record_to_uid_index)
        # Sort content of needed keys
        content = sort_useful_keys(content, per_entry_input=to_sort_keys_content)
        for (reshape_style, from_list_to_string_keys_content) in settings["keys_to_format"].items():
            content = reshape_useful_keys(content, per_entry_input=from_list_to_string_keys_content,
                                          reshape_style=reshape_style)
        return content
    else:
        logger.error(f"Deal with dict types, not {type(content).__name__}")
        raise TypeError(f"Deal with dict types, not {type(content).__name__}")

Transform a one base export content to: - remove unused keys which could create circle import later - harmonise some entries - reshape entries if needed - remove elements which are not used - filter content on status :param dict content: one base content export (direct export or created from transform_content_three_bases :return dict: the transform content