123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- """
- This file holds functions that are used in CRF reading development like
- adding new tables or new submission years (and according country specific
- categories). Thue functions are tailored towards debug output and reading
- of single years in contrast to the production functions which are tailored
- towards the creation of full datasets including storage in the
- """
- import pandas as pd
- import xarray as xr
- import primap2 as pm2
- from typing import List, Optional
- from pathlib import Path
- from datetime import date
- from util import all_crf_countries
- from util import log_path
- import crf_specifications as crf
- from UNFCCC_CRF_reader_core import get_country_name
- from UNFCCC_CRF_reader_core import get_latest_date_for_country, read_crf_table
- from UNFCCC_CRF_reader_core import convert_crf_table_to_pm2if
- def read_year_to_test_specs(
- submission_year: int,
- data_year: Optional[int]=None,
- ) -> xr.Dataset:
- """
- Read one xlsx file (so one data year) for each country for a submission year to
- create log files and extend the specifications
- """
- if data_year is None:
- data_year=2000
- unknown_categories = []
- last_row_info = []
- ds_all = None
- print(f"CRF test reading for CRF{submission_year}. Using data year {data_year}")
- print("#"*80)
- try:
- crf_spec = getattr(crf, f"CRF{submission_year}")
- except:
- raise ValueError(f"No terminology exists for submission years {submission_year}, "
- f"{submission_year - 1}")
- tables = [table for table in crf_spec.keys()
- if crf_spec[table]["status"] == "tested"]
- print(f"The following tables are available in the " \
- f"CRF{submission_year} specification: {tables}")
- print("#" * 80)
- for country_code in all_crf_countries:
- # get country name
- country_name = get_country_name(country_code)
- print(f"Reading for {country_name}")
- # get specification and available tables
- try:
- submission_date = get_latest_date_for_country(country_code, submission_year)
- except:
- print(f"No submissions for country {country_name}, CRF{submission_year}")
- submission_date = None
- if submission_date is not None:
- for table in tables:
- # read table for all years
- ds_table, new_unknown_categories, new_last_row_info = read_crf_table(
- country_code, table, submission_year, date=submission_date, data_year=[data_year])
- # collect messages on unknown rows etc
- unknown_categories = unknown_categories + new_unknown_categories
- last_row_info = last_row_info + new_last_row_info
- # convert to PRIMAP2 IF
- # first drop the orig_cat_name col as it can have multiple values for
- # one category
- ds_table = ds_table.drop(columns=["orig_cat_name"])
- # TODO: catch entity conversion errors and make list of error entities
- # if we need to map entities pass this info to the conversion function
- if "entity_mapping" in crf_spec[table]:
- entity_mapping = crf_spec[table]["entity_mapping"]
- else:
- entity_mapping = None
- try:
- ds_table_if = convert_crf_table_to_pm2if(
- ds_table,
- submission_year,
- meta_data_input={"title": f"Data submitted in {submission_year} to the UNFCCC "
- f"in the common reporting format (CRF) by {country_name}. "
- f"Submission date: {submission_date}"},
- entity_mapping=entity_mapping,
- )
- # now convert to native PRIMAP2 format
- ds_table_pm2 = pm2.pm2io.from_interchange_format(ds_table_if)
- # combine per table DS
- if ds_all is None:
- ds_all = ds_table_pm2
- else:
- ds_all = ds_all.combine_first(ds_table_pm2)
- except:
- print(f"Error occured when converting table {table} for {country_name} to"
- f" PRIMAP2 IF.")
- # TODO: error handling and logging
- # process log messages.
- today = date.today()
- if len(unknown_categories) > 0:
- log_location = log_path / f"CRF{submission_year}" \
- / f"{data_year}_unknown_categories_{today.strftime('%Y-%m-%d')}.csv"
- print(f"Unknown rows found. Savin log to {log_location}")
- save_unknown_categories_info(unknown_categories, log_location)
- if len(last_row_info) > 0:
- log_location = log_path / f"CRF{submission_year}" \
- / f"{data_yar}_last_row_info_{today.strftime('%Y-%m-%d')}.csv"
- print(f"Data found in the last row. Savin log to "
- f"{log_location}")
- save_last_row_info(last_row_info, log_location)
- # save the data:
- compression = dict(zlib=True, complevel=9)
- output_folder = log_path / f"test_read_CRF{submission_year}"
- output_filename = f"CRF{submission_year}_{today.strftime('%Y-%m-%d')}"
- if not output_folder.exists():
- output_folder.mkdir()
- # write data in interchange format
- pm2.pm2io.write_interchange_format(output_folder / output_filename,
- ds_all.pr.to_interchange_format())
- # write data in native PRIMAP2 format
- encoding = {var: compression for var in ds_all.data_vars}
- ds_all.pr.to_netcdf(output_folder / (output_filename + ".nc"),
- encoding=encoding)
- return ds_all
- def save_unknown_categories_info(
- unknown_categories: List[List],
- file: Path,
- ) -> None:
- """
- Save information on unknown categories to a csv file.
- Parameters
- __________
- unknown_categories: List[List]
- List of lists with information on the unknown categories.
- (which table, country and year, and which categories)
- file: pathlib.Path
- File including path where the data should be stored
- """
- # process unknown categories
- df_unknown_cats = pd.DataFrame(unknown_categories, columns=["Table", "Country", "Category", "Year"])
- processed_cats = []
- all_tables = df_unknown_cats["Table"].unique()
- all_years = set(df_unknown_cats["Year"].unique())
- all_years = set([year for year in all_years if isinstance(year, int)])
- all_years = set([year for year in all_years if int(year) > 1989])
- for table in all_tables:
- df_cats_current_table = df_unknown_cats[df_unknown_cats["Table"] == table]
- cats_current_table = list(df_cats_current_table["Category"].unique())
- for cat in cats_current_table:
- df_current_cat_table = df_cats_current_table[df_cats_current_table["Category"] == cat]
- all_countries = df_current_cat_table["Country"].unique()
- countries_cat = ""
- for country in all_countries:
- years_country = df_current_cat_table[df_current_cat_table["Country"] == country]["Year"].unique()
- if set(years_country) == all_years:
- countries_cat = f"{countries_cat}; {country}"
- else:
- countries_cat = f"{countries_cat}; {country} ({years_country})"
- processed_cats.append([table, cat, countries_cat])
- if not file.parents[1].exists():
- file.parents[1].mkdir()
- if not file.parents[0].exists():
- file.parents[0].mkdir()
- df_processed_cats = pd.DataFrame(processed_cats, columns=["Table", "Category", "Countries"])
- df_processed_cats.to_csv(file, index=False)
- def save_last_row_info(
- last_row_info: List[List],
- file: Path,
- ) -> None:
- """
- Save information on data found in the last row read for a table.
- The last row read should not contain data. If it does contain data
- it is a hint that table size is larger for some countries than
- given in the specification and thus we might not read the full table.
- Parameters
- __________
- last_row_info: List[List]
- List of lists with information on the unknown categories.
- (which table, country and year, and which categories)
- file: pathlib.Path
- File including path where the data should be stored
- """
- # process last row with information messages
- df_last_row_info = pd.DataFrame(last_row_info, columns=["Table", "Country", "Category", "Year"])
- processed_last_row_info = []
- all_tables = df_last_row_info["Table"].unique()
- all_years = set(df_last_row_info["Year"].unique())
- all_years = set([year for year in all_years if isinstance(year, int)])
- all_years = set([year for year in all_years if year > 1989])
- for table in all_tables:
- df_last_row_current_table = df_last_row_info[df_last_row_info["Table"] == table]
- all_countries = df_last_row_current_table["Country"].unique()
- for country in all_countries:
- df_current_country_table = df_last_row_current_table[df_last_row_current_table["Country"] == country]
- all_categories = df_current_country_table["Category"].unique()
- cats_country = ""
- for cat in all_categories:
- years_category = df_current_country_table[df_current_country_table["Category"] == cat]["Year"].unique()
- if set(years_category) == all_years:
- cats_country = f"{cats_country}; {cat}"
- else:
- cats_country = f"{cats_country}; {cat} ({years_category})"
- processed_last_row_info.append([table, country, cats_country])
- if not file.parents[1].exists():
- file.parents[1].mkdir()
- if not file.parents[0].exists():
- file.parents[0].mkdir()
- df_processed_lost_row_info = pd.DataFrame(processed_last_row_info, columns=["Table", "Country", "Categories"])
- df_processed_lost_row_info.to_csv("test_last_row_info.csv", index=False)
|