Browse Source

DI bulk processing

Johannes Gütschow 1 year ago
parent
commit
263c9a015e

+ 59 - 1
UNFCCC_GHG_data/UNFCCC_DI_reader/UNFCCC_DI_reader_datalad.py

@@ -1,5 +1,5 @@
 from datetime import date
-from typing import Union
+from typing import Union, Optional
 import datalad.api
 from datalad.support.exceptions import IncompleteResultsError
 from UNFCCC_GHG_data.helper import root_path, code_path
@@ -156,6 +156,64 @@ def read_DI_for_country_group_datalad(
         )
     except IncompleteResultsError as IRE:
         print(f"IncompleteResultsError occurred when running {cmd}: {IRE}")
+    except Exception as ex:
+        print(f"Exception occurred when running {cmd}")
+        print(ex.message)
+
+
+def process_DI_for_country_group_datalad(
+        annexI: bool=False,
+        date_str: Optional[str]=None,
+) -> None:
+    """
+    Wrapper around read_UNFCCC_DI_for_country_group which takes care of selecting input
+    and output files and using datalad run to trigger the data processing
+
+    Parameters
+    __________
+
+    annexI: bool (default False)
+        If True process all annexI countries (not implemented yet), else all non-AnnexI
+        countries.
+    date_str: str (default None)
+        Date of the data to be processed in the format %Y-%m-%d (e.g. 2023-01-30). If
+        no date is given the last data read will be processed.
+    """
+
+    if annexI:
+        country_group = "AnnexI"
+    else:
+        country_group = "non-AnnexI"
+
+    print(f"Attempting to process DI data for {country_group}.")
+    print("#"*80)
+    print("")
+    print(f"Using the UNFCCC_DI_reader")
+    print("")
+    print(f"Run the script using datalad run via the python api")
+    script = code_path / "UNFCCC_DI_reader" / "process_UNFCCC_DI_for_country_group.py"
+    script = script.relative_to(root_path)
+
+    cmd = f"./venv/bin/python3 {script.as_posix()} "
+    if annexI:
+        cmd = cmd + f" --annexI"
+    if date_str is not None:
+        cmd = cmd + f" --date_str={date_str}"
+    else:
+        date_str = "latest"
+
+    try:
+        datalad.api.run(
+            cmd=cmd,
+            dataset=root_path,
+            message=f"Process DI data for {country_group} for date {date_str}",
+            inputs=[],
+            outputs=[],
+            dry_run=None,
+            explicit=False,
+        )
+    except IncompleteResultsError as IRE:
+        print(f"IncompleteResultsError occurred when running {cmd}: {IRE}")
     except Exception as ex:
         print(f"Exception occurred when running {cmd}")
         print(ex.message)

+ 8 - 5
UNFCCC_GHG_data/UNFCCC_DI_reader/UNFCCC_DI_reader_helper.py

@@ -249,9 +249,9 @@ def find_latest_DI_data(
     '''
 
     if raw:
-        regex = regex_date + r"_raw\.nc"
+        regex = f"{country_code}_DI_{regex_date}" + r"_raw\.nc"
     else:
-        regex = regex_date + r"\.nc"
+        regex = f"{country_code}_DI_{regex_date}" + r"\.nc"
 
     # get the country folder
     with open(extracted_data_path_UNFCCC / "folder_mapping.json", "r") as mapping_file:
@@ -272,6 +272,9 @@ def find_latest_DI_data(
         # remove files with hash
         files_list = [file.name for file in files_path_list
                       if not re.search(r'_hash\.nc', file.name)]
+        # remove files that don't begin with country_code_DI
+        files_list = [file for file in files_list
+                      if re.search(f'^{country_code}_DI_', file)]
         # filter according to raw flag
         if raw:
             files_list = [file for file in files_list if
@@ -283,9 +286,9 @@ def find_latest_DI_data(
         if len(files_list) > 0:
             date_list = [re.findall(regex, file)[0] for file in files_list]
             latest_date = find_latest_date(date_list, '%Y-%m-%d')
-            latest_file = [file for file in files_path_list if re.search(latest_date,
-                                                                         file.name)][0]
-            return latest_file
+            latest_file = [file for file in files_list if re.search(latest_date,
+                                                                         file)][0]
+            return country_folder / latest_file
         else:
             return None
 

+ 51 - 27
UNFCCC_GHG_data/UNFCCC_DI_reader/UNFCCC_DI_reader_proc.py

@@ -34,16 +34,20 @@ def process_and_save_UNFCCC_DI_for_country(
     if date_str is None:
         # get the latest date
         raw_data_file = find_latest_DI_data(country_code, raw=True)
+        if raw_data_file is None:
+            raise ValueError(f"No raw data available for {country_code}.")
     else:
         raw_data_file = determine_filename(country_code, date_str, raw=True,
                                            hash=False)
 
         raw_data_file = raw_data_file.parent / (raw_data_file.name + '.nc')
-        print(f"process {raw_data_file.name}")
+
         if not raw_data_file.exists():
             raise ValueError(f"File {raw_data_file.name} does not exist. Check if it "
                              "has been read.")
 
+    print(f"process {raw_data_file.name}")
+
     # load the data
     data_to_process = pm2.open_dataset(raw_data_file)
 
@@ -55,7 +59,10 @@ def process_and_save_UNFCCC_DI_for_country(
             f"can be processed by this function. countries: {countries}")
     else:
         country_code = countries[0]
-    processing_info_country = di_processing_info[country_code]
+    if country_code in di_processing_info.keys():
+        processing_info_country = di_processing_info[country_code]
+    else:
+        processing_info_country = None
     entities_to_ignore = []  # TODO: check and make default list
 
     # process
@@ -344,57 +351,74 @@ def process_UNFCCC_DI_for_country(
 
 def process_UNFCCC_DI_for_country_group(
         annexI: bool = False,
+        date_str: Optional[str] = None,
 ) -> xr.Dataset:
     """
     This function processes DI data for all countries in a group (annexI or non-AnnexI)
-    
-    The function processes all data in one go using datalad run. as the output data file
-    names are unknown beforehand datalad run uses explicit=false
 
-    TODO: use the latest
+    Parameters
+    __________
 
+    annexI: bool (default False)
+        If True process all annexI countries (not implemented yet), else all non-AnnexI
+        countries.
+    date_str: str (default None)
+        Date of the data to be processed in the format %Y-%m-%d (e.g. 2023-01-30). If
+        no date is given the last data read will be processed.
 
     """
-
     today = date.today()
-    date_str = today.strftime(DI_date_format)
+    date_str_today = today.strftime(DI_date_format)
 
     if annexI:
-        raise ValueError("Bulk reading for AnnexI countries not implemented yet")
+        raise ValueError("Bulk processing for AnnexI countries not implemented yet")
+        countries = AI_countries
+        #data_all_if = None
+        country_group = "AnnexI"
     else:
         countries = nAI_countries
+        data_all = None
+        country_group = "non-AnnexI"
 
     # read the data
-    data_all = None
-    for country in countries[0:5]:
-        print(f"reading DI data for country {country}")
+    exception_countries = []
+    for country in countries:
+        print(f"processing DI data for country {country}")
 
         try:
-            data_country = read_UNFCCC_DI_for_country(
+            data_country = process_and_save_UNFCCC_DI_for_country(
                 country_code=country,
-                category_groups=None,  # read all categories
-                read_subsectors=False,  # not applicable as we read all categories
                 date_str=date_str,
-                pm2if_specifications=None,
-                # automatically use the right specs for AI and NAI
-                default_gwp=None,  # automatically uses right default GWP for AI and NAI
-                debug=False)
+            )
+
+            # change the scenario to today's date
+            data_country = data_country.assign_coords({"scenario (Access_Date)": [
+                f"DI{date_str_today}"]})
+            scen_dim = data_country.attrs["scen"]
+            data_country.attrs["scen"] = f"scenario (Process_Date)"
+            data_country = data_country.rename({scen_dim: data_country.attrs["scen"]})
 
             if data_all is None:
                 data_all = data_country
             else:
                 data_all = data_all.pr.merge(data_country)
-        except unfccc_di_api.NoDataError as err:
-            print(f"No data for {country}.")
+        except Exception as err:
+            exception_countries.append(country)
+            print(f"Error occurred when processing data for {country}.")
             print(err)
 
-    # TODO: write metadata
+    # update metadata
+    countries_present = list(data_all.coords[data_all.attrs['area']].values)
+    data_all.attrs["title"] = f"Data submitted by the following {country_group} " \
+                              f"countries and available in the DI interface, " \
+                              f"converted to IPCC2006 categories and downscaled " \
+                              f"where applicable. For download date see scenario. " \
+                              f"Countries: {', '.join(countries_present)}"
 
-    # save the data
-    save_DI_dataset(data_all, raw=True, annexI=annexI)
 
+    # save the data
+    save_DI_dataset(data_all, raw=False, annexI=annexI)
+    print(data_all.coords["scenario (Process_Date)"].values)
+    print(f"Errors occured for countries: {exception_countries}")
     return data_all
 
-# TODO:
-# add process all sfunctios and scripts
-# config for all DI data

+ 3 - 1
UNFCCC_GHG_data/UNFCCC_DI_reader/__init__.py

@@ -9,7 +9,8 @@ from .UNFCCC_DI_reader_proc import process_UNFCCC_DI_for_country, \
     process_and_save_UNFCCC_DI_for_country, process_UNFCCC_DI_for_country_group
 
 from .UNFCCC_DI_reader_datalad import read_DI_for_country_datalad, \
-read_DI_for_country_group_datalad, process_DI_for_country_datalad
+read_DI_for_country_group_datalad, process_DI_for_country_datalad, \
+    process_DI_for_country_group_datalad
 
 from .UNFCCC_DI_reader_helper import determine_filename
 
@@ -21,6 +22,7 @@ __all__ = [
     "process_UNFCCC_DI_for_country",
     "process_and_save_UNFCCC_DI_for_country",
     "process_UNFCCC_DI_for_country_group",
+    "process_DI_for_country_group_datalad",
     "read_DI_for_country_datalad",
     "process_DI_for_country_datalad",
     "read_DI_for_country_group_datalad",

+ 1 - 1
UNFCCC_GHG_data/UNFCCC_DI_reader/process_UNFCCC_DI_for_country.py

@@ -11,7 +11,7 @@ from UNFCCC_GHG_data.UNFCCC_DI_reader import \
 parser = argparse.ArgumentParser()
 parser.add_argument('--country', help='Country code')
 parser.add_argument('--date', help='String with date to read and process. If not '
-                                   'given latest data will be used')
+                                   'given latest data will be used', default=None)
 args = parser.parse_args()
 
 country_code = args.country

+ 1 - 1
UNFCCC_GHG_data/UNFCCC_DI_reader/process_UNFCCC_DI_for_country_datalad.py

@@ -11,7 +11,7 @@ import argparse
 parser = argparse.ArgumentParser()
 parser.add_argument('--country', help='Country name or code')
 parser.add_argument('--date', help='String with date to read and process. If not '
-                                   'given latest data will be used')
+                                   'given latest data will be used', default=None)
 args = parser.parse_args()
 country = args.country
 date_str = args.date

+ 25 - 0
UNFCCC_GHG_data/UNFCCC_DI_reader/process_UNFCCC_DI_for_country_group.py

@@ -0,0 +1,25 @@
+"""
+This script is a wrapper around the process_UNFCCC_DI_for_country_group
+function such that it can be called from datalad
+"""
+
+import argparse
+from UNFCCC_GHG_data.UNFCCC_DI_reader import \
+    process_UNFCCC_DI_for_country_group
+
+
+parser = argparse.ArgumentParser()
+parser.add_argument('--annexI', help='read for AnnexI countries (default is for '
+                                     'non-AnnexI)', action='store_true')
+parser.add_argument('--date', help='date of input data to use (default is None '
+                                       'to read latest data)', default=None)
+args = parser.parse_args()
+annexI = args.annexI
+date_str = args.date
+if date_str == "None":
+    date_str = None
+
+process_UNFCCC_DI_for_country_group(
+    annexI=annexI,
+    date_str=date_str,
+)

+ 25 - 0
UNFCCC_GHG_data/UNFCCC_DI_reader/process_UNFCCC_DI_for_country_group_datalad.py

@@ -0,0 +1,25 @@
+"""
+wrapper around read_crf_for_country_datalad such that it can be called
+from doit in the current setup where doit runs on system python and
+not in the venv.
+"""
+
+from UNFCCC_GHG_data.UNFCCC_DI_reader import \
+    process_DI_for_country_group_datalad
+import argparse
+
+parser = argparse.ArgumentParser()
+parser.add_argument('--annexI', help='read for AnnexI countries (default is for '
+                                     'non-AnnexI)', action='store_true')
+parser.add_argument('--date', help='date of inout data to use (default is None '
+                                       'to read latest data)', default=None)
+args = parser.parse_args()
+annexI = args.annexI
+date_str = args.date
+if date_str == "None":
+    date_str = None
+
+process_DI_for_country_group_datalad(
+    annexI=annexI,
+    date_str=date_str
+)

+ 21 - 1
dodo.py

@@ -326,7 +326,7 @@ def task_process_unfccc_di_for_country():
     }
 
 def task_read_unfccc_di_for_country_group():
-    """ Read DI data for a country """
+    """ Read DI data for a country group """
     actions = [
         f"./venv/bin/python "
         f"UNFCCC_GHG_data/UNFCCC_DI_reader/read_UNFCCC_DI_for_country_group_datalad.py",
@@ -344,6 +344,26 @@ def task_read_unfccc_di_for_country_group():
     }
 
 
+def task_process_unfccc_di_for_country_group():
+    """ Process DI data for a country group """
+    actions = [
+        f"./venv/bin/python "
+        f"UNFCCC_GHG_data/UNFCCC_DI_reader/read_UNFCCC_DI_for_country_group_datalad.py",
+        f"./venv/bin/python UNFCCC_GHG_data/helper/folder_mapping.py "
+        f"--folder=extracted_data/UNFCCC"
+        ]
+    if read_config_di["annexI"] == "True":
+        actions[0] = actions[0] + " --annexI"
+    if read_config_di["date"] is not None:
+        actions[0] = actions[0] + f" --date={read_config_di['date']}"
+
+    return {
+        'actions': actions,
+        'task_dep': ['set_env'],
+        'verbosity': 2,
+        'setup': ['setup_venv'],
+    }
+
 # general tasks
 def task_country_info():
     """ Print information on submissions and datasets