Browse Source

wrapper function and script for DI data reading

Johannes Gütschow 2 years ago
parent
commit
13135bbc8a

+ 106 - 31
code/UNFCCC_DI_reader/UNFCCC_DI_reader_core.py

@@ -5,19 +5,66 @@ import pycountry
 import itertools
 import json
 import copy
+import xarray as xr
 from datetime import date
 from typing import Optional, Dict
 from pathlib import Path
 
-from .UNFCCC_DI_reader_config import di_to_pm2if_template_nai
-from .UNFCCC_DI_reader_config import di_to_pm2if_template_ai
-from .UNFCCC_DI_reader_config import di_query_filters
-from util import NoDIDataError, extracted_data_path
+from UNFCCC_DI_reader_config import di_to_pm2if_template_nai
+from UNFCCC_DI_reader_config import di_to_pm2if_template_ai
+from UNFCCC_DI_reader_config import di_query_filters
+from util import NoDIDataError, extracted_data_path, get_country_name
 
 
+def read_UNFCCC_DI_for_party(
+        party_code: str,
+        category_groups: Optional[Dict]=None,
+        read_subsectors: bool=False,
+        date_str: Optional[str]=None,
+        pm2if_specifications: Optional[dict]=None,
+        default_gwp: Optional[str]=None,
+        debug: Optional[bool]=False,
+):
+    """
+    # TODO
+    """
+
+    # read the data
+    data_df = read_UNFCCC_DI_for_party_df(
+        party_code=party_code,
+        category_groups=category_groups,
+        read_subsectors=read_subsectors,
+        debug=debug,
+    )
+
+    # set date_str if not given
+    if date_str is None:
+        date_str = str(date.today())
+
+    # determine filename
+    filename = determine_filename(party_code, date_str)
+
+    # convert it to pm2 interchange format and save
+    data_if = convert_DI_data_to_pm2_if(
+        data=data_df,
+        pm2if_specifications=pm2if_specifications,
+        filename=filename,
+        default_gwp=default_gwp,
+        date_str=date_str,
+        debug=debug,
+    )
+
+    # convert to native pm2 format and save that
+    data_pm2 = convert_DI_IF_data_to_pm2(
+        data_di_if=data_if,
+        filename=filename,
+    )
+
+    return data_pm2
+
 
 def read_UNFCCC_DI_for_party_df(
-        party: str,
+        party_code: str,
         category_groups: Optional[Dict]=None,
         read_subsectors: bool=False,
         debug: Optional[bool]=False,
@@ -55,26 +102,26 @@ def read_UNFCCC_DI_for_party_df(
 
     # template for the query to the DI API
     query_template = {
-        "party_codes": [party],
+        "party_codes": [party_code],
         "normalize_gas_names": True
     }
 
-
     # find country group
-    if party in list(reader.non_annex_one_reader.parties["code"]):
+    if party_code in list(reader.non_annex_one_reader.parties["code"]):
         ai_country = False
-    elif party in list(reader.annex_one_reader.parties["code"]):
+    elif party_code in list(reader.annex_one_reader.parties["code"]):
         ai_country = True
         #di_data = reader.annex_one_reader.query(**query)
     else:
-        raise ValueError(f"Party code {party} found neither in AnnexI nor non-AnnexI "
+        raise ValueError(f"Party code {party_code} found neither in AnnexI nor "
+                         f"non-AnnexI "
                          f"party lists.")
 
     if category_groups is None:
         # no category defs given, so use default which is all categories,
         # all gases, but no other data
         if debug:
-            print(f"Using default config to read for party {party}")
+            print(f"Using default config to read for party {party_code}")
         if ai_country:
             all_gases = reader.annex_one_reader.gases["name"]
             query = query_template
@@ -168,11 +215,12 @@ def read_UNFCCC_DI_for_party_df(
 
     # if data has been collected print some information and save the data
     if di_data is None:
-        raise ValueError(f"No data collected for party {party} and category groups "
+        raise ValueError(f"No data collected for party {party_code} and category "
+                         f"groups "
                          f"{category_groups}")
     elif debug:
         # print some information on collected data
-        print(f"Collected data for party {party}")
+        print(f"Collected data for party {party_code}")
         print("### Categories ###")
         categories = di_data["category"].unique()
         categories.sort()
@@ -192,8 +240,9 @@ def read_UNFCCC_DI_for_party_df(
 def convert_DI_data_to_pm2_if(
         data: pd.DataFrame,
         pm2if_specifications: Optional[dict]=None,
-        filename: str = "",
+        filename: Optional[Path]=None,
         default_gwp: Optional[str]=None,
+        date_str: Optional[str]=None,
         debug: bool = False,
 ) -> pd.DataFrame:
     """
@@ -259,8 +308,9 @@ def convert_DI_data_to_pm2_if(
     # modify specifications
     #pm2if_specifications["filter_remove"].update(filter_activity_factors)
 
-    # set the scenario to today's date
-    date_str = str(date.today())
+    # set the scenario to today's date if not given explicitly
+    if date_str is None:
+        date_str = str(date.today())
     pm2if_specifications["coords_defaults"]["scenario"] = f"DI{date_str}"
 
     # set metadata
@@ -303,13 +353,12 @@ def convert_DI_data_to_pm2_if(
         data_temp.loc[row_idx_co2eq, "gas"] = data_temp.loc[row_idx_co2eq, "gas"] + \
                                               " (SARGWP100)"
 
-
     # combine numeric and string values
     nan_idx = data_temp["numberValue"].isna()
     data_temp.loc[nan_idx, "numberValue"] = data_temp.loc[nan_idx, "stringValue"]
     data_temp = data_temp.drop(columns=["stringValue"])
 
-    # Currently in primap2 data reading a column can only be used once.
+    # Currently in primap2 a data reading a column can only be used once.
     # We want to use the category column both for the primap2 "category"
     # column (which contains the code only) and an additional column which stores
     # the full name as available from the DI API. As a workaround we create a
@@ -328,14 +377,17 @@ def convert_DI_data_to_pm2_if(
         **pm2if_specifications,
     )
 
-    if filename != "":
-        print(f"Save data to {filename + '.csv/.yaml'}")
+    if filename is not None:
+        print(f"Save data to {filename.name + '.csv/.yaml'}")
         pm2.pm2io.write_interchange_format(filename, data_pm2if)
 
     return data_pm2if
 
 
-def convert_DI_IF_data_to_pm2(data_di_if: pd.DataFrame)-> xr.Dataset:
+def convert_DI_IF_data_to_pm2(
+        data_di_if: pd.DataFrame,
+        filename: Optional[Path]=None,
+)-> xr.Dataset:
     if_index_cols = set(itertools.chain(*data_di_if.attrs["dimensions"].values()))
     time_cols = set(data_di_if.columns.values) - if_index_cols
     data_di_if.dropna(subset=time_cols, inplace=True)
@@ -348,17 +400,31 @@ def convert_DI_IF_data_to_pm2(data_di_if: pd.DataFrame)-> xr.Dataset:
     #except ValueError: # better more specific error in primap2
     #    print()
 
+    if filename is not None:
+        compression = dict(zlib=True, complevel=9)
+
+        if not filename.parent.exists():
+            filename.parent.mkdir()
+
+         # write data in native PRIMAP2 format
+        encoding = {var: compression for var in data_pm2.data_vars}
+        data_pm2.pr.to_netcdf(filename.parent / (filename.name + ".nc"),
+                            encoding=encoding)
+
     return data_pm2
 
 
-def determine_filename(country_code, date_str)->Path:
+def determine_filename(
+        party_code: str,
+        date_str: str
+)->Path:
     """
-    Determine the filename for a dataset from given country code and data string.
+    Determine the filename for a dataset from given country code and date string.
 
 
     Parameters
     ----------
-    country_code: str
+    party_code: str
         ISO 3 letter code of the country
     date_str:
         formatted date string
@@ -373,20 +439,29 @@ def determine_filename(country_code, date_str)->Path:
     with open(extracted_data_path / "folder_mapping.json", "r") as mapping_file:
         folder_mapping = json.load(mapping_file)
 
-    if country_code in folder_mapping:
+    if party_code in folder_mapping:
         file_filter = {}
-        file_filter["party"] = country_code
-        country_folders = folder_mapping[country_code]
+        file_filter["party"] = party_code
+        country_folders = folder_mapping[party_code]
         if isinstance(country_folders, str):
             # only one folder
-            filename = Path(country_folders) / f"{country_code}_DI_{date_str}"
+            filename = Path(country_folders) / f"{party_code}_DI_{date_str}"
 
         else:
             raise ValueError("More than one output folder for country "
-                             f"{country_code}. This should not happen.")
+                             f"{party_code}. This should not happen.")
     else:
-        raise ValueError(f"No output data folder found for country {country_code}. "
-                         f"Check if folder mapping is up to date.")
+        # folder not in mapping. It will be created if not present yet
+        party_name = get_country_name(party_code)
+        country_folder = extracted_data_path / party_name.replace(" ", "_")
+        if country_folder.exists():
+           print(f"Output folder {party_name.replace(' ', '_')} for country "
+                 f"{party_code} exists but is not in folder mapping. Update "
+                 "folder mapping")
+        else:
+            country_folder.mkdir()
+
+        filename = Path(country_folder) / f"{party_code}_DI_{date_str}"
 
     return filename
 

+ 118 - 71
code/UNFCCC_DI_reader/read_UNFCCC_DI_for_country.py

@@ -3,9 +3,18 @@ This script is a wrapper around the read_crf_for_country
 function such that it can be called from datalad
 """
 
-from UNFCCC_DI_reader import read_UNFCCC_DI_for_party
-from UNFCCC_DI_reader import determine_filename
 import argparse
+import sys
+from datetime import date
+from util import code_path
+#from UNFCCC_CRF_reader import custom_country_mapping
+sys.path.append(code_path.name)
+from UNFCCC_DI_reader_core import read_UNFCCC_DI_for_party
+from UNFCCC_DI_reader_core import determine_filename
+from util import custom_country_mapping
+from util import get_country_name
+from util import get_country_code
+
 #from pathlib import Path
 
 suffixes = ["nc", "yaml", "csv"]
@@ -16,90 +25,128 @@ args = parser.parse_args()
 
 country = args.country
 
-#TODO: get country name and code
+#country_info = {}
+if country in custom_country_mapping:
+    country_code = country
+else:
+    country_code = get_country_code(country)
+# now get the country name
+country_name = get_country_name(country_code)
+#country_info["code"] = country_code
+#country_info["name"] = country_name
+
+# get current date to pass on to other functions in case reading is done over night
+# and the date changes
+date_str = str(date.today())
+
 
 # TODO: this function: get output files and run datalad.
 # problem: the data. should be the same in file name and scenario but is determined
 # at two different places and might differ when running over night.  so it should be
 # a parameter determined in this function and passed on to the datalad function / script
 
-
 print(f"Attempting to read DI data for {country}.")
 print("#"*80)
 print("")
+print(f"Using the UNFCCC_CRF_reader")
+print("")
 
 # determine output files
-filename_base = determine_filename(country)
+filename_base = determine_filename(country_code, date_str)
 
 # we have no input files as data is read from DI API
 
-
-
-
-read_crf_for_country(
-    country,
-    submission_year=submission_year,
-    submission_date=submission_date,
-    re_read=re_read
+read_UNFCCC_DI_for_party(
+    party_code=country_code,
+    category_groups=None, # read all categories
+    read_subsectors=False, # not applicable as we read all categories
+    date_str=date_str,
+    pm2if_specifications=None, # automatically use the right specs for AI and NAI
+    default_gwp=None, # automatically uses right default GWP for AI and NAI
+    debug=False,
 )
 
-#######################
-
 
 
-
-country = args.country
-submission = args.submission
-
-codepath = Path(__file__).parent
-rootpath = codepath / ".." / ".."
-rootpath = rootpath.resolve()
-
-if script_name is not None:
-    print(f"Found code file {script_name}")
-    print("")
-
-    # get possible input files
-    input_files = get_possible_inputs(country, submission)
-    if not input_files:
-        print(f"No possible input files found for {country}, {submission}. "
-              f"Something might be wrong here.")
-    else:
-        print(f"Found the following input_files:")
-        for file in input_files:
-            print(file)
-        print("")
-    # make input files absolute to avoid datalad confusions when
-    # root directory is via symlink
-    input_files = [rootpath / file for file in input_files]
-    # convert file's path to str
-    input_files = [file.as_posix() for file in input_files]
-
-    # get possible output files
-    output_files = get_possible_outputs(country, submission)
-    if not output_files:
-        print(f"No possible output files found for {country}, {submission}. "
-              f"This is either the first run or something is wrong.")
-    else:
-        print(f"Found the following output_files:")
-        for file in output_files:
-            print(file)
-        print("")
-    # convert file path's to str
-    output_files = [file.as_posix() for file in output_files]
-
-    print(f"Run the script using datalad run via the python api")
-    datalad.api.run(
-        cmd=f"./venv/bin/python3 {script_name.as_posix()}",
-        dataset=rootpath,
-        message=f"Read data for {country}, {submission}.",
-        inputs=input_files,
-        outputs=output_files,
-        dry_run=None,
-        explicit=True,
-    )
-else:
-    # no code found.
-    print(f"No code found to read {submission} from {country}")
-    print(f"Use 'doit country_info --country={country} to get "
-          f"a list of available submissions and datasets.")
+#    print(f"Run the script using datalad run via the python api")
+#    script = code_path / "UNFCCC_CRF_reader" / "read_UNFCCC_CRF_submission.py"
+#
+#     cmd = f"./venv/bin/python3 {script.as_posix()} --country={country} "\
+#           f"--submission_year={submission_year} --submission_date={submission_date}"
+#     if re_read:
+#         cmd = cmd + f" --re_read"
+#     datalad.api.run(
+#         cmd=cmd,
+#         dataset=root_path,
+#         message=f"Read data for {country}, CRF{submission_year}, {submission_date}.",
+#         inputs=country_info["input"],
+#         outputs=country_info["output"],
+#         dry_run=None,
+#         explicit=True,
+#     )
+
+
+
+
+################################3
+
+
+
+
+
+
+# country = args.country
+# submission = args.submission
+#
+# codepath = Path(__file__).parent
+# rootpath = codepath / ".." / ".."
+# rootpath = rootpath.resolve()
+#
+# if script_name is not None:
+#     print(f"Found code file {script_name}")
+#     print("")
+#
+#     # get possible input files
+#     input_files = get_possible_inputs(country, submission)
+#     if not input_files:
+#         print(f"No possible input files found for {country}, {submission}. "
+#               f"Something might be wrong here.")
+#     else:
+#         print(f"Found the following input_files:")
+#         for file in input_files:
+#             print(file)
+#         print("")
+#     # make input files absolute to avoid datalad confusions when
+#     # root directory is via symlink
+#     input_files = [rootpath / file for file in input_files]
+#     # convert file's path to str
+#     input_files = [file.as_posix() for file in input_files]
+#
+#     # get possible output files
+#     output_files = get_possible_outputs(country, submission)
+#     if not output_files:
+#         print(f"No possible output files found for {country}, {submission}. "
+#               f"This is either the first run or something is wrong.")
+#     else:
+#         print(f"Found the following output_files:")
+#         for file in output_files:
+#             print(file)
+#         print("")
+#     # convert file path's to str
+#     output_files = [file.as_posix() for file in output_files]
+#
+#     print(f"Run the script using datalad run via the python api")
+#     datalad.api.run(
+#         cmd=f"./venv/bin/python3 {script_name.as_posix()}",
+#         dataset=rootpath,
+#         message=f"Read data for {country}, {submission}.",
+#         inputs=input_files,
+#         outputs=output_files,
+#         dry_run=None,
+#         explicit=True,
+#     )
+# else:
+#     # no code found.
+#     print(f"No code found to read {submission} from {country}")
+#     print(f"Use 'doit country_info --country={country} to get "
+#           f"a list of available submissions and datasets.")

+ 71 - 1
code/UNFCCC_DI_reader/util.py

@@ -1,4 +1,6 @@
 from pathlib import Path
+# imports for copied functions
+import pycountry
 
 root_path = Path(__file__).parents[2].absolute()
 root_path = root_path.resolve()
@@ -7,5 +9,73 @@ code_path = root_path / "code"
 downloaded_data_path = root_path / "downloaded_data" / "UNFCCC"
 extracted_data_path = root_path / "extracted_data" / "UNFCCC"
 
+
+
 class NoDIDataError(Exception):
-    pass
+    pass
+
+
+# the following is copied from other cub-packages
+# TODO: move these fucntions to common location to allow easy importing into all modules
+custom_country_mapping = {
+    "EUA": "European Union",
+    "EUC": "European Union",
+    "FRK": "France",
+    "DKE": "Denmark",
+    "DNM": "Denmark",
+    "GBK": "United Kingdom of Great Britain and Northern Ireland",
+}
+
+
+def get_country_name(
+        country_code: str,
+) -> str:
+    """get country name from code """
+    if country_code in custom_country_mapping:
+        country_name = custom_country_mapping[country_code]
+    else:
+        try:
+            country = pycountry.countries.get(alpha_3=country_code)
+            country_name = country.name
+        except:
+            raise ValueError(f"Country code {country_code} can not be mapped to "
+                             f"any country")
+
+    return country_name
+
+
+def get_country_code(
+        country_name: str,
+)->str:
+    """
+    obtain country code. If the input is a code it will be returned, if the input
+    is not a three letter code a search will be performed
+
+    Parameters
+    __________
+    country_name: str
+        Country code or name to get the three-letter code for.
+
+    """
+    try:
+        # check if it's a 3 letter code
+        country = pycountry.countries.get(alpha_3=country_name)
+        country_code = country.alpha_3
+    except:
+        try:
+            country = pycountry.countries.search_fuzzy(country_name)
+        except:
+            raise ValueError(f"Country name {country_name} can not be mapped to "
+                             f"any country code")
+        if len(country) > 1:
+            country_code = None
+            for current_country in country:
+                if current_country.name == country_name:
+                    country_code = current_country.alpha_3
+            if country_code is None:
+                raise ValueError(f"Country name {country_name} has {len(country)} "
+                                 f"possible results for country codes.")
+
+        country_code = country[0].alpha_3
+
+    return country_code