|
@@ -20,8 +20,8 @@ from util import NoDIDataError, extracted_data_path, get_country_name
|
|
|
from util import nAI_countries, AI_countries
|
|
|
|
|
|
|
|
|
-def read_UNFCCC_DI_for_party(
|
|
|
- party_code: str,
|
|
|
+def read_UNFCCC_DI_for_country(
|
|
|
+ country_code: str,
|
|
|
category_groups: Optional[Dict]=None,
|
|
|
read_subsectors: bool=False,
|
|
|
save_data: Optional[bool]=True,
|
|
@@ -31,13 +31,13 @@ def read_UNFCCC_DI_for_party(
|
|
|
debug: Optional[bool]=False,
|
|
|
):
|
|
|
"""
|
|
|
- reads data for a party from the UNFCCC DI interface and saves to native and
|
|
|
+ reads data for a country from the UNFCCC DI interface and saves to native and
|
|
|
interchange format
|
|
|
"""
|
|
|
|
|
|
# read the data
|
|
|
- data_df = read_UNFCCC_DI_for_party_df(
|
|
|
- party_code=party_code,
|
|
|
+ data_df = read_UNFCCC_DI_for_country_df(
|
|
|
+ country_code=country_code,
|
|
|
category_groups=category_groups,
|
|
|
read_subsectors=read_subsectors,
|
|
|
debug=debug,
|
|
@@ -49,7 +49,7 @@ def read_UNFCCC_DI_for_party(
|
|
|
|
|
|
# determine filename
|
|
|
if save_data:
|
|
|
- filename = determine_filename(party_code, date_str, True)
|
|
|
+ filename = determine_filename(country_code, date_str, True)
|
|
|
else:
|
|
|
filename = None
|
|
|
|
|
@@ -72,7 +72,7 @@ def read_UNFCCC_DI_for_party(
|
|
|
return data_pm2
|
|
|
|
|
|
|
|
|
-def process_UNFCCC_DI_for_party(
|
|
|
+def process_UNFCCC_DI_for_country(
|
|
|
data_country: xr.Dataset,
|
|
|
country: str,
|
|
|
cat_terminology_in: str,
|
|
@@ -268,8 +268,8 @@ def process_UNFCCC_DI_for_party(
|
|
|
return data_country
|
|
|
|
|
|
|
|
|
-def read_UNFCCC_DI_for_party_df(
|
|
|
- party_code: str,
|
|
|
+def read_UNFCCC_DI_for_country_df(
|
|
|
+ country_code: str,
|
|
|
category_groups: Optional[Dict]=None,
|
|
|
read_subsectors: bool=False,
|
|
|
debug: Optional[bool]=False,
|
|
@@ -281,8 +281,8 @@ def read_UNFCCC_DI_for_party_df(
|
|
|
|
|
|
Parameters
|
|
|
----------
|
|
|
- party: str
|
|
|
- ISO3 code of the party (country names don't work, use the wrapper function)
|
|
|
+ country_code: str
|
|
|
+ ISO3 code of the country (country names don't work, use the wrapper function)
|
|
|
|
|
|
category_groups: dict (optional)
|
|
|
define which categories to read including filters on classification, measure,
|
|
@@ -307,26 +307,25 @@ def read_UNFCCC_DI_for_party_df(
|
|
|
|
|
|
# template for the query to the DI API
|
|
|
query_template = {
|
|
|
- "party_codes": [party_code],
|
|
|
+ "party_codes": [country_code],
|
|
|
"normalize_gas_names": True
|
|
|
}
|
|
|
|
|
|
# find country group
|
|
|
- if party_code in list(reader.non_annex_one_reader.parties["code"]):
|
|
|
+ if country_code in list(reader.non_annex_one_reader.parties["code"]):
|
|
|
ai_country = False
|
|
|
- elif party_code in list(reader.annex_one_reader.parties["code"]):
|
|
|
+ elif country_code in list(reader.annex_one_reader.parties["code"]):
|
|
|
ai_country = True
|
|
|
#di_data = reader.annex_one_reader.query(**query)
|
|
|
else:
|
|
|
- raise ValueError(f"Party code {party_code} found neither in AnnexI nor "
|
|
|
- f"non-AnnexI "
|
|
|
- f"party lists.")
|
|
|
+ raise ValueError(f"Country code {country_code} found neither in AnnexI nor "
|
|
|
+ f"non-AnnexI countrz lists.")
|
|
|
|
|
|
if category_groups is None:
|
|
|
# no category defs given, so use default which is all categories,
|
|
|
# all gases, but no other data
|
|
|
if debug:
|
|
|
- print(f"Using default config to read for party {party_code}")
|
|
|
+ print(f"Using default config to read for country {country_code}")
|
|
|
if ai_country:
|
|
|
all_gases = reader.annex_one_reader.gases["name"]
|
|
|
query = query_template
|
|
@@ -420,12 +419,12 @@ def read_UNFCCC_DI_for_party_df(
|
|
|
|
|
|
# if data has been collected print some information and save the data
|
|
|
if di_data is None:
|
|
|
- raise ValueError(f"No data collected for party {party_code} and category "
|
|
|
+ raise ValueError(f"No data collected for country {country_code} and category "
|
|
|
f"groups "
|
|
|
f"{category_groups}")
|
|
|
elif debug:
|
|
|
# print some information on collected data
|
|
|
- print(f"Collected data for party {party_code}")
|
|
|
+ print(f"Collected data for country {country_code}")
|
|
|
print("### Categories ###")
|
|
|
categories = di_data["category"].unique()
|
|
|
categories.sort()
|
|
@@ -618,9 +617,49 @@ def convert_DI_IF_data_to_pm2(
|
|
|
|
|
|
return data_pm2
|
|
|
|
|
|
+## datalad and pydoit interface functions
|
|
|
+def read_DI_for_country_datalad(
|
|
|
+ country: str,
|
|
|
+) -> None:
|
|
|
+ """
|
|
|
+ Wrapper around read_DI_for_country which takes care of selecting input
|
|
|
+ and output files and using datalad run to trigger the data reading
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ __________
|
|
|
+
|
|
|
+ country_codes: str
|
|
|
+ ISO 3-letter country code
|
|
|
+
|
|
|
+ """
|
|
|
+
|
|
|
+ # get all the info for the country
|
|
|
+ country_info = get_input_and_output_files_for_country(
|
|
|
+ country, submission_year=submission_year, verbose=True)
|
|
|
+
|
|
|
+ print(f"Attempting to read DI data for {country}.")
|
|
|
+ print("#"*80)
|
|
|
+ print("")
|
|
|
+ print(f"Using the UNFCCC_DI_reader")
|
|
|
+ print("")
|
|
|
+ print(f"Run the script using datalad run via the python api")
|
|
|
+ script = code_path / "UNFCCC_DI_reader" / "read_UNFCCC_DI_country.py"
|
|
|
+
|
|
|
+ cmd = f"./venv/bin/python3 {script.as_posix()} --country={country} ""
|
|
|
+ datalad.api.run(
|
|
|
+ cmd=cmd,
|
|
|
+ dataset=root_path,
|
|
|
+ message=f"Read DI data for {country}.",
|
|
|
+ inputs=country_info["input"],
|
|
|
+ outputs=country_info["output"],
|
|
|
+ dry_run=None,
|
|
|
+ explicit=True,
|
|
|
+ )
|
|
|
+
|
|
|
+## helper functions
|
|
|
|
|
|
def determine_filename(
|
|
|
- party_code: str,
|
|
|
+ country_code: str,
|
|
|
date_str: str,
|
|
|
raw: bool=False,
|
|
|
)->Path:
|
|
@@ -630,7 +669,7 @@ def determine_filename(
|
|
|
|
|
|
Parameters
|
|
|
----------
|
|
|
- party_code: str
|
|
|
+ country_code: str
|
|
|
ISO 3 letter code of the country
|
|
|
date_str:
|
|
|
formatted date string
|
|
@@ -647,32 +686,32 @@ def determine_filename(
|
|
|
with open(extracted_data_path / "folder_mapping.json", "r") as mapping_file:
|
|
|
folder_mapping = json.load(mapping_file)
|
|
|
|
|
|
- if party_code in folder_mapping:
|
|
|
+ if country_code in folder_mapping:
|
|
|
file_filter = {}
|
|
|
- file_filter["party"] = party_code
|
|
|
- country_folders = folder_mapping[party_code]
|
|
|
+ file_filter["party"] = country_code
|
|
|
+ country_folders = folder_mapping[country_code]
|
|
|
if isinstance(country_folders, str):
|
|
|
# only one folder
|
|
|
- filename = Path(country_folders) / f"{party_code}_DI_{date_str}"
|
|
|
+ filename = Path(country_folders) / f"{country_code}_DI_{date_str}"
|
|
|
|
|
|
else:
|
|
|
raise ValueError("More than one output folder for country "
|
|
|
- f"{party_code}. This should not happen.")
|
|
|
+ f"{country_code}. This should not happen.")
|
|
|
else:
|
|
|
# folder not in mapping. It will be created if not present yet
|
|
|
- party_name = get_country_name(party_code)
|
|
|
- country_folder = extracted_data_path / party_name.replace(" ", "_")
|
|
|
+ country_name = get_country_name(country_code)
|
|
|
+ country_folder = extracted_data_path / country_name.replace(" ", "_")
|
|
|
if country_folder.exists():
|
|
|
- print(f"Output folder {party_name.replace(' ', '_')} for country "
|
|
|
- f"{party_code} exists but is not in folder mapping. Update "
|
|
|
+ print(f"Output folder {country_name.replace(' ', '_')} for country "
|
|
|
+ f"{country_code} exists but is not in folder mapping. Update "
|
|
|
"folder mapping")
|
|
|
else:
|
|
|
country_folder.mkdir()
|
|
|
|
|
|
if raw:
|
|
|
- filename = Path(country_folder) / f"{party_code}_DI_{date_str}_raw"
|
|
|
+ filename = Path(country_folder) / f"{country_code}_DI_{date_str}_raw"
|
|
|
else:
|
|
|
- filename = Path(country_folder) / f"{party_code}_DI_{date_str}"
|
|
|
+ filename = Path(country_folder) / f"{country_code}_DI_{date_str}"
|
|
|
|
|
|
return filename
|
|
|
|
|
@@ -736,6 +775,85 @@ def convert_categories(
|
|
|
|
|
|
return ds_converted
|
|
|
|
|
|
+def get_input_and_output_files_for_country(
|
|
|
+ country: str,
|
|
|
+ submission_year: int,
|
|
|
+ submission_date: Optional[str]=None,
|
|
|
+ verbose: Optional[bool]=True,
|
|
|
+) -> Dict[str, Union[List, str]]:
|
|
|
+ """
|
|
|
+ Get input and output files for a given country
|
|
|
+ """
|
|
|
+
|
|
|
+ country_info = {}
|
|
|
+
|
|
|
+ if country in custom_country_mapping:
|
|
|
+ country_code = country
|
|
|
+ else:
|
|
|
+ country_code = get_country_code(country)
|
|
|
+ # now get the country name
|
|
|
+ country_name = get_country_name(country_code)
|
|
|
+ country_info["code"] = country_code
|
|
|
+ country_info["name"] = country_name
|
|
|
+
|
|
|
+ # determine latest data
|
|
|
+ print(f"Determining input and output files for {country}")
|
|
|
+ if submission_date is None:
|
|
|
+ if verbose:
|
|
|
+ print(f"No submission date given, find latest date.")
|
|
|
+ submission_date = get_latest_date_for_country(country_code, submission_year)
|
|
|
+ else:
|
|
|
+ if verbose:
|
|
|
+ print(f"Using given submissions date {submission_date}")
|
|
|
+
|
|
|
+ if submission_date is None:
|
|
|
+ # there is no data. Raise an exception
|
|
|
+ raise NoCRFFilesError(f"No submissions found for {country_code}, "
|
|
|
+ f"submission_year={submission_year}, "
|
|
|
+ f"date={date}")
|
|
|
+ else:
|
|
|
+ if verbose:
|
|
|
+ print(f"Latest submission date for CRF{submission_year} is {submission_date}")
|
|
|
+ country_info["date"] = submission_date
|
|
|
+
|
|
|
+ # get possible input files
|
|
|
+ input_files = get_crf_files(country_codes=country_code,
|
|
|
+ submission_year=submission_year,
|
|
|
+ date=submission_date)
|
|
|
+ if not input_files:
|
|
|
+ raise NoCRFFilesError(f"No possible input files found for {country}, CRF{submission_year}, "
|
|
|
+ f"v{submission_date}. Are they already submitted and included in the "
|
|
|
+ f"repository?")
|
|
|
+ elif verbose:
|
|
|
+ print(f"Found the following input_files:")
|
|
|
+ for file in input_files:
|
|
|
+ print(file.name)
|
|
|
+ print("")
|
|
|
+
|
|
|
+
|
|
|
+ # convert file's path to str
|
|
|
+ input_files = [file.as_posix() for file in input_files]
|
|
|
+ country_info["input"] = input_files
|
|
|
+
|
|
|
+ # get output file
|
|
|
+ output_folder = extracted_data_path / country_name.replace(" ", "_")
|
|
|
+ output_files = [output_folder / f"{country_code}_CRF{submission_year}"
|
|
|
+ f"_{submission_date}.{suffix}" for suffix
|
|
|
+ in ['yaml', 'csv', 'nc']]
|
|
|
+ if verbose:
|
|
|
+ print(f"The following files are considered as output_files:")
|
|
|
+ for file in output_files:
|
|
|
+ print(file)
|
|
|
+ print("")
|
|
|
+
|
|
|
+ # check if output data present
|
|
|
+
|
|
|
+ # convert file paths to str
|
|
|
+ output_files = [file.as_posix() for file in output_files]
|
|
|
+ country_info["output"] = output_files
|
|
|
+
|
|
|
+ return country_info
|
|
|
+
|
|
|
# TODO
|
|
|
|
|
|
# functions
|