Просмотр исходного кода

Add function to read DI for whole country group (currently non AI only)

Johannes Gütschow 1 год назад
Родитель
Сommit
bc0afd2eaf

+ 80 - 89
UNFCCC_GHG_data/UNFCCC_DI_reader/UNFCCC_DI_reader_core.py

@@ -31,6 +31,7 @@ from UNFCCC_GHG_data.helper import custom_country_mapping
 from UNFCCC_GHG_data.helper import get_country_code, get_country_name
 from UNFCCC_GHG_data.helper import extracted_data_path_UNFCCC, root_path, code_path
 from UNFCCC_GHG_data.helper import dataset_path_UNFCCC
+from UNFCCC_GHG_data.helper import convert_categories
 
 
 def read_UNFCCC_DI_for_country(
@@ -780,7 +781,7 @@ def save_DI_country_data(
 def save_DI_dataset(
         data_pm2: xr.Dataset,
         raw: bool=True,
-        non_AnnexI: bool=True,
+        annexI: bool=False,
 ):
     '''
     save primap2 and IF data to dataset folder
@@ -789,10 +790,11 @@ def save_DI_dataset(
 
     # preparations
     data_if = data_pm2.pr.to_interchange_format()
-    if non_AnnexI:
-        country_group = "non-AnnexI"
-    else:
+    if annexI:
         country_group = "AnnexI"
+    else:
+        country_group = "non-AnnexI"
+
 
     ## get timestamp
     scenario_col = data_pm2.attrs['scen']
@@ -811,7 +813,7 @@ def save_DI_dataset(
 
     # get the filename with the hash and check if it exists (separate for pm2 format
     # and IF to fix broken datasets if necessary)
-    filename_hash = determine_dataset_filename(token, raw, non_AnnexI=non_AnnexI,
+    filename_hash = root_path / determine_dataset_filename(token, raw, annexI=annexI,
                                                hash=True)
     # primap2 native format
     filename_hash_nc = filename_hash.parent / (filename_hash.name + '.nc')
@@ -836,8 +838,8 @@ def save_DI_dataset(
         print(f"Data unchanged for {country_group}. Create symlinks.")
 
     # get the filename with the date
-    filename_date = determine_dataset_filename(date_str, raw=raw,
-                                               non_AnnexI=non_AnnexI, hash=False)
+    filename_date = root_path / determine_dataset_filename(date_str, raw=raw,
+                                               annexI=annexI, hash=False)
 
     # create the symlinks to the actual data (with the hash)
     suffixes = ['.nc', '.csv', '.yaml']
@@ -850,25 +852,23 @@ def save_DI_dataset(
 
 
 ## functions for multiple country reading
-def read_UNFCCC_DI_for_all_countries(
-        non_AnnexI: bool=True,
+def read_UNFCCC_DI_for_country_group(
+        annexI: bool=False,
 ) -> xr.Dataset:
     '''
     This function reads DI data for all countries in a group (annexI or non-AnnexI)
     TODO: currently only non-annexI is implemented
     The function reads all data in one go using datalad run. as the output data file
     names are unknown beforehand datalad run uses explicit=false
-    TODO: decide if dataset creation goes in here as well. Makes sense, I think. Then
-    the function can return the xarray dataset
     '''
 
     today = date.today()
     date_str = today.strftime(DI_date_format)
 
-    if non_AnnexI:
-        countries = nAI_countries
-    else:
+    if annexI:
         raise ValueError("Bulk reading for AnnexI countries not implemented yet")
+    else:
+        countries = nAI_countries
 
     # read the data
     data_all = None
@@ -897,10 +897,17 @@ def read_UNFCCC_DI_for_all_countries(
     # TODO: write metadata
 
     # save the data
-    #save_DI_dataset(data_all, raw=True, non_AnnexI=non_AnnexI)
+    save_DI_dataset(data_all, raw=True, annexI=annexI)
 
     return data_all
 
+# TODO: add interface functions and script for read all data
+# add process all sfunctios and scripts
+# merge into main
+# rund reading procedure
+# config for all DI data
+# re-run crf etc
+
 
 ## datalad and pydoit interface functions
 def read_DI_for_country_datalad(
@@ -1004,13 +1011,57 @@ def process_DI_for_country_datalad(
         print(ex.message)
 
 
-def read_DI_for_all_countries_datalad(
-        non_AnnexI: bool=True,
-):
-    '''
-    This function calls datalad run to read all data in one go. as the output data file
-    names are unknown beforehand datalad run uses explicit=false
-    '''
+def read_DI_for_country_group_datalad(
+        annexI: bool=False,
+) -> None:
+    """
+    Wrapper around read_UNFCCC_DI_for_country_group which takes care of selecting input
+    and output files and using datalad run to trigger the data processing
+
+    Parameters
+    __________
+
+    country: str
+        country name or ISO 3-letter country code
+    date_str: str
+        Date of the data to be processed in the format %Y-%m-%d (e.g. 2023-01-30). If
+        no date is given the last data read will be processed.
+    """
+
+    if annexI:
+        country_group = "AnnexI"
+    else:
+        country_group = "non-AnnexI"
+
+    print(f"Attempting to read DI data for {country_group}.")
+    print("#"*80)
+    print("")
+    print(f"Using the UNFCCC_DI_reader")
+    print("")
+    print(f"Run the script using datalad run via the python api")
+    script = code_path / "UNFCCC_DI_reader" / "read_UNFCCC_DI_for_all_countries.py"
+    script = script.relative_to(root_path)
+
+    cmd = f"./venv/bin/python3 {script.as_posix()} "
+    if annexI:
+        cmd = cmd + f" --annexI"
+
+    try:
+        datalad.api.run(
+            cmd=cmd,
+            dataset=root_path,
+            message=f"Read DI data for {country_group}.",
+            inputs=[],
+            outputs=[],
+            dry_run=None,
+            explicit=False,
+        )
+    except IncompleteResultsError as IRE:
+        print(f"IncompleteResultsError occurred when running {cmd}: {IRE}")
+    except Exception as ex:
+        print(f"Exception occurred when running {cmd}")
+        print(ex.message)
+
 
 ## helper functions
 def determine_filename(
@@ -1078,7 +1129,7 @@ def determine_filename(
 def determine_dataset_filename(
         date_or_hash: str,
         raw: bool=False,
-        non_AnnexI: bool=True,
+        annexI: bool=False,
         hash: bool = False,
 ) -> Path:
     """
@@ -1090,8 +1141,8 @@ def determine_dataset_filename(
         formatted date string
     raw: bool
         bool specifying if filename fow raw or processed data should be returned
-    non_AnnexI: bool
-        True if non-AnnexI False if AnnexI
+    annexI: bool, default False
+        True if AnnexI data, False if non-AnnexI data
     hash: str
 
     Returns
@@ -1100,12 +1151,12 @@ def determine_dataset_filename(
     """
 
     # get the country folder
-    if non_AnnexI:
-        current_dataset_path = dataset_path_UNFCCC / "DI_non_AnnexI"
-        filename = f"DI_non_AnnexI_{date_or_hash}"
-    else:
+    if annexI:
         current_dataset_path = dataset_path_UNFCCC / "DI_AnnexI"
         filename = f"DI_AnnexI_{date_or_hash}"
+    else:
+        current_dataset_path = dataset_path_UNFCCC / "DI_non_AnnexI"
+        filename = f"DI_non_AnnexI_{date_or_hash}"
 
     if not current_dataset_path.exists():
         current_dataset_path.mkdir()
@@ -1119,66 +1170,6 @@ def determine_dataset_filename(
     return filename.relative_to(root_path)
 
 
-def convert_categories(
-        ds_input: xr.Dataset,
-        conversion: Dict[str, Dict[str, str]],
-        #terminology_from: str,
-        terminology_to: str,
-        debug: bool=False,
-        tolerance: float=0.01,
-)->xr.Dataset:
-    ds_converted = ds_input.copy(deep=True)
-    ds_converted.attrs = deepcopy(ds_input.attrs)
-
-    # change category terminology
-    cat_dim = ds_converted.attrs["cat"]
-    ds_converted.attrs["cat"] = f"category ({terminology_to})"
-    ds_converted = ds_converted.rename({cat_dim: ds_converted.attrs["cat"]})
-
-    # find categories present in dataset
-    cats_present = list(ds_converted.coords[f'category ({terminology_to})'])
-
-    # restrict categories and map category names
-    if 'mapping' in conversion.keys():
-        mapping_cats_present = [cat for cat in list(conversion['mapping'].keys()) if
-                                cat in cats_present]
-        ds_converted = ds_converted.pr.loc[
-            {'category': mapping_cats_present}]
-
-        from_cats = ds_converted.coords[f'category ({terminology_to})'].values
-        to_cats = pd.Series(from_cats).replace(conversion['mapping'])
-        ds_converted = ds_converted.assign_coords({f'category ({terminology_to})':
-                                                   (f'category ({terminology_to})',
-                                                    to_cats)})
-
-    # redo the list of present cats after mapping, as we have new categories in the
-    # target terminology now
-    cats_present_mapped = list(ds_converted.coords[f'category ({terminology_to})'])
-    # aggregate categories
-    if 'aggregate' in conversion:
-        aggregate_cats = conversion['aggregate']
-        for cat_to_agg in aggregate_cats:
-            if debug:
-                print(f"Category: {cat_to_agg}")
-            source_cats = [cat for cat in aggregate_cats[cat_to_agg]['sources'] if
-                           cat in cats_present_mapped]
-            data_agg = ds_converted.pr.loc[{'category': source_cats}].pr.sum(
-                dim='category', skipna=True, min_count=1)
-            nan_vars = [var for var in data_agg.data_vars if
-                        data_agg[var].isnull().all().data == True]
-            data_agg = data_agg.drop(nan_vars)
-            if len(data_agg.data_vars) > 0:
-                data_agg = data_agg.expand_dims([f'category ({terminology_to})'])
-                data_agg = data_agg.assign_coords(
-                    coords={f'category ({terminology_to})':
-                                (f'category ({terminology_to})', [cat_to_agg])})
-                ds_converted = ds_converted.pr.merge(data_agg, tolerance=tolerance)
-            else:
-                print(f"no data to aggregate category {cat_to_agg}")
-
-    return ds_converted
-
-
 def get_input_and_output_files_for_country_DI(
         country: str,
         date_str: str,

+ 3 - 3
UNFCCC_GHG_data/UNFCCC_DI_reader/__init__.py

@@ -6,8 +6,7 @@ from .UNFCCC_DI_reader_core import \
     process_UNFCCC_DI_for_country, process_and_save_UNFCCC_DI_for_country, \
     process_DI_for_country_datalad, \
     convert_DI_data_to_pm2_if, convert_DI_IF_data_to_pm2, determine_filename, \
-    read_UNFCCC_DI_for_all_countries
-
+    read_UNFCCC_DI_for_country_group, read_DI_for_country_group_datalad
 
 
 __all__ = [
@@ -19,5 +18,6 @@ __all__ = [
     "convert_DI_data_to_pm2_if",
     "convert_DI_IF_data_to_pm2",
     "determine_filename",
-    "read_UNFCCC_DI_for_all_countries",
+    "read_UNFCCC_DI_for_country_group",
+    "read_DI_for_country_group_datalad",
 ]

+ 19 - 0
UNFCCC_GHG_data/UNFCCC_DI_reader/read_UNFCCC_DI_for_country_group.py

@@ -0,0 +1,19 @@
+"""
+This script is a wrapper around the read_UNFCCC_DI_for_country_group
+function such that it can be called from datalad
+"""
+
+import argparse
+from UNFCCC_GHG_data.UNFCCC_DI_reader import \
+    read_UNFCCC_DI_for_country_group
+
+
+parser = argparse.ArgumentParser()
+parser.add_argument('--annexI', help='read for AnnexI countries (default is for '
+                                     'non-AnnexI)', action='store_true')
+args = parser.parse_args()
+annexI = args.annexI
+
+read_UNFCCC_DI_for_country_group(
+    annexI=annexI,
+)

+ 19 - 0
UNFCCC_GHG_data/UNFCCC_DI_reader/read_UNFCCC_DI_for_country_group_datalad.py

@@ -0,0 +1,19 @@
+"""
+wrapper around read_crf_for_country_datalad such that it can be called
+from doit in the current setup where doit runs on system python and
+not in the venv.
+"""
+
+from UNFCCC_GHG_data.UNFCCC_DI_reader import \
+    read_DI_for_country_group_datalad
+import argparse
+
+parser = argparse.ArgumentParser()
+parser.add_argument('--annexI', help='read for AnnexI countries (default is for '
+                                     'non-AnnexI)', action='store_true')
+args = parser.parse_args()
+annexI = args.annexI
+
+read_DI_for_country_group_datalad(
+    annexI=annexI,
+)

+ 2 - 1
UNFCCC_GHG_data/helper/__init__.py

@@ -4,7 +4,7 @@ from .definitions import legacy_data_path
 from .definitions import downloaded_data_path, downloaded_data_path_UNFCCC
 from .definitions import dataset_path, dataset_path_UNFCCC
 from .definitions import custom_country_mapping, custom_folders
-from .functions import get_country_code, get_country_name
+from .functions import get_country_code, get_country_name, convert_categories
 
 __all__ = [
     "root_path",
@@ -21,4 +21,5 @@ __all__ = [
     "custom_folders",
     "get_country_code",
     "get_country_name",
+    "convert_categories",
 ]

+ 65 - 0
UNFCCC_GHG_data/helper/functions.py

@@ -1,5 +1,7 @@
 import pycountry
 import json
+import xarray as xr
+from copy import deepcopy
 from typing import Dict, List
 from pathlib import Path
 from .definitions import custom_country_mapping, custom_folders
@@ -7,6 +9,69 @@ from .definitions import root_path, downloaded_data_path, extracted_data_path
 from .definitions import legacy_data_path, code_path
 
 
+def convert_categories(
+        ds_input: xr.Dataset,
+        conversion: Dict[str, Dict[str, str]],
+        #terminology_from: str,
+        terminology_to: str,
+        debug: bool=False,
+        tolerance: float=0.01,
+)->xr.Dataset:
+    """
+    convert data from one category terminology to another
+    """
+    ds_converted = ds_input.copy(deep=True)
+    ds_converted.attrs = deepcopy(ds_input.attrs)
+
+    # change category terminology
+    cat_dim = ds_converted.attrs["cat"]
+    ds_converted.attrs["cat"] = f"category ({terminology_to})"
+    ds_converted = ds_converted.rename({cat_dim: ds_converted.attrs["cat"]})
+
+    # find categories present in dataset
+    cats_present = list(ds_converted.coords[f'category ({terminology_to})'])
+
+    # restrict categories and map category names
+    if 'mapping' in conversion.keys():
+        mapping_cats_present = [cat for cat in list(conversion['mapping'].keys()) if
+                                cat in cats_present]
+        ds_converted = ds_converted.pr.loc[
+            {'category': mapping_cats_present}]
+
+        from_cats = ds_converted.coords[f'category ({terminology_to})'].values
+        to_cats = pd.Series(from_cats).replace(conversion['mapping'])
+        ds_converted = ds_converted.assign_coords({f'category ({terminology_to})':
+                                                   (f'category ({terminology_to})',
+                                                    to_cats)})
+
+    # redo the list of present cats after mapping, as we have new categories in the
+    # target terminology now
+    cats_present_mapped = list(ds_converted.coords[f'category ({terminology_to})'])
+    # aggregate categories
+    if 'aggregate' in conversion:
+        aggregate_cats = conversion['aggregate']
+        for cat_to_agg in aggregate_cats:
+            if debug:
+                print(f"Category: {cat_to_agg}")
+            source_cats = [cat for cat in aggregate_cats[cat_to_agg]['sources'] if
+                           cat in cats_present_mapped]
+            data_agg = ds_converted.pr.loc[{'category': source_cats}].pr.sum(
+                dim='category', skipna=True, min_count=1)
+            nan_vars = [var for var in data_agg.data_vars if
+                        data_agg[var].isnull().all().data == True]
+            data_agg = data_agg.drop(nan_vars)
+            if len(data_agg.data_vars) > 0:
+                data_agg = data_agg.expand_dims([f'category ({terminology_to})'])
+                data_agg = data_agg.assign_coords(
+                    coords={f'category ({terminology_to})':
+                                (f'category ({terminology_to})', [cat_to_agg])})
+                ds_converted = ds_converted.pr.merge(data_agg, tolerance=tolerance)
+            else:
+                print(f"no data to aggregate category {cat_to_agg}")
+
+    return ds_converted
+
+
 def get_country_name(
         country_code: str,
 ) -> str:

+ 18 - 0
dodo.py

@@ -250,6 +250,7 @@ def task_read_new_unfccc_crf_for_year():
 read_config_di = {
     "country": get_var('country', None),
     "date": get_var('date', None),
+    "annexI": get_var('annexI', False),
     #"countries": get_var('countries', None),
 }
 
@@ -285,6 +286,23 @@ def task_process_unfccc_di_for_country():
         'setup': ['setup_venv'],
     }
 
+def task_read_unfccc_di_for_country_group():
+    """ Read DI data for a country """
+    actions = [
+        f"./venv/bin/python "
+        f"UNFCCC_GHG_data/UNFCCC_DI_reader/read_UNFCCC_DI_for_country_group_datalad.py",
+        f"./venv/bin/python UNFCCC_GHG_data/helper/folder_mapping.py "
+        f"--folder=extracted_data/UNFCCC"
+        ]
+    if read_config_di["annexI"] == "True":
+        actions[0] = actions[0] + " --annexI"
+
+    return {
+        'actions': actions,
+        'task_dep': ['set_env'],
+        'verbosity': 2,
+        'setup': ['setup_venv'],
+    }
 
 
 # general tasks