Browse Source

DI reading and processing for single countries

Johannes Gütschow 1 year ago
parent
commit
5cc55dc7f2

+ 1 - 0
.gitignore

@@ -9,5 +9,6 @@ UNFCCC_GHG_data/datasets
 UNFCCC_GHG_data/UNFCCC_DI_reader/test_UNFCCC_DI_reader.ipynb
 UNFCCC_GHG_data/UNFCCC_DI_reader/.ipynb_checkpoints/
 datasets/UNFCCC/DI_NAI/
+*.autosave
 #UNFCCC_GHG_data/UNFCCC_DI_reader
 

+ 4 - 2
UNFCCC_GHG_data/UNFCCC_CRF_reader/UNFCCC_CRF_reader_core.py

@@ -1022,7 +1022,7 @@ def get_submission_parties(
                          f"the function's purpose is to return available parties.")
 
     if folder.exists():
-        files = filter_filenames(folder.glob("*.xlsx"), **file_filter)
+        files = filter_filenames(list(folder.glob("*.xlsx")), **file_filter)
     else:
         raise ValueError(f"Folder {folder} does not exist")
 
@@ -1034,6 +1034,7 @@ def get_submission_parties(
 
 def find_latest_date(
         dates: List[str],
+        date_format: str='%d%m%Y',
 )-> str:
     """
     Returns the latest date in a list of dates as str in the format
@@ -1050,7 +1051,8 @@ def find_latest_date(
     """
 
     if len(dates) > 0:
-        dates_datetime = [[date, datetime.strptime(date, "%d%m%Y")] for date in dates]
+        dates_datetime = [[date, datetime.strptime(date, date_format)] for date in
+                          dates]
         dates_datetime = sorted(dates_datetime, key=itemgetter(1))
     else:
         raise ValueError(f"Passed list of dates is empty")

+ 529 - 185
UNFCCC_GHG_data/UNFCCC_DI_reader/UNFCCC_DI_reader_core.py

@@ -8,19 +8,26 @@ import json
 import copy
 import xarray as xr
 import datalad.api
+import re
+from datalad.support.exceptions import IncompleteResultsError
 from datetime import date
 from typing import Optional, Dict, List, Union
 from pathlib import Path
 from copy import deepcopy
+from dask.base import tokenize
+
+from UNFCCC_GHG_data.UNFCCC_CRF_reader.UNFCCC_CRF_reader_core import find_latest_date
 
 from .UNFCCC_DI_reader_config import di_to_pm2if_template_nai
 from .UNFCCC_DI_reader_config import di_to_pm2if_template_ai
 from .UNFCCC_DI_reader_config import di_query_filters
+from .UNFCCC_DI_reader_config import di_processing_info
 from .UNFCCC_DI_reader_config import cat_conversion
-from .util import NoDIDataError, extracted_data_path, \
-    get_country_name, get_country_code
+from .UNFCCC_DI_reader_config import gas_baskets
+from .util import NoDIDataError, get_country_name, get_country_code
 from .util import nAI_countries, AI_countries, custom_country_mapping
-from .util import code_path, root_path
+from .util import code_path, root_path, extracted_data_path
+from .util import DI_date_format, regex_date
 
 
 def read_UNFCCC_DI_for_country(
@@ -48,19 +55,13 @@ def read_UNFCCC_DI_for_country(
 
     # set date_str if not given
     if date_str is None:
-        date_str = str(date.today())
-
-    # determine filename
-    if save_data:
-        filename = determine_filename(country_code, date_str, True)
-    else:
-        filename = None
+        today = date.today()
+        date_str = today.strftime(DI_date_format)
 
     # convert raw data to pm2 interchange format and save
     data_if = convert_DI_data_to_pm2_if(
         data=data_df,
-        pm2if_specifications=pm2if_specifications,
-        filename=filename,
+        pm2if_specifications=deepcopy(pm2if_specifications),
         default_gwp=default_gwp,
         date_str=date_str,
         debug=debug,
@@ -69,19 +70,73 @@ def read_UNFCCC_DI_for_country(
     # convert raw data to native pm2 format and save that
     data_pm2 = convert_DI_IF_data_to_pm2(
         data_di_if=data_if,
-        filename=filename,
     )
 
+    # save
+    if save_data:
+        save_DI_country_data(data_pm2, raw=True)
+
     return data_pm2
 
 
+def process_and_save_UNFCCC_DI_for_country(
+        country_code: str,
+        date_str: Union[str, None]=None,
+) -> xr.Dataset:
+    '''
+    process data and save them to disk using default parameters
+    '''
+
+    # get latest dataset if no date given
+    if date_str is None:
+        # get the latest date
+        raw_data_file = find_latest_DI_data(country_code, raw=True)
+    else:
+        raw_data_file = determine_filename(country_code, date_str, raw=True,
+                                           hash=False)
+
+        raw_data_file = raw_data_file.parent / (raw_data_file.name + '.nc')
+        print(f"process {raw_data_file.name}")
+        if not raw_data_file.exists():
+            raise ValueError(f"File {raw_data_file.name} does not exist. Check if it "
+                             "has been read.")
+
+    # load the data
+    data_to_process = pm2.open_dataset(raw_data_file)
+
+    # get parameters
+    countries = list(data_to_process.coords[data_to_process.attrs['area']].values)
+    if len(countries) > 1:
+        raise ValueError(
+            f"Found {len(countries)} countries. Only single country data "
+            f"can be processed by this function. countries: {countries}")
+    else:
+        country_code = countries[0]
+    processing_info_country = di_processing_info[country_code]
+    entities_to_ignore = [] # TODO: check and make default list
+
+    # process
+    data_processed = process_UNFCCC_DI_for_country(
+        data_country=data_to_process,
+        entities_to_ignore=entities_to_ignore,
+        gas_baskets=gas_baskets,
+        cat_conversion=cat_conversion,
+        sectors=None,
+        processing_info_country=processing_info_country,
+    )
+
+    # save
+    save_DI_country_data(data_processed, raw=False)
+
+    return data_processed
+
+
 def process_UNFCCC_DI_for_country(
         data_country: xr.Dataset,
-        country: str,
-        cat_terminology_in: str,
         entities_to_ignore: List[str],
-        sectors: List[str],
         gas_baskets: Dict[str, List[str]],
+        cat_conversion: Dict[str, Dict] = None,
+        sectors: List[str] = None,
         processing_info_country: Dict = None,
 ) -> xr.Dataset:
     """
@@ -91,6 +146,19 @@ def process_UNFCCC_DI_for_country(
         * Conversion to IPCC2006 categories
         * general sector and gas basket aggregation (in new categories)
     """
+    #### 0: gather information
+    countries = list(data_country.coords[data_country.attrs['area']].values)
+    if len(countries) > 1:
+        raise ValueError(
+            f"Found {len(countries)} countries. Only single country data "
+            f"can be processed by this function. countries: {countries}")
+    else:
+        country_code = countries[0]
+
+    cat_col = data_country.attrs['cat']
+    temp = re.findall(r'\((.*)\)', cat_col)
+    cat_terminology_in = temp[0]
+
     #### 1: general processing
     # remove unused cats
     data_country = data_country.dropna(f'category ({cat_terminology_in})', how='all')
@@ -107,119 +175,144 @@ def process_UNFCCC_DI_for_country(
     data_country = data_country.drop_vars(entities_ignore_present)
 
     #### 2: country specific processing
+
+
     if processing_info_country is not None:
-        if 'tolerance' in processing_info_country:
-            tolerance = processing_info_country["tolerance"]
+        # get scenario
+        scenarios = list(data_country.coords[data_country.attrs['scen']].values)
+        if len(scenarios) > 1:
+            raise ValueError(
+                f"Found {len(scenarios)} scenarios. Only single scenario data "
+                f"can be processed by this function. Scenarios: {scenarios}")
         else:
-            tolerance = 0.01
-
-        # take only desired years
-        if 'years' in processing_info_country:
-            data_country = data_country.pr.loc[
-                {'time': processing_info_country['years']}]
-
-        # remove timeseries if desired
-        if 'remove_ts' in processing_info_country:
-            for case in processing_info_country['remove_ts']:
-                remove_info = processing_info_country['remove_ts'][case]
-                entities = remove_info.pop("entities")
-                for entity in entities:
-                    data_country[entity].pr.loc[remove_info] = \
-                        data_country[entity].pr.loc[remove_info] * np.nan
-
-        # remove all data for given years if necessary
-        if 'remove_years' in processing_info_country:
-            data_country.pr.loc[{'time': processing_info_country['remove_years']}] = \
-                data_country.pr.loc[{'time': processing_info_country[
-                    'remove_years']}] * np.nan
-
-        # subtract categories
-        if 'subtract_cats' in processing_info_country:
-            subtract_cats_current = processing_info_country['subtract_cats']
-            if 'entities' in subtract_cats_current.keys():
-                entities_current = subtract_cats_current['entities']
+            scenario = scenarios[0]
+            if scenario in processing_info_country.keys():
+                processing_info_country_scen = processing_info_country[scenario]
             else:
-                entities_current = list(data_country.data_vars)
-            print(f"Subtracting categories for country {country}, entities "
-                  f"{entities_current}")
-            for cat_to_generate in subtract_cats_current:
-                cats_to_subtract = subtract_cats_current[cat_to_generate]['subtract']
-                data_sub = data_country.pr.loc[{'category': cats_to_subtract}].pr.sum(
-                    dim='category', skipna=True, min_count=1)
-                data_parent = data_country.pr.loc[
-                    {'category': subtract_cats_current[cat_to_generate]['parent']}]
-                data_agg = data_parent - data_sub
-                nan_vars = [var for var in data_agg.data_vars if
-                            data_agg[var].isnull().all().data == True]
-                data_agg = data_agg.drop(nan_vars)
-                if len(data_agg.data_vars) > 0:
-                    print(f"Generating {cat_to_generate} through subtraction")
-                    data_agg = data_agg.expand_dims([f'category ('
-                                                     f'{cat_terminology_in})'])
-                    data_agg = data_agg.assign_coords(
-                        coords={f'category ({cat_terminology_in})':
-                                    (f'category ({cat_terminology_in})',
-                                     [cat_to_generate])})
-                    data_country = data_country.pr.merge(data_agg, tolerance=tolerance)
-                else:
-                    print(f"no data to generate category {cat_to_generate}")
-
-        # downscaling
-        if 'downscale' in processing_info_country:
-            if 'sectors' in processing_info_country['downscale']:
-                sector_downscaling = processing_info_country['downscale']['sectors']
-                for case in sector_downscaling.keys():
-                    print(f"Downscaling for {case}.")
-                    sector_downscaling_current = sector_downscaling[case]
-                    entities = sector_downscaling_current.pop('entities')
+                processing_info_country_scen = processing_info_country['default']
+
+
+            if 'tolerance' in processing_info_country_scen:
+                tolerance = processing_info_country_scen["tolerance"]
+            else:
+                tolerance = 0.01
+
+            # take only desired years
+            if 'years' in processing_info_country_scen:
+                data_country = data_country.pr.loc[
+                    {'time': processing_info_country_scen['years']}]
+
+            # remove timeseries if desired
+            if 'remove_ts' in processing_info_country_scen:
+                for case in processing_info_country_scen['remove_ts']:
+                    remove_info = processing_info_country_scen['remove_ts'][case]
+                    entities = remove_info.pop("entities")
                     for entity in entities:
-                        data_country[entity] = data_country[
-                            entity].pr.downscale_timeseries(
-                            **sector_downscaling_current)  # , skipna_evaluation_dims=None)
-
-            if 'entities' in processing_info_country['downscale']:
-                entity_downscaling = processing_info_country['downscale']['entities']
-                for case in entity_downscaling.keys():
-                    #print(case)
-                    print(data_country.coords[f'category ('
-                                              f'{cat_terminology_in})'].values)
-                    data_country = data_country.pr.downscale_gas_timeseries(
-                        **entity_downscaling[case], skipna=True,
-                        skipna_evaluation_dims=None)
-
-        # aggregate categories
-        if 'aggregate_cats' in processing_info_country:
-            aggregate_cats_current = processing_info_country['aggregate_cats']
-            print(
-                f"Aggregating categories for country {country}")
-            for cat_to_agg in aggregate_cats_current:
-                print(f"Category: {cat_to_agg}")
-                source_cats = aggregate_cats_current[cat_to_agg]['sources']
-                data_agg = data_country.pr.loc[{'category': source_cats}].pr.sum(
-                    dim='category', skipna=True, min_count=1)
-                nan_vars = [var for var in data_agg.data_vars if
-                            data_agg[var].isnull().all().data == True]
-                data_agg = data_agg.drop(nan_vars)
-                if len(data_agg.data_vars) > 0:
-                    data_agg = data_agg.expand_dims([f'category ('
-                                                     f'{cat_terminology_in})'])
-                    data_agg = data_agg.assign_coords(
-                        coords={f'category ({cat_terminology_in})':
-                                    (f'category ({cat_terminology_in})', [cat_to_agg])})
-                    data_country = data_country.pr.merge(data_agg, tolerance=tolerance)
+                        data_country[entity].pr.loc[remove_info] = \
+                            data_country[entity].pr.loc[remove_info] * np.nan
+
+            # remove all data for given years if necessary
+            if 'remove_years' in processing_info_country_scen:
+                data_country.pr.loc[{'time': processing_info_country_scen[
+                    'remove_years']}] = \
+                    data_country.pr.loc[{'time': processing_info_country_scen[
+                        'remove_years']}] * np.nan
+
+            # subtract categories
+            if 'subtract_cats' in processing_info_country_scen:
+                subtract_cats_current = processing_info_country_scen['subtract_cats']
+                if 'entities' in subtract_cats_current.keys():
+                    entities_current = subtract_cats_current['entities']
                 else:
-                    print(f"no data to aggregate category {cat_to_agg}")
-
-        # aggregate gases if desired
-        if 'aggregate_gases' in processing_info_country:
-            for case in processing_info_country['aggregate_gases'].keys():
-                case_info = processing_info_country['aggregate_gases'][case]
-                data_country[case_info['basket']] = \
-                    data_country.pr.fill_na_gas_basket_from_contents(
-                        **case_info)
+                    entities_current = list(data_country.data_vars)
+                print(f"Subtracting categories for country {country_code}, entities "
+                      f"{entities_current}")
+                for cat_to_generate in subtract_cats_current:
+                    cats_to_subtract = \
+                        subtract_cats_current[cat_to_generate]['subtract']
+                    data_sub = \
+                        data_country.pr.loc[{'category': cats_to_subtract}].pr.sum(
+                        dim='category', skipna=True, min_count=1)
+                    data_parent = data_country.pr.loc[
+                        {'category': subtract_cats_current[cat_to_generate]['parent']}]
+                    data_agg = data_parent - data_sub
+                    nan_vars = [var for var in data_agg.data_vars if
+                                data_agg[var].isnull().all().data == True]
+                    data_agg = data_agg.drop(nan_vars)
+                    if len(data_agg.data_vars) > 0:
+                        print(f"Generating {cat_to_generate} through subtraction")
+                        data_agg = data_agg.expand_dims([f'category ('
+                                                         f'{cat_terminology_in})'])
+                        data_agg = data_agg.assign_coords(
+                            coords={f'category ({cat_terminology_in})':
+                                        (f'category ({cat_terminology_in})',
+                                         [cat_to_generate])})
+                        data_country = data_country.pr.merge(data_agg,
+                                                             tolerance=tolerance)
+                    else:
+                        print(f"no data to generate category {cat_to_generate}")
+
+            # downscaling
+            if 'downscale' in processing_info_country_scen:
+                if 'sectors' in processing_info_country_scen['downscale']:
+                    sector_downscaling = \
+                        processing_info_country_scen['downscale']['sectors']
+                    for case in sector_downscaling.keys():
+                        print(f"Downscaling for {case}.")
+                        sector_downscaling_current = sector_downscaling[case]
+                        entities = sector_downscaling_current.pop('entities')
+                        for entity in entities:
+                            data_country[entity] = data_country[
+                                entity].pr.downscale_timeseries(
+                                **sector_downscaling_current)
+                            # , skipna_evaluation_dims=None)
+
+                if 'entities' in processing_info_country_scen['downscale']:
+                    entity_downscaling = \
+                        processing_info_country_scen['downscale']['entities']
+                    for case in entity_downscaling.keys():
+                        #print(case)
+                        print(data_country.coords[f'category ('
+                                                  f'{cat_terminology_in})'].values)
+                        data_country = data_country.pr.downscale_gas_timeseries(
+                            **entity_downscaling[case], skipna=True,
+                            skipna_evaluation_dims=None)
+
+            # aggregate categories
+            if 'aggregate_cats' in processing_info_country_scen:
+                aggregate_cats_current = processing_info_country_scen['aggregate_cats']
+                print(
+                    f"Aggregating categories for country {country_code}")
+                for cat_to_agg in aggregate_cats_current:
+                    print(f"Category: {cat_to_agg}")
+                    source_cats = aggregate_cats_current[cat_to_agg]['sources']
+                    data_agg = data_country.pr.loc[{'category': source_cats}].pr.sum(
+                        dim='category', skipna=True, min_count=1)
+                    nan_vars = [var for var in data_agg.data_vars if
+                                data_agg[var].isnull().all().data == True]
+                    data_agg = data_agg.drop(nan_vars)
+                    if len(data_agg.data_vars) > 0:
+                        data_agg = data_agg.expand_dims([f'category ('
+                                                         f'{cat_terminology_in})'])
+                        data_agg = data_agg.assign_coords(
+                            coords={f'category ({cat_terminology_in})':
+                                        (f'category ({cat_terminology_in})',
+                                         [cat_to_agg])})
+                        data_country = data_country.pr.merge(data_agg,
+                                                             tolerance=tolerance)
+                    else:
+                        print(f"no data to aggregate category {cat_to_agg}")
+
+            # aggregate gases if desired
+            if 'aggregate_gases' in processing_info_country_scen:
+                for case in processing_info_country_scen['aggregate_gases'].keys():
+                    case_info = processing_info_country_scen['aggregate_gases'][case]
+                    data_country[case_info['basket']] = \
+                        data_country.pr.fill_na_gas_basket_from_contents(
+                            **case_info)
 
     #### 3: map categories
-    if country in nAI_countries:
+    if country_code in nAI_countries:
         # conversion from BURDI to IPCC2006_PRIMAP needed
         cat_terminology_out = 'IPCC2006_PRIMAP'
         data_country = convert_categories(
@@ -234,10 +327,11 @@ def process_UNFCCC_DI_for_country(
 
     # more general processing
     # reduce categories to output cats
-    cats_to_keep = [cat for cat in
-                    data_country.coords[f'category ({cat_terminology_out})'].values if
-                    cat in sectors]
-    data_country = data_country.pr.loc[{'category': cats_to_keep}]
+    if sectors is not None:
+        cats_to_keep = [cat for cat in
+                        data_country.coords[f'category ({cat_terminology_out})'].values if
+                        cat in sectors]
+        data_country = data_country.pr.loc[{'category': cats_to_keep}]
 
     # create gas baskets
     entities_present = set(data_country.data_vars)
@@ -260,7 +354,7 @@ def process_UNFCCC_DI_for_country(
                         basket=basket, basket_contents=basket_contents_present,
                         min_count=1)
                 except:
-                    print(f"No gas basket created for {country}")
+                    print(f"No gas basket created for {country_code}")
 
     # amend title and comment
     data_country.attrs["comment"] = data_country.attrs["comment"] + f" Processed on " \
@@ -447,7 +541,6 @@ def read_UNFCCC_DI_for_country_df(
 def convert_DI_data_to_pm2_if(
         data: pd.DataFrame,
         pm2if_specifications: Optional[dict]=None,
-        filename: Optional[Path]=None,
         default_gwp: Optional[str]=None,
         date_str: Optional[str]=None,
         debug: bool = False,
@@ -508,34 +601,37 @@ def convert_DI_data_to_pm2_if(
 
     if pm2if_specifications is None:
         if ai_dataset:
-            pm2if_specifications = di_to_pm2if_template_ai.copy()
+            pm2if_specifications = deepcopy(di_to_pm2if_template_ai)
         else:
-            pm2if_specifications = di_to_pm2if_template_nai.copy()
+            pm2if_specifications = deepcopy(di_to_pm2if_template_nai)
 
     # modify specifications
     #pm2if_specifications["filter_remove"].update(filter_activity_factors)
 
     # set the scenario to today's date if not given explicitly
-    if date_str is None:
+    if date_str == "country":
+        pm2if_specifications["coords_defaults"]["scenario"] = f"DIrolling"
+    elif date_str is None:
         date_str = str(date.today())
     pm2if_specifications["coords_defaults"]["scenario"] = f"DI{date_str}"
 
     # set metadata
     countries = data["party"].unique()
     if len(countries) > 1:
-        pm2if_specifications["meta_data"]["title"] = "Data submitted to the UNFCCC " \
-                                                     f"by countries {countries} as " \
-                                                     "available in the DI interface."
+        pm2if_specifications["meta_data"]["title"] = \
+            f"Data submitted to the UNFCCC by countries {countries} as " \
+            f"available in the DI interface on {date_str}."
     else:
         try:
             country_info = pycountry.countries.get(alpha_3=countries[0])
             country_name = country_info.name
         except:
             country_name = countries[0]
-        pm2if_specifications["meta_data"]["title"] = "Data submitted to the UNFCCC " \
-                                                     f"by country {country_name} as " \
-                                                     "available in the DI interface " \
-                                                     f"on {date_str}."
+
+        pm2if_specifications["meta_data"]["title"] = \
+            f"Data submitted to the UNFCCC by country {country_name} as " \
+            f"available in the DI interface on {date_str}."
+
     pm2if_specifications["meta_data"]["comment"] = \
         pm2if_specifications["meta_data"]["comment"] + f" Data read on {date_str}."
 
@@ -584,41 +680,97 @@ def convert_DI_data_to_pm2_if(
         **pm2if_specifications,
     )
 
-    if filename is not None:
-        print(f"Save data to {filename.name + '.csv/.yaml'}")
-        pm2.pm2io.write_interchange_format(filename, data_pm2if)
-
     return data_pm2if
 
 
 def convert_DI_IF_data_to_pm2(
         data_di_if: pd.DataFrame,
-        filename: Optional[Path]=None,
 )-> xr.Dataset:
     if_index_cols = set(itertools.chain(*data_di_if.attrs["dimensions"].values()))
     time_cols = set(data_di_if.columns.values) - if_index_cols
     data_di_if.dropna(subset=time_cols, inplace=True, how="all")
 
-    #try:
-        # try to convert all in one go
+    try:
         # use a copy as from_interchange_format modifies the input DF
-    data_pm2 = pm2.pm2io.from_interchange_format(data_di_if.copy(deep=True),
-                                                 attrs=copy.deepcopy(data_di_if.attrs))
-    #except ValueError: # better more specific error in primap2
-    #    print()
+        data_pm2 = pm2.pm2io.from_interchange_format(
+            data_di_if.copy(deep=True), attrs=copy.deepcopy(data_di_if.attrs))
+    except Exception as ex: # better more specific error in primap2
+        print(f'Error on conversion to PRIMAP2 native format: {ex}')
 
-    if filename is not None:
-        compression = dict(zlib=True, complevel=9)
+    return data_pm2
+
+
+def save_DI_country_data(
+        data_pm2: xr.Dataset,
+        raw: bool=True,
+):
+    '''
+    save primap2 and IF data to country folder
+    can be used for raw and processed data but for a single country only
+    '''
 
-        if not filename.parent.exists():
-            filename.parent.mkdir()
+    # preparations
+    data_if = data_pm2.pr.to_interchange_format()
 
-         # write data in native PRIMAP2 format
+    ## get country
+    countries = data_if[data_pm2.attrs['area']].unique()
+    if len(countries) > 1:
+        raise ValueError(f"More than one country in input data. This function can only"
+                         f"handle single country data. Countries: {countries}")
+    else:
+        country_code = countries[0]
+
+    ## get timestamp
+    scenario_col = data_pm2.attrs['scen']
+    scenarios = data_if[scenario_col].unique()
+    if len(scenarios) > 1:
+        raise ValueError(f"More than one scenario in input data. This function can only"
+                         f"handle single scenario data. Scenarios: {scenarios}")
+    else:
+        scenario = scenarios[0]
+
+    date_str = scenario[2:]
+
+    # calculate the hash of the data to see if it's identical to present data
+    data_for_token = data_if.drop(columns=[scenario_col])
+    token = tokenize(data_for_token)
+
+    # get the filename with the hash and check if it exists (separate for pm2 format
+    # and IF to fix broken datasets if necessary)
+    filename_hash = determine_filename(country_code, token, raw, hash=True)
+
+    # primap2 native format
+    filename_hash_nc = filename_hash.parent / (filename_hash.name + '.nc')
+    if not filename_hash_nc.exists():
+        # if parent dir does not exist create it
+        if not filename_hash.parent.exists():
+            filename_hash.parent.mkdir()
+        # save the data
+        print(f"Data has changed. Save to {filename_hash_nc.name}")
+        compression = dict(zlib=True, complevel=9)
         encoding = {var: compression for var in data_pm2.data_vars}
-        data_pm2.pr.to_netcdf(filename.parent / (filename.name + ".nc"),
-                            encoding=encoding)
+        data_pm2.pr.to_netcdf(filename_hash_nc, encoding=encoding)
+
+    # primap2 IF
+    filename_hash_csv = filename_hash.parent / (filename_hash.name + '.csv')
+    if not filename_hash_csv.exists():
+        # save the data
+        print(f"Data has changed. Save to {filename_hash.name + '.csv/.yaml'}")
+        pm2.pm2io.write_interchange_format(filename_hash, data_if)
+    else:
+        print(f"Data unchanged for {country_code}. Create symlinks.")
 
-    return data_pm2
+    # get the filename with the date
+    filename_date = determine_filename(country_code, date_str, raw)
+
+    # create the symlinks to the actual data (with the hash)
+    suffixes = ['.nc', '.csv', '.yaml']
+    for suffix in suffixes:
+        file_date = filename_date.parent / (filename_date.name + suffix)
+        file_hash = filename_hash.name + suffix
+        if file_date.exists():
+            file_date.unlink()
+        file_date.symlink_to(file_hash)
 
 
 ## datalad and pydoit interface functions
@@ -626,13 +778,13 @@ def read_DI_for_country_datalad(
         country: str,
 ) -> None:
     """
-    Wrapper around read_DI_for_country which takes care of selecting input
+    Wrapper around read_UNFCCC_DI_for_country which takes care of selecting input
     and output files and using datalad run to trigger the data reading
 
     Parameters
     __________
 
-    country_codes: str
+    country: str
         country name or ISO 3-letter country code
 
     """
@@ -641,8 +793,8 @@ def read_DI_for_country_datalad(
     date_str = str(date.today())
 
     # get all the info for the country
-    country_info = get_output_files_for_country_DI(country, date_str,
-                                                   raw=True, verbose=True)
+    country_info = get_input_and_output_files_for_country_DI(country, date_str,
+                                                             raw=True, verbose=True)
 
     print(f"Attempting to read DI data for {country_info['name']}.")
     print("#"*80)
@@ -651,25 +803,85 @@ def read_DI_for_country_datalad(
     print("")
     print(f"Run the script using datalad run via the python api")
     script = code_path / "UNFCCC_DI_reader" / "read_UNFCCC_DI_for_country.py"
+    script = script.relative_to(root_path)
 
     cmd = f"./venv/bin/python3 {script.as_posix()} --country={country_info['code']} " \
           f"--date={date_str}"
-    datalad.api.run(
-        cmd=cmd,
-        dataset=root_path,
-        message=f"Read DI data for {country_info['name']}.",
-        inputs=country_info["input"],
-        outputs=country_info["output"],
-        dry_run=None,
-        explicit=True,
-    )
+    try:
+        datalad.api.run(
+            cmd=cmd,
+            dataset=root_path,
+            message=f"Read DI data for {country_info['name']}.",
+            inputs=country_info["input"],
+            outputs=country_info["output"],
+            dry_run=None,
+            explicit=True,
+        )
+    except IncompleteResultsError as IRE:
+        print(f"IncompleteResultsError occured when running {cmd}: {IRE}")
+    except Exception as ex:
+        print(f"Exception occurred when running {cmd}")
+        print(ex.message)
+
+
+def process_DI_for_country_datalad(
+        country: str,
+        date_str: Union[str, None],
+) -> None:
+    """
+    Wrapper around process_UNFCCC_DI_for_country which takes care of selecting input
+    and output files and using datalad run to trigger the data processing
+
+    Parameters
+    __________
+
+    country: str
+        country name or ISO 3-letter country code
+    date_str: str
+        Date of the data to be processed in the format %Y-%m-%d (e.g. 2023-01-30). If
+        no date is given the last data read will be processed.
+    """
+
+    # get all the info for the country
+    country_info = get_input_and_output_files_for_country_DI(country, date_str,
+                                                             raw=True, verbose=True)
+
+    print(f"Attempting to process DI data for {country_info['name']}.")
+    print("#"*80)
+    print("")
+    print(f"Using the UNFCCC_DI_reader")
+    print("")
+    print(f"Run the script using datalad run via the python api")
+    script = code_path / "UNFCCC_DI_reader" / "process_UNFCCC_DI_for_country.py"
+    script = script.relative_to(root_path)
+
+    cmd = f"./venv/bin/python3 {script.as_posix()} --country={country_info['code']} " \
+          f"--date={date_str}"
+    try:
+        datalad.api.run(
+            cmd=cmd,
+            dataset=root_path,
+            message=f"Read DI data for {country_info['name']}.",
+            inputs=country_info["input"],
+            outputs=country_info["output"],
+            dry_run=None,
+            explicit=True,
+        )
+    except IncompleteResultsError as IRE:
+        print(f"IncompleteResultsError occurred when running {cmd}: {IRE}")
+    except Exception as ex:
+        print(f"Exception occurred when running {cmd}")
+        print(ex.message)
+
 
 ## helper functions
 
+
 def determine_filename(
         country_code: str,
-        date_str: str,
+        date_or_hash: str,
         raw: bool=False,
+        hash: bool=False,
 )->Path:
     """
     Determine the filename for a dataset from given country code and date string.
@@ -700,8 +912,7 @@ def determine_filename(
         country_folders = folder_mapping[country_code]
         if isinstance(country_folders, str):
             # only one folder
-            filename = Path(country_folders) / f"{country_code}_DI_{date_str}"
-
+            country_folder = extracted_data_path / country_folders
         else:
             raise ValueError("More than one output folder for country "
                              f"{country_code}. This should not happen.")
@@ -709,6 +920,7 @@ def determine_filename(
         # folder not in mapping. It will be created if not present yet
         country_name = get_country_name(country_code)
         country_folder = extracted_data_path / country_name.replace(" ", "_")
+
         if country_folder.exists():
            print(f"Output folder {country_name.replace(' ', '_')} for country "
                  f"{country_code} exists but is not in folder mapping. Update "
@@ -716,12 +928,14 @@ def determine_filename(
         else:
             country_folder.mkdir()
 
-        if raw:
-            filename = Path(country_folder) / f"{country_code}_DI_{date_str}_raw"
-        else:
-            filename = Path(country_folder) / f"{country_code}_DI_{date_str}"
+    filename = f"{country_code}_DI_{date_or_hash}"
+    if raw:
+        filename = f"{filename}_raw"
+    if hash:
+        filename = f"{filename}_hash"
+    filename = country_folder / filename
 
-    return filename
+    return filename.relative_to(root_path)
 
 
 def convert_categories(
@@ -784,7 +998,7 @@ def convert_categories(
     return ds_converted
 
 
-def get_output_files_for_country_DI(
+def get_input_and_output_files_for_country_DI(
         country: str,
         date_str: str,
         raw: bool,
@@ -812,11 +1026,40 @@ def get_output_files_for_country_DI(
     # determine latest data
     print(f"Determining output files for {country_name}")
 
+    # get input files (only for processing)
+    if raw:
+        input_files = []
+    else:
+        # get latest dataset if no date given
+        if date_str is None:
+            # get the latest date
+            input_file = [find_latest_DI_data(country_code, raw=True)]
+        else:
+            input_file = [determine_filename(country_code, date_str, raw=False,
+                                               hash=False)]
+            if input_file[0].is_symlink():
+                # also get the file with the actual data
+                input_file.append(input_file[0].readlink())
+            else:
+                # DI processing input files wit date labels should always be symlinks
+                # to the files with hashes holding the actual data.
+                raise(ValueError, f"Input file {input_file[0].name} is not a symlink "
+                                  f" or not existent. Check if the data you want to "
+                                  f"process exists and if your repository is ")
+
+        input_files = [f"{input_file.as_posix()}.{suffix}" for
+                        suffix in ['yaml', 'csv', 'nc']]
+
+        if verbose:
+            print(f"The following files are considered as input_files:")
+            for file in input_files:
+                print(file)
+            print("")
+
     # get output files
     output_file = determine_filename(country_code, date_str, raw=raw)
-
-    output_files = [f"{str(output_file)}.{suffix}" for suffix
-                    in ['yaml', 'csv', 'nc']]
+    output_files = [f"{output_file.as_posix()}.{suffix}" for
+                    suffix in ['yaml', 'csv', 'nc']]
 
     if verbose:
         print(f"The following files are considered as output_files:")
@@ -824,12 +1067,113 @@ def get_output_files_for_country_DI(
             print(file)
         print("")
 
-    # add to country infor
-    country_info["input"] = []
-    country_info["output"] = output_files
+    # add to country info
+    country_info["input"] = input_files
+    country_info["output"] = [] #output_files # not used because we don't know the
+    # hash in advance
 
     return country_info
 
+
+def get_present_hashes_for_country_DI(
+        country_code: str,
+        raw: bool,
+) -> List:
+    '''
+    Get the hashes of outputs
+    '''
+
+    regex_hash = r"_([a-f0-9]*)_"
+    if raw:
+        regex_hash = regex_hash + "raw_hash\.nc"
+    else:
+        regex_hash = regex_hash + "hash\.nc"
+
+    # get the country folder
+    with open(extracted_data_path / "folder_mapping.json", "r") as mapping_file:
+        folder_mapping = json.load(mapping_file)
+
+    if country_code in folder_mapping:
+        file_filter = {}
+        file_filter["party"] = country_code
+        country_folders = folder_mapping[country_code]
+        if isinstance(country_folders, str):
+            # only one folder
+            country_folder = extracted_data_path / country_folders
+        else:
+            raise ValueError("More than one output folder for country "
+                             f"{country_code}. This should not happen.")
+
+        files_list = list(country_folder.glob("*_hash.nc"))
+        # filter according to raw flag
+        if raw:
+            files_list = [file.name for file in files_list if
+                          re.search(r'_raw_hash', file.name)]
+        else:
+            files_list = [file.name for file in files_list if
+                          not re.search(r'_raw_hash', file.name)]
+
+        hash_list = [re.findall(regex_hash, file)[0] for file in files_list]
+        return hash_list
+
+    else:
+        # folder not in mapping.
+        return []
+
+
+def find_latest_DI_data(
+        country_code: str,
+        raw: bool=True,
+)->Union[Path, None]:
+    '''
+    Find the path to the nc file with the latest DI data for a given country
+    '''
+
+    if raw:
+        regex = regex_date + r"_raw\.nc"
+    else:
+        regex = regex_date + r"\.nc"
+
+    # get the country folder
+    with open(extracted_data_path / "folder_mapping.json", "r") as mapping_file:
+        folder_mapping = json.load(mapping_file)
+
+    if country_code in folder_mapping:
+        file_filter = {}
+        file_filter["party"] = country_code
+        country_folders = folder_mapping[country_code]
+        if isinstance(country_folders, str):
+            # only one folder
+            country_folder = extracted_data_path / country_folders
+        else:
+            raise ValueError("More than one output folder for country "
+                             f"{country_code}. This should not happen.")
+
+        files_path_list = list(country_folder.glob("*.nc"))
+        # remove files with hash
+        files_list = [file.name for file in files_path_list
+                      if not re.search(r'_hash\.nc', file.name)]
+        # filter according to raw flag
+        if raw:
+            files_list = [file for file in files_list if
+                          re.search(r'_raw\.nc', file)]
+        else:
+            files_list = [file for file in files_list if
+                          not re.search(r'_raw\.nc', file)]
+
+        if len(files_list) > 0:
+            date_list = [re.findall(regex, file)[0] for file in files_list]
+            latest_date = find_latest_date(date_list, '%Y-%m-%d')
+            latest_file = [file for file in files_path_list if re.search(latest_date,
+                                                                         file.name)][0]
+            return latest_file
+        else:
+            return None
+
+    else:
+        # folder not in mapping.
+        return None
+
 # TODO
 
 # functions

+ 11 - 5
UNFCCC_GHG_data/UNFCCC_DI_reader/__init__.py

@@ -1,14 +1,20 @@
 # submodule to read data from UNFCCC DI API using the unfccc_di_api package
 
 #import unfccc_di_api
-from .UNFCCC_DI_reader_core import read_UNFCCC_DI_for_country_df, \
-    convert_DI_data_to_pm2_if, convert_DI_IF_data_to_pm2, determine_filename, \
-    read_DI_for_country_datalad
+from .UNFCCC_DI_reader_core import \
+    read_UNFCCC_DI_for_country, read_DI_for_country_datalad, \
+    process_UNFCCC_DI_for_country, process_and_save_UNFCCC_DI_for_country, \
+    process_DI_for_country_datalad, \
+    convert_DI_data_to_pm2_if, convert_DI_IF_data_to_pm2, determine_filename
+
 
 __all__ = [
-    "read_UNFCCC_DI_for_country_df",
+    "read_UNFCCC_DI_for_country",
+    "read_DI_for_country_datalad",
+    "process_UNFCCC_DI_for_country",
+    "process_and_save_UNFCCC_DI_for_country",
+    "process_DI_for_country_datalad",
     "convert_DI_data_to_pm2_if",
     "convert_DI_IF_data_to_pm2",
     "determine_filename",
-    "read_DI_for_country_datalad",
 ]

+ 26 - 0
UNFCCC_GHG_data/UNFCCC_DI_reader/process_UNFCCC_DI_for_country.py

@@ -0,0 +1,26 @@
+"""
+This script is a wrapper around the read__for_country
+function such that it can be called from datalad
+"""
+
+import argparse
+from UNFCCC_GHG_data.UNFCCC_DI_reader import \
+    process_and_save_UNFCCC_DI_for_country
+
+
+parser = argparse.ArgumentParser()
+parser.add_argument('--country', help='Country code')
+parser.add_argument('--date', help='String with date to read and process. If not '
+                                   'given latest data will be used')
+args = parser.parse_args()
+
+country_code = args.country
+date_str = args.date
+
+if date_str == "None":
+    date_str = None
+
+process_and_save_UNFCCC_DI_for_country(
+    country_code=country_code,
+    date_str=date_str,
+)

+ 22 - 0
UNFCCC_GHG_data/UNFCCC_DI_reader/process_UNFCCC_DI_for_country_datalad.py

@@ -0,0 +1,22 @@
+"""
+wrapper around read_crf_for_country_datalad such that it can be called
+from doit in the current setup where doit runs on system python and
+not in the venv.
+"""
+
+from UNFCCC_GHG_data.UNFCCC_DI_reader import \
+    process_DI_for_country_datalad
+import argparse
+
+parser = argparse.ArgumentParser()
+parser.add_argument('--country', help='Country name or code')
+parser.add_argument('--date', help='String with date to read and process. If not '
+                                   'given latest data will be used')
+args = parser.parse_args()
+country = args.country
+date_str = args.date
+
+if date_str == "None":
+    date_str = None
+
+process_DI_for_country_datalad(country, date_str=date_str)

+ 0 - 0
UNFCCC_GHG_data/UNFCCC_DI_reader/read_UNFCCC_DI_country_datalad.py → UNFCCC_GHG_data/UNFCCC_DI_reader/read_UNFCCC_DI_for_country_datalad.py


+ 4 - 1
UNFCCC_GHG_data/UNFCCC_DI_reader/util.py

@@ -6,7 +6,7 @@ import pycountry
 root_path = Path(__file__).parents[2].absolute()
 root_path = root_path.resolve()
 log_path = root_path / "log"
-code_path = root_path / "code"
+code_path = root_path / "UNFCCC_GHG_data"
 downloaded_data_path = root_path / "downloaded_data" / "UNFCCC"
 extracted_data_path = root_path / "extracted_data" / "UNFCCC"
 
@@ -15,6 +15,9 @@ reader = unfccc_di_api.UNFCCCApiReader()
 nAI_countries = list(reader.non_annex_one_reader.parties["code"])
 AI_countries = list(reader.annex_one_reader.parties["code"])
 
+DI_date_format = '%Y-%m-%d'
+regex_date = r"([0-9]{4}-[0-9]{2}-[0-9]{2})"
+
 class NoDIDataError(Exception):
     pass
 

+ 18 - 1
dodo.py

@@ -225,6 +225,7 @@ def task_read_new_unfccc_crf_for_year():
 # datalad run is called from within the read_UNFCCC_DI_for_country.py script
 read_config_di = {
     "country": get_var('country', None),
+    "date": get_var('date', None),
     #"countries": get_var('countries', None),
 }
 
@@ -232,7 +233,7 @@ def task_read_unfccc_di_for_country():
     """ Read DI data for a country """
     actions = [
         f"./venv/bin/python "
-        f"UNFCCC_GHG_data/UNFCCC_DI_reader/read_UNFCCC_DI_country_datalad.py "
+        f"UNFCCC_GHG_data/UNFCCC_DI_reader/read_UNFCCC_DI_for_country_datalad.py "
         f"--country={read_config_di['country']}",
         f"./venv/bin/python UNFCCC_GHG_data/UNFCCC_reader/folder_mapping.py "
         f"--folder=extracted_data/UNFCCC"
@@ -243,6 +244,22 @@ def task_read_unfccc_di_for_country():
         'setup': ['setup_venv'],
     }
 
+def task_process_unfccc_di_for_country():
+    """ Process DI data for a country """
+    actions = [
+        f"./venv/bin/python "
+        f"UNFCCC_GHG_data/UNFCCC_DI_reader/process_UNFCCC_DI_for_country_datalad.py "
+        f"--country={read_config_di['country']} --date={read_config_di['date']}",
+        f"./venv/bin/python UNFCCC_GHG_data/UNFCCC_reader/folder_mapping.py "
+        f"--folder=extracted_data/UNFCCC"
+        ]
+    return {
+        'actions': actions,
+        'verbosity': 2,
+        'setup': ['setup_venv'],
+    }
+
+
 
 # general tasks
 def task_country_info():

+ 1 - 0
extracted_data/UNFCCC/Brazil/BRA_DI_2023-05-15_raw.csv

@@ -0,0 +1 @@
+BRA_DI_1ed04062596059c1d50fe1e1dcf570c3_raw_hash.csv

+ 1 - 0
extracted_data/UNFCCC/Brazil/BRA_DI_2023-05-15_raw.nc

@@ -0,0 +1 @@
+BRA_DI_1ed04062596059c1d50fe1e1dcf570c3_raw_hash.nc

+ 1 - 0
extracted_data/UNFCCC/Brazil/BRA_DI_2023-05-15_raw.yaml

@@ -0,0 +1 @@
+BRA_DI_1ed04062596059c1d50fe1e1dcf570c3_raw_hash.yaml

+ 1 - 0
extracted_data/UNFCCC/folder_mapping.json

@@ -5,6 +5,7 @@
     "MEX": "Mexico",
     "FRA": "France",
     "LIE": "Liechtenstein",
+    "BRA": "Brazil",
     "MLT": "Malta",
     "SVN": "Slovenia",
     "BGR": "Bulgaria",

+ 2 - 0
setup.cfg

@@ -46,6 +46,7 @@ install_requires =
     opencv-python
     ghostscript
     unfccc_di_api
+    dask
 
 [options.extras_require]
 dev =
@@ -65,6 +66,7 @@ dev =
     ghostscript
     ipykernel
     jupyter
+    dask
 
 
 [options.package_data]