Module data_request_api.query.dreq_query

Functions to extract information from the data request. E.g., get variables requested for each experiment.

The module has two basic sections:

1) Functions that take the data request content and convert it to python objects (instances of classes defined in dreq_classes.py).

2) Functions that interrogate the data request, usually using output from (1) as their input.

Functions

def create_dreq_tables_for_request(content, dreq_version, **kwargs)
Expand source code
@append_kwargs_from_config
def create_dreq_tables_for_request(content, dreq_version, **kwargs):
    '''
    For the "request" part of the data request content (Opportunities, Variable Groups, etc),
    render airtable export content as DreqTable objects.

    For the "data" part of the data request, the corresponding function is create_dreq_tables_for_variables().

    Parameters
    ----------
    content : dict
        Airtable export (from json file). Dict is keyed by base name, for example:
        {'Data Request Opportunities (Public)' : {
            'Opportunity' : {...},
            ...
            },
         'Data Request Variables (Public)' : {
            'Variables' : {...}
            ...
            }
        }
    dreq_version : str
        Version string identifier for Data Request Content

    Returns
    -------
    Dict 'base' whose keys are table names and values are DreqTable objects.
    '''
    base, content_type = _get_base_dict(content, dreq_version, purpose='request')
    # base, content_type = _get_base_dict(content, dreq_version)

    # config defaults
    CONFIG = {'consolidate': True}
    # override with input args, if given
    CONFIG.update(kwargs)
    consolidate = CONFIG['consolidate']

    # Create objects representing data request tables
    table_id2name = get_table_id2name(base)
    for table_name, table in base.items():
        # print('Creating table object for table: ' + table_name)
        base[table_name] = DreqTable(table, table_id2name)

    # Change names of tables if needed
    # (insulates downstream code from upstream name changes that don't affect functionality)
    change_table_names = {}
    if content_type == 'raw':
        change_table_names = {
            # old name : new name
            'Experiment': 'Experiments',
            'Priority level': 'Priority Level'
        }
    for old, new in change_table_names.items():
        assert new not in base, 'New table name already exists: ' + new
        if old not in base:
            # print(f'Unavailable table {old}, skipping name change')
            continue
        base[new] = base[old]
        base.pop(old)

    # Make some adjustments that are specific to the Opportunity table
    dreq_opps = base['Opportunity']
    dreq_opps.rename_attr('title_of_opportunity', 'title')  # rename title attribute for brevity in downstream code
    for opp in dreq_opps.records.values():
        opp.title = opp.title.strip()
    if content_type == 'raw':
        if 'variable_groups' not in dreq_opps.attr2field:
            # Try alternate names for the latest variable groups
            try_vg_attr = []
            try_vg_attr.append('working_updated_variable_groups')  # takes precendence over originally requested groups
            try_vg_attr.append('originally_requested_variable_groups')
            for vg_attr in try_vg_attr:
                if vg_attr in dreq_opps.attr2field:
                    dreq_opps.rename_attr(vg_attr, 'variable_groups')
                    break
            assert 'variable_groups' in dreq_opps.attr2field, f'unable to determine variable groups attribute for opportunity: {opp.title}'
    exclude_opps = set()
    for opp_id, opp in dreq_opps.records.items():
        if not hasattr(opp, 'experiment_groups'):
            print(f' * WARNING *    no experiment groups found for Opportunity: {opp.title}')
            exclude_opps.add(opp_id)
        if not hasattr(opp, 'variable_groups'):
            print(f' * WARNING *    no variable groups found for Opportunity: {opp.title}')
            exclude_opps.add(opp_id)
    if len(exclude_opps) > 0:
        print('Quality control check is excluding these Opportunities:')
        for opp_id in exclude_opps:
            opp = dreq_opps.records[opp_id]
            print(f'  {opp.title}')
            dreq_opps.delete_record(opp_id)
        print()
    if len(dreq_opps.records) == 0:
        # If there are no opportunities left, there's no point in continuing!
        # This check is here because if something changes upstream in Airtable, it might cause
        # the above code to erroneously remove all opportunities.
        raise Exception(' * ERROR *    All Opportunities were removed!')

    # Determine which compound name to use
    if consolidate:
        USE_COMPOUND_NAME = 'cmip6_compound_name'
    else:
        version_tuple = get_dreq_version_tuple(dreq_version)
        if version_tuple[:2] >= (1, 2):
            USE_COMPOUND_NAME = 'cmip6_compound_name'
        else:
            USE_COMPOUND_NAME = 'compound_name'
    if USE_COMPOUND_NAME != 'compound_name':
        table_name = 'Variables'
        for rec in base[table_name].records.values():
            if hasattr(rec, 'compound_name'):
                raise Exception(f'compound_name attribute is already defined for table "{table_name}"')
            rec.compound_name = getattr(rec, USE_COMPOUND_NAME)

    return base

For the "request" part of the data request content (Opportunities, Variable Groups, etc), render airtable export content as DreqTable objects.

For the "data" part of the data request, the corresponding function is create_dreq_tables_for_variables().

Parameters

content : dict
Airtable export (from json file). Dict is keyed by base name, for example: {'Data Request Opportunities (Public)' : { 'Opportunity' : {…}, … }, 'Data Request Variables (Public)' : { 'Variables' : {…} … } }
dreq_version : str
Version string identifier for Data Request Content

Returns

Dict 'base' whose keys are table names and values are DreqTable objects.

def create_dreq_tables_for_variables(content, dreq_version)
Expand source code
def create_dreq_tables_for_variables(content, dreq_version):
    '''
    For the "data" part of the data request content (Variables, Cell Methods etc),
    render airtable export content as DreqTable objects.

    For the "request" part of the data request, the corresponding function is create_dreq_tables_for_request().

    '''
    base, content_type = _get_base_dict(content, dreq_version, purpose='variables')

    # Create objects representing data request tables
    table_id2name = get_table_id2name(base)
    for table_name, table in base.items():
        # print('Creating table object for table: ' + table_name)
        base[table_name] = DreqTable(table, table_id2name)

    # Change names of tables if needed
    # (insulates downstream code from upstream name changes that don't affect functionality)
    change_table_names = {}
    if content_type == 'raw':
        change_table_names = {
            # old name : new name
            'Variable': 'Variables',
            'Coordinate or Dimension': 'Coordinates and Dimensions',
            'Physical Parameter': 'Physical Parameters',
        }
    for old, new in change_table_names.items():
        assert new not in base, 'New table name already exists: ' + new
        base[new] = base[old]
        base.pop(old)

    return base

For the "data" part of the data request content (Variables, Cell Methods etc), render airtable export content as DreqTable objects.

For the "request" part of the data request, the corresponding function is create_dreq_tables_for_request().

def get_dimension_sizes(dreq_tables)
Expand source code
def get_dimension_sizes(dreq_tables):
    '''
    Create lookup table of dimension sizes by examining records in the Spatial Shape table.

    Parameters
    ----------
    dreq_tables: dict
        Dict values are DreqTable objects for the required tables, e.g.:
        dreq_tables = {
            'coordinates and dimensions': base['Coordinates and Dimensions'],
            'spatial shape': base['Spatial Shape'],
        }
    '''
    dim_names = [dimension.name for dimension in dreq_tables['coordinates and dimensions'].records.values()]
    assert len(set(dim_names)) == len(dim_names)
    dim_names.sort(key=str.lower)
    # Initialize dict having names of all dimensions in the data request (to ensure we don't miss any).
    # Each entry is a set(), and below we determine dimension sizes by any available method,
    # and then after the fact check to see if the answers were consistent.
    dim_sizes = OrderedDict({dim: set() for dim in dim_names})

    # Determine dimension sizes based on their records in the Coordinates & Dimensions table.
    for dimension in dreq_tables['coordinates and dimensions'].records.values():
        dim = dimension.name
        if hasattr(dimension, 'grid_class'):
            # Get size based on what type of grid this dimension is labelled as.
            if dimension.grid_class in ['model', 'options']:
                dim_sizes[dim].add(dimension.grid_class)
            elif dimension.grid_class in ['fixedScalar', 'fixedScaler']:  # fixedScaler = typo in Airtable
                dim_sizes[dim].add(1)
            elif dimension.grid_class == 'fixed':
                dim_sizes[dim].add(dimension.size)
            elif dimension.grid_class == 'fixedExternal':
                pass
            else:
                raise ValueError(f'Unknown grid class for dimension {dim}: {dimension.grid_class}')
        if hasattr(dimension, 'size'):
            # Use the size attribute, if it exists.
            dim_sizes[dim].add(dimension.size)
        if hasattr(dimension, 'requested_values'):
            # If a set of requested values if specified (e.g. for pressure levels grids like "plev19"),
            # use the length of the list of values.
            # The list is stored in Airtable as a space-delimited string.
            assert isinstance(dimension.requested_values, str), \
                f'Expected str for dimension.requested_values, received: {type(dimension.requested_values)}'
            values = dimension.requested_values.split()
            dim_sizes[dim].add(len(values))

    # Determine dimension sizes where possible by looking in the Spatial Shape table records.
    # This is an extra consistency check on the results from dimensions, but it doesn't seem to change
    # the results (as tested on dreq v1.2 content).
    for spatial_shape in dreq_tables['spatial shape'].records.values():
        if hasattr(spatial_shape, 'dimensions'):
            # Follow links from Spatial Shape to dimensions, if they exist
            for link in spatial_shape.dimensions:
                dimension = dreq_tables['coordinates and dimensions'].get_record(link)
                dim = dimension.name
                if hasattr(dimension, 'axis_flag') and dimension.axis_flag == 'Z':
                    dim_sizes[dim].add(spatial_shape.number_of_levels)
                if hasattr(dimension, 'size'):
                    dim_sizes[dim].add(dimension.size)

    # Check that the results make sense
    # Each dimension should have only one size
    # User-determined sizes are indicated by the grid_class values listed in user_sizes
    for dim, sizes in dim_sizes.items():
        user_sizes = {'options', 'model'}
        if len(user_sizes.intersection(sizes)) > 1:
            # Raise error if more than one user-determined size option is given, because
            # the result is ambiguous (which should be used?).
            raise ValueError(f'Unexpected sizes: {sizes}')
        for grid_class in user_sizes:
            if grid_class in sizes:
                sizes = {grid_class}

        if len(sizes) == 1:
            size = list(sizes)[0]
        elif len(sizes) > 1:
            size = max(sizes)
            print(f'Warning: found sizes {sorted(sizes)} for dimension "{dim}", assuming size = {size}')
        else:
            size = None
            msg = f'Warning: found no size for dimension "{dim}"'
            if dim in ['xant', 'yant']:
                size = 200
                msg += f', assuming size = {size}'
            print(msg)

        dim_sizes[dim] = size

    return dim_sizes

Create lookup table of dimension sizes by examining records in the Spatial Shape table.

Parameters

dreq_tables : dict
Dict values are DreqTable objects for the required tables, e.g.: dreq_tables = { 'coordinates and dimensions': base['Coordinates and Dimensions'], 'spatial shape': base['Spatial Shape'], }
def get_dreq_version_tuple(version: str)
Expand source code
def get_dreq_version_tuple(version: str):
    '''
    Parse version string to return tuple giving version major, minor (etc) numbers.
    Examples:
        get_dreq_version_tuple('v1.2') --> (1,2)
        get_dreq_version_tuple('v1.0beta') --> (1,0)
    '''
    if version == 'dev':
        # Is a tuple needed/useful for 'dev' versions? Set one just in case.
        return (0,)
    else:
        patt = '[0-9.]*[0-9]'
        ver_num = re.findall(patt, version)
        if len(ver_num) != 1:
            raise ValueError('Ambiguous version string: ' + version)
        ver_num_str = ver_num[0]
        return tuple(map(int, ver_num_str.split('.')))

Parse version string to return tuple giving version major, minor (etc) numbers.

Examples

get_dreq_version_tuple('v1.2') –> (1,2) get_dreq_version_tuple('v1.0beta') –> (1,0)

def get_opp_expts(opp, expt_groups, expts, verbose=False)
Expand source code
def get_opp_expts(opp, expt_groups, expts, verbose=False):
    '''
    For one Opportunity, get its requested experiments.
    Input parameters are not modified.

    Parameters
    ----------
    opp : DreqRecord
        One record from the Opportunity table
    expt_groups : DreqTable
        Experiment Group table
    expts : DreqTable
        Experiments table

    Returns
    -------
    Set giving names of experiments from which the Opportunity requests output.
    Example: {'historical', 'piControl'}
    '''
    # Follow links to experiment groups to find the names of requested experiments
    opp_expts = set()  # list to store names of experiments requested by this Opportunity
    if verbose:
        print('  Experiment Groups ({}):'.format(len(opp.experiment_groups)))
    for link in opp.experiment_groups:
        expt_group = expt_groups.records[link.record_id]

        if not hasattr(expt_group, 'experiments'):
            continue

        if verbose:
            print(f'    {expt_group.name}  ({len(expt_group.experiments)} experiments)')

        for link in expt_group.experiments:
            expt = expts.records[link.record_id]
            opp_expts.add(expt.experiment)
    return opp_expts

For one Opportunity, get its requested experiments. Input parameters are not modified.

Parameters

opp : DreqRecord
One record from the Opportunity table
expt_groups : DreqTable
Experiment Group table
expts : DreqTable
Experiments table

Returns

Set giving names of experiments from which the Opportunity requests output.
Example : {'historical', 'piControl'}
 
def get_opp_ids(use_opps, dreq_opps, verbose=False, quality_control=True)
Expand source code
def get_opp_ids(use_opps, dreq_opps, verbose=False, quality_control=True):
    '''
    Return list of unique opportunity identifiers.

    Parameters
    ----------
    use_opps : str or list
        "all" : return all available ids
        list of str : return ids for with the listed opportunity titles
    dreq_opps : DreqTable
        table object representing the opportunities table
    '''
    opp_ids = []
    records = dreq_opps.records
    if use_opps == 'all':
        # Include all opportunities
        opp_ids = list(records.keys())
    elif isinstance(use_opps, list):
        use_opps = sorted(set(use_opps))
        if all([isinstance(s, str) for s in use_opps]):
            # opp_ids = [opp_id for opp_id,opp in records.items() if opp.title in use_opps]
            title2id = {opp.title: opp_id for opp_id, opp in records.items()}
            assert len(records) == len(title2id), 'Opportunity titles are not unique'
            for title in use_opps:
                if title in title2id:
                    opp_ids.append(title2id[title])
                else:
                    # print(f'\n* WARNING *    Opportunity not found: {title}\n')
                    raise Exception(f'\n* ERROR *    The specified Opportunity is not found: {title}\n')

    assert len(set(opp_ids)) == len(opp_ids), 'found repeated opportunity ids'

    if quality_control:
        valid_opp_status = ['Accepted', 'Under review']
        discard_opp_id = set()
        for opp_id in opp_ids:
            opp = dreq_opps.get_record(opp_id)
            # print(opp)
            # if len(opp) == 0:
            #     # discard empty opportunities
            #     discard_opp_id.add(opp_id)
            if hasattr(opp, 'status') and opp.status not in valid_opp_status:
                discard_opp_id.add(opp_id)
        for opp_id in discard_opp_id:
            dreq_opps.delete_record(opp_id)
            opp_ids.remove(opp_id)
        del discard_opp_id

    if verbose:
        if len(opp_ids) > 0:
            print('Found {} Opportunities:'.format(len(opp_ids)))
            for opp_id in opp_ids:
                opp = records[opp_id]
                print('  ' + opp.title)
        else:
            print('No Opportunities found')

    return opp_ids

Return list of unique opportunity identifiers.

Parameters

use_opps : str or list
"all" : return all available ids list of str : return ids for with the listed opportunity titles
dreq_opps : DreqTable
table object representing the opportunities table
def get_opp_vars(opp, priority_levels, var_groups, dreq_vars, dreq_priorities=None, verbose=False)
Expand source code
def get_opp_vars(opp, priority_levels, var_groups, dreq_vars, dreq_priorities=None, verbose=False):
    '''
    For one Opportunity, get its requested variables grouped by priority level.
    Input parameters are not modified.

    Parameters
    ----------
    opp : DreqRecord
        One record from the Opportunity table
    priority_levels : list[str]
        Priority levels to get, example: ['High', 'Medium']
    var_groups : DreqTable
        Variable Group table
    dreq_vars : DreqTable
        Variables table
    dreq_priorities : DreqTable
        Required if var_group.priority_level is link to dreq_priorities table

    Returns
    -------
    Dict giving set of variables requested at each specified priority level
    Example: {'High' : {'Amon.tas', 'day.tas'}, 'Medium' : {'day.ua'}}
    '''
    # Follow links to variable groups to find names of requested variables
    opp_vars = {p: set() for p in priority_levels}
    if verbose:
        print('  Variable Groups ({}):'.format(len(opp.variable_groups)))
    for link in opp.variable_groups:
        var_group = var_groups.records[link.record_id]

        priority_level = get_var_group_priority(var_group, dreq_priorities)
        if priority_level not in priority_levels:
            continue

        if verbose:
            print(f'    {var_group.name}  ({len(var_group.variables)} variables, {priority_level} priority)')

        for link in var_group.variables:
            var = dreq_vars.records[link.record_id]
            var_name = get_unique_var_name(var)
            # Add this variable to the list of requested variables at the specified priority
            opp_vars[priority_level].add(var_name)
    return opp_vars

For one Opportunity, get its requested variables grouped by priority level. Input parameters are not modified.

Parameters

opp : DreqRecord
One record from the Opportunity table
priority_levels : list[str]
Priority levels to get, example: ['High', 'Medium']
var_groups : DreqTable
Variable Group table
dreq_vars : DreqTable
Variables table
dreq_priorities : DreqTable
Required if var_group.priority_level is link to dreq_priorities table

Returns

Dict giving set of variables requested at each specified priority level
 
Example : {'High' : {'Amon.tas', 'day.tas'}, 'Medium' : {'day.ua'}}
 
def get_priority_levels()
Expand source code
def get_priority_levels():
    '''
    Return list of all valid priority levels (str) in the data request.
    List is ordered from highest to lowest priority.
    '''
    priority_levels = [s.capitalize() for s in PRIORITY_LEVELS]

    # The priorities are specified in PRIORITY_LEVELS from dreq_classes.
    # Check here that 'Core' is highest priority.
    # The 'Core' priority represents the Baseline Climate Variables (BCVs, https://doi.org/10.5194/egusphere-2024-2363).
    # It should be highest priority unless something has been mistakenly modified in dreq_classes.py.
    # Hence this check should NEVER fail, and is done here only to be EXTRA safe.
    assert priority_levels[0] == 'Core', 'error in PRIORITY_LEVELS: highest priority should be Core (BCVs)'

    return priority_levels

Return list of all valid priority levels (str) in the data request. List is ordered from highest to lowest priority.

def get_requested_variables(content,
dreq_version,
use_opps='all',
priority_cutoff='Low',
verbose=True,
check_core_variables=True)
Expand source code
def get_requested_variables(content, dreq_version,
                            use_opps='all', priority_cutoff='Low',
                            verbose=True, check_core_variables=True):
    '''
    Return variables requested for each experiment, as a function of opportunities supported and priority level of variables.

    Parameters
    ----------
    content : dict
        Dict containing either:
        - data request content as exported from airtable
        OR
        - DreqTable objects representing tables (dict keys are table names)
    dreq_version : str
        Version string identifier for Data Request Content
    use_opp : str or list of str/int
        Identifies the opportunities being supported. Options:
            'all' : include all available opportunities
            integers : include opportunities identified by their integer IDs
            strings : include opportunities identified by their titles
    priority_cutoff : str
        Only return variables of equal or higher priority level than priority_cutoff.
        E.g., priority_cutoff='Low' means all priority levels are returned.
    check_core_variables : bool
        True ==> check that all experiments contain a non-empty list of Core variables,
        and that it's the same list for all experiments.

    Returns
    -------
    Dict keyed by experiment name, giving prioritized variables for each experiment.
    Example:
    {   'Header' : ... (Header contains info about where this request comes from)
        'experiment' : {
            'historical' :
                'High' : ['Amon.tas', 'day.tas', ...],
                'Medium' : ...
            }
            ...
        }
    }
    '''
    base = _get_base_dreq_tables(content, dreq_version, purpose='request')

    dreq_tables = {
        'opps': base['Opportunity'],
        'expt groups': base['Experiment Group'],
        'expts': base['Experiments'],
        'var groups': base['Variable Group'],
        'vars': base['Variables']
    }
    opp_ids = get_opp_ids(use_opps, dreq_tables['opps'], verbose=verbose)

    # all_priority_levels = ['Core', 'High', 'Medium', 'Low']
    # all_priority_levels = [s.capitalize() for s in PRIORITY_LEVELS]
    all_priority_levels = get_priority_levels()

    if 'Priority Level' in base:
        dreq_tables['priority level'] = base['Priority Level']
        priority_levels_from_table = [rec.name for rec in dreq_tables['priority level'].records.values()]
        assert set(all_priority_levels) == set(priority_levels_from_table), \
            'inconsistent priority levels:\n  ' + str(all_priority_levels) + '\n  ' + str(priority_levels_from_table)
    else:
        dreq_tables['priority level'] = None
    priority_cutoff = priority_cutoff.capitalize()
    if priority_cutoff not in all_priority_levels:
        raise ValueError('Invalid priority level cutoff: ' + priority_cutoff + '\nCould not determine priority levels to include.')
    m = all_priority_levels.index(priority_cutoff)
    priority_levels = all_priority_levels[:m + 1]
    del priority_cutoff

    # Loop over Opportunities to get prioritized lists of variables
    request = {}  # dict to hold aggregated request
    for opp_id in opp_ids:
        opp = dreq_tables['opps'].records[opp_id]  # one record from the Opportunity table

        if verbose:
            print(f'Opportunity: {opp.title}')

        opp_expts = get_opp_expts(opp,
                                  dreq_tables['expt groups'],
                                  dreq_tables['expts'],
                                  verbose=verbose)

        opp_vars = get_opp_vars(opp,
                                priority_levels,
                                dreq_tables['var groups'],
                                dreq_tables['vars'],
                                dreq_tables['priority level'],
                                verbose=verbose)

        # Aggregate this Opportunity's request into the master list of requests
        for expt_name in opp_expts:
            if expt_name not in request:
                # If we haven't encountered this experiment yet, initialize an ExptRequest object for it
                request[expt_name] = ExptRequest(expt_name)

            # Add this Opportunity's variables request to the ExptRequest object
            for priority_level, var_names in opp_vars.items():
                request[expt_name].add_vars(var_names, priority_level)

    opp_titles = sorted([dreq_tables['opps'].get_record(opp_id).title for opp_id in opp_ids])
    requested_vars = {
        'Header': {
            'Opportunities': opp_titles,
            'dreq version': dreq_version,
        },
        'experiment': {},
    }
    for expt_name, expt_req in request.items():
        requested_vars['experiment'].update(expt_req.to_dict())

    if check_core_variables:
        # Confirm that 'Core' priority level variables are included, and identical for each experiment.
        # The setting of priority_levels list, above, should guarantee this.
        # Putting this extra check here just to be extra sure.
        core_vars = set()
        for expt_name, expt_req in requested_vars['experiment'].items():
            assert 'Core' in expt_req, 'Missing Core variables for experiment: ' + expt_name
            vars = set(expt_req['Core'])
            if len(vars) == 0:
                msg = 'Empty Core variables list for experiment: ' + expt_name
                raise ValueError(msg)
            if len(core_vars) == 0:
                core_vars = vars
            if vars != core_vars:
                msg = 'Inconsistent Core variables for experiment: ' + expt_name + \
                    f'\n{len(core_vars)} {len(vars)} {len(core_vars.intersection(vars))}'
                raise ValueError(msg)

    return requested_vars

Return variables requested for each experiment, as a function of opportunities supported and priority level of variables.

Parameters

content : dict
Dict containing either: - data request content as exported from airtable OR - DreqTable objects representing tables (dict keys are table names)
dreq_version : str
Version string identifier for Data Request Content
use_opp : str or list of str/int
Identifies the opportunities being supported. Options: 'all' : include all available opportunities integers : include opportunities identified by their integer IDs strings : include opportunities identified by their titles
priority_cutoff : str
Only return variables of equal or higher priority level than priority_cutoff. E.g., priority_cutoff='Low' means all priority levels are returned.
check_core_variables : bool
True ==> check that all experiments contain a non-empty list of Core variables, and that it's the same list for all experiments.

Returns

Dict keyed by experiment name, giving prioritized variables for each experiment.
Example:
 

{ 'Header' : … (Header contains info about where this request comes from) 'experiment' : { 'historical' : 'High' : ['Amon.tas', 'day.tas', …], 'Medium' : … } … } }

def get_table_id2name(base)
Expand source code
def get_table_id2name(base):
    '''
    Get a mapping from table id to table name
    '''
    table_id2name = {}
    base.pop("version", None)
    for table in base.values():
        table_id2name.update({
            table['id']: table['name']
        })
    assert len(table_id2name) == len(base), 'table ids are not unique!'
    return table_id2name

Get a mapping from table id to table name

def get_unique_var_name(var)
Expand source code
def get_unique_var_name(var):
    '''
    Return name that uniquely identifies a variable.
    Reason to make this a function is to control this choice in one place.
    E.g., if compound_name is used initially, but something else chosen later.

    Parameters
    ----------
    var : DreqRecord
        Object representing a variable

    Returns
    -------
    str that uniquely identifes a variable in the data request
    '''
    if UNIQUE_VAR_NAME == 'compound name':
        return var.compound_name
    else:
        raise ValueError('Unknown identifier for UNIQUE_VAR_NAME: ' + UNIQUE_VAR_NAME +
                         '\nHow should the unique variable name be determined?')

Return name that uniquely identifies a variable. Reason to make this a function is to control this choice in one place. E.g., if compound_name is used initially, but something else chosen later.

Parameters

var : DreqRecord
Object representing a variable

Returns

str that uniquely identifes a variable in the data request
 
def get_var_group_priority(var_group, dreq_priorities=None)
Expand source code
def get_var_group_priority(var_group, dreq_priorities=None):
    '''
    Returns string stating the priorty level of variable group.

    Parameters
    ----------
    var_group : DreqRecord
        Object representing a variable group
        Its "priority_level" attribute specifies the priority as either string or link to dreq_priorities table
    dreq_priorities : DreqTable
        Required if var_group.priority_level is link to dreq_priorities table

    Returns
    -------
    str that states the priority level, e.g. "High"
    '''
    if not hasattr(var_group, 'priority_level'):
        return 'Undefined'

    if isinstance(var_group.priority_level, list):
        assert len(var_group.priority_level) == 1, 'Variable group should have one specified priority level'
        link = var_group.priority_level[0]
        assert isinstance(dreq_priorities, DreqTable)
        rec = dreq_priorities.records[link.record_id]
        priority_level = rec.name
    elif isinstance(var_group.priority_level, str):
        priority_level = var_group.priority_level
    else:
        raise Exception('Unable to determine variable group priority level')
    if not isinstance(priority_level, str):
        raise TypeError('Priority level should be str, instead got {}'.format(type(priority_level)))
    return priority_level

Returns string stating the priorty level of variable group.

Parameters

var_group : DreqRecord
Object representing a variable group Its "priority_level" attribute specifies the priority as either string or link to dreq_priorities table
dreq_priorities : DreqTable
Required if var_group.priority_level is link to dreq_priorities table

Returns

str that states the priority level, e.g. "High"
 
def get_variables_metadata(content, dreq_version, compound_names=None, cmor_tables=None, cmor_variables=None)
Expand source code
def get_variables_metadata(content, dreq_version,
                           compound_names=None, cmor_tables=None, cmor_variables=None):
    '''
    Get metadata for CMOR variables (dimensions, cell_methods, out_name, ...).

    Parameters:
    -----------
    content : dict
        Dict containing either:
        - data request content as exported from airtable
        OR
        - DreqTable objects representing tables (dict keys are table names)
    dreq_version : str
        Version string identifier for Data Request Content
    compound_names : list[str]
        Compound names of variables to include. If not given, all are included.
        Example: ['Amon.tas', 'Omon.sos']
    cmor_tables : list[str]
        Names of CMOR tables to include. If not given, all are included.
        Example: ['Amon', 'Omon']
    cmor_variables : list[str]
        Names of CMOR variables to include. If not given, all are included.
        Here the out_name is used as the CMOR variable name.
        Example: ['tas', 'siconc']

    Returns:
    --------
    all_var_info : dict
        Dictionary indexed by unique variable name, giving metadata for each variable.
        Also includes a header giving info on provenance of the info (data request version used, etc).
    '''
    base = _get_base_dreq_tables(content, dreq_version, purpose='request')

    # Some variables in these dreq versions lack a 'frequency' attribute; use the legacy CMIP6 frequency for them
    dreq_versions_substitute_cmip6_freq = ['v1.0', 'v1.1']

    # Use dict dreq_tables to store instances of the DreqTable class that are used in this function.
    # Mostly this would be the same as simply using base[table name], but in some cases there's a choice
    # of which table to use. Using dreq_tables as a mapping makes this choice explicit.
    dreq_tables = {
        'variables': base['Variables']
    }
    # The Variables table is the master list of variables in the data request.
    # Each entry (row) is a CMOR variable, containing the variable's metadata.
    # Many of these entries are links to other tables in the database (see below).

    # Set frequency table and (if necessary) frequency attribute of variables table
    freq_table_name = 'CMIP7 Frequency'
    dreq_tables['frequency'] = base[freq_table_name]
    if 'frequency' not in dreq_tables['variables'].attr2field:
        # The code below assumes each variable has an attribute called 'frequency'.
        # Here adjust for the possibility that the variables table may not yet have an attribute with this name.
        freq_attr_name = format_attribute_name(freq_table_name)
        if freq_attr_name in dreq_tables['variables'].attr2field:
            # If the attribute name corresponding to this table name is available, rename it as 'frequency'
            dreq_tables['variables'].rename_attr(freq_attr_name, 'frequency')
        else:
            raise ValueError(f'Expected attribute {freq_attr_name} linking to table {freq_table_name}')
        # Confirm that the 'frequency' attribute points to the correct table
        # (this is checking if the above change was made self-consistently).
        assert dreq_tables['variables'].links['frequency'] == freq_table_name, \
            'inconsistent table link for frequency attribute'

    # Get other tables from the database that are required to find all of a variable's metadata used by CMOR.
    dreq_tables.update({
        'spatial shape': base['Spatial Shape'],
        'coordinates and dimensions': base['Coordinates and Dimensions'],
        'temporal shape': base['Temporal Shape'],
        'cell methods': base['Cell Methods'],
        'physical parameters': base['Physical Parameters'],
        'realm': base['Modelling Realm'],
        'cell measures': base['Cell Measures'],
        'CF standard name': None,
    })
    if 'CF Standard Names' in base:
        dreq_tables['CF standard name'] = base['CF Standard Names']
    if 'Structure' in base:
        dreq_tables['structure'] = base['Structure']

    if 'Table Identifiers' in base:
        dreq_tables['CMOR tables'] = base['Table Identifiers']
        attr_table = 'table'
        attr_realm = 'modelling_realm'
    elif 'CMIP6 Table Identifiers (legacy)' in base:
        dreq_tables['CMOR tables'] = base['CMIP6 Table Identifiers (legacy)']
        attr_table = 'cmip6_table_legacy'
        attr_realm = 'modelling_realm___primary'
    else:
        raise ValueError('Which table contains CMOR table identifiers?')

    if dreq_version in dreq_versions_substitute_cmip6_freq:
        # needed for corrections below
        dreq_tables['CMIP6 frequency'] = base['CMIP6 Frequency (legacy)']

    # Check uniqueness of chosen variable names.
    var_name_map = {get_unique_var_name(record): record_id for record_id, record in dreq_tables['variables'].records.items()}
    assert len(var_name_map) == len(dreq_tables['variables'].records), \
        f'Variable names from UNIQUE_VAR_NAME="{UNIQUE_VAR_NAME}" do not uniquely map to variable record ids'

    if cmor_tables:
        print('Retaining only these CMOR tables: ' + ', '.join(cmor_tables))
    if cmor_variables:
        print('Retaining only these CMOR variables: ' + ', '.join(cmor_variables))
    if compound_names:
        print('Retaining only these compound names: ' + ', '.join(compound_names))

    substitute = {
        # replacement character(s) : [characters to replace with the replacement character]
        '_': ['\\_']
    }
    all_var_info = {}
    for var in dreq_tables['variables'].records.values():

        if compound_names:
            if var.compound_name not in compound_names:
                continue

        var_name = get_unique_var_name(var)

        link_table = getattr(var, attr_table)
        if len(link_table) != 1:
            raise Exception(f'variable {var_name} should have one table link, found: ' + str(link_table))
        table_id = dreq_tables['CMOR tables'].get_record(link_table[0]).name
        if cmor_tables:
            # Filter by CMOR table name
            if table_id not in cmor_tables:
                continue

        if not hasattr(var, 'frequency') and dreq_version in dreq_versions_substitute_cmip6_freq:
            # seems to be an error for some vars in v1.0, so instead use their CMIP6 frequency
            assert len(var.cmip6_frequency_legacy) == 1
            link = var.cmip6_frequency_legacy[0]
            var.frequency = [dreq_tables['CMIP6 frequency'].get_record(link).name]
            # print('using CMIP6 frequency for ' + var_name)

        if isinstance(var.frequency[0], str):
            # retain this option for non-consolidated airtable export?
            assert isinstance(var.frequency, list)
            frequency = var.frequency[0]
        else:
            link = var.frequency[0]
            freq = dreq_tables['frequency'].get_record(link)
            frequency = freq.name

        cell_methods = ''
        area_label_dd = ''
        if hasattr(var, 'cell_methods'):
            assert len(var.cell_methods) == 1
            link = var.cell_methods[0]
            cm = dreq_tables['cell methods'].get_record(link)
            cell_methods = cm.cell_methods
            if hasattr(cm, 'brand_id'):
                area_label_dd = cm.brand_id

        # Get dimensions by
        # 1) using dimensions attribute from variable table, if given
        # 2) following database links
        dimensions_var = None
        if hasattr(var, 'dimensions'):
            # The variable table record gives the dimensions
            # dreq versions before v1.2 don't have a dimensions attribute in the variables table
            assert isinstance(var.dimensions, str), \
                f'Expected comma-delimited string giving the dimensions for {var_name}'
            dims_list = [s.strip() for s in var.dimensions.split(',')]
            dimensions_var = ' '.join(dims_list)

            # As an extra check, confirm each name in the list corresponds to a record in the coords+dims table
            for dim_name in dims_list:
                dimension = dreq_tables['coordinates and dimensions'].get_attr_record('name', dim_name, unique=True)
                # get_attr_record() with unique=True will fail if the name doesn't uniquely correspond
                # to a coordinates & dimensions table record.

        # Create dimensions list by following the relevant database links.
        dims_list = []
        # Get the 'Spatial Shape' record, which contains info about dimensions
        assert len(var.spatial_shape) == 1
        link = var.spatial_shape[0]
        spatial_shape = dreq_tables['spatial shape'].get_record(link)
        if hasattr(spatial_shape, 'dimensions'):
            for link in spatial_shape.dimensions:
                dimension = dreq_tables['coordinates and dimensions'].get_record(link)
                dims_list.append(dimension.name)
        # Add any dimensions present in structure record, if given
        # (A 'structure' link gives dimensions besides spatial & temporal ones, e.g. 'tau')
        if hasattr(var, 'structure_title'):
            link = var.structure_title[0]
            structure = dreq_tables['structure'].get_record(link)
            if hasattr(structure, 'dimensions'):
                for link in structure.dimensions:
                    dimension = dreq_tables['coordinates and dimensions'].get_record(link)
                    dims_list.append(dimension.name)
        # Add temporal dimensions
        link = var.temporal_shape[0]
        temporal_shape = dreq_tables['temporal shape'].get_record(link)
        # dims_list.append(temporal_shape.name)
        # An example of temporal_shape.name is 'time-point', but the equivalent dimensions list
        # entry for this is 'time1'.
        if hasattr(temporal_shape, 'dimensions'):
            for link in temporal_shape.dimensions:
                dimension = dreq_tables['coordinates and dimensions'].get_record(link)
                dims_list.append(dimension.name)
        # Add any coordinates
        if hasattr(var, 'coordinates'):
            for link in var.coordinates:
                coordinate = dreq_tables['coordinates and dimensions'].get_record(link)
                dims_list.append(coordinate.name)

        dimensions_linked = ' '.join(dims_list)

        compare_dims = False
        if compare_dims and dimensions_var:
            # Compare dimensions obtained from links vs. variable table record.
            # This check is expected to fail for some variables for v1.2 onward because the
            # Structure table was removed from the release base. It's left here in the code
            # as an internal option because it can be useful for debugging.
            if dimensions_linked != dimensions_var:
                msg = f'Inconsistent dimensions for {var_name}:\n  {dimensions_var}\n  {dimensions_linked}'
                print(msg)

        if dimensions_var:
            dimensions = dimensions_var
        else:
            dimensions = dimensions_linked

        # Get physical parameter record and use its name as out_name.
        # (Comparison with CMIP6 CMOR tables shows that out_name is the same as physical parameter name
        # for almost all variables in dreq v1.2.1.)
        link = var.physical_parameter[0]
        phys_param = dreq_tables['physical parameters'].get_record(link)
        out_name = phys_param.name

        if cmor_variables:
            # Filter by CMOR variable name
            if out_name not in cmor_variables:
                continue

        # Get CF standard name, if it exists
        standard_name = ''
        standard_name_proposed = ''
        if hasattr(phys_param, 'cf_standard_name'):
            if isinstance(phys_param.cf_standard_name, str):
                # retain this option for non-consolidated airtable export?
                standard_name = phys_param.cf_standard_name
            else:
                link = phys_param.cf_standard_name[0]
                cfsn = dreq_tables['CF standard name'].get_record(link)
                standard_name = cfsn.name
        else:
            standard_name_proposed = phys_param.proposed_cf_standard_name

        link_realm = getattr(var, attr_realm)
        modeling_realm = [dreq_tables['realm'].get_record(link).id for link in link_realm]

        cell_measures = ''
        if hasattr(var, 'cell_measures'):
            cell_measures = [dreq_tables['cell measures'].get_record(link).name for link in var.cell_measures]

        positive = ''
        if hasattr(var, 'positive_direction'):
            positive = var.positive_direction

        comment = ''
        if hasattr(var, 'description'):
            comment = var.description

        var_info = OrderedDict()
        # Insert fields in order given by CMIP6 cmor tables (https://github.com/PCMDI/cmip6-cmor-tables)
        var_info.update({
            'frequency': frequency,
            'modeling_realm': ' '.join(modeling_realm),
        })
        if standard_name != '':
            var_info['standard_name'] = standard_name
        else:
            var_info['standard_name_proposed'] = standard_name_proposed
        var_info.update({
            'units': phys_param.units,
            'cell_methods': cell_methods,
            'cell_measures': ' '.join(cell_measures),

            'long_name': var.title,
            'comment': comment,

            'dimensions': dimensions,

            'out_name': out_name,
            'type': var.type,
            'positive': positive,

            'spatial_shape': spatial_shape.name,
            'temporal_shape': temporal_shape.name,

            # 'temporalLabelDD' : temporal_shape.brand,
            # 'verticalLabelDD' : spatial_shape.vertical_label_dd,
            # 'horizontalLabelDD' : spatial_shape.hor_label_dd,
            # 'areaLabelDD' : area_label_dd,

            'cmip6_table': table_id,
            'physical_parameter_name': phys_param.name,
        })

        # Get info on branded variable name, if available
        if hasattr(var, 'branded_variable_name'):
            branded_variable_name = var.branded_variable_name
            assert branded_variable_name.count('_') == 1, \
                'Expected one (and only one) underscore in branded variable name: ' + branded_variable_name
            variableRootDD = branded_variable_name.split('_')[0]
            var_info.update({
                'variableRootDD': variableRootDD,
                'branded_variable_name': branded_variable_name,
            })

        for k, v in var_info.items():
            v = v.strip()
            for replacement in substitute:
                for s in substitute[replacement]:
                    if s in v:
                        v = v.replace(s, replacement)
            var_info[k] = v

        assert var_name not in all_var_info, 'non-unique variable name: ' + var_name
        all_var_info[var_name] = var_info

        del var_info, var_name

    # Sort the all-variables dict
    d = OrderedDict()
    for var_name in sorted(all_var_info, key=str.lower):
        d[var_name] = all_var_info[var_name]
    all_var_info = d
    del d

    return all_var_info

Get metadata for CMOR variables (dimensions, cell_methods, out_name, …).

Parameters:

content : dict Dict containing either: - data request content as exported from airtable OR - DreqTable objects representing tables (dict keys are table names) dreq_version : str Version string identifier for Data Request Content compound_names : list[str] Compound names of variables to include. If not given, all are included. Example: ['Amon.tas', 'Omon.sos'] cmor_tables : list[str] Names of CMOR tables to include. If not given, all are included. Example: ['Amon', 'Omon'] cmor_variables : list[str] Names of CMOR variables to include. If not given, all are included. Here the out_name is used as the CMOR variable name. Example: ['tas', 'siconc']

Returns:

all_var_info : dict Dictionary indexed by unique variable name, giving metadata for each variable. Also includes a header giving info on provenance of the info (data request version used, etc).

def show_requested_vars_summary(expt_vars, dreq_version)
Expand source code
def show_requested_vars_summary(expt_vars, dreq_version):
    '''
    Display quick summary to stdout of variables requested.
    expt_vars is the output dict from dq.get_requested_variables().
    '''
    print(f'\nFor data request version {dreq_version}, number of requested variables found by experiment:')
    priority_levels = get_priority_levels()
    for expt, req in sorted(expt_vars['experiment'].items()):
        d = {p: 0 for p in priority_levels}
        for p in priority_levels:
            if p in req:
                d[p] = len(req[p])
        n_total = sum(d.values())
        print(f'  {expt} : ' + ' ,'.join(['{p}={n}'.format(p=p, n=d[p]) for p in priority_levels]) + f', TOTAL={n_total}')

Display quick summary to stdout of variables requested. expt_vars is the output dict from dq.get_requested_variables().

def write_requested_vars_json(outfile, expt_vars, dreq_version, priority_cutoff, content_path)
Expand source code
def write_requested_vars_json(outfile, expt_vars, dreq_version, priority_cutoff, content_path):
    '''
    Write a nicely formatted json file with lists of requested variables by experiment.
    expt_vars is the output dict from dq.get_requested_variables().
    '''

    header = OrderedDict({
        'Description': 'This file gives the names of output variables that are requested from CMIP experiments by the supported Opportunities. The variables requested from each experiment are listed under each experiment name, grouped according to the priority level at which they are requested. For each experiment, the prioritized list of variables was determined by compiling together all requests made by the supported Opportunities for output from that experiment.',
        'Opportunities supported': sorted(expt_vars['Header']['Opportunities'], key=str.lower)
    })

    # List supported priority levels
    priority_levels = get_priority_levels()
    priority_cutoff = priority_cutoff.capitalize()
    m = priority_levels.index(priority_cutoff) + 1
    header.update({
        'Priority levels supported': priority_levels[:m]
    })
    for req in expt_vars['experiment'].values():
        for p in priority_levels[m:]:
            assert req[p] == []
            req.pop(p)  # remove empty lists of unsupported priorities from the output

    # List included experiments
    header.update({
        'Experiments included': sorted(expt_vars['experiment'].keys(), key=str.lower)
    })

    # Get provenance of content to include in the header
    # content_path = dc._dreq_content_loaded['json_path']
    with open(content_path, 'rb') as f:
        content_hash = hashlib.sha256(f.read()).hexdigest()
    header.update({
        'dreq content version': dreq_version,
        'dreq content file': os.path.basename(os.path.normpath(content_path)),
        'dreq content sha256 hash': content_hash,
        'dreq api version': api_version,
    })

    out = {
        'Header': header,
        'experiment': OrderedDict(),
    }
    # Put sorted contents of expt_vars into OrderedDict
    expt_names = sorted(expt_vars['experiment'].keys(), key=str.lower)
    for expt_name in expt_names:
        out['experiment'][expt_name] = OrderedDict()
        req = expt_vars['experiment'][expt_name]
        for p in priority_levels:
            if p in req:
                out['experiment'][expt_name][p] = req[p]

    # Write the results to json
    with open(outfile, 'w') as f:
        # json.dump(expt_vars, f, indent=4, sort_keys=True)
        json.dump(out, f, indent=4)
        print('\nWrote requested variables to ' + outfile)

Write a nicely formatted json file with lists of requested variables by experiment. expt_vars is the output dict from dq.get_requested_variables().

def write_variables_metadata(all_var_info, dreq_version, filepath, api_version=None, content_path=None)
Expand source code
def write_variables_metadata(all_var_info, dreq_version, filepath,
                             api_version=None, content_path=None):

    ext = os.path.splitext(filepath)[-1]

    if not api_version:
        raise ValueError(f'Must provide API version, received: {api_version}')
    if not content_path:
        raise ValueError(f'Must provide path to data request content, received: {content_path}')

    if ext == '.json':
        # Get provenance of content to include in the header
        with open(content_path, 'rb') as f:
            content_hash = hashlib.sha256(f.read()).hexdigest()

        # Create output dict
        out = OrderedDict({
            'Header': OrderedDict({
                'Description': 'Metadata attributes that characterize CMOR variables. Each variable is uniquely idenfied by a compound name comprised of a CMIP6-era table name and a short variable name.',
                'no. of variables': len(all_var_info),
                'dreq content version': dreq_version,
                'dreq content file': os.path.basename(os.path.normpath(content_path)),
                'dreq content sha256 hash': content_hash,
                'dreq api version': api_version,
            }),
            'Compound Name': all_var_info,
        })

        # Write variables metadata to json
        with open(filepath, 'w') as f:
            json.dump(out, f, indent=4)
            print(f'Wrote {filepath} for {len(all_var_info)} variables, dreq version = {dreq_version}')

    elif ext == '.csv':
        # Write variables metadata to csv
        var_info = next(iter(all_var_info.values()))
        attrs = list(var_info.keys())
        columns = ['Compound Name']
        columns.append('standard_name')
        columns.append('standard_name_proposed')
        columns += [s for s in attrs if s not in columns]
        rows = [columns]  # column header line
        # Add each variable as a row
        for var_name, var_info in all_var_info.items():
            row = []
            for col in columns:
                if col == 'Compound Name':
                    val = var_name
                elif col in var_info:
                    val = var_info[col]
                else:
                    val = ''
                row.append(val)
            rows.append(row)
        write_csv_output_file_content(filepath, rows)
        n = len(all_var_info)
        print(f'Wrote {filepath} for {n} variables, dreq version = {dreq_version}')