UNFCCC_CRF_reader_prod.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. #import re
  2. #
  3. #from treelib import Tree
  4. #import pandas as pd
  5. import xarray as xr
  6. import primap2 as pm2
  7. import numpy as np
  8. import pycountry
  9. import datalad.api
  10. from datetime import date
  11. from pathlib import Path
  12. from typing import Optional
  13. from . import crf_specifications as crf
  14. from .UNFCCC_CRF_reader_core import read_crf_table
  15. from .UNFCCC_CRF_reader_core import convert_crf_table_to_pm2if
  16. from .UNFCCC_CRF_reader_core import get_latest_date_for_country
  17. from .UNFCCC_CRF_reader_core import get_crf_files
  18. from .UNFCCC_CRF_reader_devel import save_unknown_categories_info
  19. from .UNFCCC_CRF_reader_devel import save_last_row_info
  20. from .utils import code_path, log_path, \
  21. custom_country_mapping, extracted_data_path, root_path
  22. import sys
  23. sys.path.append('../UNFCCC_reader')
  24. from UNFCCC_reader.get_submissions_info import get_country_code
  25. # functions:
  26. # * testing fucntions
  27. # ** read one or more table(s) for all countries
  28. # (and a if desired only a single year) and write
  29. # output files with missing sectors etc
  30. # **
  31. # TODO: add function to read several / all countries
  32. # general approach:
  33. # main code in a function that reads on table from one file.
  34. # return raw pandas DF for use in different functions
  35. # wrappers around this function to read for a whole country or for test reading where we also
  36. # write files with missing sectors etc.
  37. # merging functions use native pm2 format
  38. def read_crf_for_country(
  39. country_code: str,
  40. submission_year: int,
  41. submission_date: Optional[str]=None,
  42. ) -> xr.Dataset:
  43. """
  44. Read CRF data for given submission year and country. All tables
  45. available in the specification will be read for all years. Result
  46. will be written to appropriate country folder.
  47. If you want to read data for more countries of from a different folder
  48. use the test_read_crf_data function.
  49. IMPORTANT NOTE:
  50. Currently there is no consistency check between data for the same category
  51. read from different tables
  52. The folder can either be given explicitly or if not given folders are determined
  53. from the submission_year and country_code variables.
  54. The output is a primap2 dataset (xarray based).
  55. We only save the data in the country folder if there were no messages like
  56. unknown rows to make sure that data that goes into the repository is complete.
  57. The result dataframe is returned in any case. In case log messages appeared
  58. they are saved in the folder 'log' under the file name
  59. 'country_reading_<country_code>_<date>_X.csv'.
  60. Parameters
  61. __________
  62. country_codes: str
  63. ISO 3-letter country code
  64. submission_year: int
  65. Year of the submission of the data
  66. submission_data: Optional(str)
  67. Read for a specific submission date (given as string as in the file names)
  68. If not specified latest data will be read
  69. Returns
  70. _______
  71. return value is a Pandas DataFrame with the read data in PRIMAP2 format
  72. """
  73. # get country name
  74. country_name = get_country_name(country_code)
  75. # get specification and available tables
  76. try:
  77. crf_spec = getattr(crf, f"CRF{submission_year}")
  78. #print(table_spec)
  79. except:
  80. raise ValueError(f"No terminology exists for submission year {submission_year}")
  81. tables = [table for table in crf_spec.keys()
  82. if crf_spec[table]["status"] == "tested"]
  83. print(f"The following tables are available in the " \
  84. f"CRF{submission_year} specification: {tables}")
  85. if submission_date is None:
  86. submission_date = get_latest_date_for_country(country_code, submission_year)
  87. ds_all = None
  88. unknown_categories = []
  89. last_row_info = []
  90. for table in tables:
  91. # read table for all years
  92. ds_table, new_unknown_categories, new_last_row_info = read_crf_table(
  93. country_code, table, submission_year, date=submission_date)#, data_year=[1990])
  94. # collect messages on unknown rows etc
  95. unknown_categories = unknown_categories + new_unknown_categories
  96. last_row_info = last_row_info + new_last_row_info
  97. # convert to PRIMAP2 IF
  98. # first drop the orig_cat_name col as it can have multiple values for
  99. # one category
  100. ds_table = ds_table.drop(columns=["orig_cat_name"])
  101. # if we need to map entities pass this info to the conversion function
  102. if "entity_mapping" in crf_spec[table]:
  103. entity_mapping = crf_spec[table]["entity_mapping"]
  104. else:
  105. entity_mapping = None
  106. ds_table_if = convert_crf_table_to_pm2if(
  107. ds_table,
  108. 2021,
  109. meta_data_input={"title": f"Data submitted in {submission_year} to the UNFCCC "
  110. f"in the common reporting format (CRF) by {country_name}. "
  111. f"Submission date: {submission_date}"},
  112. entity_mapping=entity_mapping,
  113. )
  114. # now convert to native PRIMAP2 format
  115. ds_table_pm2 = pm2.pm2io.from_interchange_format(ds_table_if)
  116. # combine per table DS
  117. if ds_all is None:
  118. ds_all = ds_table_pm2
  119. else:
  120. ds_all = ds_all.combine_first(ds_table_pm2)
  121. # check if there were log messages.
  122. save_data = True
  123. if len(unknown_categories) > 0:
  124. save_data = False
  125. today = date.today()
  126. log_location = log_path / f"CRF{submission_year}" \
  127. / f"{country_code}_unknown_categories_{today.strftime('%Y-%m-%d')}.csv"
  128. print(f"Unknown rows found for {country_code}. Not saving data. Savin log to "
  129. f"{log_location}" )
  130. save_unknown_categories_info(unknown_categories, log_location)
  131. if len(last_row_info) > 0:
  132. save_data = False
  133. today = date.today()
  134. log_location = log_path / f"CRF{submission_year}" \
  135. / f"{country_code}_last_row_info_{today.strftime('%Y-%m-%d')}.csv"
  136. print(f"Data found in the last row found for {country_code}. Not saving data. Savin log to "
  137. f"{log_location}")
  138. save_last_row_info(last_row_info, log_location)
  139. if save_data:
  140. compression = dict(zlib=True, complevel=9)
  141. output_folder = extracted_data_path / country_name.replace(" ", "_")
  142. output_filename = f"{country_code}_CRF{submission_year}_{submission_date}"
  143. if not output_folder.exists():
  144. output_folder.mkdir()
  145. # folder mapping has to be updated !!!
  146. # if we do it here we will do it a lot of times when reading several countries at once
  147. # write data in interchange format
  148. pm2.pm2io.write_interchange_format(output_folder / output_filename,
  149. ds_all.pr.to_interchange_format())
  150. # write data in native PRIAMP2 formart
  151. encoding = {var: compression for var in ds_all.data_vars}
  152. ds_all.pr.to_netcdf(output_folder / (output_filename + ".nc"),
  153. encoding=encoding)
  154. return ds_all
  155. def read_crf_for_country_datalad(
  156. country: str,
  157. submission_year: int,
  158. submission_date: Optional[str]=None,
  159. ) -> None:
  160. """
  161. Wrapper around read_crf_for_country which takes care of selecting input
  162. and output files and using datalad run to trigger the data reading
  163. Parameters
  164. __________
  165. country_codes: str
  166. ISO 3-letter country code
  167. submission_year: int
  168. Year of the submission of the data
  169. submission_date: Optional(str)
  170. Read for a specific submission date (given as string as in the file names)
  171. If not specified latest data will be read
  172. """
  173. # get the country code and name
  174. # both could be given as input, so we need this two step process
  175. if country in custom_country_mapping:
  176. country_code = country
  177. else:
  178. country_code = get_country_code(country)
  179. # now get the country name
  180. country_name = get_country_name(country_code)
  181. print(f"Attempting to read data for CRF{submission_year} from {country}.")
  182. print("#"*80)
  183. print("")
  184. print(f"Using the UNFCCC_CRF_reader")
  185. print("")
  186. # get possible input files
  187. input_files = get_crf_files(country_codes=country_code,
  188. submission_year=submission_year,
  189. date=submission_date)
  190. if not input_files:
  191. if submission_date is not None:
  192. print(f"No possible input files found for {country}, CRF{submission_year}, "
  193. f"v{submission_date}. Are they already submitted and included in the "
  194. f"repository?")
  195. else:
  196. print(f"No possible input files found for {country}, CRF{submission_year}. "
  197. f"Are they already submitted and included in the repository?")
  198. else:
  199. print(f"Found the following input_files:")
  200. for file in input_files:
  201. print(file.name)
  202. print("")
  203. # convert file's path to str
  204. input_files = [file.as_posix() for file in input_files]
  205. # get output file
  206. if submission_date is None:
  207. submission_date = get_latest_date_for_country(country_code, submission_year)
  208. output_folder = extracted_data_path / country_name.replace(" ", "_")
  209. output_files = [output_folder / f"{country_code}_CRF{submission_year}"
  210. f"_{submission_date}.{suffix}" for suffix
  211. in ['yaml', 'csv', 'nc']]
  212. print(f"The following files are considered as output_files:")
  213. for file in output_files:
  214. print(file)
  215. print("")
  216. # convert file paths to str
  217. output_files = [file.as_posix() for file in output_files]
  218. print(f"Run the script using datalad run via the python api")
  219. script = code_path / "UNFCCC_CRF_reader" / "read_UNFCCC_CRF_submission.py"
  220. datalad.api.run(
  221. cmd=f"./venv/bin/python3 {script.name} --country={country} "
  222. f"--submission_year={submission_year} --submission_date=={submission_date}",
  223. dataset=root_path,
  224. message=f"Read data for {country}, CRF{submission_year}, {submission_date}.",
  225. inputs=input_files,
  226. outputs=output_files,
  227. dry_run=None,
  228. explicit=True,
  229. )
  230. def get_country_name(
  231. country_code: str,
  232. ) -> str:
  233. """get country name from code """
  234. if country_code in custom_country_mapping:
  235. country_name = custom_country_mapping(country_code)
  236. else:
  237. try:
  238. country = pycountry.countries.get(alpha_3=country_code)
  239. country_name = country.name
  240. except:
  241. raise ValueError(f"Country code {country_code} can not be mapped to "
  242. f"any country")
  243. return country_name