Procházet zdrojové kódy

Improve integration of CRF reading into doit. development of function to read all new CRF submissions for a year

Johannes Gütschow před 2 roky
rodič
revize
1d9fb4e0d9

+ 7 - 1
code/UNFCCC_CRF_reader/UNFCCC_CRF_reader_core.py

@@ -17,7 +17,7 @@ from collections import Counter
 from typing import Dict, List, Optional, Tuple, Union
 from datetime import datetime
 import crf_specifications as crf
-from util import downloaded_data_path
+from util import downloaded_data_path, NoCRFFilesError
 
 
 ### reading functions
@@ -207,6 +207,12 @@ def read_crf_table(
                                 data_year=data_year,
                                 date=date,
                                 folder=folder)
+    if input_files == []:
+        raise NoCRFFilesError(f"No files found for {country_codes}, "
+                              f"submission_year={submission_year}, "
+                              f"data_year={data_year}, "
+                              f"date={date}, "
+                              f"folder={folder}.")
 
     # get specification
     try:

+ 296 - 96
code/UNFCCC_CRF_reader/UNFCCC_CRF_reader_prod.py

@@ -11,7 +11,7 @@ import pycountry
 import datalad.api
 from datetime import date
 from pathlib import Path
-from typing import Optional
+from typing import Optional, List
 
 #from . import crf_specifications as crf
 import crf_specifications as crf
@@ -24,7 +24,8 @@ from UNFCCC_CRF_reader_devel import save_unknown_categories_info
 from UNFCCC_CRF_reader_devel import save_last_row_info
 
 from util import code_path, log_path, \
-    custom_country_mapping, extracted_data_path, root_path
+    custom_country_mapping, extracted_data_path, root_path, \
+    all_crf_countries, NoCRFFilesError
 
 import sys
 sys.path.append(code_path.name)
@@ -54,23 +55,23 @@ def read_crf_for_country(
         country_code: str,
         submission_year: int,
         submission_date: Optional[str]=None,
+        re_read: Optional[bool]=True,
 ) -> xr.Dataset:
     """
     Read CRF data for given submission year and country. All tables
     available in the specification will be read for all years. Result
     will be written to appropriate country folder.
 
-    If you want to read data for more countries of from a different folder
-    use the test_read_crf_data function.
+    Folders are determined from the submission_year and country_code variables.
+    The output is a primap2 dataset (xarray based).
+
+    If you want to read data for more countries or from a different folder
+    use the read_latest_crf_submissions_for_year or test_read_crf_data function.
 
     IMPORTANT NOTE:
     Currently there is no consistency check between data for the same category
     read from different tables
 
-    The folder can either be given explicitly or if not given folders are determined
-    from the submission_year and country_code variables.
-    The output is a primap2 dataset (xarray based).
-
     We only save the data in the country folder if there were no messages like
     unknown rows to make sure that data that goes into the repository is complete.
     The result dataframe is returned in any case. In case log messages appeared
@@ -91,6 +92,9 @@ def read_crf_for_country(
         Read for a specific submission date (given as string as in the file names)
         If not specified latest data will be read
 
+    re_read: Optional(bool) default: True
+        Read the data also if it's already present
+
     Returns
     _______
         return value is a Pandas DataFrame with the read data in PRIMAP2 format
@@ -114,84 +118,105 @@ def read_crf_for_country(
     if submission_date is None:
         submission_date = get_latest_date_for_country(country_code, submission_year)
 
+    # check if data has been read already
+    output_folder = extracted_data_path / country_name.replace(" ", "_")
+    output_filename = f"{country_code}_CRF{submission_year}_{submission_date}"
+    read_data = True
+    print(f"re_read: {re_read}")
+    if not re_read:
+        print("test")
+        if output_folder.exists():
+            existing_files = output_folder.glob(f"{output_filename}.*")
+            existing_suffixes= [file.suffix for file in existing_files]
+            if all(suffix in existing_suffixes for suffix in [".nc", ".yaml", ".csv"]):
+                print(f"Data already available for {country_code}, "
+                      f"CRF{submission_year}, version {submission_date}. "
+                      "Skipping.")
+            else:
+                print(f"Partial data available for {country_code}, "
+                      f"CRF{submission_year}, version {submission_date}. "
+                      "Reading for completion. Please check if all files "
+                      "have been written.")
+
     ds_all = None
-    unknown_categories = []
-    last_row_info = []
-    for table in tables:
-        # read table for all years
-        ds_table, new_unknown_categories, new_last_row_info = read_crf_table(
-            country_code, table, submission_year, date=submission_date)#, data_year=[1990])
-
-        # collect messages on unknown rows etc
-        unknown_categories = unknown_categories + new_unknown_categories
-        last_row_info = last_row_info + new_last_row_info
-
-        # convert to PRIMAP2 IF
-        # first drop the orig_cat_name col as it can have multiple values for
-        # one category
-        ds_table = ds_table.drop(columns=["orig_cat_name"])
-
-        # if we need to map entities pass this info to the conversion function
-        if "entity_mapping" in crf_spec[table]:
-            entity_mapping = crf_spec[table]["entity_mapping"]
-        else:
-            entity_mapping = None
-        ds_table_if = convert_crf_table_to_pm2if(
-            ds_table,
-            2021,
-            meta_data_input={"title": f"Data submitted in {submission_year} to the UNFCCC "
-                                      f"in the common reporting format (CRF) by {country_name}. "
-                                      f"Submission date: {submission_date}"},
-            entity_mapping=entity_mapping,
-        )
-
-        # now convert to native PRIMAP2 format
-        ds_table_pm2 = pm2.pm2io.from_interchange_format(ds_table_if)
-
-        # combine per table DS
-        if ds_all is None:
-            ds_all = ds_table_pm2
-        else:
-            ds_all = ds_all.combine_first(ds_table_pm2)
-
-    # check if there were log messages.
-    save_data = True
-    if len(unknown_categories) > 0:
-        save_data = False
-        today = date.today()
-        log_location = log_path / f"CRF{submission_year}" \
-                       / f"{country_code}_unknown_categories_{today.strftime('%Y-%m-%d')}.csv"
-        print(f"Unknown rows found for {country_code}. Not saving data. Savin log to "
-              f"{log_location}" )
-        save_unknown_categories_info(unknown_categories, log_location)
-
-    if len(last_row_info) > 0:
-        save_data = False
-        today = date.today()
-        log_location = log_path / f"CRF{submission_year}" \
-                       / f"{country_code}_last_row_info_{today.strftime('%Y-%m-%d')}.csv"
-        print(f"Data found in the last row found for {country_code}. Not saving data. Savin log to "
-              f"{log_location}")
-        save_last_row_info(last_row_info, log_location)
-
-    if save_data:
-        compression = dict(zlib=True, complevel=9)
-        output_folder = extracted_data_path / country_name.replace(" ", "_")
-        output_filename = f"{country_code}_CRF{submission_year}_{submission_date}"
-
-        if not output_folder.exists():
-            output_folder.mkdir()
-            # folder mapping has to be updated !!!
-            # if we do it here we will do it a lot of times when reading several countries at once
-
-        # write data in interchange format
-        pm2.pm2io.write_interchange_format(output_folder / output_filename,
-                                           ds_all.pr.to_interchange_format())
-
-        # write data in native PRIAMP2 format
-        encoding = {var: compression for var in ds_all.data_vars}
-        ds_all.pr.to_netcdf(output_folder / (output_filename + ".nc"),
-                              encoding=encoding)
+    if read_data:
+        unknown_categories = []
+        last_row_info = []
+        for table in tables:
+            # read table for all years
+            ds_table, new_unknown_categories, new_last_row_info = read_crf_table(
+                country_code, table, submission_year, date=submission_date)#, data_year=[1990])
+
+            # collect messages on unknown rows etc
+            unknown_categories = unknown_categories + new_unknown_categories
+            last_row_info = last_row_info + new_last_row_info
+
+            # convert to PRIMAP2 IF
+            # first drop the orig_cat_name col as it can have multiple values for
+            # one category
+            ds_table = ds_table.drop(columns=["orig_cat_name"])
+
+            # if we need to map entities pass this info to the conversion function
+            if "entity_mapping" in crf_spec[table]:
+                entity_mapping = crf_spec[table]["entity_mapping"]
+            else:
+                entity_mapping = None
+            ds_table_if = convert_crf_table_to_pm2if(
+                ds_table,
+                2021,
+                meta_data_input={"title": f"Data submitted in {submission_year} to the UNFCCC "
+                                          f"in the common reporting format (CRF) by {country_name}. "
+                                          f"Submission date: {submission_date}"},
+                entity_mapping=entity_mapping,
+            )
+
+            # now convert to native PRIMAP2 format
+            ds_table_pm2 = pm2.pm2io.from_interchange_format(ds_table_if)
+
+            # combine per table DS
+            if ds_all is None:
+                ds_all = ds_table_pm2
+            else:
+                ds_all = ds_all.combine_first(ds_table_pm2)
+
+        # check if there were log messages.
+        save_data = True
+        if len(unknown_categories) > 0:
+            save_data = False
+            today = date.today()
+            log_location = log_path / f"CRF{submission_year}" \
+                           / f"{country_code}_unknown_categories_{today.strftime('%Y-%m-%d')}.csv"
+            print(f"Unknown rows found for {country_code}. Not saving data. Savin log to "
+                  f"{log_location}" )
+            save_unknown_categories_info(unknown_categories, log_location)
+
+        if len(last_row_info) > 0:
+            save_data = False
+            today = date.today()
+            log_location = log_path / f"CRF{submission_year}" \
+                           / f"{country_code}_last_row_info_{today.strftime('%Y-%m-%d')}.csv"
+            print(f"Data found in the last row found for {country_code}. Not saving data. Savin log to "
+                  f"{log_location}")
+            save_last_row_info(last_row_info, log_location)
+
+        if save_data:
+            compression = dict(zlib=True, complevel=9)
+            output_folder = extracted_data_path / country_name.replace(" ", "_")
+            output_filename = f"{country_code}_CRF{submission_year}_{submission_date}"
+
+            if not output_folder.exists():
+                output_folder.mkdir()
+                # folder mapping has to be updated !!!
+                # if we do it here we will do it a lot of times when reading several countries at once
+
+            # write data in interchange format
+            pm2.pm2io.write_interchange_format(output_folder / output_filename,
+                                               ds_all.pr.to_interchange_format())
+
+            # write data in native PRIMAP2 format
+            encoding = {var: compression for var in ds_all.data_vars}
+            ds_all.pr.to_netcdf(output_folder / (output_filename + ".nc"),
+                                  encoding=encoding)
 
     return ds_all
 
@@ -236,18 +261,29 @@ def read_crf_for_country_datalad(
     print(f"Using the UNFCCC_CRF_reader")
     print("")
 
+    # determine latest data
+    if submission_date is None:
+        print(f"No submission date given, find latest date.")
+        submission_date = get_latest_date_for_country(country_code, submission_year)
+    else:
+        print(f"Using given submissions date {submission_date}")
+
+    if submission_date is None:
+        # there is no data. Raise an exception
+        raise NoCRFFilesError(f"No submissions found for {country_code}, "
+                              f"submission_year={submission_year}, "
+                              f"date={date}")
+    else:
+        print(f"Latest submission date for CRF{submission_year} is {submission_date}")
+
     # get possible input files
     input_files = get_crf_files(country_codes=country_code,
                                 submission_year=submission_year,
                                 date=submission_date)
     if not input_files:
-        if submission_date is not None:
-            print(f"No possible input files found for {country}, CRF{submission_year}, "
-                  f"v{submission_date}. Are they already submitted and included in the "
-                  f"repository?")
-        else:
-            print(f"No possible input files found for {country}, CRF{submission_year}. "
-                  f"Are they already submitted and included in the repository?")
+        raise NoCRFFilesError(f"No possible input files found for {country}, CRF{submission_year}, "
+                              f"v{submission_date}. Are they already submitted and included in the "
+                              f"repository?")
     else:
         print(f"Found the following input_files:")
         for file in input_files:
@@ -257,6 +293,173 @@ def read_crf_for_country_datalad(
     # convert file's path to str
     input_files = [file.as_posix() for file in input_files]
 
+    # get output file
+    output_folder = extracted_data_path / country_name.replace(" ", "_")
+    output_files = [output_folder / f"{country_code}_CRF{submission_year}"
+                    f"_{submission_date}.{suffix}" for suffix
+                    in ['yaml', 'csv', 'nc']]
+    print(f"The following files are considered as output_files:")
+    for file in output_files:
+        print(file)
+    print("")
+
+    # convert file paths to str
+    output_files = [file.as_posix() for file in output_files]
+
+    print(f"Run the script using datalad run via the python api")
+    script = code_path / "UNFCCC_CRF_reader" / "read_UNFCCC_CRF_submission.py"
+
+    datalad.api.run(
+        cmd=f"./venv/bin/python3 {script.as_posix()} --country={country} "
+            f"--submission_year={submission_year} --submission_date={submission_date}",
+        dataset=root_path,
+        message=f"Read data for {country}, CRF{submission_year}, {submission_date}.",
+        inputs=input_files,
+        outputs=output_files,
+        dry_run=None,
+        explicit=True,
+    )
+
+
+def read_new_crf_for_year(
+        submission_year: int,
+        countries: Optional[List[str]]=None,
+        re_read: Optional[bool]=False,
+) -> dict:
+    """
+    Read CRF data for given submission year for all countries in
+    `countries` that have submitted data. If no `countries` list is
+    given, all countries are used.
+    When updated submission exist the latest will be read.
+    All tables available in the specification will be read for all years.
+    Results will be written to appropriate country folders.
+
+    If you want to read data from a different folder use the
+    test_read_crf_data function.
+
+    IMPORTANT NOTE:
+    Currently there is no consistency check between data for the same category
+    read from different tables
+
+    Parameters
+    __________
+
+    submission_year: int
+        Year of the submission of the data
+
+    countries: List[int] (optional)
+        List of countries to read. If not given reading is tried for all
+        CRF countries
+
+    re_read: bool (optional, default=False)
+        If true data will be read even if already read before.
+
+    TODO: write log with failed countries and what has been read
+
+    Returns
+    _______
+        list[str]: list with country codes for which the data has been read
+
+    """
+
+    if countries is None:
+        countries = all_crf_countries
+
+    read_countries = {}
+    for country in countries:
+        try:
+            country_df = read_crf_for_country(country, submission_year, re_read=re_read)
+            if country_df is None:
+                read_countries[country] = "skipped"
+            else:
+                read_countries[country] = "read"
+        except NoCRFFilesError:
+            print(f"No data for country {country}, {submission_year}")
+            read_countries[country] = "no data"
+        except:
+            print(f"Data for country {country}, {submission_year} could not be read")
+            read_countries[country]= "failed"
+
+    # print overview
+    successful_countries = [country for country in read_countries if read_countries[country] == "read"]
+    skipped_countries = [country for country in read_countries if read_countries[country] == "skipped"]
+    failed_countries = [country for country in read_countries if read_countries[country] == "failed"]
+    no_data_countries = [country for country in read_countries if read_countries[country] == "no data"]
+
+    print(f"Read data for countries {successful_countries}")
+    print(f"Skipped countries {skipped_countries}")
+    print(f"No data for countries {no_data_countries}")
+    print(f"!!!!! Reading failed for {failed_countries}. Check why")
+    return(read_countries)
+
+
+def read_new_crf_for_year_datalad(
+        submission_year: int,
+        countries: Optional[List[str]] = None,
+        re_read: Optional[bool] = False,
+) -> None:
+    """
+    TODO: this is just a copy of the one country function
+    Wrapper around read_crf_for_year_datalad which takes care of selecting input
+    and output files and using datalad run to trigger the data reading
+
+    Parameters
+    __________
+
+    submission_year: int
+        Year of the submission of the data
+
+    countries: List[int] (optional)
+        List of countries to read. If not given reading is tried for all
+        CRF countries
+
+    re_read: bool (optional, default=False)
+        If true data will be read even if already read before.
+
+    """
+
+    input_files = []
+    output_files = []
+    # loop over countries to collect input and output files
+    for country in countries:
+        # get the country code and name
+        # both could be given as input, so we need this two step process
+        if country in custom_country_mapping:
+            country_code = country
+        else:
+            country_code = get_country_code(country)
+        # now get the country name
+        country_name = get_country_name(country_code)
+
+        print(f"Attempting to read data for CRF{submission_year} from {country}.")
+        print("#"*80)
+        print("")
+
+        print(f"Using the UNFCCC_CRF_reader")
+        print("")
+
+        # get possible input files
+        new_input_files = get_crf_files(country_codes=country_code,
+                                    submission_year=submission_year,
+                                    date=submission_date)
+        new_input
+        if not input_files:
+            if submission_date is not None:
+                print(f"No possible input files found for {country}, CRF{submission_year}, "
+                      f"v{submission_date}. Are they already submitted and included in the "
+                      f"repository?")
+            else:
+                print(f"No possible input files found for {country}, CRF{submission_year}. "
+                      f"Are they already submitted and included in the repository?")
+        else:
+            print(f"Found the following input_files:")
+            for file in input_files:
+                print(file.name)
+            print("")
+
+    # convert file's path to str
+    input_files = [file.as_posix() for file in input_files]
+
     # get output file
     if submission_date is None:
         submission_date = get_latest_date_for_country(country_code, submission_year)
@@ -275,7 +478,7 @@ def read_crf_for_country_datalad(
 
     print(f"Run the script using datalad run via the python api")
     script = code_path / "UNFCCC_CRF_reader" / "read_UNFCCC_CRF_submission.py"
-    #try:
+
     datalad.api.run(
         cmd=f"./venv/bin/python3 {script.as_posix()} --country={country} "
             f"--submission_year={submission_year} --submission_date={submission_date}",
@@ -286,9 +489,6 @@ def read_crf_for_country_datalad(
         dry_run=None,
         explicit=True,
     )
-    #except IncompleteResultsError as exce:
-    #    print(f"Code did not run successfully:")
-    #    print(exce.failed)
 
 
 # function to read all available data (or list of countries?)

+ 8 - 4
code/UNFCCC_CRF_reader/read_UNFCCC_CRF_submission.py

@@ -8,20 +8,24 @@ import argparse
 
 parser = argparse.ArgumentParser()
 parser.add_argument('--country', help='Country name or code')
-parser.add_argument('--submission_year', help='Submission round to read')
+parser.add_argument('--submission_year', help='Submission round to read', type=int)
 parser.add_argument('--submission_date', help='Date of submission to read', default=None)
+parser.add_argument('--re_read', help='Read data also if already read before',
+                    action='store_true')
 
 args = parser.parse_args()
 
 country = args.country
 submission_year = args.submission_year
 submission_date = args.submission_date
-
+re_read = args.re_read
 if submission_date == 'None':
     submission_date = None
 
 read_crf_for_country(
     country,
-    submission_year=int(submission_year),
-    submission_date=submission_date)
+    submission_year=submission_year,
+    submission_date=submission_date,
+    re_read=re_read
+)
 

+ 10 - 4
code/UNFCCC_CRF_reader/read_UNFCCC_CRF_submission_datalad.py

@@ -11,17 +11,23 @@ parser = argparse.ArgumentParser()
 parser.add_argument('--country', help='Country name or code')
 parser.add_argument('--submission_year', help='Submission round to read')
 parser.add_argument('--submission_date', help='Date of submission to read', default=None)
+parser.add_argument('--re_read', help='Read data also if already read before',
+                    action='store_true')
 
 args = parser.parse_args()
 
 country = args.country
 submission_year = args.submission_year
 submission_date = args.submission_date
+re_read = args.re_read
+
 
 if submission_date == "None":
-        submission_date = None
+    submission_date = None
 
 read_crf_for_country_datalad(
-        country,
-        submission_year=int(submission_year),
-        submission_date=submission_date)
+    country,
+    submission_year=int(submission_year),
+    submission_date=submission_date,
+    re_read=re_read
+)

+ 30 - 0
code/UNFCCC_CRF_reader/read_new_UNFCCC_CRF_for_year.py

@@ -0,0 +1,30 @@
+"""
+This script is a wrapper around the read_crf_for_country
+function such that it can be called from datalad
+"""
+
+from UNFCCC_CRF_reader_prod import read_new_crf_for_year
+import argparse
+
+parser = argparse.ArgumentParser()
+parser.add_argument('--countries', help='List of country codes', default=None)
+parser.add_argument('--submission_year', help='Submission round to read', type=int)
+parser.add_argument('--submission_date', help='Date of submission to read', default=None)
+parser.add_argument('--re_read', help='Read data also if already read before',
+                    action='store_false')
+
+args = parser.parse_args()
+
+countries = args.countries
+if countries == "None":
+    countries = None
+submission_year = args.submission_year
+re_read = args.re_read
+
+read_new_crf_for_year(
+    submission_year=int(submission_year),
+    countries=countries,
+    re_read=re_read
+)
+
+

+ 32 - 0
code/UNFCCC_CRF_reader/read_new_UNFCCC_CRF_for_year_datalad.py

@@ -0,0 +1,32 @@
+"""
+wrapper around read_crf_for_country_datalad such that it can be called
+from doit in the current setup where doit runs on system python and
+not in the venv.
+"""
+
+from UNFCCC_CRF_reader_prod import read_new_crf_for_year_datalad
+from util import NoCRFFilesError
+import argparse
+
+parser = argparse.ArgumentParser()
+parser.add_argument('--countries', help='List of country codes', default=None)
+parser.add_argument('--submission_year', help='Submission round to read')
+parser.add_argument('--re_read', help='Read data also if already read before',
+                    action='store_false')
+
+args = parser.parse_args()
+
+countries = args.countries
+if countries == "None":
+    countries = None
+submission_year = args.submission_year
+re_read = args.re_read
+
+try:
+    read_new_crf_for_year_datalad(
+        submission_year=int(submission_year),
+        countries=countries,
+        re_read=re_read
+    )
+except NoCRFFilesError as err:
+    print(f"NoCRFFilesError: {err}")

+ 22 - 2
dodo.py

@@ -150,21 +150,41 @@ def task_read_unfccc_submission():
         'setup': ['setup_venv'],
     }
 
+
 # read UNFCCC submissions.
 # datalad run is called from within the read_UNFCCC_submission.py script
 read_config_crf = {
     "country": get_var('country', None),
     "submission_year": get_var('submission_year', None),
     "submission_date": get_var('submission_date', None),
+    "re_read": get_var('re_read', False),
+    "countries": get_var('countries', None),
 }
 
 def task_read_unfccc_crf_submission():
-    """ Read CRF submission for a country """
+    """ Read CRF submission for a country (will re-read if data already present)"""
     return {
         'actions': [f"./venv/bin/python code/UNFCCC_CRF_reader/read_UNFCCC_CRF_submission_datalad.py "
                     f"--country={read_config_crf['country']} "
                     f"--submission_year={read_config_crf['submission_year']} "
-                    f"--submission_date={read_config_crf['submission_date']}"],
+                    f"--submission_date={read_config_crf['submission_date']} "],
+        'verbosity': 2,
+        'setup': ['setup_venv'],
+    }
+
+
+def task_read_new_unfccc_crf_for_year():
+    """ Read CRF submission for all countries for given submission year. by default only reads
+    data not present yet. Only reads the latest updated submission for each country."""
+    actions = [f"./venv/bin/python code/UNFCCC_CRF_reader/read_new_UNFCCC_CRF_for_year_datalad.py "
+               f"--submission_year={read_config_crf['submission_year']} "]
+    if read_config_crf["countries"] is not None:
+            actions[0] = actions[0] + f"--countries={read_config_crf['countries']} "
+    if read_config_crf["re_read"]:
+        actions[0] = actions[0] + "--re_read"
+    return {
+        #'basename': "Read_CRF_year",
+        'actions': actions,
         'verbosity': 2,
         'setup': ['setup_venv'],
     }