UNFCCC_CRF_reader_devel.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. """
  2. This file holds functions that are used in CRF reading development like
  3. adding new tables or new submission years (and according country specific
  4. categories). Thue functions are tailored towards debug output and reading
  5. of single years in contrast to the production functions which are tailored
  6. towards the creation of full datasets including storage in the
  7. """
  8. import pandas as pd
  9. import xarray as xr
  10. import primap2 as pm2
  11. from typing import List, Optional
  12. from pathlib import Path
  13. from datetime import date
  14. from util import all_crf_countries
  15. from util import log_path
  16. import crf_specifications as crf
  17. from UNFCCC_CRF_reader_core import get_country_name
  18. from UNFCCC_CRF_reader_core import get_latest_date_for_country, read_crf_table
  19. from UNFCCC_CRF_reader_core import convert_crf_table_to_pm2if
  20. def read_year_to_test_specs(
  21. submission_year: int,
  22. data_year: Optional[int]=None,
  23. ) -> xr.Dataset:
  24. """
  25. Read one xlsx file (so one data year) for each country for a submission year to
  26. create log files and extend the specifications
  27. """
  28. if data_year is None:
  29. data_year=2000
  30. unknown_categories = []
  31. last_row_info = []
  32. ds_all = None
  33. print(f"CRF test reading for CRF{submission_year}. Using data year {data_year}")
  34. print("#"*80)
  35. try:
  36. crf_spec = getattr(crf, f"CRF{submission_year}")
  37. except:
  38. raise ValueError(f"No terminology exists for submission years {submission_year}, "
  39. f"{submission_year - 1}")
  40. tables = [table for table in crf_spec.keys()
  41. if crf_spec[table]["status"] == "tested"]
  42. print(f"The following tables are available in the " \
  43. f"CRF{submission_year} specification: {tables}")
  44. print("#" * 80)
  45. for country_code in all_crf_countries:
  46. # get country name
  47. country_name = get_country_name(country_code)
  48. print(f"Reading for {country_name}")
  49. # get specification and available tables
  50. try:
  51. submission_date = get_latest_date_for_country(country_code, submission_year)
  52. except:
  53. print(f"No submissions for country {country_name}, CRF{submission_year}")
  54. submission_date = None
  55. if submission_date is not None:
  56. for table in tables:
  57. # read table for all years
  58. ds_table, new_unknown_categories, new_last_row_info = read_crf_table(
  59. country_code, table, submission_year, date=submission_date, data_year=[data_year])
  60. # collect messages on unknown rows etc
  61. unknown_categories = unknown_categories + new_unknown_categories
  62. last_row_info = last_row_info + new_last_row_info
  63. # convert to PRIMAP2 IF
  64. # first drop the orig_cat_name col as it can have multiple values for
  65. # one category
  66. ds_table = ds_table.drop(columns=["orig_cat_name"])
  67. # TODO: catch entity conversion errors and make list of error entities
  68. # if we need to map entities pass this info to the conversion function
  69. if "entity_mapping" in crf_spec[table]:
  70. entity_mapping = crf_spec[table]["entity_mapping"]
  71. else:
  72. entity_mapping = None
  73. try:
  74. ds_table_if = convert_crf_table_to_pm2if(
  75. ds_table,
  76. submission_year,
  77. meta_data_input={"title": f"Data submitted in {submission_year} to the UNFCCC "
  78. f"in the common reporting format (CRF) by {country_name}. "
  79. f"Submission date: {submission_date}"},
  80. entity_mapping=entity_mapping,
  81. )
  82. # now convert to native PRIMAP2 format
  83. ds_table_pm2 = pm2.pm2io.from_interchange_format(ds_table_if)
  84. # combine per table DS
  85. if ds_all is None:
  86. ds_all = ds_table_pm2
  87. else:
  88. ds_all = ds_all.combine_first(ds_table_pm2)
  89. except:
  90. print(f"Error occured when converting table {table} for {country_name} to"
  91. f" PRIMAP2 IF.")
  92. # TODO: error handling and logging
  93. # process log messages.
  94. today = date.today()
  95. if len(unknown_categories) > 0:
  96. log_location = log_path / f"CRF{submission_year}" \
  97. / f"{data_year}_unknown_categories_{today.strftime('%Y-%m-%d')}.csv"
  98. print(f"Unknown rows found. Savin log to {log_location}")
  99. save_unknown_categories_info(unknown_categories, log_location)
  100. if len(last_row_info) > 0:
  101. log_location = log_path / f"CRF{submission_year}" \
  102. / f"{data_yar}_last_row_info_{today.strftime('%Y-%m-%d')}.csv"
  103. print(f"Data found in the last row. Savin log to "
  104. f"{log_location}")
  105. save_last_row_info(last_row_info, log_location)
  106. # save the data:
  107. compression = dict(zlib=True, complevel=9)
  108. output_folder = log_path / f"test_read_CRF{submission_year}"
  109. output_filename = f"CRF{submission_year}_{today.strftime('%Y-%m-%d')}"
  110. if not output_folder.exists():
  111. output_folder.mkdir()
  112. # write data in interchange format
  113. pm2.pm2io.write_interchange_format(output_folder / output_filename,
  114. ds_all.pr.to_interchange_format())
  115. # write data in native PRIMAP2 format
  116. encoding = {var: compression for var in ds_all.data_vars}
  117. ds_all.pr.to_netcdf(output_folder / (output_filename + ".nc"),
  118. encoding=encoding)
  119. return ds_all
  120. def save_unknown_categories_info(
  121. unknown_categories: List[List],
  122. file: Path,
  123. ) -> None:
  124. """
  125. Save information on unknown categories to a csv file.
  126. Parameters
  127. __________
  128. unknown_categories: List[List]
  129. List of lists with information on the unknown categories.
  130. (which table, country and year, and which categories)
  131. file: pathlib.Path
  132. File including path where the data should be stored
  133. """
  134. # process unknown categories
  135. df_unknown_cats = pd.DataFrame(unknown_categories, columns=["Table", "Country", "Category", "Year"])
  136. processed_cats = []
  137. all_tables = df_unknown_cats["Table"].unique()
  138. all_years = set(df_unknown_cats["Year"].unique())
  139. all_years = set([year for year in all_years if isinstance(year, int)])
  140. all_years = set([year for year in all_years if int(year) > 1989])
  141. for table in all_tables:
  142. df_cats_current_table = df_unknown_cats[df_unknown_cats["Table"] == table]
  143. cats_current_table = list(df_cats_current_table["Category"].unique())
  144. for cat in cats_current_table:
  145. df_current_cat_table = df_cats_current_table[df_cats_current_table["Category"] == cat]
  146. all_countries = df_current_cat_table["Country"].unique()
  147. countries_cat = ""
  148. for country in all_countries:
  149. years_country = df_current_cat_table[df_current_cat_table["Country"] == country]["Year"].unique()
  150. if set(years_country) == all_years:
  151. countries_cat = f"{countries_cat}; {country}"
  152. else:
  153. countries_cat = f"{countries_cat}; {country} ({years_country})"
  154. processed_cats.append([table, cat, countries_cat])
  155. if not file.parents[1].exists():
  156. file.parents[1].mkdir()
  157. if not file.parents[0].exists():
  158. file.parents[0].mkdir()
  159. df_processed_cats = pd.DataFrame(processed_cats, columns=["Table", "Category", "Countries"])
  160. df_processed_cats.to_csv(file, index=False)
  161. def save_last_row_info(
  162. last_row_info: List[List],
  163. file: Path,
  164. ) -> None:
  165. """
  166. Save information on data found in the last row read for a table.
  167. The last row read should not contain data. If it does contain data
  168. it is a hint that table size is larger for some countries than
  169. given in the specification and thus we might not read the full table.
  170. Parameters
  171. __________
  172. last_row_info: List[List]
  173. List of lists with information on the unknown categories.
  174. (which table, country and year, and which categories)
  175. file: pathlib.Path
  176. File including path where the data should be stored
  177. """
  178. # process last row with information messages
  179. df_last_row_info = pd.DataFrame(last_row_info, columns=["Table", "Country", "Category", "Year"])
  180. processed_last_row_info = []
  181. all_tables = df_last_row_info["Table"].unique()
  182. all_years = set(df_last_row_info["Year"].unique())
  183. all_years = set([year for year in all_years if isinstance(year, int)])
  184. all_years = set([year for year in all_years if year > 1989])
  185. for table in all_tables:
  186. df_last_row_current_table = df_last_row_info[df_last_row_info["Table"] == table]
  187. all_countries = df_last_row_current_table["Country"].unique()
  188. for country in all_countries:
  189. df_current_country_table = df_last_row_current_table[df_last_row_current_table["Country"] == country]
  190. all_categories = df_current_country_table["Category"].unique()
  191. cats_country = ""
  192. for cat in all_categories:
  193. years_category = df_current_country_table[df_current_country_table["Category"] == cat]["Year"].unique()
  194. if set(years_category) == all_years:
  195. cats_country = f"{cats_country}; {cat}"
  196. else:
  197. cats_country = f"{cats_country}; {cat} ({years_category})"
  198. processed_last_row_info.append([table, country, cats_country])
  199. if not file.parents[1].exists():
  200. file.parents[1].mkdir()
  201. if not file.parents[0].exists():
  202. file.parents[0].mkdir()
  203. df_processed_lost_row_info = pd.DataFrame(processed_last_row_info, columns=["Table", "Country", "Categories"])
  204. df_processed_lost_row_info.to_csv("test_last_row_info.csv", index=False)