Module data_request_api.query.data_request

Data request.

Classes

class DRObjects (id, dr, DR_type='undef', structure={}, **attributes)
Expand source code
class DRObjects(object):
    """
    Base object to build the ones used within the DR API.
    Use to define basic information needed.
    """

    def __init__(self, id, dr, DR_type="undef", structure=dict(), **attributes):
        """
        Initialisation of the object.
        :param str id: id of the object
        :param DataRequest dr: reference data request object
        :param str DR_type: type of DR object (for reference in vocabulary server)
        :param dict structure: if needed, elements linked by structure to the current object
        :param dict attributes: attributes of the object coming from vocabulary server
        """
        if DR_type in ["undef", ]:
            self.DR_type = DR_type
        else:
            self.DR_type = to_plural(DR_type)
        _, attributes["id"] = is_link_id_or_value(id)
        self.dr = dr
        self.attributes = self.transform_content(attributes, dr)
        self.structure = self.transform_content(structure, dr, force_transform=True)

    @property
    def id(self):
        return self.attributes["id"]

    @staticmethod
    def transform_content_inner(key, value, dr, force_transform=False):
        if isinstance(value, str) and (force_transform or is_link_id_or_value(value)[0]):
            return dr.find_element(key, value)
        elif isinstance(value, str) and key not in ["id", ]:
            return ConstantValueObj(value)
        else:
            return value

    def transform_content(self, input_dict, dr, force_transform=False):
        """
        Transform the input dict to have only elements which are object (either DRObject -for links- or
        ConstantValueObj -for strings-).
        :param dict input_dict: input dictionary to transform
        :param DataRequest dr: reference Data Request to find elements from VS
        :param bool force_transform: boolean indicating whether all elements should be considered as linked and
        transform into DRObject (True) or alternatively to DRObject if link or ConstantValueObj if string.
        :return dict: transformed dictionary
        """
        for (key, values) in input_dict.items():
            if isinstance(values, list):
                input_dict[key] = [self.transform_content_inner(key=key, value=value, dr=dr,
                                                                force_transform=force_transform) for value in values]
            else:
                input_dict[key] = self.transform_content_inner(key=key, value=values, dr=dr,
                                                               force_transform=force_transform)
        return input_dict

    @classmethod
    def from_input(cls, dr, id, DR_type="undef", elements=dict(), structure=dict()):
        """
        Create instance of the class using specific arguments.
        :param DataRequest dr: reference Data Request objects
        :param str id: id of the object
        :param str DR_type: type of the object
        :param dict elements: attributes of the objects (coming from VS)
        :param dict structure: structure of the object through Data Request
        :return: instance of the current class.
        """
        elements["id"] = id
        return cls(dr=dr, DR_type=DR_type, structure=structure, **elements)

    def __hash__(self):
        return hash(self.id)

    def __eq__(self, other):
        return isinstance(other, type(self)) and self.id == other.id and self.DR_type == other.DR_type and \
            self.structure == other.structure and self.attributes == other.attributes

    def __lt__(self, other):
        return self.id < other.id

    def __gt__(self, other):
        return self.id > other.id

    def __copy__(self):
        return type(self).__call__(dr=self.dr, DR_type=copy.deepcopy(self.DR_type),
                                   structure=copy.deepcopy(self.structure), **copy.deepcopy(self.attributes))

    def __deepcopy__(self, memodict={}):
        return self.__copy__()

    def check(self):
        """
        Make checks on the current object.
        :return:
        """
        pass

    def __str__(self):
        return os.linesep.join(self.print_content())

    def __repr__(self):
        return os.linesep.join(self.print_content())

    def __getattr__(self, item):
        return self.attributes.get(item, ConstantValueObj())

    def get(self, item):
        return self.__getattr__(item)

    def print_content(self, level=0, add_content=True):
        """
        Function to return a printable version of the content of the current class.
        :param level: level of indent of the result
        :param add_content: should inner content be added?
        :return: a list of strings that can be assembled to print the content.
        """
        indent = "    " * level
        return [f"{indent}{to_singular(self.DR_type)}: {self.name} (id: {is_link_id_or_value(self.id)[1]})", ]

    def filter_on_request(self, request_value, inner=True):
        """
        Check whether the current object can be filtered by the requested value.
        :param request_value: an object to be tested
        :return bool, bool: a bool indicating whether the current object can be filtered by the requested one,
                            a bool indicating whether the current object is linked to the request one.
        """
        request_type = request_value.DR_type
        filtered_found, found = self.dr.cache_filtering[self.DR_type][self.id][request_type][request_value.id]
        if filtered_found is None:
            filtered_found = request_value.DR_type == self.DR_type
            if filtered_found:
                found = request_value == self
            else:
                found = False
            self.dr.cache_filtering[self.DR_type][self.id][request_type][request_value.id] = (filtered_found, found)
        return filtered_found, found

    @staticmethod
    def filter_on_request_list(request_values, list_to_check, inner=True):
        if not isinstance(request_values, list):
            request_values = [request_values, ]
        iter_to_check = iter(list_to_check)
        found = False
        while not found and (elt := next(iter_to_check, None)) is not None:
            found = all(elt.filter_on_request(request_value=request_value, inner=inner)[1] for request_value in request_values)
        return found

Base object to build the ones used within the DR API. Use to define basic information needed.

Initialisation of the object. :param str id: id of the object :param DataRequest dr: reference data request object :param str DR_type: type of DR object (for reference in vocabulary server) :param dict structure: if needed, elements linked by structure to the current object :param dict attributes: attributes of the object coming from vocabulary server

Subclasses

Static methods

def filter_on_request_list(request_values, list_to_check, inner=True)
Expand source code
@staticmethod
def filter_on_request_list(request_values, list_to_check, inner=True):
    if not isinstance(request_values, list):
        request_values = [request_values, ]
    iter_to_check = iter(list_to_check)
    found = False
    while not found and (elt := next(iter_to_check, None)) is not None:
        found = all(elt.filter_on_request(request_value=request_value, inner=inner)[1] for request_value in request_values)
    return found
def from_input(dr, id, DR_type='undef', elements={}, structure={})

Create instance of the class using specific arguments. :param DataRequest dr: reference Data Request objects :param str id: id of the object :param str DR_type: type of the object :param dict elements: attributes of the objects (coming from VS) :param dict structure: structure of the object through Data Request :return: instance of the current class.

def transform_content_inner(key, value, dr, force_transform=False)
Expand source code
@staticmethod
def transform_content_inner(key, value, dr, force_transform=False):
    if isinstance(value, str) and (force_transform or is_link_id_or_value(value)[0]):
        return dr.find_element(key, value)
    elif isinstance(value, str) and key not in ["id", ]:
        return ConstantValueObj(value)
    else:
        return value

Instance variables

prop id
Expand source code
@property
def id(self):
    return self.attributes["id"]

Methods

def check(self)
Expand source code
def check(self):
    """
    Make checks on the current object.
    :return:
    """
    pass

Make checks on the current object. :return:

def filter_on_request(self, request_value, inner=True)
Expand source code
def filter_on_request(self, request_value, inner=True):
    """
    Check whether the current object can be filtered by the requested value.
    :param request_value: an object to be tested
    :return bool, bool: a bool indicating whether the current object can be filtered by the requested one,
                        a bool indicating whether the current object is linked to the request one.
    """
    request_type = request_value.DR_type
    filtered_found, found = self.dr.cache_filtering[self.DR_type][self.id][request_type][request_value.id]
    if filtered_found is None:
        filtered_found = request_value.DR_type == self.DR_type
        if filtered_found:
            found = request_value == self
        else:
            found = False
        self.dr.cache_filtering[self.DR_type][self.id][request_type][request_value.id] = (filtered_found, found)
    return filtered_found, found

Check whether the current object can be filtered by the requested value. :param request_value: an object to be tested :return bool, bool: a bool indicating whether the current object can be filtered by the requested one, a bool indicating whether the current object is linked to the request one.

def get(self, item)
Expand source code
def get(self, item):
    return self.__getattr__(item)
def print_content(self, level=0, add_content=True)
Expand source code
def print_content(self, level=0, add_content=True):
    """
    Function to return a printable version of the content of the current class.
    :param level: level of indent of the result
    :param add_content: should inner content be added?
    :return: a list of strings that can be assembled to print the content.
    """
    indent = "    " * level
    return [f"{indent}{to_singular(self.DR_type)}: {self.name} (id: {is_link_id_or_value(self.id)[1]})", ]

Function to return a printable version of the content of the current class. :param level: level of indent of the result :param add_content: should inner content be added? :return: a list of strings that can be assembled to print the content.

def transform_content(self, input_dict, dr, force_transform=False)
Expand source code
def transform_content(self, input_dict, dr, force_transform=False):
    """
    Transform the input dict to have only elements which are object (either DRObject -for links- or
    ConstantValueObj -for strings-).
    :param dict input_dict: input dictionary to transform
    :param DataRequest dr: reference Data Request to find elements from VS
    :param bool force_transform: boolean indicating whether all elements should be considered as linked and
    transform into DRObject (True) or alternatively to DRObject if link or ConstantValueObj if string.
    :return dict: transformed dictionary
    """
    for (key, values) in input_dict.items():
        if isinstance(values, list):
            input_dict[key] = [self.transform_content_inner(key=key, value=value, dr=dr,
                                                            force_transform=force_transform) for value in values]
        else:
            input_dict[key] = self.transform_content_inner(key=key, value=values, dr=dr,
                                                           force_transform=force_transform)
    return input_dict

Transform the input dict to have only elements which are object (either DRObject -for links- or ConstantValueObj -for strings-). :param dict input_dict: input dictionary to transform :param DataRequest dr: reference Data Request to find elements from VS :param bool force_transform: boolean indicating whether all elements should be considered as linked and transform into DRObject (True) or alternatively to DRObject if link or ConstantValueObj if string. :return dict: transformed dictionary

class DataRequest (input_database, VS, **kwargs)
Expand source code
class DataRequest(object):
    """
    Data Request API object used to navigate among the Data Request and Vocabulary Server contents.
    """

    def __init__(self, input_database, VS, **kwargs):
        """
        Initialisation of the Data Request object
        :param dict input_database: dictionary containing the DR database
        :param VocabularyServer VS: reference Vocabulary Server to et information on objects
        :param dict kwargs: additional parameters
        """
        self.VS = VS
        self.content_version = input_database["version"]
        self.structure = input_database
        self.mapping = defaultdict(lambda: defaultdict(lambda: dict))
        self.content = defaultdict(lambda: defaultdict(lambda: dict))
        for op in input_database["opportunities"]:
            self.content["opportunities"][op] = self.find_element("opportunities", op)
        self.cache = dict()
        self.cache_filtering = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: (None, None)))))
        self.filtering_structure = read_json_file(os.sep.join([os.path.dirname(os.path.abspath(__file__)), "filtering.json"]))["definition"]

    def check(self):
        """
        Method to check the content of the Data Request.
        :return:
        """
        logger = get_logger()
        logger.info("Check data request metadata")
        logger.info("... Check experiments groups")
        for elt in self.get_experiment_groups():
            elt.check()
        logger.info("... Check variables groups")
        for elt in self.get_variable_groups():
            elt.check()
        logger.info("... Check opportunities")
        for elt in self.get_opportunities():
            elt.check()

    @property
    def software_version(self):
        """
        Method to get the version of the software.
        :return str: version of the software
        """
        return version

    @property
    def version(self):
        """
        Method to get the version of both software and content
        :return str : formatted version of the software and the content
        """
        return f"Software {self.software_version} - Content {self.content_version}"

    @classmethod
    def from_input(cls, json_input, version, **kwargs):
        """
        Method to instanciate the DataRequest object from a single input.
        :param str or dict json_input: dictionary or name of the dedicated json file containing the export content
        :param str version: version of the content
        :param dict kwargs: additional parameters
        :return DataRequest: instance of the DataRequest object.
        """
        DR_content, VS_content = cls._split_content_from_input_json(json_input, version=version)
        VS = VocabularyServer(VS_content)
        return cls(input_database=DR_content, VS=VS, **kwargs)

    @classmethod
    def from_separated_inputs(cls, DR_input, VS_input, **kwargs):
        """
        Method to instanciate the DataRequestObject from two inputs.
        :param str or dict DR_input: dictionary or name of the json file containing the data request structure
        :param str or dict VS_input: dictionary or name of the json file containing the vocabulary server
        :param dict kwargs: additional parameters
        :return DataRequest: instance of the DataRequest object
        """
        logger = get_logger()
        if isinstance(DR_input, str) and os.path.isfile(DR_input):
            DR = read_json_file(DR_input)
        elif isinstance(DR_input, dict):
            DR = copy.deepcopy(DR_input)
        else:
            logger.error("DR_input should be either the name of a json file or a dictionary.")
            raise TypeError("DR_input should be either the name of a json file or a dictionary.")
        if isinstance(VS_input, str) and os.path.isfile(VS_input):
            VS = VocabularyServer.from_input(VS_input)
        elif isinstance(VS_input, dict):
            VS = VocabularyServer(copy.deepcopy(VS_input))
        else:
            logger.error("VS_input should be either the name of a json file or a dictionary.")
            raise TypeError("VS_input should be either the name of a json file or a dictionary.")
        return cls(input_database=DR, VS=VS, **kwargs)

    @staticmethod
    def _split_content_from_input_json(input_json, version):
        """
        Split the export if given through a single file and not from two files into the two dictionaries.
        :param dict or str input_json: json input containing the bases or content as a dict
        :param str version: version of the content used
        :return dict, dict: two dictionaries containing the DR and the VS
        """
        logger = get_logger()
        if not isinstance(version, str):
            logger.error(f"Version should be a string, not {type(version).__name__}.")
            raise TypeError(f"Version should be a string, not {type(version).__name__}.")
        if isinstance(input_json, str) and os.path.isfile(input_json):
            content = read_json_file(input_json)
        elif isinstance(input_json, dict):
            content = input_json
        else:
            logger.error("input_json should be either the name of a json file or a dictionary.")
            raise TypeError("input_json should be either the name of a json file or a dictionary.")
        DR, VS = transform_content(content, version=version)
        return DR, VS

    def __str__(self):
        rep = list()
        indent = "    "
        rep.append("Data Request content:")
        rep.append(f"{indent}Experiments groups:")
        for elt in self.get_experiment_groups():
            rep.extend(elt.print_content(level=2))
        rep.append(f"{indent}Variables groups:")
        for elt in self.get_variable_groups():
            rep.extend(elt.print_content(level=2))
        rep.append(f"{indent}Opportunities:")
        for elt in self.get_opportunities():
            rep.extend(elt.print_content(level=2))
        return os.linesep.join(rep)

    def _get_sorted_list(self, list_id):
        if self.cache.get(list_id) is None:
            self.cache[list_id] = [self.content[list_id][key] for key in sorted(list(self.content[list_id]))]
        return self.cache[list_id]

    def get_experiment_groups(self):
        """
        Get the ExperimentsGroup of the Data Request.
        :return list of ExperimentsGroup: list of the ExperimentsGroup of the DR content.
        """
        return self._get_sorted_list("experiment_groups")

    def get_experiment_group(self, id):
        """
        Get the ExperimentsGroup associated with a specific id.
        :param str id: id of the ExperimentsGroup
        :return ExperimentsGroup: the ExperimentsGroup associated with the input id
        """
        rep = self.find_element("experiment_groups", id, default=None)
        if rep is not None:
            return rep
        else:
            raise ValueError(f"Could not find experiments group {id} among {self.get_experiment_groups()}.")

    def get_variable_groups(self):
        """
        Get the VariablesGroup of the Data Request.
        :return list of VariablesGroup: list of the VariablesGroup of the DR content.
        """
        return self._get_sorted_list("variable_groups")

    def get_variable_group(self, id):
        """
        Get the VariablesGroup associated with a specific id.
        :param str id: id of the VariablesGroup
        :return VariablesGroup: the VariablesGroup associated with the input id
        """
        rep = self.find_element("variable_groups", id, default=None)
        if rep is not None:
            return rep
        else:
            raise ValueError(f"Could not find variables group {id}.")

    def get_opportunities(self):
        """
        Get the Opportunity of the Data Request.
        :return list of Opportunity: list of the Opportunity of the DR content.
        """
        return self._get_sorted_list("opportunities")

    def get_opportunity(self, id):
        """
        Get the Opportunity associated with a specific id.
        :param str id: id of the Opportunity
        :return Opportunity: the Opportunity associated with the input id
        """
        rep = self.find_element("opportunities", id, default=None)
        if rep is not None:
            return rep
        else:
            raise ValueError(f"Could not find opportunity {id}.")

    def get_variables(self):
        """
        Get the Variable of the Data Request.
        :return list of Variable: list of the Variable of the DR content.
        """
        if self.cache.get("variables") is None:
            rep = set()
            for var_grp in self.get_variable_groups():
                rep = rep | set(var_grp.get_variables())
            self.cache["variables"] = sorted(list(rep))
        return self.cache["variables"]

    def get_mips(self):
        """
        Get the MIPs of the Data Request.
        :return list of DRObject or ConstantValueObj: list of the MIPs of the DR content.
        """
        if self.cache.get("mips") is None:
            rep = set()
            for op in self.get_opportunities():
                rep = rep | set(op.get_mips())
            for var_grp in self.get_variable_groups():
                rep = rep | set(var_grp.get_mips())
            self.cache["mips"] = sorted(list(rep))
        return self.cache["mips"]

    def get_experiments(self):
        """
        Get the experiments of the Data Request.
        :return list of DRObject: list of the experiments of the DR content.
        """
        if self.cache.get("experiments") is None:
            rep = set()
            for exp_grp in self.get_experiment_groups():
                rep = rep | set(exp_grp.get_experiments())
            self.cache["experiments"] = sorted(list(rep))
        return self.cache["experiments"]

    def get_data_request_themes(self):
        """
        Get the themes of the Data Request.
        :return list of DRObject: list of the themes of the DR content.
        """
        if self.cache.get("data_request_themes") is None:
            rep = set()
            for op in self.get_opportunities():
                rep = rep | set(op.get_themes())
            self.cache["data_request_themes"] = sorted(list(rep))
        return self.cache["data_request_themes"]

    def find_priority_per_variable(self, variable, **filter_request):
        logger = get_logger()
        priorities = self.filter_elements_per_request(elements_to_filter="priority_level",
                                                      requests={"variable": variable, **filter_request})
        logger.debug(f"Priorities found: {priorities} ({[int(priority.value) for priority in priorities]})")
        priority = min(int(priority.value) for priority in priorities)
        logger.debug(f"Priority_retain {priority}")
        return priority

    def find_variables_per_priority(self, priority):
        """
        Find all the variables which have a specified priority.
        :param DRObjects or ConstantValueObj or str priority: priority to be considered
        :return list of Variable: list of the variables which have a specified priority.
        """
        return self.filter_elements_per_request(elements_to_filter="variables",
                                                requests=dict(priority_level=[priority, ]))

    def find_opportunities_per_theme(self, theme):
        """
        Find all the opportunities which are linked to a specified theme.
        :param DRObjects or ConstantValueObj or str theme: theme to be considered
        :return list of Opportunity: list of the opportunities which are linked to a specified theme.
        """
        return self.filter_elements_per_request(elements_to_filter="opportunities",
                                                requests=dict(data_request_themes=[theme, ]))

    def find_experiments_per_theme(self, theme):
        """
        Find all the experiments which are linked to a specified theme.
        :param DRObjects or ConstantValueObj or str theme: theme to be considered
        :return list of DRObjects or ConstantValueObj: list of the experiments which are linked to a specified theme.
        """
        return self.filter_elements_per_request(elements_to_filter="experiments",
                                                requests=dict(data_request_themes=[theme, ]))

    def find_variables_per_theme(self, theme):
        """
        Find all the variables which are linked to a specified theme.
        :param DRObjects or ConstantValueObj or str theme: theme to be considered
        :return list of Variable: list of the variables which are linked to a specified theme.
        """
        return self.filter_elements_per_request(elements_to_filter="variables",
                                                requests=dict(data_request_themes=[theme, ]))

    def find_mips_per_theme(self, theme):
        """
        Find all the MIPs which are linked to a specified theme.
        :param DRObjects or ConstantValueObj or str theme: theme to be considered
        :return list of DRObjects or ConstantValueObj: list of the MIPs which are linked to a specified theme.
        """
        return self.filter_elements_per_request(elements_to_filter="mips", requests=dict(data_request_themes=[theme, ]))

    def find_themes_per_opportunity(self, opportunity):
        """
        Find all the themes which are linked to a specified opportunity.
        :param Opportunity or str opportunity: opportunity to be considered
        :return list of DRObjects or ConstantValueObj: list of the themes which are linked to a specified opportunity.
        """
        return self.filter_elements_per_request(elements_to_filter="data_request_themes",
                                                requests=dict(opportunities=[opportunity, ]))

    def find_experiments_per_opportunity(self, opportunity):
        """
        Find all the experiments which are linked to a specified opportunity.
        :param Opportunity or str opportunity: opportunity to be considered
        :return list of DRObjects or ConstantValueObj: list of the experiments which are linked to a specified
                opportunity.
        """
        return self.filter_elements_per_request(elements_to_filter="experiments",
                                                requests=dict(opportunities=[opportunity, ]))

    def find_variables_per_opportunity(self, opportunity):
        """
        Find all the variables which are linked to a specified opportunity.
        :param Opportunity or str opportunity: opportunity to be considered
        :return list of Variable: list of the variables which are linked to a specified opportunity.
        """
        return self.filter_elements_per_request(elements_to_filter="variables",
                                                requests=dict(opportunities=[opportunity, ]))

    def find_mips_per_opportunity(self, opportunity):
        """
        Find all the MIPs which are linked to a specified opportunity.
        :param Opportunity or str opportunity: opportunity to be considered
        :return list of DRObjects or ConstantValueObj: list of the MIPs which are linked to a specified opportunity.
        """
        return self.filter_elements_per_request(elements_to_filter="mips", requests=dict(opportunities=[opportunity, ]))

    def find_opportunities_per_variable(self, variable):
        """
        Find all the opportunities which are linked to a specified variable.
        :param Variable or str variable: variable to be considered
        :return list of Opportunity: list of the opportunities which are linked to a specified variable.
        """
        return self.filter_elements_per_request(elements_to_filter="opportunities",
                                                requests=dict(variables=[variable, ]))

    def find_themes_per_variable(self, variable):
        """
        Find all the themes which are linked to a specified variable.
        :param Variable or str variable: variable to be considered
        :return list of DRObjects or ConstantValueObj: list of the themes which are linked to a specified variable.
        """
        return self.filter_elements_per_request(elements_to_filter="data_request_themes",
                                                requests=dict(variables=[variable, ]))

    def find_mips_per_variable(self, variable):
        """
        Find all the MIPs which are linked to a specified variable.
        :param Variable or str variable: variable to be considered
        :return list of DRObjects or ConstantValueObj: list of the MIPs which are linked to a specified variable.
        """
        return self.filter_elements_per_request(elements_to_filter="mips", requests=dict(variables=[variable, ]))

    def find_opportunities_per_experiment(self, experiment):
        """
        Find all the opportunities which are linked to a specified experiment.
        :param DRObjects or ConstantValueObj or str experiment: experiment to be considered
        :return list of Opportunity: list of the opportunities which are linked to a specified experiment.
        """
        return self.filter_elements_per_request(elements_to_filter="opportunities",
                                                requests=dict(experiments=[experiment, ]))

    def find_themes_per_experiment(self, experiment):
        """
        Find all the themes which are linked to a specified experiment.
        :param DRObjects or ConstantValueObj or str experiment: experiment to be considered
        :return list of DRObjects or ConstantValueObj: list of the themes which are linked to a specified experiment.
        """
        return self.filter_elements_per_request(elements_to_filter="data_request_themes",
                                                requests=dict(experiments=[experiment, ]))

    def find_element_per_identifier_from_vs(self, element_type, key, value, default=False, **kwargs):
        """
        Find an element of a specific type and specified by a value (of a given kind) from vocabulary server.
        :param str element_type: type of the element to be found (same as in vocabulary server).
        :param str key: type of the value key to be looked for ("id", "name"...)
        :param str value: value to be looked for
        :param default: default value to be used if the value is not found
        :param dict kwargs: additional attributes to be used for vocabulary server search.
        :return Opportunity or VariablesGroup or ExperimentsGroup or Variables or DRObjects or ConstantValueObj or
                default: the element found from vocabulary server or the default value if none is found.
        """
        if key in ["id", ]:
            value = build_link_from_id(value)
        init_element_type = to_plural(element_type)
        element_type = self.VS.get_element_type(element_type)
        rep = self.VS.get_element(element_type=element_type, element_id=value, id_type=key, default=default, **kwargs)
        if rep in [value, ]:
            rep = default
        elif rep not in [default, ]:
            structure = self.structure.get(element_type, dict()).get(rep["id"], dict())
            if element_type in ["opportunities", ]:
                rep = Opportunity.from_input(dr=self, **rep, **structure)
            elif element_type in ["variable_groups", ]:
                rep = VariablesGroup.from_input(dr=self, **rep, **structure)
            elif element_type in ["experiment_groups", ]:
                rep = ExperimentsGroup.from_input(dr=self, **rep, **structure)
            elif element_type in ["variables", ]:
                rep = Variable.from_input(dr=self, **rep)
            elif init_element_type in ["max_priority_levels", ]:
                rep = DRObjects.from_input(dr=self, id=rep["id"], DR_type=init_element_type, elements=rep)
            else:
                rep = DRObjects.from_input(dr=self, id=rep["id"], DR_type=element_type, elements=rep)
        return rep

    def find_element_from_vs(self, element_type, value, key="name", default=False):
        """
        Find an element of a specific type and specified by a value from vocabulary server.
        Update the content and mapping list not to have to ask the vocabulary server again for it.
        :param str element_type: kind of element to be looked for
        :param str value: value to be looked for
        :param default: default value to be returned if no value found
        :return: element corresponding to the specified value of a given type if found, else the default value
        """
        if "priority" in element_type and isinstance(value, int):
            key = "value"
        if key in ["id", ]:
            init_default = default
        else:
            init_default = None
        rep = self.find_element_per_identifier_from_vs(element_type=element_type, value=value, key="id",
                                                       default=init_default)
        if rep is None and key not in ["id", ]:
            rep = self.find_element_per_identifier_from_vs(element_type=element_type, value=value, key=key,
                                                           default=default)
        if rep not in [default, ]:
            self.content[element_type][rep.id] = rep
            self.mapping[element_type][rep.name] = rep
        return rep

    def find_element(self, element_type, value, default=False, key="name"):
        """
        Find an element of a specific type and specified by a value from mapping/content if existing,
         else from vocabulary server.
        :param str element_type: kind of element to be found
        :param str value: value to be looked for
        :param default: value to be returned if non found
        :return: the found element if existing, else the default value
        """
        check_val = is_link_id_or_value(value)[1]
        if check_val in self.content[element_type]:
            return self.content[element_type][check_val]
        elif check_val in self.mapping[element_type]:
            return self.mapping[element_type][check_val]
        else:
            return self.find_element_from_vs(element_type=element_type, value=value, default=default, key=key)

    def get_elements_per_kind(self, element_type):
        """
        Return the list of elements of kind element_type
        :param str element_type: the kind of the elements to be found
        :return list: the list of elements of kind element_type
        """
        logger = get_logger()
        element_types = to_plural(element_type)
        if element_types in ["opportunities", ]:
            elements = self.get_opportunities()
        elif element_types in ["experiment_groups", ]:
            elements = self.get_experiment_groups()
        elif element_types in ["variable_groups", ]:
            elements = self.get_variable_groups()
        elif element_types in ["variables", ]:
            elements = self.get_variables()
        elif element_types in ["experiments", ]:
            elements = self.get_experiments()
        elif element_types in ["data_request_themes", ]:
            elements = self.get_data_request_themes()
        elif element_types in ["mips", ]:
            elements = self.get_mips()
        elif element_types in self.cache:
            elements = sorted(self.cache[element_types])
        else:
            logger.debug(f"Find elements list of kind {element_type} from vocabulary server.")
            element_type, elements_ids = self.VS.get_element_type_ids(element_type)
            elements = [self.find_element(element_type, id) for id in elements_ids]
            self.cache[element_types] = elements
        return elements

    @staticmethod
    def _two_elements_filtering(filtering_elt_1, filtering_elt_2, list_to_filter, inner=True):
        """
        Check if a list of elements can be filtered by two values
        :param filtering_elt_1: first element for filtering
        :param filtering_elt_2: second element for filtering
        :param list list_to_filter: list of elements to be filtered
        :return bool, bool: a boolean to tell if it relevant to filter list_to_filter by filtering_elt_1 and
                            filtering_elt_2, a boolean to tell, if relevant, if filtering_elt_1 and filtering_elt_2 are
                             linked to list_to_filter
        """
        elt = list_to_filter[0]
        filtered_found_1, found_1 = elt.filter_on_request(filtering_elt_1)
        filtered_found_2, found_2 = elt.filter_on_request(filtering_elt_2)
        filtered_found = filtered_found_1 and filtered_found_2
        found = found_1 and found_2
        if filtered_found and not found:
            found = elt.filter_on_request_list(request_values=[filtering_elt_1, filtering_elt_2],
                                               list_to_check=list_to_filter[1:], inner=inner)
        return filtered_found, found

    def get_filtering_structure(self, DR_type):
        rep = set(self.filtering_structure.get(DR_type, list()))
        tmp_rep = copy.deepcopy(rep)
        while len(tmp_rep) > 0:
            rep = rep | tmp_rep
            to_add = set()
            for elt in tmp_rep:
                to_add = to_add | set(self.filtering_structure.get(elt, list()))
            tmp_rep, to_add = to_add, set()
        return rep

    def filter_elements_per_request(self, elements_to_filter, requests=dict(), request_operation="all",
                                    not_requests=dict(), not_request_operation="any",
                                    skip_if_missing=False, print_warning_bcv=True):
        """
        Filter the elements of kind element_type with a dictionary of requests.
        :param str or list od DRObjects elements_to_filter: kind of elements to be filtered
        :param dict requests: dictionary of the filters to be applied
        :param dict not_requests: dictionary of the filters to be applied for non requested elements
        :param str request_operation: should at least one filter from requests be applied ("any") or all filters be fulfilled ("all")
        :param str not_request_operation: should at least one filter from not_requests be applied ("any") or all filters be fulfilled ("all")
        :param bool skip_if_missing: if a request filter is missing, should it be skipped or should an error be raised?
        :param bool print_warning_bcv: should a warning be printed if BCV variables are not included?
        :return: list of elements of kind element_type which correspond to the filtering requests
        """
        def filter_against_request(request, values, elements_to_filter, elements, elements_filtering_structure):
            logger = get_logger()
            request_filtering_structure = self.get_filtering_structure(request)
            common_filtering_structure = request_filtering_structure & elements_filtering_structure
            if len(values) == 0 or len(elements) == 0:
                rep = list()
                filtered_found = True
            elif request == elements_to_filter:
                filtered_found = True
                rep = [(elt.id, elt) for elt in set(values) & set(elements)]
            elif elements_to_filter in request_filtering_structure:
                filtered_found, _ = elements[0].filter_on_request(values[0])
                if filtered_found:
                    rep = [(val.id, elt) for (val, elt) in product(values, elements) if elt.filter_on_request(val)[1]]
            elif request in elements_filtering_structure:
                filtered_found, _ = values[0].filter_on_request(elements[0])
                if filtered_found:
                    rep = [(val.id, elt) for (val, elt) in product(values, elements) if val.filter_on_request(elt)[1]]
            else:
                if "experiment_groups" in common_filtering_structure:
                    list_to_filter = self.get_experiment_groups()
                elif "variables" in common_filtering_structure:
                    list_to_filter = self.get_variables()
                elif "variable_groups" in common_filtering_structure:
                    list_to_filter = self.get_variable_groups()
                else:
                    list_to_filter = self.get_opportunities()
                filtered_found, _ = self._two_elements_filtering(values[0], elements[0], list_to_filter)
                if filtered_found:
                    rep = [(val.id, elt) for (val, elt) in product(values, elements) if
                           self._two_elements_filtering(val, elt, list_to_filter)[1]]
                if "mips" in [request, elements_to_filter]:
                    list_to_filter = self.get_opportunities()
                    new_filtered_found, _ = self._two_elements_filtering(values[0], elements[0], list_to_filter,
                                                                         inner=False)
                    filtered_found = filtered_found or new_filtered_found
                    if new_filtered_found:
                        new_rep = [(val.id, elt) for (val, elt) in product(values, elements) if
                                   self._two_elements_filtering(val, elt, list_to_filter, inner=False)[1]]
                        rep.extend(new_rep)
            if not filtered_found:
                logger.error(f"Could not filter {elements_to_filter} by {request}")
                raise ValueError(f"Could not filter {elements_to_filter} by {request}")
            else:
                new_rep = defaultdict(list)
                for (key, val) in rep:
                    new_rep[key].append(val)
                new_rep = {key: set(val) for (key, val) in new_rep.items()}
                return new_rep

        def fill_request_dict(request_dict):
            logger = get_logger()
            rep = defaultdict(list)
            for (req, values) in request_dict.items():
                if not isinstance(values, list):
                    values = [values, ]
                for val in values:
                    if not isinstance(val, DRObjects):
                        new_val = self.find_element(element_type=req, value=val, default=None)
                    else:
                        new_val = val
                    if new_val is not None:
                        rep[new_val.DR_type].append(new_val)
                    elif skip_if_missing:
                        logger.warning(f"Could not find value {val} for element type {req}, skip it.")
                    else:
                        logger.error(f"Could not find value {val} for element type {req}.")
                        raise ValueError(f"Could not find value {val} for element type {req}.")
            return rep

        def apply_operation_on_requests_links(dict_request_links, elements, operation, void_list="full"):
            logger = get_logger()
            if len(rep) == 0:
                if void_list == "full":
                    rep_list = set(elements)
                elif void_list == "void":
                    rep_list = set()
                else:
                    logger.error(f"Unknown void_list value {void_list} (should be either 'full' or 'void').")
                    raise ValueError(f"Unknown void_list value {void_list} (should be either 'full' or 'void').")
            elif operation in ["any", ]:
                rep_list = {key: set().union(*chain(val.values())) for (key, val) in dict_request_links.items()}
                rep_list = set().union(*rep_list.values())
            elif operation in ["all", ]:
                rep_list = {key: set(elements).intersection(*chain(val.values())) for (key, val) in dict_request_links.items()}
                rep_list = set(elements).intersection(*rep_list.values())
            elif operation in ["all_of_any", ]:
                rep_list = {key: set().union(*chain(val.values())) for (key, val) in dict_request_links.items()}
                rep_list = set(elements).intersection(*rep_list.values())
            elif operation in ["any_of_all", ]:
                rep_list = {key: set(elements).intersection(*chain(val.values())) for (key, val) in dict_request_links.items()}
                rep_list = set().union(*rep_list.values())
            else:
                logger.error(f"Unknown value {operation} for request_operation (only 'all', 'any', 'any_of_all' and 'all_of_any' are available).")
                raise ValueError(f"Unknown value {operation} for request_operation (only 'all', 'any', 'any_of_all' and 'all_of_any' are available).")
            return rep_list

        logger = get_logger()
        if request_operation not in ["any", "all", "any_of_all", "all_of_any"]:
            raise ValueError(f"Operation does not accept {request_operation} as value: choose among 'any' (match at least one"
                             f" requirement) and 'all' (match all requirements)")
        else:
            # Get elements corresponding to element_type
            if isinstance(elements_to_filter, str):
                elements = self.get_elements_per_kind(elements_to_filter)
            else:
                if not isinstance(elements_to_filter, list):
                    elements = [elements_to_filter, ]
                else:
                    elements = elements_to_filter
            elements_to_filter = elements[0].DR_type
            # Find out elements linked to request
            request_dict = fill_request_dict(requests)
            elements_filtering_structure = self.get_filtering_structure(elements_to_filter)

            rep = {request: filter_against_request(request, values, elements_to_filter, elements, elements_filtering_structure)
                   for (request, values) in request_dict.items()}
            rep_list = apply_operation_on_requests_links(rep, elements, request_operation, void_list="full")
            # Find out elements linked to not_request
            not_request_dict = fill_request_dict(not_requests)
            not_rep = {request: filter_against_request(request, values, elements_to_filter, elements,
                                                       elements_filtering_structure)
                       for (request, values) in not_request_dict.items()}
            not_rep_list = apply_operation_on_requests_links(not_rep, elements, not_request_operation, void_list="void")
            # Remove not requested elements from requested elements
            rep_list = rep_list - not_rep_list

            if print_warning_bcv and elements_to_filter in ["variables", ]:
                bcv_op = self.find_element("opportunities", "Baseline Climate Variables for Earth System Modelling", default=None)
                if bcv_op is None:
                    logger.warning("Can not check that request filtering includes baseline variables, no reference found.")
                else:
                    bcv_list = set(elt for elt in self.get_variables() if bcv_op.filter_on_request(elt)[1])
                    missing_list = bcv_list - rep_list
                    if len(missing_list) > 0:
                        logger.warning("Output of the current filtering request does not include all the BCV variables.")
            return sorted(list(rep_list))

    def find_opportunities(self, operation="any", skip_if_missing=False, **kwargs):
        """
        Find the opportunities corresponding to filtering criteria.
        :param str operation: should at least one filter be applied ("any") or all filters be fulfilled ("all")
        :param bool skip_if_missing: if a request filter is missing, should it be skipped or should an error be raised?
        :param dict kwargs: filters to be applied
        :return list of Opportunity: opportunities linked to the filters
        """
        return self.filter_elements_per_request(elements_to_filter="opportunities", requests=kwargs,
                                                request_operation=operation, skip_if_missing=skip_if_missing)

    def find_experiments(self, operation="any", skip_if_missing=False, **kwargs):
        """
        Find the experiments corresponding to filtering criteria.
        :param str operation: should at least one filter be applied ("any") or all filters be fulfilled ("all")
        :param bool skip_if_missing: if a request filter is missing, should it be skipped or should an error be raised?
        :param dict kwargs: filters to be applied
        :return list of DRObjects: experiments linked to the filters
        """
        return self.filter_elements_per_request(elements_to_filter="experiments", requests=kwargs, request_operation=operation,
                                                skip_if_missing=skip_if_missing)

    def find_variables(self, operation="any", skip_if_missing=False, **kwargs):
        """
        Find the variables corresponding to filtering criteria.
        :param str operation: should at least one filter be applied ("any") or all filters be fulfilled ("all")
        :param bool skip_if_missing: if a request filter is missing, should it be skipped or should an error be raised?
        :param dict kwargs: filters to be applied
        :return list of Variable: variables linked to the filters
        """
        return self.filter_elements_per_request(elements_to_filter="variables", requests=kwargs, request_operation=operation,
                                                skip_if_missing=skip_if_missing)

    def sort_func(self, data_list, sorting_request=list()):
        """
        Method to sort a list of objects based on some criteria
        :param list data_list: the list of objects to be sorted
        :param list sorting_request: list of criteria to sort the input list
        :return list: sorted list
        """
        sorting_request = copy.deepcopy(sorting_request)
        if len(sorting_request) == 0:
            return sorted(data_list, key=lambda x: x.id)
        else:
            sorting_val = sorting_request.pop(0)
            sorting_values_dict = defaultdict(list)
            for data in data_list:
                sorting_values_dict[str(data.get(sorting_val))].append(data)
            rep = list()
            for elt in sorted(list(sorting_values_dict)):
                rep.extend(self.sort_func(sorting_values_dict[elt], sorting_request))
            return rep

    def export_data(self, main_data, output_file, filtering_requests=dict(), filtering_operation="all",
                    filtering_skip_if_missing=False, export_columns_request=list(), sorting_request=list(), **kwargs):
        """
        Method to export a filtered and sorted list of data to a csv file.
        :param str main_data: kind of data to be exported
        :param str output_file: name of the output faile (csv)
        :param dict filtering_requests: filtering request to be applied to the list of object of main_data kind
        :param str filtering_operation: filtering request_operation to be applied to the list of object of main_data kind
        :param bool filtering_skip_if_missing: filtering skip_if_missing to be applied to the list of object of
                                               main_data kind
        :param list export_columns_request: columns to be putted in the output file
        :param list sorting_request: sorting criteria to be applied
        :param dict kwargs: additional arguments to be given to function write_csv_output_file_content
        :return: an output csv file
        """
        filtered_data = self.filter_elements_per_request(elements_to_filter=main_data, requests=filtering_requests,
                                                         request_operation=filtering_operation,
                                                         skip_if_missing=filtering_skip_if_missing)
        sorted_filtered_data = self.sort_func(filtered_data, sorting_request)

        export_columns_request.insert(0, "id")
        content = list()
        content.append(export_columns_request)
        for data in sorted_filtered_data:
            content.append([str(data.__getattr__(key)) for key in export_columns_request])

        write_csv_output_file_content(output_file, content, **kwargs)

    def export_summary(self, lines_data, columns_data, output_file, sorting_line="id", title_line="name",
                       sorting_column="id", title_column="name", filtering_requests=dict(), filtering_operation="all",
                       filtering_skip_if_missing=False, regroup=False, **kwargs):
        """
        Create a 2D tables of csv kind which give the linked between the two list of elements kinds specified
        :param str lines_data: kind of data to be put in row
        :param str columns_data: kind of data to be put in range
        :param str output_file: name of the output file (csv)
        :param str sorting_line: criteria to sort raw data
        :param str title_line: attribute to be used for raw header
        :param str sorting_column: criteria to sort range data
        :param str title_column: attribute to be used for range header
        :param dict filtering_requests: filtering request to be applied to the list of object of main_data kind
        :param str filtering_operation: filtering request_operation to be applied to the list of object of main_data kind
        :param bool filtering_skip_if_missing: filtering skip_if_missing to be applied to the list of object of
                                               main_data kind
        :param bool regroup: should lines/columns be regrouped by similarities
        :param dict kwargs: additional arguments to be given to function write_csv_output_file_content
        :return: a csv output file
        """
        logger = get_logger()
        logger.debug(f"Generate summary for {lines_data}/{columns_data}")
        filtered_data = self.filter_elements_per_request(elements_to_filter=lines_data, requests=filtering_requests,
                                                         request_operation=filtering_operation,
                                                         skip_if_missing=filtering_skip_if_missing)
        if not isinstance(sorting_line, list):
            sorting_line = [sorting_line, ]
        sorted_filtered_data = self.sort_func(filtered_data, sorting_request=sorting_line)
        columns_datasets = self.filter_elements_per_request(elements_to_filter=columns_data)
        if not isinstance(sorting_column, list):
            sorting_column = [sorting_column, ]
        columns_datasets = self.sort_func(columns_datasets, sorting_request=sorting_column)
        columns_title_list = [str(elt.__getattr__(title_column)) for elt in columns_datasets]
        columns_title_dict = {elt.id: title for (elt, title) in zip(columns_datasets, columns_title_list)}
        table_title = f"{lines_data} {title_line} / {columns_data} {title_column}"
        lines_title_list = [elt.__getattr__(title_line) for elt in sorted_filtered_data]
        lines_title_dict = {elt.id: title for (elt, title) in zip(sorted_filtered_data, lines_title_list)}

        nb_lines = len(sorted_filtered_data)
        logger.debug(f"{nb_lines} elements found for {lines_data}")
        logger.debug(f"{len(columns_title_list)} found elements for {columns_data}")

        logger.debug("Generate summary")
        content = defaultdict(lambda: dict())
        if len(columns_title_list) > len(lines_title_list):
            DR_type = columns_datasets[0].DR_type
            for (column_data, column_title) in zip(columns_datasets, columns_title_list):
                filter_line_datasets = self.filter_elements_per_request(elements_to_filter=sorted_filtered_data,
                                                                        requests={DR_type: column_data},
                                                                        request_operation="all", print_warning_bcv=False)
                for line in filter_line_datasets:
                    content[lines_title_dict[line.id]][column_title] = "x"
        else:
            DR_type = sorted_filtered_data[0].DR_type
            for (line_data, line_title) in zip(sorted_filtered_data, lines_title_list):
                filtered_columns = self.filter_elements_per_request(elements_to_filter=columns_datasets,
                                                                    requests={DR_type: line_data}, request_operation="all",
                                                                    print_warning_bcv=False)
                content[line_title] = {columns_title_dict[elt.id]: "x" for elt in filtered_columns}

        logger.debug("Format summary")
        if regroup:
            similar_columns = defaultdict(list)
            for column_data_title in columns_title_list:
                similar_columns[tuple([content[line_title].get(column_data_title, "")
                                       for line_title in lines_title_list])].append(column_data_title)
            new_columns_title_list = list()
            for similar_column in sorted(list(similar_columns), reverse=True, key=lambda x: (x.count("x"), x)):
                new_columns_title_list.extend(similar_columns[similar_column])
            columns_title_list = new_columns_title_list
            similar_lines = defaultdict(list)
            for line_data_title in lines_title_list:
                similar_lines[tuple([content[line_data_title].get(column_title, "")
                                     for column_title in columns_title_list])].append(line_data_title)
            new_lines_title_list = list()
            for similar_line in sorted(list(similar_lines), reverse=True, key=lambda x: (x.count("x"), x)):
                new_lines_title_list.extend(similar_lines[similar_line])
            lines_title_list = new_lines_title_list

        rep = list()
        rep.append([table_title, ] + columns_title_list)
        for line_data_title in lines_title_list:
            rep.append([line_data_title, ] +
                       [content[line_data_title].get(column_title, "") for column_title in columns_title_list])

        logger.debug("Write summary")
        write_csv_output_file_content(output_file, rep, **kwargs)

Data Request API object used to navigate among the Data Request and Vocabulary Server contents.

Initialisation of the Data Request object :param dict input_database: dictionary containing the DR database :param VocabularyServer VS: reference Vocabulary Server to et information on objects :param dict kwargs: additional parameters

Static methods

def from_input(json_input, version, **kwargs)

Method to instanciate the DataRequest object from a single input. :param str or dict json_input: dictionary or name of the dedicated json file containing the export content :param str version: version of the content :param dict kwargs: additional parameters :return DataRequest: instance of the DataRequest object.

def from_separated_inputs(DR_input, VS_input, **kwargs)

Method to instanciate the DataRequestObject from two inputs. :param str or dict DR_input: dictionary or name of the json file containing the data request structure :param str or dict VS_input: dictionary or name of the json file containing the vocabulary server :param dict kwargs: additional parameters :return DataRequest: instance of the DataRequest object

Instance variables

prop software_version
Expand source code
@property
def software_version(self):
    """
    Method to get the version of the software.
    :return str: version of the software
    """
    return version

Method to get the version of the software. :return str: version of the software

prop version
Expand source code
@property
def version(self):
    """
    Method to get the version of both software and content
    :return str : formatted version of the software and the content
    """
    return f"Software {self.software_version} - Content {self.content_version}"

Method to get the version of both software and content :return str : formatted version of the software and the content

Methods

def check(self)
Expand source code
def check(self):
    """
    Method to check the content of the Data Request.
    :return:
    """
    logger = get_logger()
    logger.info("Check data request metadata")
    logger.info("... Check experiments groups")
    for elt in self.get_experiment_groups():
        elt.check()
    logger.info("... Check variables groups")
    for elt in self.get_variable_groups():
        elt.check()
    logger.info("... Check opportunities")
    for elt in self.get_opportunities():
        elt.check()

Method to check the content of the Data Request. :return:

def export_data(self,
main_data,
output_file,
filtering_requests={},
filtering_operation='all',
filtering_skip_if_missing=False,
export_columns_request=[],
sorting_request=[],
**kwargs)
Expand source code
def export_data(self, main_data, output_file, filtering_requests=dict(), filtering_operation="all",
                filtering_skip_if_missing=False, export_columns_request=list(), sorting_request=list(), **kwargs):
    """
    Method to export a filtered and sorted list of data to a csv file.
    :param str main_data: kind of data to be exported
    :param str output_file: name of the output faile (csv)
    :param dict filtering_requests: filtering request to be applied to the list of object of main_data kind
    :param str filtering_operation: filtering request_operation to be applied to the list of object of main_data kind
    :param bool filtering_skip_if_missing: filtering skip_if_missing to be applied to the list of object of
                                           main_data kind
    :param list export_columns_request: columns to be putted in the output file
    :param list sorting_request: sorting criteria to be applied
    :param dict kwargs: additional arguments to be given to function write_csv_output_file_content
    :return: an output csv file
    """
    filtered_data = self.filter_elements_per_request(elements_to_filter=main_data, requests=filtering_requests,
                                                     request_operation=filtering_operation,
                                                     skip_if_missing=filtering_skip_if_missing)
    sorted_filtered_data = self.sort_func(filtered_data, sorting_request)

    export_columns_request.insert(0, "id")
    content = list()
    content.append(export_columns_request)
    for data in sorted_filtered_data:
        content.append([str(data.__getattr__(key)) for key in export_columns_request])

    write_csv_output_file_content(output_file, content, **kwargs)

Method to export a filtered and sorted list of data to a csv file. :param str main_data: kind of data to be exported :param str output_file: name of the output faile (csv) :param dict filtering_requests: filtering request to be applied to the list of object of main_data kind :param str filtering_operation: filtering request_operation to be applied to the list of object of main_data kind :param bool filtering_skip_if_missing: filtering skip_if_missing to be applied to the list of object of main_data kind :param list export_columns_request: columns to be putted in the output file :param list sorting_request: sorting criteria to be applied :param dict kwargs: additional arguments to be given to function write_csv_output_file_content :return: an output csv file

def export_summary(self,
lines_data,
columns_data,
output_file,
sorting_line='id',
title_line='name',
sorting_column='id',
title_column='name',
filtering_requests={},
filtering_operation='all',
filtering_skip_if_missing=False,
regroup=False,
**kwargs)
Expand source code
def export_summary(self, lines_data, columns_data, output_file, sorting_line="id", title_line="name",
                   sorting_column="id", title_column="name", filtering_requests=dict(), filtering_operation="all",
                   filtering_skip_if_missing=False, regroup=False, **kwargs):
    """
    Create a 2D tables of csv kind which give the linked between the two list of elements kinds specified
    :param str lines_data: kind of data to be put in row
    :param str columns_data: kind of data to be put in range
    :param str output_file: name of the output file (csv)
    :param str sorting_line: criteria to sort raw data
    :param str title_line: attribute to be used for raw header
    :param str sorting_column: criteria to sort range data
    :param str title_column: attribute to be used for range header
    :param dict filtering_requests: filtering request to be applied to the list of object of main_data kind
    :param str filtering_operation: filtering request_operation to be applied to the list of object of main_data kind
    :param bool filtering_skip_if_missing: filtering skip_if_missing to be applied to the list of object of
                                           main_data kind
    :param bool regroup: should lines/columns be regrouped by similarities
    :param dict kwargs: additional arguments to be given to function write_csv_output_file_content
    :return: a csv output file
    """
    logger = get_logger()
    logger.debug(f"Generate summary for {lines_data}/{columns_data}")
    filtered_data = self.filter_elements_per_request(elements_to_filter=lines_data, requests=filtering_requests,
                                                     request_operation=filtering_operation,
                                                     skip_if_missing=filtering_skip_if_missing)
    if not isinstance(sorting_line, list):
        sorting_line = [sorting_line, ]
    sorted_filtered_data = self.sort_func(filtered_data, sorting_request=sorting_line)
    columns_datasets = self.filter_elements_per_request(elements_to_filter=columns_data)
    if not isinstance(sorting_column, list):
        sorting_column = [sorting_column, ]
    columns_datasets = self.sort_func(columns_datasets, sorting_request=sorting_column)
    columns_title_list = [str(elt.__getattr__(title_column)) for elt in columns_datasets]
    columns_title_dict = {elt.id: title for (elt, title) in zip(columns_datasets, columns_title_list)}
    table_title = f"{lines_data} {title_line} / {columns_data} {title_column}"
    lines_title_list = [elt.__getattr__(title_line) for elt in sorted_filtered_data]
    lines_title_dict = {elt.id: title for (elt, title) in zip(sorted_filtered_data, lines_title_list)}

    nb_lines = len(sorted_filtered_data)
    logger.debug(f"{nb_lines} elements found for {lines_data}")
    logger.debug(f"{len(columns_title_list)} found elements for {columns_data}")

    logger.debug("Generate summary")
    content = defaultdict(lambda: dict())
    if len(columns_title_list) > len(lines_title_list):
        DR_type = columns_datasets[0].DR_type
        for (column_data, column_title) in zip(columns_datasets, columns_title_list):
            filter_line_datasets = self.filter_elements_per_request(elements_to_filter=sorted_filtered_data,
                                                                    requests={DR_type: column_data},
                                                                    request_operation="all", print_warning_bcv=False)
            for line in filter_line_datasets:
                content[lines_title_dict[line.id]][column_title] = "x"
    else:
        DR_type = sorted_filtered_data[0].DR_type
        for (line_data, line_title) in zip(sorted_filtered_data, lines_title_list):
            filtered_columns = self.filter_elements_per_request(elements_to_filter=columns_datasets,
                                                                requests={DR_type: line_data}, request_operation="all",
                                                                print_warning_bcv=False)
            content[line_title] = {columns_title_dict[elt.id]: "x" for elt in filtered_columns}

    logger.debug("Format summary")
    if regroup:
        similar_columns = defaultdict(list)
        for column_data_title in columns_title_list:
            similar_columns[tuple([content[line_title].get(column_data_title, "")
                                   for line_title in lines_title_list])].append(column_data_title)
        new_columns_title_list = list()
        for similar_column in sorted(list(similar_columns), reverse=True, key=lambda x: (x.count("x"), x)):
            new_columns_title_list.extend(similar_columns[similar_column])
        columns_title_list = new_columns_title_list
        similar_lines = defaultdict(list)
        for line_data_title in lines_title_list:
            similar_lines[tuple([content[line_data_title].get(column_title, "")
                                 for column_title in columns_title_list])].append(line_data_title)
        new_lines_title_list = list()
        for similar_line in sorted(list(similar_lines), reverse=True, key=lambda x: (x.count("x"), x)):
            new_lines_title_list.extend(similar_lines[similar_line])
        lines_title_list = new_lines_title_list

    rep = list()
    rep.append([table_title, ] + columns_title_list)
    for line_data_title in lines_title_list:
        rep.append([line_data_title, ] +
                   [content[line_data_title].get(column_title, "") for column_title in columns_title_list])

    logger.debug("Write summary")
    write_csv_output_file_content(output_file, rep, **kwargs)

Create a 2D tables of csv kind which give the linked between the two list of elements kinds specified :param str lines_data: kind of data to be put in row :param str columns_data: kind of data to be put in range :param str output_file: name of the output file (csv) :param str sorting_line: criteria to sort raw data :param str title_line: attribute to be used for raw header :param str sorting_column: criteria to sort range data :param str title_column: attribute to be used for range header :param dict filtering_requests: filtering request to be applied to the list of object of main_data kind :param str filtering_operation: filtering request_operation to be applied to the list of object of main_data kind :param bool filtering_skip_if_missing: filtering skip_if_missing to be applied to the list of object of main_data kind :param bool regroup: should lines/columns be regrouped by similarities :param dict kwargs: additional arguments to be given to function write_csv_output_file_content :return: a csv output file

def filter_elements_per_request(self,
elements_to_filter,
requests={},
request_operation='all',
not_requests={},
not_request_operation='any',
skip_if_missing=False,
print_warning_bcv=True)
Expand source code
def filter_elements_per_request(self, elements_to_filter, requests=dict(), request_operation="all",
                                not_requests=dict(), not_request_operation="any",
                                skip_if_missing=False, print_warning_bcv=True):
    """
    Filter the elements of kind element_type with a dictionary of requests.
    :param str or list od DRObjects elements_to_filter: kind of elements to be filtered
    :param dict requests: dictionary of the filters to be applied
    :param dict not_requests: dictionary of the filters to be applied for non requested elements
    :param str request_operation: should at least one filter from requests be applied ("any") or all filters be fulfilled ("all")
    :param str not_request_operation: should at least one filter from not_requests be applied ("any") or all filters be fulfilled ("all")
    :param bool skip_if_missing: if a request filter is missing, should it be skipped or should an error be raised?
    :param bool print_warning_bcv: should a warning be printed if BCV variables are not included?
    :return: list of elements of kind element_type which correspond to the filtering requests
    """
    def filter_against_request(request, values, elements_to_filter, elements, elements_filtering_structure):
        logger = get_logger()
        request_filtering_structure = self.get_filtering_structure(request)
        common_filtering_structure = request_filtering_structure & elements_filtering_structure
        if len(values) == 0 or len(elements) == 0:
            rep = list()
            filtered_found = True
        elif request == elements_to_filter:
            filtered_found = True
            rep = [(elt.id, elt) for elt in set(values) & set(elements)]
        elif elements_to_filter in request_filtering_structure:
            filtered_found, _ = elements[0].filter_on_request(values[0])
            if filtered_found:
                rep = [(val.id, elt) for (val, elt) in product(values, elements) if elt.filter_on_request(val)[1]]
        elif request in elements_filtering_structure:
            filtered_found, _ = values[0].filter_on_request(elements[0])
            if filtered_found:
                rep = [(val.id, elt) for (val, elt) in product(values, elements) if val.filter_on_request(elt)[1]]
        else:
            if "experiment_groups" in common_filtering_structure:
                list_to_filter = self.get_experiment_groups()
            elif "variables" in common_filtering_structure:
                list_to_filter = self.get_variables()
            elif "variable_groups" in common_filtering_structure:
                list_to_filter = self.get_variable_groups()
            else:
                list_to_filter = self.get_opportunities()
            filtered_found, _ = self._two_elements_filtering(values[0], elements[0], list_to_filter)
            if filtered_found:
                rep = [(val.id, elt) for (val, elt) in product(values, elements) if
                       self._two_elements_filtering(val, elt, list_to_filter)[1]]
            if "mips" in [request, elements_to_filter]:
                list_to_filter = self.get_opportunities()
                new_filtered_found, _ = self._two_elements_filtering(values[0], elements[0], list_to_filter,
                                                                     inner=False)
                filtered_found = filtered_found or new_filtered_found
                if new_filtered_found:
                    new_rep = [(val.id, elt) for (val, elt) in product(values, elements) if
                               self._two_elements_filtering(val, elt, list_to_filter, inner=False)[1]]
                    rep.extend(new_rep)
        if not filtered_found:
            logger.error(f"Could not filter {elements_to_filter} by {request}")
            raise ValueError(f"Could not filter {elements_to_filter} by {request}")
        else:
            new_rep = defaultdict(list)
            for (key, val) in rep:
                new_rep[key].append(val)
            new_rep = {key: set(val) for (key, val) in new_rep.items()}
            return new_rep

    def fill_request_dict(request_dict):
        logger = get_logger()
        rep = defaultdict(list)
        for (req, values) in request_dict.items():
            if not isinstance(values, list):
                values = [values, ]
            for val in values:
                if not isinstance(val, DRObjects):
                    new_val = self.find_element(element_type=req, value=val, default=None)
                else:
                    new_val = val
                if new_val is not None:
                    rep[new_val.DR_type].append(new_val)
                elif skip_if_missing:
                    logger.warning(f"Could not find value {val} for element type {req}, skip it.")
                else:
                    logger.error(f"Could not find value {val} for element type {req}.")
                    raise ValueError(f"Could not find value {val} for element type {req}.")
        return rep

    def apply_operation_on_requests_links(dict_request_links, elements, operation, void_list="full"):
        logger = get_logger()
        if len(rep) == 0:
            if void_list == "full":
                rep_list = set(elements)
            elif void_list == "void":
                rep_list = set()
            else:
                logger.error(f"Unknown void_list value {void_list} (should be either 'full' or 'void').")
                raise ValueError(f"Unknown void_list value {void_list} (should be either 'full' or 'void').")
        elif operation in ["any", ]:
            rep_list = {key: set().union(*chain(val.values())) for (key, val) in dict_request_links.items()}
            rep_list = set().union(*rep_list.values())
        elif operation in ["all", ]:
            rep_list = {key: set(elements).intersection(*chain(val.values())) for (key, val) in dict_request_links.items()}
            rep_list = set(elements).intersection(*rep_list.values())
        elif operation in ["all_of_any", ]:
            rep_list = {key: set().union(*chain(val.values())) for (key, val) in dict_request_links.items()}
            rep_list = set(elements).intersection(*rep_list.values())
        elif operation in ["any_of_all", ]:
            rep_list = {key: set(elements).intersection(*chain(val.values())) for (key, val) in dict_request_links.items()}
            rep_list = set().union(*rep_list.values())
        else:
            logger.error(f"Unknown value {operation} for request_operation (only 'all', 'any', 'any_of_all' and 'all_of_any' are available).")
            raise ValueError(f"Unknown value {operation} for request_operation (only 'all', 'any', 'any_of_all' and 'all_of_any' are available).")
        return rep_list

    logger = get_logger()
    if request_operation not in ["any", "all", "any_of_all", "all_of_any"]:
        raise ValueError(f"Operation does not accept {request_operation} as value: choose among 'any' (match at least one"
                         f" requirement) and 'all' (match all requirements)")
    else:
        # Get elements corresponding to element_type
        if isinstance(elements_to_filter, str):
            elements = self.get_elements_per_kind(elements_to_filter)
        else:
            if not isinstance(elements_to_filter, list):
                elements = [elements_to_filter, ]
            else:
                elements = elements_to_filter
        elements_to_filter = elements[0].DR_type
        # Find out elements linked to request
        request_dict = fill_request_dict(requests)
        elements_filtering_structure = self.get_filtering_structure(elements_to_filter)

        rep = {request: filter_against_request(request, values, elements_to_filter, elements, elements_filtering_structure)
               for (request, values) in request_dict.items()}
        rep_list = apply_operation_on_requests_links(rep, elements, request_operation, void_list="full")
        # Find out elements linked to not_request
        not_request_dict = fill_request_dict(not_requests)
        not_rep = {request: filter_against_request(request, values, elements_to_filter, elements,
                                                   elements_filtering_structure)
                   for (request, values) in not_request_dict.items()}
        not_rep_list = apply_operation_on_requests_links(not_rep, elements, not_request_operation, void_list="void")
        # Remove not requested elements from requested elements
        rep_list = rep_list - not_rep_list

        if print_warning_bcv and elements_to_filter in ["variables", ]:
            bcv_op = self.find_element("opportunities", "Baseline Climate Variables for Earth System Modelling", default=None)
            if bcv_op is None:
                logger.warning("Can not check that request filtering includes baseline variables, no reference found.")
            else:
                bcv_list = set(elt for elt in self.get_variables() if bcv_op.filter_on_request(elt)[1])
                missing_list = bcv_list - rep_list
                if len(missing_list) > 0:
                    logger.warning("Output of the current filtering request does not include all the BCV variables.")
        return sorted(list(rep_list))

Filter the elements of kind element_type with a dictionary of requests. :param str or list od DRObjects elements_to_filter: kind of elements to be filtered :param dict requests: dictionary of the filters to be applied :param dict not_requests: dictionary of the filters to be applied for non requested elements :param str request_operation: should at least one filter from requests be applied ("any") or all filters be fulfilled ("all") :param str not_request_operation: should at least one filter from not_requests be applied ("any") or all filters be fulfilled ("all") :param bool skip_if_missing: if a request filter is missing, should it be skipped or should an error be raised? :param bool print_warning_bcv: should a warning be printed if BCV variables are not included? :return: list of elements of kind element_type which correspond to the filtering requests

def find_element(self, element_type, value, default=False, key='name')
Expand source code
def find_element(self, element_type, value, default=False, key="name"):
    """
    Find an element of a specific type and specified by a value from mapping/content if existing,
     else from vocabulary server.
    :param str element_type: kind of element to be found
    :param str value: value to be looked for
    :param default: value to be returned if non found
    :return: the found element if existing, else the default value
    """
    check_val = is_link_id_or_value(value)[1]
    if check_val in self.content[element_type]:
        return self.content[element_type][check_val]
    elif check_val in self.mapping[element_type]:
        return self.mapping[element_type][check_val]
    else:
        return self.find_element_from_vs(element_type=element_type, value=value, default=default, key=key)

Find an element of a specific type and specified by a value from mapping/content if existing, else from vocabulary server. :param str element_type: kind of element to be found :param str value: value to be looked for :param default: value to be returned if non found :return: the found element if existing, else the default value

def find_element_from_vs(self, element_type, value, key='name', default=False)
Expand source code
def find_element_from_vs(self, element_type, value, key="name", default=False):
    """
    Find an element of a specific type and specified by a value from vocabulary server.
    Update the content and mapping list not to have to ask the vocabulary server again for it.
    :param str element_type: kind of element to be looked for
    :param str value: value to be looked for
    :param default: default value to be returned if no value found
    :return: element corresponding to the specified value of a given type if found, else the default value
    """
    if "priority" in element_type and isinstance(value, int):
        key = "value"
    if key in ["id", ]:
        init_default = default
    else:
        init_default = None
    rep = self.find_element_per_identifier_from_vs(element_type=element_type, value=value, key="id",
                                                   default=init_default)
    if rep is None and key not in ["id", ]:
        rep = self.find_element_per_identifier_from_vs(element_type=element_type, value=value, key=key,
                                                       default=default)
    if rep not in [default, ]:
        self.content[element_type][rep.id] = rep
        self.mapping[element_type][rep.name] = rep
    return rep

Find an element of a specific type and specified by a value from vocabulary server. Update the content and mapping list not to have to ask the vocabulary server again for it. :param str element_type: kind of element to be looked for :param str value: value to be looked for :param default: default value to be returned if no value found :return: element corresponding to the specified value of a given type if found, else the default value

def find_element_per_identifier_from_vs(self, element_type, key, value, default=False, **kwargs)
Expand source code
def find_element_per_identifier_from_vs(self, element_type, key, value, default=False, **kwargs):
    """
    Find an element of a specific type and specified by a value (of a given kind) from vocabulary server.
    :param str element_type: type of the element to be found (same as in vocabulary server).
    :param str key: type of the value key to be looked for ("id", "name"...)
    :param str value: value to be looked for
    :param default: default value to be used if the value is not found
    :param dict kwargs: additional attributes to be used for vocabulary server search.
    :return Opportunity or VariablesGroup or ExperimentsGroup or Variables or DRObjects or ConstantValueObj or
            default: the element found from vocabulary server or the default value if none is found.
    """
    if key in ["id", ]:
        value = build_link_from_id(value)
    init_element_type = to_plural(element_type)
    element_type = self.VS.get_element_type(element_type)
    rep = self.VS.get_element(element_type=element_type, element_id=value, id_type=key, default=default, **kwargs)
    if rep in [value, ]:
        rep = default
    elif rep not in [default, ]:
        structure = self.structure.get(element_type, dict()).get(rep["id"], dict())
        if element_type in ["opportunities", ]:
            rep = Opportunity.from_input(dr=self, **rep, **structure)
        elif element_type in ["variable_groups", ]:
            rep = VariablesGroup.from_input(dr=self, **rep, **structure)
        elif element_type in ["experiment_groups", ]:
            rep = ExperimentsGroup.from_input(dr=self, **rep, **structure)
        elif element_type in ["variables", ]:
            rep = Variable.from_input(dr=self, **rep)
        elif init_element_type in ["max_priority_levels", ]:
            rep = DRObjects.from_input(dr=self, id=rep["id"], DR_type=init_element_type, elements=rep)
        else:
            rep = DRObjects.from_input(dr=self, id=rep["id"], DR_type=element_type, elements=rep)
    return rep

Find an element of a specific type and specified by a value (of a given kind) from vocabulary server. :param str element_type: type of the element to be found (same as in vocabulary server). :param str key: type of the value key to be looked for ("id", "name"…) :param str value: value to be looked for :param default: default value to be used if the value is not found :param dict kwargs: additional attributes to be used for vocabulary server search. :return Opportunity or VariablesGroup or ExperimentsGroup or Variables or DRObjects or ConstantValueObj or default: the element found from vocabulary server or the default value if none is found.

def find_experiments(self, operation='any', skip_if_missing=False, **kwargs)
Expand source code
def find_experiments(self, operation="any", skip_if_missing=False, **kwargs):
    """
    Find the experiments corresponding to filtering criteria.
    :param str operation: should at least one filter be applied ("any") or all filters be fulfilled ("all")
    :param bool skip_if_missing: if a request filter is missing, should it be skipped or should an error be raised?
    :param dict kwargs: filters to be applied
    :return list of DRObjects: experiments linked to the filters
    """
    return self.filter_elements_per_request(elements_to_filter="experiments", requests=kwargs, request_operation=operation,
                                            skip_if_missing=skip_if_missing)

Find the experiments corresponding to filtering criteria. :param str operation: should at least one filter be applied ("any") or all filters be fulfilled ("all") :param bool skip_if_missing: if a request filter is missing, should it be skipped or should an error be raised? :param dict kwargs: filters to be applied :return list of DRObjects: experiments linked to the filters

def find_experiments_per_opportunity(self, opportunity)
Expand source code
def find_experiments_per_opportunity(self, opportunity):
    """
    Find all the experiments which are linked to a specified opportunity.
    :param Opportunity or str opportunity: opportunity to be considered
    :return list of DRObjects or ConstantValueObj: list of the experiments which are linked to a specified
            opportunity.
    """
    return self.filter_elements_per_request(elements_to_filter="experiments",
                                            requests=dict(opportunities=[opportunity, ]))

Find all the experiments which are linked to a specified opportunity. :param Opportunity or str opportunity: opportunity to be considered :return list of DRObjects or ConstantValueObj: list of the experiments which are linked to a specified opportunity.

def find_experiments_per_theme(self, theme)
Expand source code
def find_experiments_per_theme(self, theme):
    """
    Find all the experiments which are linked to a specified theme.
    :param DRObjects or ConstantValueObj or str theme: theme to be considered
    :return list of DRObjects or ConstantValueObj: list of the experiments which are linked to a specified theme.
    """
    return self.filter_elements_per_request(elements_to_filter="experiments",
                                            requests=dict(data_request_themes=[theme, ]))

Find all the experiments which are linked to a specified theme. :param DRObjects or ConstantValueObj or str theme: theme to be considered :return list of DRObjects or ConstantValueObj: list of the experiments which are linked to a specified theme.

def find_mips_per_opportunity(self, opportunity)
Expand source code
def find_mips_per_opportunity(self, opportunity):
    """
    Find all the MIPs which are linked to a specified opportunity.
    :param Opportunity or str opportunity: opportunity to be considered
    :return list of DRObjects or ConstantValueObj: list of the MIPs which are linked to a specified opportunity.
    """
    return self.filter_elements_per_request(elements_to_filter="mips", requests=dict(opportunities=[opportunity, ]))

Find all the MIPs which are linked to a specified opportunity. :param Opportunity or str opportunity: opportunity to be considered :return list of DRObjects or ConstantValueObj: list of the MIPs which are linked to a specified opportunity.

def find_mips_per_theme(self, theme)
Expand source code
def find_mips_per_theme(self, theme):
    """
    Find all the MIPs which are linked to a specified theme.
    :param DRObjects or ConstantValueObj or str theme: theme to be considered
    :return list of DRObjects or ConstantValueObj: list of the MIPs which are linked to a specified theme.
    """
    return self.filter_elements_per_request(elements_to_filter="mips", requests=dict(data_request_themes=[theme, ]))

Find all the MIPs which are linked to a specified theme. :param DRObjects or ConstantValueObj or str theme: theme to be considered :return list of DRObjects or ConstantValueObj: list of the MIPs which are linked to a specified theme.

def find_mips_per_variable(self, variable)
Expand source code
def find_mips_per_variable(self, variable):
    """
    Find all the MIPs which are linked to a specified variable.
    :param Variable or str variable: variable to be considered
    :return list of DRObjects or ConstantValueObj: list of the MIPs which are linked to a specified variable.
    """
    return self.filter_elements_per_request(elements_to_filter="mips", requests=dict(variables=[variable, ]))

Find all the MIPs which are linked to a specified variable. :param Variable or str variable: variable to be considered :return list of DRObjects or ConstantValueObj: list of the MIPs which are linked to a specified variable.

def find_opportunities(self, operation='any', skip_if_missing=False, **kwargs)
Expand source code
def find_opportunities(self, operation="any", skip_if_missing=False, **kwargs):
    """
    Find the opportunities corresponding to filtering criteria.
    :param str operation: should at least one filter be applied ("any") or all filters be fulfilled ("all")
    :param bool skip_if_missing: if a request filter is missing, should it be skipped or should an error be raised?
    :param dict kwargs: filters to be applied
    :return list of Opportunity: opportunities linked to the filters
    """
    return self.filter_elements_per_request(elements_to_filter="opportunities", requests=kwargs,
                                            request_operation=operation, skip_if_missing=skip_if_missing)

Find the opportunities corresponding to filtering criteria. :param str operation: should at least one filter be applied ("any") or all filters be fulfilled ("all") :param bool skip_if_missing: if a request filter is missing, should it be skipped or should an error be raised? :param dict kwargs: filters to be applied :return list of Opportunity: opportunities linked to the filters

def find_opportunities_per_experiment(self, experiment)
Expand source code
def find_opportunities_per_experiment(self, experiment):
    """
    Find all the opportunities which are linked to a specified experiment.
    :param DRObjects or ConstantValueObj or str experiment: experiment to be considered
    :return list of Opportunity: list of the opportunities which are linked to a specified experiment.
    """
    return self.filter_elements_per_request(elements_to_filter="opportunities",
                                            requests=dict(experiments=[experiment, ]))

Find all the opportunities which are linked to a specified experiment. :param DRObjects or ConstantValueObj or str experiment: experiment to be considered :return list of Opportunity: list of the opportunities which are linked to a specified experiment.

def find_opportunities_per_theme(self, theme)
Expand source code
def find_opportunities_per_theme(self, theme):
    """
    Find all the opportunities which are linked to a specified theme.
    :param DRObjects or ConstantValueObj or str theme: theme to be considered
    :return list of Opportunity: list of the opportunities which are linked to a specified theme.
    """
    return self.filter_elements_per_request(elements_to_filter="opportunities",
                                            requests=dict(data_request_themes=[theme, ]))

Find all the opportunities which are linked to a specified theme. :param DRObjects or ConstantValueObj or str theme: theme to be considered :return list of Opportunity: list of the opportunities which are linked to a specified theme.

def find_opportunities_per_variable(self, variable)
Expand source code
def find_opportunities_per_variable(self, variable):
    """
    Find all the opportunities which are linked to a specified variable.
    :param Variable or str variable: variable to be considered
    :return list of Opportunity: list of the opportunities which are linked to a specified variable.
    """
    return self.filter_elements_per_request(elements_to_filter="opportunities",
                                            requests=dict(variables=[variable, ]))

Find all the opportunities which are linked to a specified variable. :param Variable or str variable: variable to be considered :return list of Opportunity: list of the opportunities which are linked to a specified variable.

def find_priority_per_variable(self, variable, **filter_request)
Expand source code
def find_priority_per_variable(self, variable, **filter_request):
    logger = get_logger()
    priorities = self.filter_elements_per_request(elements_to_filter="priority_level",
                                                  requests={"variable": variable, **filter_request})
    logger.debug(f"Priorities found: {priorities} ({[int(priority.value) for priority in priorities]})")
    priority = min(int(priority.value) for priority in priorities)
    logger.debug(f"Priority_retain {priority}")
    return priority
def find_themes_per_experiment(self, experiment)
Expand source code
def find_themes_per_experiment(self, experiment):
    """
    Find all the themes which are linked to a specified experiment.
    :param DRObjects or ConstantValueObj or str experiment: experiment to be considered
    :return list of DRObjects or ConstantValueObj: list of the themes which are linked to a specified experiment.
    """
    return self.filter_elements_per_request(elements_to_filter="data_request_themes",
                                            requests=dict(experiments=[experiment, ]))

Find all the themes which are linked to a specified experiment. :param DRObjects or ConstantValueObj or str experiment: experiment to be considered :return list of DRObjects or ConstantValueObj: list of the themes which are linked to a specified experiment.

def find_themes_per_opportunity(self, opportunity)
Expand source code
def find_themes_per_opportunity(self, opportunity):
    """
    Find all the themes which are linked to a specified opportunity.
    :param Opportunity or str opportunity: opportunity to be considered
    :return list of DRObjects or ConstantValueObj: list of the themes which are linked to a specified opportunity.
    """
    return self.filter_elements_per_request(elements_to_filter="data_request_themes",
                                            requests=dict(opportunities=[opportunity, ]))

Find all the themes which are linked to a specified opportunity. :param Opportunity or str opportunity: opportunity to be considered :return list of DRObjects or ConstantValueObj: list of the themes which are linked to a specified opportunity.

def find_themes_per_variable(self, variable)
Expand source code
def find_themes_per_variable(self, variable):
    """
    Find all the themes which are linked to a specified variable.
    :param Variable or str variable: variable to be considered
    :return list of DRObjects or ConstantValueObj: list of the themes which are linked to a specified variable.
    """
    return self.filter_elements_per_request(elements_to_filter="data_request_themes",
                                            requests=dict(variables=[variable, ]))

Find all the themes which are linked to a specified variable. :param Variable or str variable: variable to be considered :return list of DRObjects or ConstantValueObj: list of the themes which are linked to a specified variable.

def find_variables(self, operation='any', skip_if_missing=False, **kwargs)
Expand source code
def find_variables(self, operation="any", skip_if_missing=False, **kwargs):
    """
    Find the variables corresponding to filtering criteria.
    :param str operation: should at least one filter be applied ("any") or all filters be fulfilled ("all")
    :param bool skip_if_missing: if a request filter is missing, should it be skipped or should an error be raised?
    :param dict kwargs: filters to be applied
    :return list of Variable: variables linked to the filters
    """
    return self.filter_elements_per_request(elements_to_filter="variables", requests=kwargs, request_operation=operation,
                                            skip_if_missing=skip_if_missing)

Find the variables corresponding to filtering criteria. :param str operation: should at least one filter be applied ("any") or all filters be fulfilled ("all") :param bool skip_if_missing: if a request filter is missing, should it be skipped or should an error be raised? :param dict kwargs: filters to be applied :return list of Variable: variables linked to the filters

def find_variables_per_opportunity(self, opportunity)
Expand source code
def find_variables_per_opportunity(self, opportunity):
    """
    Find all the variables which are linked to a specified opportunity.
    :param Opportunity or str opportunity: opportunity to be considered
    :return list of Variable: list of the variables which are linked to a specified opportunity.
    """
    return self.filter_elements_per_request(elements_to_filter="variables",
                                            requests=dict(opportunities=[opportunity, ]))

Find all the variables which are linked to a specified opportunity. :param Opportunity or str opportunity: opportunity to be considered :return list of Variable: list of the variables which are linked to a specified opportunity.

def find_variables_per_priority(self, priority)
Expand source code
def find_variables_per_priority(self, priority):
    """
    Find all the variables which have a specified priority.
    :param DRObjects or ConstantValueObj or str priority: priority to be considered
    :return list of Variable: list of the variables which have a specified priority.
    """
    return self.filter_elements_per_request(elements_to_filter="variables",
                                            requests=dict(priority_level=[priority, ]))

Find all the variables which have a specified priority. :param DRObjects or ConstantValueObj or str priority: priority to be considered :return list of Variable: list of the variables which have a specified priority.

def find_variables_per_theme(self, theme)
Expand source code
def find_variables_per_theme(self, theme):
    """
    Find all the variables which are linked to a specified theme.
    :param DRObjects or ConstantValueObj or str theme: theme to be considered
    :return list of Variable: list of the variables which are linked to a specified theme.
    """
    return self.filter_elements_per_request(elements_to_filter="variables",
                                            requests=dict(data_request_themes=[theme, ]))

Find all the variables which are linked to a specified theme. :param DRObjects or ConstantValueObj or str theme: theme to be considered :return list of Variable: list of the variables which are linked to a specified theme.

def get_data_request_themes(self)
Expand source code
def get_data_request_themes(self):
    """
    Get the themes of the Data Request.
    :return list of DRObject: list of the themes of the DR content.
    """
    if self.cache.get("data_request_themes") is None:
        rep = set()
        for op in self.get_opportunities():
            rep = rep | set(op.get_themes())
        self.cache["data_request_themes"] = sorted(list(rep))
    return self.cache["data_request_themes"]

Get the themes of the Data Request. :return list of DRObject: list of the themes of the DR content.

def get_elements_per_kind(self, element_type)
Expand source code
def get_elements_per_kind(self, element_type):
    """
    Return the list of elements of kind element_type
    :param str element_type: the kind of the elements to be found
    :return list: the list of elements of kind element_type
    """
    logger = get_logger()
    element_types = to_plural(element_type)
    if element_types in ["opportunities", ]:
        elements = self.get_opportunities()
    elif element_types in ["experiment_groups", ]:
        elements = self.get_experiment_groups()
    elif element_types in ["variable_groups", ]:
        elements = self.get_variable_groups()
    elif element_types in ["variables", ]:
        elements = self.get_variables()
    elif element_types in ["experiments", ]:
        elements = self.get_experiments()
    elif element_types in ["data_request_themes", ]:
        elements = self.get_data_request_themes()
    elif element_types in ["mips", ]:
        elements = self.get_mips()
    elif element_types in self.cache:
        elements = sorted(self.cache[element_types])
    else:
        logger.debug(f"Find elements list of kind {element_type} from vocabulary server.")
        element_type, elements_ids = self.VS.get_element_type_ids(element_type)
        elements = [self.find_element(element_type, id) for id in elements_ids]
        self.cache[element_types] = elements
    return elements

Return the list of elements of kind element_type :param str element_type: the kind of the elements to be found :return list: the list of elements of kind element_type

def get_experiment_group(self, id)
Expand source code
def get_experiment_group(self, id):
    """
    Get the ExperimentsGroup associated with a specific id.
    :param str id: id of the ExperimentsGroup
    :return ExperimentsGroup: the ExperimentsGroup associated with the input id
    """
    rep = self.find_element("experiment_groups", id, default=None)
    if rep is not None:
        return rep
    else:
        raise ValueError(f"Could not find experiments group {id} among {self.get_experiment_groups()}.")

Get the ExperimentsGroup associated with a specific id. :param str id: id of the ExperimentsGroup :return ExperimentsGroup: the ExperimentsGroup associated with the input id

def get_experiment_groups(self)
Expand source code
def get_experiment_groups(self):
    """
    Get the ExperimentsGroup of the Data Request.
    :return list of ExperimentsGroup: list of the ExperimentsGroup of the DR content.
    """
    return self._get_sorted_list("experiment_groups")

Get the ExperimentsGroup of the Data Request. :return list of ExperimentsGroup: list of the ExperimentsGroup of the DR content.

def get_experiments(self)
Expand source code
def get_experiments(self):
    """
    Get the experiments of the Data Request.
    :return list of DRObject: list of the experiments of the DR content.
    """
    if self.cache.get("experiments") is None:
        rep = set()
        for exp_grp in self.get_experiment_groups():
            rep = rep | set(exp_grp.get_experiments())
        self.cache["experiments"] = sorted(list(rep))
    return self.cache["experiments"]

Get the experiments of the Data Request. :return list of DRObject: list of the experiments of the DR content.

def get_filtering_structure(self, DR_type)
Expand source code
def get_filtering_structure(self, DR_type):
    rep = set(self.filtering_structure.get(DR_type, list()))
    tmp_rep = copy.deepcopy(rep)
    while len(tmp_rep) > 0:
        rep = rep | tmp_rep
        to_add = set()
        for elt in tmp_rep:
            to_add = to_add | set(self.filtering_structure.get(elt, list()))
        tmp_rep, to_add = to_add, set()
    return rep
def get_mips(self)
Expand source code
def get_mips(self):
    """
    Get the MIPs of the Data Request.
    :return list of DRObject or ConstantValueObj: list of the MIPs of the DR content.
    """
    if self.cache.get("mips") is None:
        rep = set()
        for op in self.get_opportunities():
            rep = rep | set(op.get_mips())
        for var_grp in self.get_variable_groups():
            rep = rep | set(var_grp.get_mips())
        self.cache["mips"] = sorted(list(rep))
    return self.cache["mips"]

Get the MIPs of the Data Request. :return list of DRObject or ConstantValueObj: list of the MIPs of the DR content.

def get_opportunities(self)
Expand source code
def get_opportunities(self):
    """
    Get the Opportunity of the Data Request.
    :return list of Opportunity: list of the Opportunity of the DR content.
    """
    return self._get_sorted_list("opportunities")

Get the Opportunity of the Data Request. :return list of Opportunity: list of the Opportunity of the DR content.

def get_opportunity(self, id)
Expand source code
def get_opportunity(self, id):
    """
    Get the Opportunity associated with a specific id.
    :param str id: id of the Opportunity
    :return Opportunity: the Opportunity associated with the input id
    """
    rep = self.find_element("opportunities", id, default=None)
    if rep is not None:
        return rep
    else:
        raise ValueError(f"Could not find opportunity {id}.")

Get the Opportunity associated with a specific id. :param str id: id of the Opportunity :return Opportunity: the Opportunity associated with the input id

def get_variable_group(self, id)
Expand source code
def get_variable_group(self, id):
    """
    Get the VariablesGroup associated with a specific id.
    :param str id: id of the VariablesGroup
    :return VariablesGroup: the VariablesGroup associated with the input id
    """
    rep = self.find_element("variable_groups", id, default=None)
    if rep is not None:
        return rep
    else:
        raise ValueError(f"Could not find variables group {id}.")

Get the VariablesGroup associated with a specific id. :param str id: id of the VariablesGroup :return VariablesGroup: the VariablesGroup associated with the input id

def get_variable_groups(self)
Expand source code
def get_variable_groups(self):
    """
    Get the VariablesGroup of the Data Request.
    :return list of VariablesGroup: list of the VariablesGroup of the DR content.
    """
    return self._get_sorted_list("variable_groups")

Get the VariablesGroup of the Data Request. :return list of VariablesGroup: list of the VariablesGroup of the DR content.

def get_variables(self)
Expand source code
def get_variables(self):
    """
    Get the Variable of the Data Request.
    :return list of Variable: list of the Variable of the DR content.
    """
    if self.cache.get("variables") is None:
        rep = set()
        for var_grp in self.get_variable_groups():
            rep = rep | set(var_grp.get_variables())
        self.cache["variables"] = sorted(list(rep))
    return self.cache["variables"]

Get the Variable of the Data Request. :return list of Variable: list of the Variable of the DR content.

def sort_func(self, data_list, sorting_request=[])
Expand source code
def sort_func(self, data_list, sorting_request=list()):
    """
    Method to sort a list of objects based on some criteria
    :param list data_list: the list of objects to be sorted
    :param list sorting_request: list of criteria to sort the input list
    :return list: sorted list
    """
    sorting_request = copy.deepcopy(sorting_request)
    if len(sorting_request) == 0:
        return sorted(data_list, key=lambda x: x.id)
    else:
        sorting_val = sorting_request.pop(0)
        sorting_values_dict = defaultdict(list)
        for data in data_list:
            sorting_values_dict[str(data.get(sorting_val))].append(data)
        rep = list()
        for elt in sorted(list(sorting_values_dict)):
            rep.extend(self.sort_func(sorting_values_dict[elt], sorting_request))
        return rep

Method to sort a list of objects based on some criteria :param list data_list: the list of objects to be sorted :param list sorting_request: list of criteria to sort the input list :return list: sorted list

class ExperimentsGroup (id, dr, DR_type='experiment_groups', structure={'experiments': []}, **attributes)
Expand source code
class ExperimentsGroup(DRObjects):
    def __init__(self, id, dr, DR_type="experiment_groups", structure=dict(experiments=list()), **attributes):
        super().__init__(id=id, dr=dr, DR_type=DR_type, structure=structure, **attributes)

    def check(self):
        super().check()
        logger = get_logger()
        if self.count() == 0:
            logger.critical(f"No experiment defined for {self.DR_type} id {self.id}")

    def count(self):
        """
        Return the number of experiments linked to the ExperimentGroup
        :return int: number of experiments of the ExperimentGroup
        """
        return len(self.get_experiments())

    def get_experiments(self):
        """
        Return the list of experiments linked to the ExperimentGroup.
        :return list of DRObjects: list of the experiments linked to the ExperimentGroup
        """
        return self.structure["experiments"]

    def print_content(self, level=0, add_content=True):
        rep = super().print_content(level=level)
        if add_content:
            indent = "    " * (level + 1)
            rep.append(f"{indent}Experiments included:")
            for experiment in self.get_experiments():
                rep.extend(experiment.print_content(level=level + 2))
        return rep

    @classmethod
    def from_input(cls, dr, id, experiments=list(), **kwargs):
        return super().from_input(DR_type="experiment_groups", dr=dr, id=id, structure=dict(experiments=experiments),
                                  elements=kwargs)

    def filter_on_request(self, request_value, inner=True):
        request_type = request_value.DR_type
        filtered_found, found = self.dr.cache_filtering[self.DR_type][self.id][request_type][request_value.id]
        if filtered_found is None:
            if request_type in ["experiments", ]:
                filtered_found = True
                found = request_value in self.get_experiments()
            else:
                filtered_found, found = super().filter_on_request(request_value=request_value)
            self.dr.cache_filtering[self.DR_type][self.id][request_type][request_value.id] = (filtered_found, found)
        return filtered_found, found

Base object to build the ones used within the DR API. Use to define basic information needed.

Initialisation of the object. :param str id: id of the object :param DataRequest dr: reference data request object :param str DR_type: type of DR object (for reference in vocabulary server) :param dict structure: if needed, elements linked by structure to the current object :param dict attributes: attributes of the object coming from vocabulary server

Ancestors

Methods

def count(self)
Expand source code
def count(self):
    """
    Return the number of experiments linked to the ExperimentGroup
    :return int: number of experiments of the ExperimentGroup
    """
    return len(self.get_experiments())

Return the number of experiments linked to the ExperimentGroup :return int: number of experiments of the ExperimentGroup

def get_experiments(self)
Expand source code
def get_experiments(self):
    """
    Return the list of experiments linked to the ExperimentGroup.
    :return list of DRObjects: list of the experiments linked to the ExperimentGroup
    """
    return self.structure["experiments"]

Return the list of experiments linked to the ExperimentGroup. :return list of DRObjects: list of the experiments linked to the ExperimentGroup

Inherited members

class Opportunity (id,
dr,
DR_type='opportunities',
structure={'experiment_groups': [], 'variable_groups': [], 'data_request_themes': [], 'time_subsets': []},
**attributes)
Expand source code
class Opportunity(DRObjects):
    def __init__(self, id, dr, DR_type="opportunities",
                 structure=dict(experiment_groups=list(), variable_groups=list(), data_request_themes=list(),
                                time_subsets=list()),
                 **attributes):
        super().__init__(id=id, dr=dr, DR_type=DR_type, structure=structure, **attributes)

    def check(self):
        super().check()
        logger = get_logger()
        if len(self.get_experiment_groups()) == 0:
            logger.critical(f"No experiments group defined for {self.DR_type} id {self.id}")
        if len(self.get_variable_groups()) == 0:
            logger.critical(f"No variables group defined for {self.DR_type} id {self.id}")
        if len(self.get_data_request_themes()) == 0:
            logger.critical(f"No theme defined for {self.DR_type} id {self.id}")

    @classmethod
    def from_input(cls, dr, id, experiment_groups=list(), variable_groups=list(), data_request_themes=list(),
                   time_subsets=list(), mips=list(), **kwargs):

        return super().from_input(DR_type="opportunities", dr=dr, id=id, elements=kwargs,
                                  structure=dict(experiment_groups=experiment_groups, variable_groups=variable_groups,
                                                 data_request_themes=data_request_themes, time_subsets=time_subsets,
                                                 mips=mips))

    def get_experiment_groups(self):
        """
        Return the list of ExperimentsGroup linked to the Opportunity.
        :return list of ExperimentsGroup: list of ExperimentsGroup linked to Opportunity
        """
        return self.structure["experiment_groups"]

    def get_variable_groups(self):
        """
        Return the list of VariablesGroup linked to the Opportunity.
        :return list of VariablesGroup: list of VariablesGroup linked to Opportunity
        """
        return self.structure["variable_groups"]

    def get_data_request_themes(self):
        """
        Return the list of themes linked to the Opportunity.
        :return list of DRObject or ConstantValueObj: list of themes linked to Opportunity
        """
        return self.structure["data_request_themes"]

    def get_themes(self):
        """
        Return the list of themes linked to the Opportunity.
        :return list of DRObject or ConstantValueObj: list of themes linked to Opportunity
        """
        return self.get_data_request_themes()

    def get_time_subsets(self):
        """
        Return the list of time subsets linked to the Opportunity.
        :return list of DRObject: list of time subsets linked to Opportunity
        """
        return self.structure["time_subsets"]

    def get_mips(self):
        """
        Return the list of MIPs linked to the Opportunity.
        :return list of DRObject: list of MIPs linked to Opportunity
        """
        return self.structure["mips"]

    def print_content(self, level=0, add_content=True):
        rep = super().print_content(level=level)
        if add_content:
            indent = "    " * (level + 1)
            rep.append(f"{indent}Experiments groups included:")
            for experiments_group in self.get_experiment_groups():
                rep.extend(experiments_group.print_content(level=level + 2, add_content=False))
            rep.append(f"{indent}Variables groups included:")
            for variables_group in self.get_variable_groups():
                rep.extend(variables_group.print_content(level=level + 2, add_content=False))
            rep.append(f"{indent}Themes included:")
            for theme in self.get_data_request_themes():
                rep.extend(theme.print_content(level=level + 2, add_content=False))
            rep.append(f"{indent}Time subsets included:")
            superindent = "    " * (level + 2)
            for time_subset in self.get_time_subsets():
                if time_subset is None:
                    rep.append(superindent + "time_subset: None")
                else:
                    rep.extend(time_subset.print_content(level=level + 2, add_content=False))
        return rep

    def filter_on_request(self, request_value, inner=True):
        request_type = request_value.DR_type
        filtered_found, found = self.dr.cache_filtering[self.DR_type][self.id][request_type][request_value.id]
        if filtered_found is None:
            filtered_found = True
            if request_type in ["data_request_themes", ]:
                found = request_value in self.get_data_request_themes()
            elif request_type in ["experiment_groups", ]:
                found = request_value in self.get_experiment_groups()
            elif request_type in ["variable_groups", ]:
                found = request_value in self.get_variable_groups()
            elif request_type in ["time_subsets", ]:
                found = request_value in self.get_time_subsets()
            elif request_type in ["mips", ]:
                found = request_value in self.get_mips() or \
                    (inner and self.filter_on_request_list(request_values=request_value,
                                                           list_to_check=self.get_variable_groups()))
            elif request_type in ["variables", "priority_levels", "cmip6_tables_identifiers", "temporal_shapes",
                                  "spatial_shapes", "structure_titles", "physical_parameters", "modelling_realms", "esm-bcvs",
                                  "cf_standard_names", "cell_methods", "cell_measures", "max_priority_levels", "cmip7_frequencies"]:
                found = self.filter_on_request_list(request_values=request_value,
                                                    list_to_check=self.get_variable_groups())
            elif request_type in ["experiments", ]:
                found = self.filter_on_request_list(request_values=request_value,
                                                    list_to_check=self.get_experiment_groups())
            else:
                filtered_found, found = super().filter_on_request(request_value=request_value)
            self.dr.cache_filtering[self.DR_type][self.id][request_type][request_value.id] = (filtered_found, found)
        return filtered_found, found

Base object to build the ones used within the DR API. Use to define basic information needed.

Initialisation of the object. :param str id: id of the object :param DataRequest dr: reference data request object :param str DR_type: type of DR object (for reference in vocabulary server) :param dict structure: if needed, elements linked by structure to the current object :param dict attributes: attributes of the object coming from vocabulary server

Ancestors

Methods

def get_data_request_themes(self)
Expand source code
def get_data_request_themes(self):
    """
    Return the list of themes linked to the Opportunity.
    :return list of DRObject or ConstantValueObj: list of themes linked to Opportunity
    """
    return self.structure["data_request_themes"]

Return the list of themes linked to the Opportunity. :return list of DRObject or ConstantValueObj: list of themes linked to Opportunity

def get_experiment_groups(self)
Expand source code
def get_experiment_groups(self):
    """
    Return the list of ExperimentsGroup linked to the Opportunity.
    :return list of ExperimentsGroup: list of ExperimentsGroup linked to Opportunity
    """
    return self.structure["experiment_groups"]

Return the list of ExperimentsGroup linked to the Opportunity. :return list of ExperimentsGroup: list of ExperimentsGroup linked to Opportunity

def get_mips(self)
Expand source code
def get_mips(self):
    """
    Return the list of MIPs linked to the Opportunity.
    :return list of DRObject: list of MIPs linked to Opportunity
    """
    return self.structure["mips"]

Return the list of MIPs linked to the Opportunity. :return list of DRObject: list of MIPs linked to Opportunity

def get_themes(self)
Expand source code
def get_themes(self):
    """
    Return the list of themes linked to the Opportunity.
    :return list of DRObject or ConstantValueObj: list of themes linked to Opportunity
    """
    return self.get_data_request_themes()

Return the list of themes linked to the Opportunity. :return list of DRObject or ConstantValueObj: list of themes linked to Opportunity

def get_time_subsets(self)
Expand source code
def get_time_subsets(self):
    """
    Return the list of time subsets linked to the Opportunity.
    :return list of DRObject: list of time subsets linked to Opportunity
    """
    return self.structure["time_subsets"]

Return the list of time subsets linked to the Opportunity. :return list of DRObject: list of time subsets linked to Opportunity

def get_variable_groups(self)
Expand source code
def get_variable_groups(self):
    """
    Return the list of VariablesGroup linked to the Opportunity.
    :return list of VariablesGroup: list of VariablesGroup linked to Opportunity
    """
    return self.structure["variable_groups"]

Return the list of VariablesGroup linked to the Opportunity. :return list of VariablesGroup: list of VariablesGroup linked to Opportunity

Inherited members

class Variable (id, dr, DR_type='variables', structure={}, **attributes)
Expand source code
class Variable(DRObjects):
    def __init__(self, id, dr, DR_type="variables", structure=dict(), **attributes):
        super().__init__(id=id, dr=dr, DR_type=DR_type, structure=structure, **attributes)

    @classmethod
    def from_input(cls, dr, id, **kwargs):
        return super().from_input(DR_type="variables", dr=dr, id=id, elements=kwargs, structure=dict())

    def print_content(self, level=0, add_content=True):
        """
        Function to return a printable version of the content of the current class.
        :param level: level of indent of the result
        :param add_content: should inner content be added?
        :return: a list of strings that can be assembled to print the content.
        """
        indent = "    " * level
        return [f"{indent}{self.DR_type.rstrip('s')}: {self.physical_parameter.name} at frequency "
                f"{self.cmip7_frequency.name} (id: {is_link_id_or_value(self.id)[1]}, title: {self.title})", ]

    def filter_on_request(self, request_value, inner=True):
        request_type = request_value.DR_type
        filtered_found, found = self.dr.cache_filtering[self.DR_type][self.id][request_type][request_value.id]
        if filtered_found is None:
            filtered_found = True
            if request_type in ["cmip6_tables_identifiers", ]:
                found = request_value == self.cmip6_tables_identifier
            elif request_type in ["temporal_shapes", ]:
                found = request_value == self.temporal_shape
            elif request_type in ["spatial_shapes", ]:
                found = request_value == self.spatial_shape
            elif request_type in ["structures", "structure_titles"]:
                found = request_value in self.structure_title
            elif request_type in ["physical_parameters", ]:
                found = request_value == self.physical_parameter
            elif request_type in ["modelling_realms", ]:
                found = request_value in self.modelling_realm
            elif request_type in ["esm-bcvs", ]:
                found = request_value == self.__getattr__("esm-bcv")
            elif request_type in ["cf_standard_names", ]:
                found = request_value == self.physical_parameter.cf_standard_name
            elif request_type in ["cell_methods", ]:
                found = request_value == self.cell_methods
            elif request_type in ["cell_measures", ]:
                found = request_value in self.cell_measures
            elif request_type in ["cmip7_frequencies", ]:
                found = request_value == self.cmip7_frequency
            elif request_type in ["cmip6_frequencies", ]:
                found = request_value == self.cmip6_frequency
            else:
                filtered_found, found = super().filter_on_request(request_value)
            self.dr.cache_filtering[self.DR_type][self.id][request_type][request_value.id] = (filtered_found, found)
        return filtered_found, found

Base object to build the ones used within the DR API. Use to define basic information needed.

Initialisation of the object. :param str id: id of the object :param DataRequest dr: reference data request object :param str DR_type: type of DR object (for reference in vocabulary server) :param dict structure: if needed, elements linked by structure to the current object :param dict attributes: attributes of the object coming from vocabulary server

Ancestors

Inherited members

class VariablesGroup (id,
dr,
DR_type='variable_groups',
structure={'variables': [], 'mips': [], 'priority_level': 'High'},
**attributes)
Expand source code
class VariablesGroup(DRObjects):
    def __init__(self, id, dr, DR_type="variable_groups",
                 structure=dict(variables=list(), mips=list(), priority_level="High"), **attributes):
        super().__init__(id=id, dr=dr, DR_type=DR_type, structure=structure, **attributes)

    def check(self):
        super().check()
        logger = get_logger()
        if self.count() == 0:
            logger.critical(f"No variable defined for {self.DR_type} id {self.id}")

    @classmethod
    def from_input(cls, dr, id, variables=list(), mips=list(), priority_level="High", **kwargs):
        return super().from_input(DR_type="variable_groups", dr=dr, id=id, elements=kwargs,
                                  structure=dict(variables=variables, mips=mips, priority_level=priority_level))

    def count(self):
        """
        Count the number of variables linked to the VariablesGroup.
        :return int: number of variables linked to the VariablesGroup
        """
        return len(self.get_variables())

    def get_variables(self):
        """
        Return the list of Variables linked to the VariablesGroup.
        :return list of Variable: list of Variable linked to VariablesGroup
        """
        return self.structure["variables"]

    def get_mips(self):
        """
        Return the list of MIPs linked to the VariablesGroup.
        :return list of DrObject: list of MIPs linked to VariablesGroup
        """
        return self.structure["mips"]

    def get_priority_level(self):
        """
        Return the priority level of the VariablesGroup.
        :return DrObject: priority level of VariablesGroup
        """
        return self.structure["priority_level"]

    def print_content(self, level=0, add_content=True):
        rep = super().print_content(level=level)
        if add_content:
            indent = "    " * (level + 1)
            rep.append(f"{indent}Variables included:")
            for variable in self.get_variables():
                rep.extend(variable.print_content(level=level + 2))
        return rep

    def filter_on_request(self, request_value, inner=True):
        request_type = request_value.DR_type
        filtered_found, found = self.dr.cache_filtering[self.DR_type][self.id][request_type][request_value.id]
        if filtered_found is None:
            filtered_found = True
            if request_type in ["variables", ]:
                found = request_value in self.get_variables()
            elif request_type in ["mips", ]:
                found = request_value in self.get_mips()
            elif request_type in ["max_priority_levels", ]:
                priority = self.dr.find_element("priority_level", self.get_priority_level().id)
                req_priority = self.dr.find_element("priority_level", request_value.id)
                found = priority.value <= req_priority.value
            elif request_type in ["priority_levels", ]:
                _, priority = is_link_id_or_value(self.get_priority_level().id)
                _, req_priority = is_link_id_or_value(request_value.id)
                found = req_priority == priority
            elif request_type in ["cmip6_tables_identifiers", "temporal_shapes", "spatial_shapes", "structures", "structure_titles",
                                  "physical_parameters", "modelling_realms", "esm-bcvs", "cf_standard_names", "cell_methods",
                                  "cell_measures", "cmip7_frequencies"]:
                found = self.filter_on_request_list(request_values=request_value, list_to_check=self.get_variables())
            else:
                filtered_found, found = super().filter_on_request(request_value=request_value)
            self.dr.cache_filtering[self.DR_type][self.id][request_type][request_value.id] = (filtered_found, found)
            if request_type not in ["max_priority_levels", ]:
                self.dr.cache_filtering[request_type][request_value.id][self.DR_type][self.id] = (filtered_found, found)
        return filtered_found, found

Base object to build the ones used within the DR API. Use to define basic information needed.

Initialisation of the object. :param str id: id of the object :param DataRequest dr: reference data request object :param str DR_type: type of DR object (for reference in vocabulary server) :param dict structure: if needed, elements linked by structure to the current object :param dict attributes: attributes of the object coming from vocabulary server

Ancestors

Methods

def count(self)
Expand source code
def count(self):
    """
    Count the number of variables linked to the VariablesGroup.
    :return int: number of variables linked to the VariablesGroup
    """
    return len(self.get_variables())

Count the number of variables linked to the VariablesGroup. :return int: number of variables linked to the VariablesGroup

def get_mips(self)
Expand source code
def get_mips(self):
    """
    Return the list of MIPs linked to the VariablesGroup.
    :return list of DrObject: list of MIPs linked to VariablesGroup
    """
    return self.structure["mips"]

Return the list of MIPs linked to the VariablesGroup. :return list of DrObject: list of MIPs linked to VariablesGroup

def get_priority_level(self)
Expand source code
def get_priority_level(self):
    """
    Return the priority level of the VariablesGroup.
    :return DrObject: priority level of VariablesGroup
    """
    return self.structure["priority_level"]

Return the priority level of the VariablesGroup. :return DrObject: priority level of VariablesGroup

def get_variables(self)
Expand source code
def get_variables(self):
    """
    Return the list of Variables linked to the VariablesGroup.
    :return list of Variable: list of Variable linked to VariablesGroup
    """
    return self.structure["variables"]

Return the list of Variables linked to the VariablesGroup. :return list of Variable: list of Variable linked to VariablesGroup

Inherited members