UNFCCC_CRF_reader_prod.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. #import re
  2. #
  3. #from treelib import Tree
  4. #import pandas as pd
  5. import xarray as xr
  6. import primap2 as pm2
  7. import numpy as np
  8. import pycountry
  9. import datalad.api
  10. from datetime import date
  11. from pathlib import Path
  12. from typing import Optional, List, Dict, Union
  13. #from . import crf_specifications as crf
  14. import crf_specifications as crf
  15. from UNFCCC_CRF_reader_core import read_crf_table
  16. from UNFCCC_CRF_reader_core import convert_crf_table_to_pm2if
  17. from UNFCCC_CRF_reader_core import get_latest_date_for_country
  18. from UNFCCC_CRF_reader_core import get_crf_files
  19. from UNFCCC_CRF_reader_core import get_country_name
  20. from UNFCCC_CRF_reader_devel import save_unknown_categories_info
  21. from UNFCCC_CRF_reader_devel import save_last_row_info
  22. from util import code_path, log_path, \
  23. custom_country_mapping, extracted_data_path, root_path, \
  24. all_crf_countries, NoCRFFilesError
  25. import sys
  26. sys.path.append(code_path.name)
  27. from UNFCCC_reader.get_submissions_info import get_country_code
  28. # functions:
  29. # * testing fucntions
  30. # ** read one or more table(s) for all countries
  31. # (and a if desired only a single year) and write
  32. # output files with missing sectors etc
  33. # **
  34. # TODO: add function to read several / all countries
  35. # general approach:
  36. # main code in a function that reads on table from one file.
  37. # return raw pandas DF for use in different functions
  38. # wrappers around this function to read for a whole country or for test reading where we also
  39. # write files with missing sectors etc.
  40. # merging functions use native pm2 format
  41. def read_crf_for_country(
  42. country_code: str,
  43. submission_year: int,
  44. submission_date: Optional[str]=None,
  45. re_read: Optional[bool]=True,
  46. ) -> xr.Dataset:
  47. """
  48. Read CRF data for given submission year and country. All tables
  49. available in the specification will be read for all years. Result
  50. will be written to appropriate country folder.
  51. Folders are determined from the submission_year and country_code variables.
  52. The output is a primap2 dataset (xarray based).
  53. If you want to read data for more countries or from a different folder
  54. use the read_latest_crf_submissions_for_year or test_read_crf_data function.
  55. IMPORTANT NOTE:
  56. Currently there is no consistency check between data for the same category
  57. read from different tables
  58. We only save the data in the country folder if there were no messages like
  59. unknown rows to make sure that data that goes into the repository is complete.
  60. The result dataframe is returned in any case. In case log messages appeared
  61. they are saved in the folder 'log' under the file name
  62. 'country_reading_<country_code>_<date>_X.csv'.
  63. Parameters
  64. __________
  65. country_codes: str
  66. ISO 3-letter country code
  67. submission_year: int
  68. Year of the submission of the data
  69. submission_data: Optional(str)
  70. Read for a specific submission date (given as string as in the file names)
  71. If not specified latest data will be read
  72. re_read: Optional(bool) default: True
  73. Read the data also if it's already present
  74. Returns
  75. _______
  76. return value is a Pandas DataFrame with the read data in PRIMAP2 format
  77. """
  78. # get country name
  79. country_name = get_country_name(country_code)
  80. # get specification and available tables
  81. try:
  82. crf_spec = getattr(crf, f"CRF{submission_year}")
  83. #print(table_spec)
  84. except:
  85. raise ValueError(f"No terminology exists for submission year {submission_year}")
  86. tables = [table for table in crf_spec.keys()
  87. if crf_spec[table]["status"] == "tested"]
  88. print(f"The following tables are available in the " \
  89. f"CRF{submission_year} specification: {tables}")
  90. if submission_date is None:
  91. submission_date = get_latest_date_for_country(country_code, submission_year)
  92. # check if data has been read already
  93. read_data = not submission_has_been_read(
  94. country_code, country_name, submission_year=submission_year,
  95. submission_date=submission_date, verbose=True,
  96. )
  97. ds_all = None
  98. if read_data or re_read:
  99. unknown_categories = []
  100. last_row_info = []
  101. for table in tables:
  102. # read table for all years
  103. ds_table, new_unknown_categories, new_last_row_info = read_crf_table(
  104. country_code, table, submission_year, date=submission_date)#, data_year=[1990])
  105. # collect messages on unknown rows etc
  106. unknown_categories = unknown_categories + new_unknown_categories
  107. last_row_info = last_row_info + new_last_row_info
  108. # convert to PRIMAP2 IF
  109. # first drop the orig_cat_name col as it can have multiple values for
  110. # one category
  111. ds_table = ds_table.drop(columns=["orig_cat_name"])
  112. # if we need to map entities pass this info to the conversion function
  113. if "entity_mapping" in crf_spec[table]:
  114. entity_mapping = crf_spec[table]["entity_mapping"]
  115. else:
  116. entity_mapping = None
  117. ds_table_if = convert_crf_table_to_pm2if(
  118. ds_table,
  119. submission_year,
  120. meta_data_input={"title": f"Data submitted in {submission_year} to the UNFCCC "
  121. f"in the common reporting format (CRF) by {country_name}. "
  122. f"Submission date: {submission_date}"},
  123. entity_mapping=entity_mapping,
  124. )
  125. # now convert to native PRIMAP2 format
  126. ds_table_pm2 = pm2.pm2io.from_interchange_format(ds_table_if)
  127. # combine per table DS
  128. if ds_all is None:
  129. ds_all = ds_table_pm2
  130. else:
  131. ds_all = ds_all.combine_first(ds_table_pm2)
  132. # check if there were log messages.
  133. save_data = True
  134. if len(unknown_categories) > 0:
  135. save_data = False
  136. today = date.today()
  137. log_location = log_path / f"CRF{submission_year}" \
  138. / f"{country_code}_unknown_categories_{today.strftime('%Y-%m-%d')}.csv"
  139. print(f"Unknown rows found for {country_code}. Not saving data. Savin log to "
  140. f"{log_location}" )
  141. save_unknown_categories_info(unknown_categories, log_location)
  142. if len(last_row_info) > 0:
  143. save_data = False
  144. today = date.today()
  145. log_location = log_path / f"CRF{submission_year}" \
  146. / f"{country_code}_last_row_info_{today.strftime('%Y-%m-%d')}.csv"
  147. print(f"Data found in the last row found for {country_code}. Not saving data. Savin log to "
  148. f"{log_location}")
  149. save_last_row_info(last_row_info, log_location)
  150. if save_data:
  151. compression = dict(zlib=True, complevel=9)
  152. output_folder = extracted_data_path / country_name.replace(" ", "_")
  153. output_filename = f"{country_code}_CRF{submission_year}_{submission_date}"
  154. if not output_folder.exists():
  155. output_folder.mkdir()
  156. # write data in interchange format
  157. pm2.pm2io.write_interchange_format(output_folder / output_filename,
  158. ds_all.pr.to_interchange_format())
  159. # write data in native PRIMAP2 format
  160. encoding = {var: compression for var in ds_all.data_vars}
  161. ds_all.pr.to_netcdf(output_folder / (output_filename + ".nc"),
  162. encoding=encoding)
  163. return ds_all
  164. def read_crf_for_country_datalad(
  165. country: str,
  166. submission_year: int,
  167. submission_date: Optional[str]=None,
  168. re_read: Optional[bool]=True
  169. ) -> None:
  170. """
  171. Wrapper around read_crf_for_country which takes care of selecting input
  172. and output files and using datalad run to trigger the data reading
  173. Parameters
  174. __________
  175. country_codes: str
  176. ISO 3-letter country code
  177. submission_year: int
  178. Year of the submission of the data
  179. submission_date: Optional(str)
  180. Read for a specific submission date (given as string as in the file names)
  181. If not specified latest data will be read
  182. """
  183. # get all the info for the country
  184. country_info = get_input_and_output_files_for_country(
  185. country, submission_year=submission_year, verbose=True)
  186. print(f"Attempting to read data for CRF{submission_year} from {country}.")
  187. print("#"*80)
  188. print("")
  189. print(f"Using the UNFCCC_CRF_reader")
  190. print("")
  191. print(f"Run the script using datalad run via the python api")
  192. script = code_path / "UNFCCC_CRF_reader" / "read_UNFCCC_CRF_submission.py"
  193. cmd = f"./venv/bin/python3 {script.as_posix()} --country={country} "\
  194. f"--submission_year={submission_year} --submission_date={submission_date}"
  195. if re_read:
  196. cmd = cmd + f" --re_read"
  197. datalad.api.run(
  198. cmd=cmd,
  199. dataset=root_path,
  200. message=f"Read data for {country}, CRF{submission_year}, {submission_date}.",
  201. inputs=country_info["input"],
  202. outputs=country_info["output"],
  203. dry_run=None,
  204. explicit=True,
  205. )
  206. def read_new_crf_for_year(
  207. submission_year: int,
  208. countries: Optional[List[str]]=None,
  209. re_read: Optional[bool]=False,
  210. ) -> dict:
  211. """
  212. Read CRF data for given submission year for all countries in
  213. `countries` that have submitted data. If no `countries` list is
  214. given, all countries are used.
  215. When updated submission exist the latest will be read.
  216. All tables available in the specification will be read for all years.
  217. Results will be written to appropriate country folders.
  218. If you want to read data from a different folder use the
  219. test_read_crf_data function.
  220. IMPORTANT NOTE:
  221. Currently there is no consistency check between data for the same category
  222. read from different tables
  223. Parameters
  224. __________
  225. submission_year: int
  226. Year of the submission of the data
  227. countries: List[int] (optional)
  228. List of countries to read. If not given reading is tried for all
  229. CRF countries
  230. re_read: bool (optional, default=False)
  231. If true data will be read even if already read before.
  232. TODO: write log with failed countries and what has been read
  233. Returns
  234. _______
  235. list[str]: list with country codes for which the data has been read
  236. """
  237. if countries is None:
  238. countries = all_crf_countries
  239. read_countries = {}
  240. for country in countries:
  241. try:
  242. country_df = read_crf_for_country(country, submission_year, re_read=re_read)
  243. if country_df is None:
  244. read_countries[country] = "skipped"
  245. else:
  246. read_countries[country] = "read"
  247. except NoCRFFilesError:
  248. print(f"No data for country {country}, {submission_year}")
  249. read_countries[country] = "no data"
  250. except Exception as ex:
  251. print(f"Data for country {country}, {submission_year} could not be read")
  252. print(f"The following error occurred: {ex}")
  253. read_countries[country]= "failed"
  254. # print overview
  255. successful_countries = [country for country in read_countries if read_countries[country] == "read"]
  256. skipped_countries = [country for country in read_countries if read_countries[country] == "skipped"]
  257. failed_countries = [country for country in read_countries if read_countries[country] == "failed"]
  258. no_data_countries = [country for country in read_countries if read_countries[country] == "no data"]
  259. print(f"Read data for countries {successful_countries}")
  260. print(f"Skipped countries {skipped_countries}")
  261. print(f"No data for countries {no_data_countries}")
  262. print(f"!!!!! Reading failed for {failed_countries}. Check why")
  263. return(read_countries)
  264. def read_new_crf_for_year_datalad(
  265. submission_year: int,
  266. countries: Optional[List[str]] = None,
  267. re_read: Optional[bool] = False,
  268. ) -> None:
  269. """
  270. Wrapper around read_crf_for_year_datalad which takes care of selecting input
  271. and output files and using datalad run to trigger the data reading
  272. Parameters
  273. __________
  274. submission_year: int
  275. Year of the submission of the data
  276. countries: List[int] (optional)
  277. List of countries to read. If not given reading is tried for all
  278. CRF countries
  279. re_read: bool (optional, default=False)
  280. If true data will be read even if already read before.
  281. """
  282. if countries is not None:
  283. print(f"Reading CRF{submission_year} for countries {countries} using UNFCCC_CRF_reader.")
  284. else:
  285. print(f"Reading CRF{submission_year} for all countries using UNFCCC_CRF_reader.")
  286. countries = all_crf_countries
  287. print("#" * 80)
  288. print("")
  289. if re_read:
  290. print("Re-reading all latest submissions.")
  291. else:
  292. print("Only reading new submissions not read yet.")
  293. input_files = []
  294. output_files = []
  295. # loop over countries to collect input and output files
  296. print("Collect input and output files to pass to datalad")
  297. for country in countries:
  298. try:
  299. country_info = get_input_and_output_files_for_country(
  300. country, submission_year=submission_year, verbose=False)
  301. # check if the submission has been read already
  302. if re_read:
  303. input_files = input_files + country_info["input"]
  304. output_files = output_files + country_info["output"]
  305. else:
  306. data_read = submission_has_been_read(
  307. country_info["code"], country_info["name"],
  308. submission_year=submission_year,
  309. submission_date=country_info["date"],
  310. verbose=False,
  311. )
  312. if not data_read:
  313. input_files = input_files + country_info["input"]
  314. output_files = output_files + country_info["output"]
  315. except:
  316. # no error handling here as that is done in the function that does the actual reading
  317. pass
  318. print(f"Run the script using datalad run via the python api")
  319. script = code_path / "UNFCCC_CRF_reader" / "read_new_UNFCCC_CRF_for_year.py"
  320. #cmd = f"./venv/bin/python3 {script.as_posix()} --countries={countries} "\
  321. # f"--submission_year={submission_year}"
  322. cmd = f"./venv/bin/python3 {script.as_posix()} " \
  323. f"--submission_year={submission_year}"
  324. if re_read:
  325. cmd = cmd + " --re_read"
  326. datalad.api.run(
  327. cmd=cmd,
  328. dataset=root_path,
  329. message=f"Read data for {countries}, CRF{submission_year}. Re-reading: {re_read}",
  330. inputs=input_files,
  331. outputs=output_files,
  332. dry_run=None,
  333. #explicit=True,
  334. )
  335. # function to read all available data (or list of countries?)
  336. # make sure it works when not all countries have submitted data
  337. # give option to only read new data (no output yet), but also option to
  338. # read all data, e.g. when specifications have changed
  339. def get_input_and_output_files_for_country(
  340. country: str,
  341. submission_year: int,
  342. submission_date: Optional[str]=None,
  343. verbose: Optional[bool]=True,
  344. ) -> Dict[str, Union[List, str]]:
  345. """
  346. Get input and output files for a given country
  347. """
  348. country_info = {}
  349. if country in custom_country_mapping:
  350. country_code = country
  351. else:
  352. country_code = get_country_code(country)
  353. # now get the country name
  354. country_name = get_country_name(country_code)
  355. country_info["code"] = country_code
  356. country_info["name"] = country_name
  357. # determine latest data
  358. print(f"Determining input and output files for {country}")
  359. if submission_date is None:
  360. if verbose:
  361. print(f"No submission date given, find latest date.")
  362. submission_date = get_latest_date_for_country(country_code, submission_year)
  363. else:
  364. if verbose:
  365. print(f"Using given submissions date {submission_date}")
  366. if submission_date is None:
  367. # there is no data. Raise an exception
  368. raise NoCRFFilesError(f"No submissions found for {country_code}, "
  369. f"submission_year={submission_year}, "
  370. f"date={date}")
  371. else:
  372. if verbose:
  373. print(f"Latest submission date for CRF{submission_year} is {submission_date}")
  374. country_info["date"] = submission_date
  375. # get possible input files
  376. input_files = get_crf_files(country_codes=country_code,
  377. submission_year=submission_year,
  378. date=submission_date)
  379. if not input_files:
  380. raise NoCRFFilesError(f"No possible input files found for {country}, CRF{submission_year}, "
  381. f"v{submission_date}. Are they already submitted and included in the "
  382. f"repository?")
  383. elif verbose:
  384. print(f"Found the following input_files:")
  385. for file in input_files:
  386. print(file.name)
  387. print("")
  388. # convert file's path to str
  389. input_files = [file.as_posix() for file in input_files]
  390. country_info["input"] = input_files
  391. # get output file
  392. output_folder = extracted_data_path / country_name.replace(" ", "_")
  393. output_files = [output_folder / f"{country_code}_CRF{submission_year}"
  394. f"_{submission_date}.{suffix}" for suffix
  395. in ['yaml', 'csv', 'nc']]
  396. if verbose:
  397. print(f"The following files are considered as output_files:")
  398. for file in output_files:
  399. print(file)
  400. print("")
  401. # check if output data present
  402. # convert file paths to str
  403. output_files = [file.as_posix() for file in output_files]
  404. country_info["output"] = output_files
  405. return country_info
  406. def submission_has_been_read(
  407. country_code: str,
  408. country_name: str,
  409. submission_year: int,
  410. submission_date: str,
  411. verbose: Optional[bool]=True,
  412. ) -> bool:
  413. """
  414. Check if a CRF submission has already been read
  415. """
  416. output_folder = extracted_data_path / country_name.replace(" ", "_")
  417. output_filename = f"{country_code}_CRF{submission_year}_{submission_date}"
  418. if output_folder.exists():
  419. existing_files = output_folder.glob(f"{output_filename}.*")
  420. existing_suffixes = [file.suffix for file in existing_files]
  421. if all(suffix in existing_suffixes for suffix in [".nc", ".yaml", ".csv"]):
  422. has_been_read = True
  423. if verbose:
  424. print(f"Data already available for {country_code}, "
  425. f"CRF{submission_year}, version {submission_date}.")
  426. else:
  427. has_been_read = False
  428. if verbose:
  429. print(f"Partial data available for {country_code}, "
  430. f"CRF{submission_year}, version {submission_date}. "
  431. "Please check if all files have been written after "
  432. "reading.")
  433. else:
  434. has_been_read = False
  435. if verbose:
  436. print(f"No read data available for {country_code}, "
  437. f"CRF{submission_year}, version {submission_date}. ")
  438. return has_been_read