Module data_request_api.tests.test_dreq_query

Functions

def monkeypatch(monkeypatch)
Expand source code
@pytest.fixture(scope="function")
def monkeypatch(monkeypatch):
    return monkeypatch
def temp_config_file(tmp_path_factory)
Expand source code
@pytest.fixture(scope="function")
def temp_config_file(tmp_path_factory):
    temp_dir = tmp_path_factory.mktemp("data")
    config_file = temp_dir / ".CMIP7_data_request_api_config"
    cfg = {"cache_dir": str(temp_dir)}
    with open(config_file, "w") as f:
        yaml.dump(cfg, f)
    try:
        yield config_file
    finally:
        config_file.unlink(missing_ok=True)
        dreqcfg.CONFIG = {}
def test_get_requested_variables_combined_request(temp_config_file, monkeypatch)
Expand source code
def test_get_requested_variables_combined_request(temp_config_file, monkeypatch):
    dreqcfg.CONFIG = {}
    monkeypatch.setattr(
        "data_request_api.utilities.config.CONFIG_FILE", temp_config_file
    )
    dc._dreq_res = os.path.dirname(temp_config_file)
    dc.versions = {"tags": [], "branches": []}
    dc._versions_retrieved_last = {"tags": 0, "branches": 0}
    use_dreq_version = "v1.2.2.2"
    data = dc.load(use_dreq_version)
    assert dreqcfg.CONFIG == {
        **dreqcfg.DEFAULT_CONFIG,
        "cache_dir": str(os.path.dirname(temp_config_file)),
    }
    base = dq.create_dreq_tables_for_request(data, use_dreq_version)

    # Run get_requested_variables
    no_cr = dq.get_requested_variables(
        base,
        use_dreq_version,
        use_opps="all",
        time_subsets=False,
        combined_request=False,
        verbose=False,
    )
    inc_cr = dq.get_requested_variables(
        base,
        use_dreq_version,
        use_opps="all",
        time_subsets=False,
        combined_request=True,
        verbose=False,
    )

    # Check that time subsets are included and that the same variables are included
    assert sorted(list(inc_cr["experiment"].keys())) == sorted(
        list(no_cr["experiment"].keys())
        + ["all_experiments", "historical_experiments", "scenario_experiments"]
    )
    joint_request = dict()
    for prio in ["Core", "High", "Medium", "Low"]:
        joint_request[prio] = [
            var for prioreq in no_cr["experiment"].values() for var in prioreq[prio]
        ]
        joint_request[prio] = set(joint_request[prio])

    # Remove higher priority vars from low priority requests
    joint_request["High"] = joint_request["High"].difference(joint_request["Core"])

    joint_request["Medium"] = joint_request["Medium"].difference(joint_request["Core"])
    joint_request["Medium"] = joint_request["Medium"].difference(joint_request["High"])

    joint_request["Low"] = joint_request["Low"].difference(joint_request["Core"])
    joint_request["Low"] = joint_request["Low"].difference(joint_request["High"])
    joint_request["Low"] = joint_request["Low"].difference(joint_request["Medium"])

    # Check that all variables are included in the joint request
    for prio in ["Core", "High", "Medium", "Low"]:
        assert set(joint_request[prio]) == set(
            inc_cr["experiment"]["all_experiments"][prio]
        )
def test_get_requested_variables_time_subsets(temp_config_file, monkeypatch)
Expand source code
def test_get_requested_variables_time_subsets(temp_config_file, monkeypatch):
    dreqcfg.CONFIG = {}
    monkeypatch.setattr(
        "data_request_api.utilities.config.CONFIG_FILE", temp_config_file
    )
    dc._dreq_res = os.path.dirname(temp_config_file)
    dc.versions = {"tags": [], "branches": []}
    dc._versions_retrieved_last = {"tags": 0, "branches": 0}
    use_dreq_version = "v1.2.2.2"
    data = dc.load(use_dreq_version)
    assert dreqcfg.CONFIG == {
        **dreqcfg.DEFAULT_CONFIG,
        "cache_dir": str(os.path.dirname(temp_config_file)),
    }
    base = dq.create_dreq_tables_for_request(data, use_dreq_version)

    # Select opportunities
    dreq_opps = base["Opportunity"]
    all_opp_ids = [opp.opportunity_id for opp in dreq_opps.records.values()]
    assert len(all_opp_ids) == len(set(all_opp_ids))
    oppid2title = {
        int(opp.opportunity_id): opp.title for opp in dreq_opps.records.values()
    }
    opp_ids = [1, 69, 20]
    use_opps = []
    for opp_id in opp_ids:
        use_opps.append(oppid2title[opp_id])

    # Run get_requested_variables
    no_ts = dq.get_requested_variables(
        base, use_dreq_version, use_opps=use_opps, time_subsets=False, verbose=False
    )
    inc_ts = dq.get_requested_variables(
        base, use_dreq_version, use_opps=use_opps, time_subsets=True, verbose=False
    )

    # Check that time subsets are included and that the same variables are included
    assert list(inc_ts["experiment"].keys()) == list(no_ts["experiment"].keys())
    for exp, req in inc_ts["experiment"].items():
        assert all(req["Core"][i] == ["all"] for i in req["Core"].keys())
        for prio in ["Core", "High", "Medium", "Low"]:
            # This may actually not be true depending on the selected opportunities and dreq version:
            #   one opp could request a variable for time subset hist72 at medium priority and another
            #   one the same variable for hist36 at High priority
            assert list(req[prio].keys()) == no_ts["experiment"][exp][prio]
def test_get_requested_variables_time_subsets_combined_request(temp_config_file, monkeypatch)
Expand source code
def test_get_requested_variables_time_subsets_combined_request(
    temp_config_file, monkeypatch
):
    dreqcfg.CONFIG = {}
    monkeypatch.setattr(
        "data_request_api.utilities.config.CONFIG_FILE", temp_config_file
    )
    dc._dreq_res = os.path.dirname(temp_config_file)
    dc.versions = {"tags": [], "branches": []}
    dc._versions_retrieved_last = {"tags": 0, "branches": 0}
    use_dreq_version = "v1.2.2.2"
    data = dc.load(use_dreq_version)
    assert dreqcfg.CONFIG == {
        **dreqcfg.DEFAULT_CONFIG,
        "cache_dir": str(os.path.dirname(temp_config_file)),
    }
    base = dq.create_dreq_tables_for_request(data, use_dreq_version)

    # Run get_requested_variables
    tscr = dq.get_requested_variables(
        base,
        use_dreq_version,
        use_opps="all",
        time_subsets=True,
        combined_request=True,
        verbose=False,
    )

    # Check that time subsets are included and that the same variables are included
    assert all(
        exp in tscr["experiment"].keys()
        for exp in ["all_experiments", "historical_experiments", "scenario_experiments"]
    )

    # Loop over all experiments
    #  We expect to find "duplicates": variables that are requested in multiple
    #    priority levels with differing time_subsets for one and the same experiment
    #    if we consider all opportunities
    duplicate_found = False
    for e in tscr["experiment"]:
        seen = {}

        for p in tscr["experiment"][e]:
            for v in tscr["experiment"][e][p]:
                try:
                    if v in seen:
                        duplicate_found = True
                        seen[v].append((p, tscr["experiment"][e][p][v]))
                    else:
                        seen[v] = [(p, tscr["experiment"][e][p][v])]
                except TypeError:
                    # This branch handles unhashable values
                    if v in seen:
                        duplicate_found = True
                        seen[v].append(p)
                    else:
                        seen[v] = [p]

    assert duplicate_found, "No 'duplicates' found in time subsets combined request"