UNFCCC_CRF_reader_prod.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514
  1. #import re
  2. #
  3. #from treelib import Tree
  4. #import pandas as pd
  5. import xarray as xr
  6. import primap2 as pm2
  7. import numpy as np
  8. import pycountry
  9. import datalad.api
  10. from datetime import date
  11. from pathlib import Path
  12. from typing import Optional, List
  13. #from . import crf_specifications as crf
  14. import crf_specifications as crf
  15. from UNFCCC_CRF_reader_core import read_crf_table
  16. from UNFCCC_CRF_reader_core import convert_crf_table_to_pm2if
  17. from UNFCCC_CRF_reader_core import get_latest_date_for_country
  18. from UNFCCC_CRF_reader_core import get_crf_files
  19. from UNFCCC_CRF_reader_devel import save_unknown_categories_info
  20. from UNFCCC_CRF_reader_devel import save_last_row_info
  21. from util import code_path, log_path, \
  22. custom_country_mapping, extracted_data_path, root_path, \
  23. all_crf_countries, NoCRFFilesError
  24. import sys
  25. sys.path.append(code_path.name)
  26. from UNFCCC_reader.get_submissions_info import get_country_code
  27. # functions:
  28. # * testing fucntions
  29. # ** read one or more table(s) for all countries
  30. # (and a if desired only a single year) and write
  31. # output files with missing sectors etc
  32. # **
  33. # TODO: add function to read several / all countries
  34. # general approach:
  35. # main code in a function that reads on table from one file.
  36. # return raw pandas DF for use in different functions
  37. # wrappers around this function to read for a whole country or for test reading where we also
  38. # write files with missing sectors etc.
  39. # merging functions use native pm2 format
  40. def read_crf_for_country(
  41. country_code: str,
  42. submission_year: int,
  43. submission_date: Optional[str]=None,
  44. re_read: Optional[bool]=True,
  45. ) -> xr.Dataset:
  46. """
  47. Read CRF data for given submission year and country. All tables
  48. available in the specification will be read for all years. Result
  49. will be written to appropriate country folder.
  50. Folders are determined from the submission_year and country_code variables.
  51. The output is a primap2 dataset (xarray based).
  52. If you want to read data for more countries or from a different folder
  53. use the read_latest_crf_submissions_for_year or test_read_crf_data function.
  54. IMPORTANT NOTE:
  55. Currently there is no consistency check between data for the same category
  56. read from different tables
  57. We only save the data in the country folder if there were no messages like
  58. unknown rows to make sure that data that goes into the repository is complete.
  59. The result dataframe is returned in any case. In case log messages appeared
  60. they are saved in the folder 'log' under the file name
  61. 'country_reading_<country_code>_<date>_X.csv'.
  62. Parameters
  63. __________
  64. country_codes: str
  65. ISO 3-letter country code
  66. submission_year: int
  67. Year of the submission of the data
  68. submission_data: Optional(str)
  69. Read for a specific submission date (given as string as in the file names)
  70. If not specified latest data will be read
  71. re_read: Optional(bool) default: True
  72. Read the data also if it's already present
  73. Returns
  74. _______
  75. return value is a Pandas DataFrame with the read data in PRIMAP2 format
  76. """
  77. # get country name
  78. country_name = get_country_name(country_code)
  79. # get specification and available tables
  80. try:
  81. crf_spec = getattr(crf, f"CRF{submission_year}")
  82. #print(table_spec)
  83. except:
  84. raise ValueError(f"No terminology exists for submission year {submission_year}")
  85. tables = [table for table in crf_spec.keys()
  86. if crf_spec[table]["status"] == "tested"]
  87. print(f"The following tables are available in the " \
  88. f"CRF{submission_year} specification: {tables}")
  89. if submission_date is None:
  90. submission_date = get_latest_date_for_country(country_code, submission_year)
  91. # check if data has been read already
  92. output_folder = extracted_data_path / country_name.replace(" ", "_")
  93. output_filename = f"{country_code}_CRF{submission_year}_{submission_date}"
  94. read_data = True
  95. print(f"re_read: {re_read}")
  96. if not re_read:
  97. print("test")
  98. if output_folder.exists():
  99. existing_files = output_folder.glob(f"{output_filename}.*")
  100. existing_suffixes= [file.suffix for file in existing_files]
  101. if all(suffix in existing_suffixes for suffix in [".nc", ".yaml", ".csv"]):
  102. print(f"Data already available for {country_code}, "
  103. f"CRF{submission_year}, version {submission_date}. "
  104. "Skipping.")
  105. else:
  106. print(f"Partial data available for {country_code}, "
  107. f"CRF{submission_year}, version {submission_date}. "
  108. "Reading for completion. Please check if all files "
  109. "have been written.")
  110. ds_all = None
  111. if read_data:
  112. unknown_categories = []
  113. last_row_info = []
  114. for table in tables:
  115. # read table for all years
  116. ds_table, new_unknown_categories, new_last_row_info = read_crf_table(
  117. country_code, table, submission_year, date=submission_date)#, data_year=[1990])
  118. # collect messages on unknown rows etc
  119. unknown_categories = unknown_categories + new_unknown_categories
  120. last_row_info = last_row_info + new_last_row_info
  121. # convert to PRIMAP2 IF
  122. # first drop the orig_cat_name col as it can have multiple values for
  123. # one category
  124. ds_table = ds_table.drop(columns=["orig_cat_name"])
  125. # if we need to map entities pass this info to the conversion function
  126. if "entity_mapping" in crf_spec[table]:
  127. entity_mapping = crf_spec[table]["entity_mapping"]
  128. else:
  129. entity_mapping = None
  130. ds_table_if = convert_crf_table_to_pm2if(
  131. ds_table,
  132. 2021,
  133. meta_data_input={"title": f"Data submitted in {submission_year} to the UNFCCC "
  134. f"in the common reporting format (CRF) by {country_name}. "
  135. f"Submission date: {submission_date}"},
  136. entity_mapping=entity_mapping,
  137. )
  138. # now convert to native PRIMAP2 format
  139. ds_table_pm2 = pm2.pm2io.from_interchange_format(ds_table_if)
  140. # combine per table DS
  141. if ds_all is None:
  142. ds_all = ds_table_pm2
  143. else:
  144. ds_all = ds_all.combine_first(ds_table_pm2)
  145. # check if there were log messages.
  146. save_data = True
  147. if len(unknown_categories) > 0:
  148. save_data = False
  149. today = date.today()
  150. log_location = log_path / f"CRF{submission_year}" \
  151. / f"{country_code}_unknown_categories_{today.strftime('%Y-%m-%d')}.csv"
  152. print(f"Unknown rows found for {country_code}. Not saving data. Savin log to "
  153. f"{log_location}" )
  154. save_unknown_categories_info(unknown_categories, log_location)
  155. if len(last_row_info) > 0:
  156. save_data = False
  157. today = date.today()
  158. log_location = log_path / f"CRF{submission_year}" \
  159. / f"{country_code}_last_row_info_{today.strftime('%Y-%m-%d')}.csv"
  160. print(f"Data found in the last row found for {country_code}. Not saving data. Savin log to "
  161. f"{log_location}")
  162. save_last_row_info(last_row_info, log_location)
  163. if save_data:
  164. compression = dict(zlib=True, complevel=9)
  165. output_folder = extracted_data_path / country_name.replace(" ", "_")
  166. output_filename = f"{country_code}_CRF{submission_year}_{submission_date}"
  167. if not output_folder.exists():
  168. output_folder.mkdir()
  169. # folder mapping has to be updated !!!
  170. # if we do it here we will do it a lot of times when reading several countries at once
  171. # write data in interchange format
  172. pm2.pm2io.write_interchange_format(output_folder / output_filename,
  173. ds_all.pr.to_interchange_format())
  174. # write data in native PRIMAP2 format
  175. encoding = {var: compression for var in ds_all.data_vars}
  176. ds_all.pr.to_netcdf(output_folder / (output_filename + ".nc"),
  177. encoding=encoding)
  178. return ds_all
  179. def read_crf_for_country_datalad(
  180. country: str,
  181. submission_year: int,
  182. submission_date: Optional[str]=None,
  183. ) -> None:
  184. """
  185. Wrapper around read_crf_for_country which takes care of selecting input
  186. and output files and using datalad run to trigger the data reading
  187. Parameters
  188. __________
  189. country_codes: str
  190. ISO 3-letter country code
  191. submission_year: int
  192. Year of the submission of the data
  193. submission_date: Optional(str)
  194. Read for a specific submission date (given as string as in the file names)
  195. If not specified latest data will be read
  196. """
  197. # get the country code and name
  198. # both could be given as input, so we need this two step process
  199. if country in custom_country_mapping:
  200. country_code = country
  201. else:
  202. country_code = get_country_code(country)
  203. # now get the country name
  204. country_name = get_country_name(country_code)
  205. print(f"Attempting to read data for CRF{submission_year} from {country}.")
  206. print("#"*80)
  207. print("")
  208. print(f"Using the UNFCCC_CRF_reader")
  209. print("")
  210. # determine latest data
  211. if submission_date is None:
  212. print(f"No submission date given, find latest date.")
  213. submission_date = get_latest_date_for_country(country_code, submission_year)
  214. else:
  215. print(f"Using given submissions date {submission_date}")
  216. if submission_date is None:
  217. # there is no data. Raise an exception
  218. raise NoCRFFilesError(f"No submissions found for {country_code}, "
  219. f"submission_year={submission_year}, "
  220. f"date={date}")
  221. else:
  222. print(f"Latest submission date for CRF{submission_year} is {submission_date}")
  223. # get possible input files
  224. input_files = get_crf_files(country_codes=country_code,
  225. submission_year=submission_year,
  226. date=submission_date)
  227. if not input_files:
  228. raise NoCRFFilesError(f"No possible input files found for {country}, CRF{submission_year}, "
  229. f"v{submission_date}. Are they already submitted and included in the "
  230. f"repository?")
  231. else:
  232. print(f"Found the following input_files:")
  233. for file in input_files:
  234. print(file.name)
  235. print("")
  236. # convert file's path to str
  237. input_files = [file.as_posix() for file in input_files]
  238. # get output file
  239. output_folder = extracted_data_path / country_name.replace(" ", "_")
  240. output_files = [output_folder / f"{country_code}_CRF{submission_year}"
  241. f"_{submission_date}.{suffix}" for suffix
  242. in ['yaml', 'csv', 'nc']]
  243. print(f"The following files are considered as output_files:")
  244. for file in output_files:
  245. print(file)
  246. print("")
  247. # convert file paths to str
  248. output_files = [file.as_posix() for file in output_files]
  249. print(f"Run the script using datalad run via the python api")
  250. script = code_path / "UNFCCC_CRF_reader" / "read_UNFCCC_CRF_submission.py"
  251. datalad.api.run(
  252. cmd=f"./venv/bin/python3 {script.as_posix()} --country={country} "
  253. f"--submission_year={submission_year} --submission_date={submission_date}",
  254. dataset=root_path,
  255. message=f"Read data for {country}, CRF{submission_year}, {submission_date}.",
  256. inputs=input_files,
  257. outputs=output_files,
  258. dry_run=None,
  259. explicit=True,
  260. )
  261. def read_new_crf_for_year(
  262. submission_year: int,
  263. countries: Optional[List[str]]=None,
  264. re_read: Optional[bool]=False,
  265. ) -> dict:
  266. """
  267. Read CRF data for given submission year for all countries in
  268. `countries` that have submitted data. If no `countries` list is
  269. given, all countries are used.
  270. When updated submission exist the latest will be read.
  271. All tables available in the specification will be read for all years.
  272. Results will be written to appropriate country folders.
  273. If you want to read data from a different folder use the
  274. test_read_crf_data function.
  275. IMPORTANT NOTE:
  276. Currently there is no consistency check between data for the same category
  277. read from different tables
  278. Parameters
  279. __________
  280. submission_year: int
  281. Year of the submission of the data
  282. countries: List[int] (optional)
  283. List of countries to read. If not given reading is tried for all
  284. CRF countries
  285. re_read: bool (optional, default=False)
  286. If true data will be read even if already read before.
  287. TODO: write log with failed countries and what has been read
  288. Returns
  289. _______
  290. list[str]: list with country codes for which the data has been read
  291. """
  292. if countries is None:
  293. countries = all_crf_countries
  294. read_countries = {}
  295. for country in countries:
  296. try:
  297. country_df = read_crf_for_country(country, submission_year, re_read=re_read)
  298. if country_df is None:
  299. read_countries[country] = "skipped"
  300. else:
  301. read_countries[country] = "read"
  302. except NoCRFFilesError:
  303. print(f"No data for country {country}, {submission_year}")
  304. read_countries[country] = "no data"
  305. except:
  306. print(f"Data for country {country}, {submission_year} could not be read")
  307. read_countries[country]= "failed"
  308. # print overview
  309. successful_countries = [country for country in read_countries if read_countries[country] == "read"]
  310. skipped_countries = [country for country in read_countries if read_countries[country] == "skipped"]
  311. failed_countries = [country for country in read_countries if read_countries[country] == "failed"]
  312. no_data_countries = [country for country in read_countries if read_countries[country] == "no data"]
  313. print(f"Read data for countries {successful_countries}")
  314. print(f"Skipped countries {skipped_countries}")
  315. print(f"No data for countries {no_data_countries}")
  316. print(f"!!!!! Reading failed for {failed_countries}. Check why")
  317. return(read_countries)
  318. def read_new_crf_for_year_datalad(
  319. submission_year: int,
  320. countries: Optional[List[str]] = None,
  321. re_read: Optional[bool] = False,
  322. ) -> None:
  323. """
  324. TODO: this is just a copy of the one country function
  325. Wrapper around read_crf_for_year_datalad which takes care of selecting input
  326. and output files and using datalad run to trigger the data reading
  327. Parameters
  328. __________
  329. submission_year: int
  330. Year of the submission of the data
  331. countries: List[int] (optional)
  332. List of countries to read. If not given reading is tried for all
  333. CRF countries
  334. re_read: bool (optional, default=False)
  335. If true data will be read even if already read before.
  336. """
  337. input_files = []
  338. output_files = []
  339. # loop over countries to collect input and output files
  340. for country in countries:
  341. # get the country code and name
  342. # both could be given as input, so we need this two step process
  343. if country in custom_country_mapping:
  344. country_code = country
  345. else:
  346. country_code = get_country_code(country)
  347. # now get the country name
  348. country_name = get_country_name(country_code)
  349. print(f"Attempting to read data for CRF{submission_year} from {country}.")
  350. print("#"*80)
  351. print("")
  352. print(f"Using the UNFCCC_CRF_reader")
  353. print("")
  354. # get possible input files
  355. new_input_files = get_crf_files(country_codes=country_code,
  356. submission_year=submission_year,
  357. date=submission_date)
  358. new_input
  359. if not input_files:
  360. if submission_date is not None:
  361. print(f"No possible input files found for {country}, CRF{submission_year}, "
  362. f"v{submission_date}. Are they already submitted and included in the "
  363. f"repository?")
  364. else:
  365. print(f"No possible input files found for {country}, CRF{submission_year}. "
  366. f"Are they already submitted and included in the repository?")
  367. else:
  368. print(f"Found the following input_files:")
  369. for file in input_files:
  370. print(file.name)
  371. print("")
  372. # convert file's path to str
  373. input_files = [file.as_posix() for file in input_files]
  374. # get output file
  375. if submission_date is None:
  376. submission_date = get_latest_date_for_country(country_code, submission_year)
  377. output_folder = extracted_data_path / country_name.replace(" ", "_")
  378. output_files = [output_folder / f"{country_code}_CRF{submission_year}"
  379. f"_{submission_date}.{suffix}" for suffix
  380. in ['yaml', 'csv', 'nc']]
  381. print(f"The following files are considered as output_files:")
  382. for file in output_files:
  383. print(file)
  384. print("")
  385. # convert file paths to str
  386. output_files = [file.as_posix() for file in output_files]
  387. print(f"Run the script using datalad run via the python api")
  388. script = code_path / "UNFCCC_CRF_reader" / "read_UNFCCC_CRF_submission.py"
  389. datalad.api.run(
  390. cmd=f"./venv/bin/python3 {script.as_posix()} --country={country} "
  391. f"--submission_year={submission_year} --submission_date={submission_date}",
  392. dataset=root_path,
  393. message=f"Read data for {country}, CRF{submission_year}, {submission_date}.",
  394. inputs=input_files,
  395. outputs=output_files,
  396. dry_run=None,
  397. explicit=True,
  398. )
  399. # function to read all available data (or list of countries?)
  400. # make sure it works when not all countries have submitted data
  401. # give option to only read new data (no output yet), but also option to
  402. # read all data, e.g. when specifications have changed
  403. def get_country_name(
  404. country_code: str,
  405. ) -> str:
  406. """get country name from code """
  407. if country_code in custom_country_mapping:
  408. country_name = custom_country_mapping(country_code)
  409. else:
  410. try:
  411. country = pycountry.countries.get(alpha_3=country_code)
  412. country_name = country.name
  413. except:
  414. raise ValueError(f"Country code {country_code} can not be mapped to "
  415. f"any country")
  416. return country_name