UNFCCC_CRF_reader.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. import re
  2. from pathlib import Path
  3. from treelib import Tree
  4. import pandas as pd
  5. import xarray as xr
  6. import primap2 as pm2
  7. import pycountry
  8. import crf_specifications as crf
  9. from typing import Dict, List, Optional, Tuple, Union
  10. from datetime import date
  11. from .UNFCCC_CRF_reader_core import read_crf_table
  12. from .UNFCCC_CRF_reader_core import convert_crf_table_to_pm2if
  13. from .UNFCCC_CRF_reader_devel import save_unknown_categories_info
  14. from .UNFCCC_CRF_reader_devel import save_last_row_info
  15. from . import log_path, custom_country_mapping, extracted_data_path
  16. # functions:
  17. # * production functions
  18. # ** read one table for a country
  19. # ** read alist of tables for one country
  20. # ** convert to IF and NC and save
  21. # * testing fucntions
  22. # ** read one or more table(s) for all countries
  23. # (and a if desired only a single year) and write
  24. # output files with missing sectors etc
  25. # **
  26. # TODO: add saving to read_crf_for_country
  27. # TODO: add function to read several / all countries
  28. # general approach:
  29. # main code in a function that reads on table from one file.
  30. # return raw pandas DF for use in different functions
  31. # wrappers around this function to read for a whole country or for test reading where we also
  32. # write files with missing sectors etc.
  33. # merging functions use native pm2 format
  34. def read_crf_for_country(
  35. country_code: str,
  36. submission_year: int,
  37. ) -> xr.Dataset:
  38. """
  39. Read CRF data for given submission year and country. All tables
  40. available in the specification will be read for all years. Result
  41. will be written to appropriate country folder.
  42. If you want to read data for more countries of from a different folder
  43. use the test_read_crf_data function.
  44. IMPORTANT NOTE:
  45. Currently there is no consistency check between data for the same category
  46. read from different tables
  47. The folder can either be given explicitly or if not given folders are determined
  48. from the submission_year and country_code variables.
  49. The output is a primap2 dataset (xarray based).
  50. We only save the data in the country folder if there were no messages like
  51. unknown rows to make sure that data that goes into the repository is complete.
  52. The result dataframe is returned in any case. In case log messages appeared
  53. they are saved in the folder 'log' under the file name
  54. 'country_reading_<country_code>_<date>_X.csv'.
  55. Parameters
  56. __________
  57. country_codes: str
  58. ISO 3-letter country code
  59. submission_year: int
  60. Year of the submission of the data
  61. Returns
  62. _______
  63. first return value is a Pandas DataFrame with the read data in long format
  64. second return value
  65. third return value TODO
  66. """
  67. # get country name
  68. if country_code in custom_country_mapping:
  69. country_name = custom_country_mapping(country_code)
  70. else:
  71. try:
  72. country = pycountry.countries.get(alpha_3=country_code)
  73. country_name = country.name
  74. except:
  75. raise ValueError(f"Country code {country_code} can not be mapped to "
  76. f"any country")
  77. # get specification and available tables
  78. try:
  79. crf_spec = getattr(crf, f"CRF{submission_year}")
  80. #print(table_spec)
  81. except:
  82. raise ValueError(f"No terminology exists for submission year {submission_year}")
  83. tables = [table for table in crf_spec.keys()
  84. if crf_spec[table]["status"] == "tested"]
  85. print(f"The following tables are available in the " \
  86. f"CRF{submission_year} specification: {tables}")
  87. # TODO: get available dates (first get folders for country, then dates, select latest date and passt on)
  88. # dates need to be determined here.
  89. ds_all = None
  90. unknown_categories = []
  91. last_row_info = []
  92. for table in tables:
  93. # read table for all years
  94. ds_table, new_unknown_categories, new_last_row_info = read_crf_table(
  95. country_code, table, submission_year, folder="CRF2021")#, data_year=[1990])
  96. # collect messages on unknown rows etc
  97. unknown_categories = unknown_categories + new_unknown_categories
  98. last_row_info = last_row_info + new_last_row_info
  99. # convert to PRIMAP2 IF
  100. # first drop the orig_cat_name col as it can have multiple values for
  101. # one category
  102. ds_table = ds_table.drop(columns=["orig_cat_name"])
  103. # if we need to map entities pass this info to the conversion function
  104. if "entity_mapping" in crf_spec[table]:
  105. entity_mapping = crf_spec[table]["entity_mapping"]
  106. else:
  107. entity_mapping = None
  108. ds_table_if = convert_crf_table_to_pm2if(
  109. ds_table,
  110. 2021,
  111. meta_data_input={"title": "DEU"},
  112. entity_mapping=entity_mapping,
  113. )
  114. # now convert to native PRIMAP2 format
  115. ds_table_pm2 = pm2.pm2io.from_interchange_format(ds_table_if)
  116. # combine per table DS
  117. if ds_all is None:
  118. ds_all = ds_table_pm2
  119. else:
  120. ds_all = xr.combine_by_coords(data_objects=[ds_all, ds_table_pm2],
  121. compat='override',
  122. data_vars='all',
  123. coords='all',
  124. fill_value=np.nan,
  125. #join='outer',
  126. combine_attrs='drop_conflicts'
  127. )
  128. # check if there were log messages.
  129. save_data = True
  130. if len(unknown_categories) > 0:
  131. save_data = False
  132. today = date.today()
  133. log_location = log_path / f"CRF{submission_year}" \
  134. / f"{country_code}_unknown_categories_{today.strftime('%d/%m/%Y')}.csv"
  135. print(f"Unknown rows found for {country_code}. Not saving data. Savin log to "
  136. f"{log_location}" )
  137. save_unknown_categories_info(unknown_categories, log_location)
  138. if len(last_row_info) > 0:
  139. save_data = False
  140. today = date.today()
  141. log_location = log_path / f"CRF{submission_year}" \
  142. / f"{country_code}_last_row_info_{today.strftime('%d/%m/%Y')}.csv"
  143. print(f"Data found in the last row found for {country_code}. Not saving data. Savin log to "
  144. f"{log_location}")
  145. save_last_row_info(last_row_info, log_location)
  146. if save_data:
  147. output_folder = extracted_data_path / country_name.replace(" ", "_")
  148. output_filename = f"{country_code}_CRF{submission_year}_
  149. # TODO: need to consider the date when reading, there might be multiple submissions...
  150. if not output_folder.exists():
  151. output_folder.mkdir()
  152. # write data in interchnange formart
  153. pm2.pm2io.write_interchange_format(output_folder / (output_filename + coords_terminologies["category"]), data_if)
  154. # write data in native PRIAMP2 formart
  155. data_pm2 = pm2.pm2io.from_interchange_format(data_if)
  156. encoding = {var: compression for var in data_pm2.data_vars}
  157. data_pm2.pr.to_netcdf(output_folder / (output_filename + coords_terminologies["category"] + ".nc"),
  158. encoding=encoding)
  159. return ds_all