123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536 |
- #import re
- #
- #from treelib import Tree
- #import pandas as pd
- import xarray as xr
- import primap2 as pm2
- #import numpy as np
- #import pycountry
- import datalad.api
- from datetime import date
- #from pathlib import Path
- from typing import Optional, List, Dict, Union
- #from . import crf_specifications as crf
- import crf_specifications as crf
- from UNFCCC_CRF_reader_core import read_crf_table
- from UNFCCC_CRF_reader_core import convert_crf_table_to_pm2if
- from UNFCCC_CRF_reader_core import get_latest_date_for_country
- from UNFCCC_CRF_reader_core import get_crf_files
- from UNFCCC_CRF_reader_core import get_country_name
- from UNFCCC_CRF_reader_devel import save_unknown_categories_info
- from UNFCCC_CRF_reader_devel import save_last_row_info
- from util import code_path, log_path, \
- custom_country_mapping, extracted_data_path, root_path, \
- all_crf_countries, NoCRFFilesError
- import sys
- sys.path.append(code_path.name)
- from UNFCCC_reader.get_submissions_info import get_country_code
- # functions:
- # * testing fucntions
- # ** read one or more table(s) for all countries
- # (and a if desired only a single year) and write
- # output files with missing sectors etc
- # **
- # TODO: add function to read several / all countries
- # general approach:
- # main code in a function that reads on table from one file.
- # return raw pandas DF for use in different functions
- # wrappers around this function to read for a whole country or for test reading where we also
- # write files with missing sectors etc.
- # merging functions use native pm2 format
- def read_crf_for_country(
- country_code: str,
- submission_year: int,
- submission_date: Optional[str]=None,
- re_read: Optional[bool]=True,
- ) -> xr.Dataset:
- """
- Read CRF data for given submission year and country. All tables
- available in the specification will be read for all years. Result
- will be written to appropriate country folder.
- Folders are determined from the submission_year and country_code variables.
- The output is a primap2 dataset (xarray based).
- If you want to read data for more countries or from a different folder
- use the read_latest_crf_submissions_for_year or test_read_crf_data function.
- IMPORTANT NOTE:
- Currently there is no consistency check between data for the same category
- read from different tables
- We only save the data in the country folder if there were no messages like
- unknown rows to make sure that data that goes into the repository is complete.
- The result dataframe is returned in any case. In case log messages appeared
- they are saved in the folder 'log' under the file name
- 'country_reading_<country_code>_<date>_X.csv'.
- Parameters
- __________
- country_codes: str
- ISO 3-letter country code
- submission_year: int
- Year of the submission of the data
- submission_data: Optional(str)
- Read for a specific submission date (given as string as in the file names)
- If not specified latest data will be read
- re_read: Optional(bool) default: True
- Read the data also if it's already present
- Returns
- _______
- return value is a Pandas DataFrame with the read data in PRIMAP2 format
- """
- # get country name
- country_name = get_country_name(country_code)
- # get specification and available tables
- try:
- crf_spec = getattr(crf, f"CRF{submission_year}")
- #print(table_spec)
- except:
- raise ValueError(f"No terminology exists for submission year {submission_year}")
- tables = [table for table in crf_spec.keys()
- if crf_spec[table]["status"] == "tested"]
- print(f"The following tables are available in the " \
- f"CRF{submission_year} specification: {tables}")
- if submission_date is None:
- submission_date = get_latest_date_for_country(country_code, submission_year)
- # check if data has been read already
- read_data = not submission_has_been_read(
- country_code, country_name, submission_year=submission_year,
- submission_date=submission_date, verbose=True,
- )
- ds_all = None
- if read_data or re_read:
- unknown_categories = []
- last_row_info = []
- for table in tables:
- # read table for all years
- ds_table, new_unknown_categories, new_last_row_info = read_crf_table(
- country_code, table, submission_year, date=submission_date)#, data_year=[1990])
- # collect messages on unknown rows etc
- unknown_categories = unknown_categories + new_unknown_categories
- last_row_info = last_row_info + new_last_row_info
- # convert to PRIMAP2 IF
- # first drop the orig_cat_name col as it can have multiple values for
- # one category
- ds_table = ds_table.drop(columns=["orig_cat_name"])
- # if we need to map entities pass this info to the conversion function
- if "entity_mapping" in crf_spec[table]:
- entity_mapping = crf_spec[table]["entity_mapping"]
- else:
- entity_mapping = None
- ds_table_if = convert_crf_table_to_pm2if(
- ds_table,
- submission_year,
- meta_data_input={"title": f"Data submitted in {submission_year} to the UNFCCC "
- f"in the common reporting format (CRF) by {country_name}. "
- f"Submission date: {submission_date}"},
- entity_mapping=entity_mapping,
- )
- # now convert to native PRIMAP2 format
- ds_table_pm2 = pm2.pm2io.from_interchange_format(ds_table_if)
- # combine per table DS
- if ds_all is None:
- ds_all = ds_table_pm2
- else:
- ds_all = ds_all.combine_first(ds_table_pm2)
- # check if there were log messages.
- save_data = True
- if len(unknown_categories) > 0:
- save_data = False
- today = date.today()
- log_location = log_path / f"CRF{submission_year}" \
- / f"{country_code}_unknown_categories_{today.strftime('%Y-%m-%d')}.csv"
- print(f"Unknown rows found for {country_code}. Not saving data. Savin log to "
- f"{log_location}" )
- save_unknown_categories_info(unknown_categories, log_location)
- if len(last_row_info) > 0:
- save_data = False
- today = date.today()
- log_location = log_path / f"CRF{submission_year}" \
- / f"{country_code}_last_row_info_{today.strftime('%Y-%m-%d')}.csv"
- print(f"Data found in the last row found for {country_code}. Not saving data. Savin log to "
- f"{log_location}")
- save_last_row_info(last_row_info, log_location)
- if save_data:
- compression = dict(zlib=True, complevel=9)
- output_folder = extracted_data_path / country_name.replace(" ", "_")
- output_filename = f"{country_code}_CRF{submission_year}_{submission_date}"
- if not output_folder.exists():
- output_folder.mkdir()
- # write data in interchange format
- pm2.pm2io.write_interchange_format(output_folder / output_filename,
- ds_all.pr.to_interchange_format())
- # write data in native PRIMAP2 format
- encoding = {var: compression for var in ds_all.data_vars}
- ds_all.pr.to_netcdf(output_folder / (output_filename + ".nc"),
- encoding=encoding)
- return ds_all
- def read_crf_for_country_datalad(
- country: str,
- submission_year: int,
- submission_date: Optional[str]=None,
- re_read: Optional[bool]=True
- ) -> None:
- """
- Wrapper around read_crf_for_country which takes care of selecting input
- and output files and using datalad run to trigger the data reading
- Parameters
- __________
- country_codes: str
- ISO 3-letter country code
- submission_year: int
- Year of the submission of the data
- submission_date: Optional(str)
- Read for a specific submission date (given as string as in the file names)
- If not specified latest data will be read
- """
- # get all the info for the country
- country_info = get_input_and_output_files_for_country(
- country, submission_year=submission_year, verbose=True)
- print(f"Attempting to read data for CRF{submission_year} from {country}.")
- print("#"*80)
- print("")
- print(f"Using the UNFCCC_CRF_reader")
- print("")
- print(f"Run the script using datalad run via the python api")
- script = code_path / "UNFCCC_CRF_reader" / "read_UNFCCC_CRF_submission.py"
- cmd = f"./venv/bin/python3 {script.as_posix()} --country={country} "\
- f"--submission_year={submission_year} --submission_date={submission_date}"
- if re_read:
- cmd = cmd + f" --re_read"
- datalad.api.run(
- cmd=cmd,
- dataset=root_path,
- message=f"Read data for {country}, CRF{submission_year}, {submission_date}.",
- inputs=country_info["input"],
- outputs=country_info["output"],
- dry_run=None,
- explicit=True,
- )
- def read_new_crf_for_year(
- submission_year: int,
- countries: Optional[List[str]]=None,
- re_read: Optional[bool]=False,
- ) -> dict:
- """
- Read CRF data for given submission year for all countries in
- `countries` that have submitted data. If no `countries` list is
- given, all countries are used.
- When updated submission exist the latest will be read.
- All tables available in the specification will be read for all years.
- Results will be written to appropriate country folders.
- If you want to read data from a different folder use the
- test_read_crf_data function.
- IMPORTANT NOTE:
- Currently there is no consistency check between data for the same category
- read from different tables
- Parameters
- __________
- submission_year: int
- Year of the submission of the data
- countries: List[int] (optional)
- List of countries to read. If not given reading is tried for all
- CRF countries
- re_read: bool (optional, default=False)
- If true data will be read even if already read before.
- TODO: write log with failed countries and what has been read
- Returns
- _______
- list[str]: list with country codes for which the data has been read
- """
- if countries is None:
- countries = all_crf_countries
- read_countries = {}
- for country in countries:
- try:
- country_df = read_crf_for_country(country, submission_year, re_read=re_read)
- if country_df is None:
- read_countries[country] = "skipped"
- else:
- read_countries[country] = "read"
- except NoCRFFilesError:
- print(f"No data for country {country}, {submission_year}")
- read_countries[country] = "no data"
- except Exception as ex:
- print(f"Data for country {country}, {submission_year} could not be read")
- print(f"The following error occurred: {ex}")
- read_countries[country]= "failed"
- # print overview
- successful_countries = [country for country in read_countries if read_countries[country] == "read"]
- skipped_countries = [country for country in read_countries if read_countries[country] == "skipped"]
- failed_countries = [country for country in read_countries if read_countries[country] == "failed"]
- no_data_countries = [country for country in read_countries if read_countries[country] == "no data"]
- print(f"Read data for countries {successful_countries}")
- print(f"Skipped countries {skipped_countries}")
- print(f"No data for countries {no_data_countries}")
- print(f"!!!!! Reading failed for {failed_countries}. Check why")
- return(read_countries)
- def read_new_crf_for_year_datalad(
- submission_year: int,
- countries: Optional[List[str]] = None,
- re_read: Optional[bool] = False,
- ) -> None:
- """
- Wrapper around read_crf_for_year_datalad which takes care of selecting input
- and output files and using datalad run to trigger the data reading
- Parameters
- __________
- submission_year: int
- Year of the submission of the data
- countries: List[int] (optional)
- List of countries to read. If not given reading is tried for all
- CRF countries
- re_read: bool (optional, default=False)
- If true data will be read even if already read before.
- """
- if countries is not None:
- print(f"Reading CRF{submission_year} for countries {countries} using UNFCCC_CRF_reader.")
- else:
- print(f"Reading CRF{submission_year} for all countries using UNFCCC_CRF_reader.")
- countries = all_crf_countries
- print("#" * 80)
- print("")
- if re_read:
- print("Re-reading all latest submissions.")
- else:
- print("Only reading new submissions not read yet.")
- input_files = []
- output_files = []
- # loop over countries to collect input and output files
- print("Collect input and output files to pass to datalad")
- for country in countries:
- try:
- country_info = get_input_and_output_files_for_country(
- country, submission_year=submission_year, verbose=False)
- # check if the submission has been read already
- if re_read:
- input_files = input_files + country_info["input"]
- output_files = output_files + country_info["output"]
- else:
- data_read = submission_has_been_read(
- country_info["code"], country_info["name"],
- submission_year=submission_year,
- submission_date=country_info["date"],
- verbose=False,
- )
- if not data_read:
- input_files = input_files + country_info["input"]
- output_files = output_files + country_info["output"]
- except:
- # no error handling here as that is done in the function that does the actual reading
- pass
- print(f"Run the script using datalad run via the python api")
- script = code_path / "UNFCCC_CRF_reader" / "read_new_UNFCCC_CRF_for_year.py"
- #cmd = f"./venv/bin/python3 {script.as_posix()} --countries={countries} "\
- # f"--submission_year={submission_year}"
- cmd = f"./venv/bin/python3 {script.as_posix()} " \
- f"--submission_year={submission_year}"
- if re_read:
- cmd = cmd + " --re_read"
- datalad.api.run(
- cmd=cmd,
- dataset=root_path,
- message=f"Read data for {countries}, CRF{submission_year}. Re-reading: {re_read}",
- inputs=input_files,
- outputs=output_files,
- dry_run=None,
- #explicit=True,
- )
- # function to read all available data (or list of countries?)
- # make sure it works when not all countries have submitted data
- # give option to only read new data (no output yet), but also option to
- # read all data, e.g. when specifications have changed
- def get_input_and_output_files_for_country(
- country: str,
- submission_year: int,
- submission_date: Optional[str]=None,
- verbose: Optional[bool]=True,
- ) -> Dict[str, Union[List, str]]:
- """
- Get input and output files for a given country
- """
- country_info = {}
- if country in custom_country_mapping:
- country_code = country
- else:
- country_code = get_country_code(country)
- # now get the country name
- country_name = get_country_name(country_code)
- country_info["code"] = country_code
- country_info["name"] = country_name
- # determine latest data
- print(f"Determining input and output files for {country}")
- if submission_date is None:
- if verbose:
- print(f"No submission date given, find latest date.")
- submission_date = get_latest_date_for_country(country_code, submission_year)
- else:
- if verbose:
- print(f"Using given submissions date {submission_date}")
- if submission_date is None:
- # there is no data. Raise an exception
- raise NoCRFFilesError(f"No submissions found for {country_code}, "
- f"submission_year={submission_year}, "
- f"date={date}")
- else:
- if verbose:
- print(f"Latest submission date for CRF{submission_year} is {submission_date}")
- country_info["date"] = submission_date
- # get possible input files
- input_files = get_crf_files(country_codes=country_code,
- submission_year=submission_year,
- date=submission_date)
- if not input_files:
- raise NoCRFFilesError(f"No possible input files found for {country}, CRF{submission_year}, "
- f"v{submission_date}. Are they already submitted and included in the "
- f"repository?")
- elif verbose:
- print(f"Found the following input_files:")
- for file in input_files:
- print(file.name)
- print("")
- # convert file's path to str
- input_files = [file.as_posix() for file in input_files]
- country_info["input"] = input_files
- # get output file
- output_folder = extracted_data_path / country_name.replace(" ", "_")
- output_files = [output_folder / f"{country_code}_CRF{submission_year}"
- f"_{submission_date}.{suffix}" for suffix
- in ['yaml', 'csv', 'nc']]
- if verbose:
- print(f"The following files are considered as output_files:")
- for file in output_files:
- print(file)
- print("")
- # check if output data present
- # convert file paths to str
- output_files = [file.as_posix() for file in output_files]
- country_info["output"] = output_files
- return country_info
- def submission_has_been_read(
- country_code: str,
- country_name: str,
- submission_year: int,
- submission_date: str,
- verbose: Optional[bool]=True,
- ) -> bool:
- """
- Check if a CRF submission has already been read
- """
- output_folder = extracted_data_path / country_name.replace(" ", "_")
- output_filename = f"{country_code}_CRF{submission_year}_{submission_date}"
- if output_folder.exists():
- existing_files = output_folder.glob(f"{output_filename}.*")
- existing_suffixes = [file.suffix for file in existing_files]
- if all(suffix in existing_suffixes for suffix in [".nc", ".yaml", ".csv"]):
- has_been_read = True
- if verbose:
- print(f"Data already available for {country_code}, "
- f"CRF{submission_year}, version {submission_date}.")
- else:
- has_been_read = False
- if verbose:
- print(f"Partial data available for {country_code}, "
- f"CRF{submission_year}, version {submission_date}. "
- "Please check if all files have been written after "
- "reading.")
- else:
- has_been_read = False
- if verbose:
- print(f"No read data available for {country_code}, "
- f"CRF{submission_year}, version {submission_date}. ")
- return has_been_read
|