@@ -5,9 +5,535 @@ well as for test-reading to check for new categories etc.
import re
-from typing import Dict, Union, List, Optional, Union
+import json
+import pandas as pd
+import xarray as xr
+import primap2 as pm2
from pathlib import Path
from treelib import Tree
+from operator import itemgetter
+from collections import Counter
+from typing import Dict, List, Optional, Tuple, Union
+from datetime import datetime
+import crf_specifications as crf
+### reading functions
+def convert_crf_table_to_pm2if(
+ df_table: pd.DataFrame,
+ submission_year: int,
+ entity_mapping: Optional[Dict[str,str]]=None,
+ coords_defaults_input: Optional[Dict[str,str]]=None,
+ filter_remove_input: Optional[Dict[str,Dict[str,Union[str,List]]]]=None,
+ filter_keep_input: Optional[Dict[str,Dict[str,Union[str,List]]]]=None,
+ meta_data_input: Optional[Dict[str,str]]=None,
+) -> pd.DataFrame:
+ """
+ Converts a given pandas long format crf table to PRIMAP2 interchange format
+ Parameters
+ __________
+ df_table: pd.DataFrame
+ Data to convert
+ submission_year: int
+ Year of submission
+ entity_mapping: Optional[Dict[str,str]]
+ Mapping of entities to PRIMAP2 format. Not necessary for all tables
+ coords_defaults_input: Optional[Dict[str,str]],
+ Additional default values for coordinates. (e.g. "Total" for `type`)
+ filter_remove_input: Optional[Dict[str,Dict[str,Union[str,List]]]]
+ Filter to remove data during conversion. The format is as in
+ filter_keep_input: Optional[Dict[str,Dict[str,Union[str,List]]]]
+ Filter to keep only specified data during conversion.
+ The format is as in PRIMAP2
+ meta_data_input: Optional[Dict[str,str]]
+ Meta data information. If values filled by this function automatically
+ are given as input the automatic values are overwritten.
+ Returns
+ _______
+ pd.DataFrame:
+ Pandas DataFrame containing the data in PRIMAP2 interchange format
+ Metadata is stored as attrs in the DataFrame
+ """
+ coords_cols = {
+ "category": "category",
+ "entity": "entity",
+ "unit": "unit",
+ "sec_cats__type": "type",
+ "area": "country",
+ "data": "data",
+ }
+ add_coords_cols = {
+ # "orig_cat_name": ["orig_cat_name", "category"],
+ }
+ coords_terminologies = {
+ "area": "ISO3",
+ "category": f"CRF2013_{submission_year}",
+ "scenario": "PRIMAP",
+ "type": "CRF2013",
+ }
+ coords_defaults = {
+ "source": "UNFCCC",
+ "provenance": "measured",
+ "scenario": f"CRF{submission_year}",
+ }
+ if coords_defaults_input is not None:
+ for key in coords_defaults_input.keys():
+ coords_defaults[key] = coords_defaults_input[key]
+ coords_value_mapping = {
+ "unit": "PRIMAP1",
+ "entity": "PRIMAP1",
+ }
+ if entity_mapping is not None:
+ coords_value_mapping["entity"] = entity_mapping
+ #coords_value_filling_template = {
+ #}
+ filter_remove = {
+ "f1": {
+ "category": ["\IGNORE"],
+ }
+ }
+ if filter_remove_input is not None:
+ for key in filter_remove_input.keys():
+ filter_remove[key] = filter_remove_input[key]
+ filter_keep = {
+ }
+ if filter_keep_input is not None:
+ for key in filter_keep_input.keys():
+ filter_keep[key] = filter_keep_input[key]
+ meta_data = {
+ "references": f"https://unfccc.int/ghg-inventories-annex-i-parties/{submission_year}",
+ "rights": "XXXX",
+ "contact": "johannes.guetschow@pik-potsdam.de",
+ "title": f"Data submitted in {submission_year} to the UNFCCC in the common reporting format (CRF)",
+ "comment": "Read fom xlsx file by Johannes Gütschow",
+ "institution": "United Nations Framework Convention on Climate Change (www.unfccc.int)",
+ }
+ if meta_data_input is not None:
+ for key in meta_data_input.keys():
+ meta_data[key] = meta_data_input[key]
+ df_table_if = pm2.pm2io.convert_long_dataframe_if(
+ df_table,
+ coords_cols=coords_cols,
+ add_coords_cols=add_coords_cols,
+ coords_defaults=coords_defaults,
+ coords_terminologies=coords_terminologies,
+ coords_value_mapping=coords_value_mapping,
+ #coords_value_filling=coords_value_filling,
+ filter_remove=filter_remove,
+ filter_keep=filter_keep,
+ meta_data=meta_data
+ )
+ return df_table_if
+def read_crf_table(
+ country_codes: Union[str, List[str]],
+ table: str,
+ submission_year: int,
+ data_year: Optional[Union[int, List[int]]]=None,
+ date: Optional[str]=None,
+ folder: Optional[str]=None,
+) -> Tuple[pd.DataFrame, List[List], List[List]]:
+ """
+ Read CRF table for given submission year and country / or countries
+ This function can read for multiple years and countries but only a single
+ table. The reason is that combining data from different tables needs
+ consistency checks while combining for different years and countries does not.
+ The folder can either be given explicitly or if not given folders are determined
+ from the submission_year and country_code variables
+ Parameters
+ __________
+ country_codes: str or list[str]
+ ISO 3-letter country code or list of country codes
+ table: str
+ name of the table sheet in the CRF xlsx file
+ submission_year: int
+ Year of the submission of the data
+ data_year: int or List of int (optional)
+ if int a single data year will be read. if a list of ints is given these
+ years will be read. If no nothing is given all data years will be read
+ date: str (optional, default is "latest")
+ readonly submission from the given date
+ folder: str (optional)
+ Folder that contains the xls files. If not given fodlers are determined by the
+ submissions_year and country_code variables
+ Returns
+ _______
+ Tuple[pd.DataFrame, List[List], List[List]]:
+ * First return parameter is the data as a pandas DataFrame in long format
+ * Second return parameter is a list of unknown categories / row headers
+ * Third return parameter holds information on data found in the last read row.
+ This is used as a hint to check if table specifications might have to be adapted
+ as country submitted tables are longer than expected.
+ """
+ if isinstance(country_codes, str):
+ country_codes = [country_codes]
+ # get file names and locations
+ # we're filtering for country and submission year here but in the repository setup
+ # we should only have files for one country and submission in the folder. But the
+ # function can also be used on a given folder and then the filter is useful.
+ input_files = []
+ if folder is None:
+ root = Path(__file__).parents[3]
+ #root = Path(os.getcwd()).parents
+ data_folder = root / "downloaded_data" / "UNFCCC"
+ submission_folder = f"CRF{submission_year}"
+ with open(data_folder / "folder_mapping.json", "r") as mapping_file:
+ folder_mapping = json.load(mapping_file)
+ # use country default folders
+ country_folders = []
+ for country_code in country_codes:
+ if country_code in folder_mapping:
+ new_country_folders = folder_mapping[country_code]
+ if isinstance(new_country_folders, str):
+ # only one folder
+ country_folders = country_folders + \
+ [data_folder / new_country_folders / submission_folder]
+ else:
+ country_folders = country_folders + \
+ [data_folder / folder / submission_folder
+ for folder in new_country_folders]
+ else:
+ raise ValueError(f"No data folder found for country {country_code}. "
+ f"Check if folder mapping is up to date.")
+ else:
+ country_folders = [folder]
+ file_filter_template = {}
+ file_filter_template["submission_year"] = submission_year
+ file_filter_template["party"] = country_codes
+ if data_year is not None:
+ file_filter_template["data_year"] = data_year
+ for input_folder in country_folders:
+ input_folder = Path(input_folder)
+ if input_folder.exists():
+ # if desired find the latest date and only read that
+ # has to be done per country
+ if date == "latest":
+ for country in country_codes:
+ file_filter = file_filter_template.copy()
+ file_filter["party"] = country
+ dates = get_submission_dates(folder, file_filter)
+ file_filter["date"] = find_latest_date(dates)
+ input_files = input_files + \
+ filter_filenames(input_folder.glob("*.xlsx"),
+ **file_filter)
+ else:
+ file_filter = file_filter_template.copy()
+ if date is not None:
+ file_filter["date"] = date
+ input_files = input_files + \
+ filter_filenames(input_folder.glob("*.xlsx"),
+ **file_filter)
+ else:
+ raise ValueError(f"Folder {input_folder} does not exist")
+ # get specification
+ try:
+ crf_spec = getattr(crf, f"CRF{submission_year}")
+ except:
+ raise ValueError(f"No terminology exists for submission year {submission_year}")
+ # now loop over files and read them
+ df_all = None
+ unknown_rows = []
+ last_row_info = []
+ for file in input_files:
+ df_this_file, unknown_rows_this_file, last_row_info_this_file = \
+ read_crf_table_from_file(file, table, crf_spec[table])
+ if df_all is None:
+ df_all = df_this_file.copy(deep=True)
+ unknown_rows = unknown_rows_this_file
+ last_row_info = last_row_info_this_file
+ else:
+ df_all = pd.concat([df_this_file, df_all])
+ unknown_rows = unknown_rows + unknown_rows_this_file
+ last_row_info = last_row_info + last_row_info_this_file
+ return df_all, unknown_rows, last_row_info
+def read_crf_table_from_file(
+ file: Path,
+ table: str,
+ table_spec: Dict[str, Dict],
+) -> Tuple[pd.DataFrame, List[List], List[List]]:
+ """
+ Read a single CRF table from a given file. This is the core function of the CRF
+ reading process as it reads the data from xls and performs the category mapping.
+ Parameters
+ __________
+ file: Path
+ file to read from
+ table: str
+ table to read (name of the sheet in the xlsx file)
+ table_spec: Dict[str, Dict]
+ Specification for the given table, e.g. CRF2021["Table4"]
+ Returns
+ _______
+ Tuple[pd.DataFrame, List[List], List[List]]:
+ * First return parameter is the data as a pandas DataFrame in long format
+ * Second return parameter is a list of unknown categories / row headers
+ * Third return parameter holds information on data found in the last read row.
+ This is used as a hint to check if table specifications might have to be adapted
+ as country submitted tables are longer than expected.
+ TODO: add verbosity option for debugging?
+ """
+ table_properties = table_spec["table"]
+ file_info = get_info_from_crf_filename(file.name)
+ # find non-unique categories in mapping
+ all_cats_mapping = table_spec["sector_mapping"]
+ all_cats = [cat[0] for cat in all_cats_mapping]
+ unique_cats = [cat for (cat, count) in Counter(all_cats).items() if count == 1]
+ unique_cat_tuples = [mapping for mapping in all_cats_mapping if mapping[0] in unique_cats]
+ unique_mapping = dict(zip([tup[0] for tup in unique_cat_tuples],
+ [tup[1] for tup in unique_cat_tuples]))
+ non_unique_cats = [cat for (cat, count) in Counter(all_cats).items() if count > 1]
+ # prepare the sector hierarchy
+ if non_unique_cats:
+ # if we have non-unique categories present we need the information on
+ # levels within the category hierarchy
+ category_tree = create_category_tree(all_cats_mapping, table, file_info["party"])
+ # prepare index colum information
+ cat_col = table_properties["col_for_categories"]
+ index_cols = table_properties["categories"] + [cat_col]
+ cols_for_space_stripping = [table_properties["col_for_categories"]]
+ # read the data
+ print(f"Reading table {table} for year {file_info['data_year']} from {file.name}.")
+ skiprows = table_properties["firstrow"] - 1
+ nrows = table_properties["lastrow"] - skiprows + 1 # read one row more to check if we reached the end
+ # we read with user specific NaN treatment as the NaN treatment is part of the conversion to
+ # PRIMAP2 format.
+ df_raw = pd.read_excel(file, sheet_name=table, skiprows=skiprows , nrows=nrows, engine="openpyxl",
+ na_values=['-1.#IND', '-1.#QNAN', '-NaN', '-nan', '1.#IND', '1.#QNAN',
+ 'NULL', 'NaN', ''], keep_default_na=False)
+ if len(df_raw) < nrows:
+ #print(f"read data truncated because of all-nan rows")
+ last_row_nan = True
+ else:
+ last_row_nan = False
+ #### prepare the header (2 row header, first entity, then unit)
+ # We do this before removing columns and any other processing to
+ # have consistent column names in the configuration and to avoid
+ # "Unnamed: X" column names which appear after reading of merged
+ # cells
+ # the filling leads to long and a bit confusing headers, but as long
+ # as pandas can not fill values of merged cells in all individual cells
+ # we have to use some filling algorithm.
+ df_header = df_raw.iloc[0:len(table_properties["header"])-1].copy(deep=True)
+ df_header.loc[-1] = df_header.columns.values
+ df_header.index = df_header.index + 1
+ # replace "Unnamed: X" colum names by nan to fill from left in next step
+ df_header = df_header.sort_index()
+ df_header = df_header.replace(r"Unnamed: [0-9]{1,2}", np.nan, regex=True)
+ header = []
+ # fill nans with the last value from the left
+ for row in range(0, len(df_header)):
+ header.append(list(df_header.iloc[row].fillna(method="ffill")))
+ # combine all non-unit rows into one
+ entities = None
+ units = None
+ for idx, row in enumerate(header):
+ if table_properties["header"][idx] == "unit":
+ units = row
+ else:
+ if entities is None:
+ entities = row
+ else:
+ for col, value in enumerate(row):
+ if str(value) != "nan":
+ entities[col] = f"{entities[col]} {value}"
+ if units is None:
+ raise ValueError(f"Specification for table {table} does not contain unit information.")
+ # remove double spaces
+ entities = [entity.strip() for entity in entities]
+ entities = [re.sub('\s+', ' ', entity) for entity in entities]
+ # replace the old header
+ if len(header) > 2:
+ df_current = df_raw.drop(index=df_raw.iloc[0:len(header)-2].index)
+ else:
+ df_current = df_raw
+ df_current.iloc[0] = units
+ df_current.columns = entities
+ #### standardized header is finalized
+ # remove all columns to ignore
+ df_current = df_current.drop(columns=table_properties["cols_to_ignore"])
+ # remove double spaces
+ for col in cols_for_space_stripping:
+ df_current[col] = df_current[col].str.strip()
+ df_current[col] = df_current[col].replace('\s+', ' ', regex=True)
+ # prepare for sector mapping by initializing result lists and
+ # variables
+ new_cats = [[''] * len(table_properties["categories"])] * len(df_current)
+ # copy the header rows which are not part of the index (unit)
+ new_cats[0] = [df_current.iloc[0][cat_col]] * len(table_properties["categories"])
+ # do the sector mapping here as we need to keep track of unmapped categories
+ # and also need to consider the order of elements for the mapping
+ unknown_categories = []
+ info_last_row = []
+ if non_unique_cats:
+ # need to initialize the tree parsing.
+ last_parent = category_tree.get_node("root")
+ all_nodes = set([category_tree.get_node(node).tag for node in category_tree.nodes])
+ for idx in range(1, len(df_current)):
+ current_cat = df_current.iloc[idx][cat_col]
+ if current_cat in table_properties["stop_cats"]:
+ # we've reached the end of the table, so stop processing
+ # and remove all further rows
+ df_current = df_current.drop(df_current.index[idx:])
+ new_cats = new_cats[0:idx]
+ break
+ # check if current category is a child of the last node
+ children = dict([[child.tag, child.identifier]
+ for child in category_tree.children(last_parent.identifier)])
+ if current_cat in children.keys():
+ # the current category is a child of the current parent
+ # do the mapping
+ node = category_tree.get_node(children[current_cat])
+ new_cats[idx] = node.data[1]
+ # check if the node has children
+ new_children = category_tree.children(node.identifier)
+ if new_children:
+ last_parent = node
+ else:
+ # two possibilities
+ # 1. The category is at a higher point in the hierarchy
+ # 2. It's missing in the hierarchy
+ # we have to first move up the hierarchy
+ # first check if category is present at all
+ if current_cat in all_nodes:
+ old_parent = last_parent
+ while (current_cat not in children.keys()) and \
+ (last_parent.identifier != "root"):
+ last_parent = category_tree.get_node(
+ last_parent.predecessor(category_tree.identifier))
+ children = dict([[child.tag, child.identifier]
+ for child in category_tree.children(last_parent.identifier)])
+ if (last_parent.identifier == "root") and \
+ (current_cat not in children.keys()):
+ # we have not found the category as direct child of any of the
+ # predecessors. Thus it is missing in the specification in
+ # that place
+ print(f"Unknown category '{current_cat}' found in {table} for {file_info['party']}, "
+ f"{file_info['data_year']} (last parent: {old_parent.tag}).")
+ unknown_categories.append([table, file_info["party"], current_cat, file_info['data_year']])
+ # copy back the parent info to continue with next category
+ last_parent = old_parent
+ else:
+ # do the mapping
+ node = category_tree.get_node(children[current_cat])
+ new_cats[idx] = node.data[1]
+ # check if the node has children
+ new_children = category_tree.children(node.identifier)
+ if new_children:
+ last_parent = node
+ else:
+ print(f"Unknown category '{current_cat}' found in {table} for {file_info['party']}, {file_info['data_year']}.")
+ unknown_categories.append([table, file_info["party"], current_cat, file_info['data_year']])
+ else:
+ for idx in range(1, len(df_current)):
+ current_cat = df_current.iloc[idx][cat_col]
+ if current_cat in table_properties["stop_cats"]:
+ # we've reached the end of the table, so stop processing
+ # and remove all further rows
+ df_current = df_current.drop(df_current.index[idx:])
+ new_cats = new_cats[0:idx]
+ break
+ if current_cat in all_cats:
+ new_cats[idx] = unique_mapping[current_cat]
+ if (idx == len(df_current) - 1) and not last_row_nan:
+ print(f"found information in last row: category {current_cat}, row {idx}")
+ info_last_row.append([table, file_info["party"], current_cat, file_info['data_year']])
+ else:
+ print(f"Unknown category '{current_cat}' found in {table} for {file_info['party']}, {file_info['data_year']}.")
+ unknown_categories.append([table, file_info["party"], current_cat, file_info['data_year']])
+ for idx, col in enumerate(table_properties["categories"]):
+ df_current.insert(loc=idx, column=col, value=
+ [cat[idx] for cat in new_cats])
+ # set index
+ df_current = df_current.set_index(index_cols)
+ # process the unit information using the primap2 functions
+ df_current = pm2.pm2io.nir_add_unit_information(df_current, **table_properties["unit_info"])
+ # convert to long format
+ header_long = table_properties["categories"] + \
+ ["orig_cat_name", "entity", "unit", "time", "data"]
+ df_long = pm2.pm2io.nir_convert_df_to_long(
+ df_current, file_info["data_year"], header_long=header_long)
+ # add country information
+ df_long.insert(0, column="country", value=file_info["party"])
+ #df_long.insert(1, column="submission", value=f"CRF{file_info['submission_year']}")
+ if "coords_defaults" in table_spec.keys():
+ for col in table_spec["coords_defaults"]:
+ df_long.insert(2, column=col, value=table_spec["coords_defaults"][col])
+ return df_long, unknown_categories, info_last_row
def get_info_from_crf_filename(
filename: str,
@@ -287,3 +813,97 @@ def filter_category(
return new_mapping
+def get_submission_dates(
+ folder: Path,
+ file_filter: Dict[str, Union[str, int, List]],
+ """
+ Returns all submission dates available in a folder
+ Parameters
+ __________
+ folder: Path
+ Folder to analyze
+ file_filter: Dict[str, Union[str, int, List]]
+ Dict with possible fields "party", "submission_year", "data_year"
+ Returns
+ _______
+ List[str]:
+ List of dates as str
+ """
+ if "date" in file_filter:
+ raise ValueError(f"'date' present in 'file_filter'. This makes no sense as "
+ f"the function's purpose is to return available dates.")
+ if folder.exists():
+ files = filter_filenames(folder.glob("*.xlsx"), **file_filter)
+ else:
+ raise ValueError(f"Folder {folder} does not exist")
+ dates = [get_info_from_crf_filename(file.name)["date"] for file in files]
+ dates = list(set(dates))
+ return dates
+def get_submission_parties(
+ folder: Path,
+ file_filter: Dict[str, Union[str, int, List]],
+ """
+ Returns all submission dates available in a folder
+ Parameters
+ __________
+ folder: Path
+ Folder to analyze
+ file_filter: Dict[str, Union[str, int, List]]
+ Dict with possible fields "submission_year", "data_year", "date"
+ Returns
+ _______
+ List[str]:
+ List of parties as str
+ """
+ if "party" in file_filter:
+ raise ValueError(f"'party' present in 'file_filter'. This makes no sense as "
+ f"the function's purpose is to return available parties.")
+ if folder.exists():
+ files = filter_filenames(folder.glob("*.xlsx"), **file_filter)
+ else:
+ raise ValueError(f"Folder {folder} does not exist")
+ parties = [get_info_from_crf_filename(file.name)["party"] for file in files]
+ parties = list(set(parties))
+ return parties
+def find_latest_date(
+ dates: List[str],
+)-> str:
+ """
+ Returns the latest date in a list of dates as str in the format
+ ddmmyyyy
+ Parameters
+ __________
+ dates: List[str]
+ List of dates
+ Returns
+ _______
+ str: latest date
+ """
+ dates_datetime = [[date, datetime.strptime(date, "%d%m%Y")] for date in dates]
+ dates_datetime = sorted(dates_datetime, key=itemgetter(1))
+ return dates_datetime[-1][0]