functions.py 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017
  1. """common functions for unfccc_ghg_data
  2. Functions used by the different readers and downloaders in the unfccc_ghg_data package
  3. """
  4. import copy
  5. import json
  6. import re
  7. from copy import deepcopy
  8. from datetime import date
  9. from pathlib import Path
  10. from typing import Optional
  11. import numpy as np
  12. import pandas as pd
  13. import pycountry
  14. import xarray as xr
  15. from .definitions import (
  16. GWP_factors,
  17. code_path,
  18. custom_country_mapping,
  19. custom_folders,
  20. downloaded_data_path,
  21. extracted_data_path,
  22. legacy_data_path,
  23. root_path,
  24. )
  25. def process_data_for_country(
  26. data_country: xr.Dataset,
  27. entities_to_ignore: list[str],
  28. gas_baskets: dict[str, list[str]],
  29. filter_dims: Optional[dict[str, list[str]]] = None,
  30. cat_terminology_out: Optional[str] = None,
  31. category_conversion: dict[str, dict] = None,
  32. sectors_out: list[str] = None,
  33. processing_info_country: dict = None,
  34. ) -> xr.Dataset:
  35. """
  36. Process data from DI interface (where necessary).
  37. * Downscaling including subtraction of time series
  38. * country specific sector aggregation
  39. * Conversion to IPCC2006 categories
  40. * general sector and gas basket aggregation (in new categories)
  41. Parameters
  42. ----------
  43. data_country: xr.Dataset
  44. data to process
  45. entities_to_ignore: list[str]
  46. Which entities should be ignored. They will not be in the returned dataset
  47. gas_baskets: dict[str, list[str]
  48. Gas baskets to create. Each entry consists of the basket as key and a list of
  49. gases that make up the basket as value
  50. filter_dims: Optional[dict[str, list[str]]] = None
  51. filter data before processing. Filter is in the format taken by PRIMAP2's
  52. ds.pr.loc[] functionality
  53. cat_terminology_out: Optional[str] = None
  54. Category terminology for the output dataset
  55. category_conversion: dict[str, dict] = None
  56. Definition of category conversion. The dict has two possible fields:
  57. * "conversion" where the value is a dict[str, str] with 1 to 1 category code
  58. mapping (key is category from and value is category to)
  59. * "aggregation" TODO
  60. sectors_out: list[str] = None
  61. Categories to return
  62. processing_info_country
  63. more detailed processing info TODO: explain format
  64. Returns
  65. -------
  66. xr.Dataset: processed dataset
  67. """
  68. # 0: gather information
  69. countries = list(data_country.coords[data_country.attrs["area"]].values)
  70. if len(countries) > 1:
  71. raise ValueError(
  72. f"Found {len(countries)} countries. Only single country data "
  73. f"can be processed by this function. countries: {countries}"
  74. )
  75. else:
  76. country_code = countries[0]
  77. # get category terminology
  78. cat_col = data_country.attrs["cat"]
  79. temp = re.findall(r"\((.*)\)", cat_col)
  80. cat_terminology_in = temp[0]
  81. # get scenario
  82. scenarios = list(data_country.coords[data_country.attrs["scen"]].values)
  83. if len(scenarios) > 1:
  84. raise ValueError(
  85. f"Found {len(scenarios)} scenarios. Only single scenario data "
  86. f"can be processed by this function. Scenarios: {scenarios}"
  87. )
  88. scenario = scenarios[0]
  89. # get source
  90. sources = list(data_country.coords["source"].values)
  91. if len(sources) > 1:
  92. raise ValueError(
  93. f"Found {len(sources)} sources. Only single source data "
  94. f"can be processed by this function. Sources: {sources}"
  95. )
  96. source = sources[0]
  97. # check if category name column present
  98. # TODO: replace 'name' in config by 'additional_cols' dict that defines the cols
  99. # and the values
  100. if "orig_cat_name" in data_country.coords:
  101. cat_name_present = True
  102. else:
  103. cat_name_present = False
  104. # 1: general processing
  105. # remove unused cats
  106. data_country = data_country.dropna(f"category ({cat_terminology_in})", how="all")
  107. # remove unused years
  108. data_country = data_country.dropna("time", how="all")
  109. # remove variables only containing nan
  110. nan_vars_country = [
  111. var
  112. for var in data_country.data_vars
  113. if bool(data_country[var].isnull().all().data) is True
  114. ]
  115. print(f"removing all-nan variables: {nan_vars_country}")
  116. data_country = data_country.drop_vars(nan_vars_country)
  117. # remove unnecessary variables
  118. entities_ignore_present = [
  119. entity for entity in entities_to_ignore if entity in data_country.data_vars
  120. ]
  121. data_country = data_country.drop_vars(entities_ignore_present)
  122. # filter ()
  123. if filter_dims is not None:
  124. data_country = data_country.pr.loc[filter_dims]
  125. # 2: country specific processing
  126. if processing_info_country is not None:
  127. if "tolerance" in processing_info_country:
  128. tolerance = processing_info_country["tolerance"]
  129. else:
  130. tolerance = 0.01
  131. # remove entities if needed
  132. if "ignore_entities" in processing_info_country:
  133. entities_to_ignore_country = processing_info_country["ignore_entities"]
  134. entities_ignore_present = [
  135. entity
  136. for entity in entities_to_ignore_country
  137. if entity in data_country.data_vars
  138. ]
  139. data_country = data_country.drop_vars(entities_ignore_present)
  140. # take only desired years
  141. if "years" in processing_info_country:
  142. data_country = data_country.pr.loc[
  143. {"time": processing_info_country["years"]}
  144. ]
  145. # remove timeseries if desired
  146. if "remove_ts" in processing_info_country:
  147. for case in processing_info_country["remove_ts"]:
  148. remove_info = copy.deepcopy(processing_info_country["remove_ts"][case])
  149. entities = remove_info.pop("entities")
  150. for entity in entities:
  151. data_country[entity].pr.loc[remove_info] = (
  152. data_country[entity].pr.loc[remove_info] * np.nan
  153. )
  154. # remove all data for given years if necessary
  155. if "remove_years" in processing_info_country:
  156. data_country = data_country.drop_sel(
  157. time=processing_info_country["remove_years"]
  158. )
  159. # subtract categories
  160. if "subtract_cats" in processing_info_country:
  161. subtract_cats_current = processing_info_country["subtract_cats"]
  162. print(f"Subtracting categories for country {country_code}")
  163. for cat_to_generate in subtract_cats_current:
  164. if "entities" in subtract_cats_current[cat_to_generate].keys():
  165. entities_current = subtract_cats_current[cat_to_generate][
  166. "entities"
  167. ]
  168. else:
  169. entities_current = list(data_country.data_vars)
  170. cats_to_subtract = subtract_cats_current[cat_to_generate]["subtract"]
  171. data_sub = (
  172. data_country[entities_current]
  173. .pr.loc[{"category": cats_to_subtract}]
  174. .pr.sum(dim="category", skipna=True, min_count=1)
  175. )
  176. data_parent = data_country[entities_current].pr.loc[
  177. {"category": subtract_cats_current[cat_to_generate]["parent"]}
  178. ]
  179. data_agg = data_parent - data_sub
  180. nan_vars = [
  181. var
  182. for var in data_agg.data_vars
  183. if data_agg[var].isnull().all().data is True
  184. ]
  185. data_agg = data_agg.drop(nan_vars)
  186. if len(data_agg.data_vars) > 0:
  187. print(f"Generating {cat_to_generate} through subtraction")
  188. data_agg = data_agg.expand_dims(
  189. [f"category (" f"{cat_terminology_in})"]
  190. )
  191. data_agg = data_agg.assign_coords(
  192. coords={
  193. f"category ({cat_terminology_in})": (
  194. f"category ({cat_terminology_in})",
  195. [cat_to_generate],
  196. )
  197. }
  198. )
  199. if cat_name_present:
  200. cat_name = subtract_cats_current[cat_to_generate]["name"]
  201. data_agg = data_agg.assign_coords(
  202. coords={
  203. "orig_cat_name": (
  204. f"category ({cat_terminology_in})",
  205. [cat_name],
  206. )
  207. }
  208. )
  209. data_country = data_country.pr.merge(data_agg, tolerance=tolerance)
  210. else:
  211. print(f"no data to generate category {cat_to_generate}")
  212. # downscaling
  213. if "downscale" in processing_info_country:
  214. if "sectors" in processing_info_country["downscale"]:
  215. sector_downscaling = processing_info_country["downscale"]["sectors"]
  216. for case in sector_downscaling.keys():
  217. print(f"Downscaling for {case}.")
  218. sector_downscaling_current = sector_downscaling[case]
  219. entities = sector_downscaling_current.pop("entities")
  220. for entity in entities:
  221. data_country[entity] = data_country[
  222. entity
  223. ].pr.downscale_timeseries(**sector_downscaling_current)
  224. # , skipna_evaluation_dims=None)
  225. if "entities" in processing_info_country["downscale"]:
  226. entity_downscaling = processing_info_country["downscale"]["entities"]
  227. for case in entity_downscaling.keys():
  228. print(f"Downscaling for {case}.")
  229. # print(data_country.coords[f'category ('
  230. # f'{cat_terminology_in})'].values)
  231. data_country = data_country.pr.downscale_gas_timeseries(
  232. **entity_downscaling[case],
  233. skipna=True,
  234. skipna_evaluation_dims=None,
  235. )
  236. # aggregate categories
  237. if "aggregate_cats" in processing_info_country:
  238. if "agg_tolerance" in processing_info_country:
  239. agg_tolerance = processing_info_country["agg_tolerance"]
  240. else:
  241. agg_tolerance = tolerance
  242. aggregate_cats_current = processing_info_country["aggregate_cats"]
  243. print(
  244. f"Aggregating categories for country {country_code}, source {source}, "
  245. f"scenario {scenario}"
  246. )
  247. for cat_to_agg in aggregate_cats_current:
  248. print(f"Category: {cat_to_agg}")
  249. source_cats = aggregate_cats_current[cat_to_agg]["sources"]
  250. data_agg = data_country.pr.loc[{"category": source_cats}].pr.sum(
  251. dim="category", skipna=True, min_count=1
  252. )
  253. nan_vars = [
  254. var
  255. for var in data_agg.data_vars
  256. if data_agg[var].isnull().all().data is True
  257. ]
  258. data_agg = data_agg.drop(nan_vars)
  259. if len(data_agg.data_vars) > 0:
  260. data_agg = data_agg.expand_dims(
  261. [f"category (" f"{cat_terminology_in})"]
  262. )
  263. data_agg = data_agg.assign_coords(
  264. coords={
  265. f"category ({cat_terminology_in})": (
  266. f"category ({cat_terminology_in})",
  267. [cat_to_agg],
  268. )
  269. }
  270. )
  271. if cat_name_present:
  272. cat_name = aggregate_cats_current[cat_to_agg]["name"]
  273. data_agg = data_agg.assign_coords(
  274. coords={
  275. "orig_cat_name": (
  276. f"category ({cat_terminology_in})",
  277. [cat_name],
  278. )
  279. }
  280. )
  281. data_country = data_country.pr.merge(
  282. data_agg, tolerance=agg_tolerance
  283. )
  284. else:
  285. print(f"no data to aggregate category {cat_to_agg}")
  286. # copy HFCs and PFCs with default factors
  287. if "basket_copy" in processing_info_country:
  288. GWPs_to_add = processing_info_country["basket_copy"]["GWPs_to_add"]
  289. entities = processing_info_country["basket_copy"]["entities"]
  290. source_GWP = processing_info_country["basket_copy"]["source_GWP"]
  291. for entity in entities:
  292. data_source = data_country[f"{entity} ({source_GWP})"]
  293. for GWP in GWPs_to_add:
  294. data_GWP = (
  295. data_source * GWP_factors[f"{source_GWP}_to_{GWP}"][entity]
  296. )
  297. data_GWP.attrs["entity"] = entity
  298. data_GWP.attrs["gwp_context"] = GWP
  299. data_country[f"{entity} ({GWP})"] = data_GWP
  300. # aggregate gases if desired
  301. if "aggregate_gases" in processing_info_country:
  302. # TODO: why use different code here than below. Can this fill non-existen
  303. # gas baskets?
  304. for case in processing_info_country["aggregate_gases"].keys():
  305. case_info = processing_info_country["aggregate_gases"][case]
  306. data_country[
  307. case_info["basket"]
  308. ] = data_country.pr.fill_na_gas_basket_from_contents(**case_info)
  309. # 3: map categories
  310. if category_conversion is not None:
  311. data_country = convert_categories(
  312. data_country,
  313. category_conversion,
  314. cat_terminology_out,
  315. debug=False,
  316. tolerance=0.01,
  317. )
  318. else:
  319. cat_terminology_out = cat_terminology_in
  320. # more general processing
  321. # reduce categories to output cats
  322. if sectors_out is not None:
  323. cats_to_keep = [
  324. cat
  325. for cat in data_country.coords[f"category ({cat_terminology_out})"].values
  326. if cat in sectors_out
  327. ]
  328. data_country = data_country.pr.loc[{"category": cats_to_keep}]
  329. # create gas baskets
  330. entities_present = set(data_country.data_vars)
  331. for basket in gas_baskets.keys():
  332. basket_contents_present = [
  333. gas for gas in gas_baskets[basket] if gas in entities_present
  334. ]
  335. if len(basket_contents_present) > 0:
  336. if basket in list(data_country.data_vars):
  337. data_country[basket] = data_country.pr.fill_na_gas_basket_from_contents(
  338. basket=basket,
  339. basket_contents=basket_contents_present,
  340. skipna=True,
  341. min_count=1,
  342. )
  343. else:
  344. try:
  345. # print(data_country.data_vars)
  346. data_country[basket] = xr.full_like(
  347. data_country["CO2"], np.nan
  348. ).pr.quantify(units="Gg CO2 / year")
  349. data_country[basket].attrs = {
  350. "entity": basket.split(" ")[0],
  351. "gwp_context": basket.split(" ")[1][1:-1],
  352. }
  353. data_country[basket] = data_country.pr.gas_basket_contents_sum(
  354. basket=basket,
  355. basket_contents=basket_contents_present,
  356. min_count=1,
  357. )
  358. entities_present.add(basket)
  359. except Exception as ex:
  360. print(
  361. f"No gas basket created for {country_code}, {source}, "
  362. f"{scenario}: {ex}"
  363. )
  364. # amend title and comment
  365. data_country.attrs["comment"] = (
  366. data_country.attrs["comment"] + f" Processed on " f"{date.today()}"
  367. )
  368. data_country.attrs["title"] = (
  369. data_country.attrs["title"] + f" Processed on " f"{date.today()}"
  370. )
  371. return data_country
  372. def convert_categories(
  373. ds_input: xr.Dataset,
  374. conversion: dict[str, dict[str, str]],
  375. # terminology_from: str,
  376. terminology_to: str,
  377. debug: bool = False,
  378. tolerance: float = 0.01,
  379. ) -> xr.Dataset:
  380. """
  381. convert data from one category terminology to another
  382. """
  383. print(f"converting categories to {terminology_to}")
  384. if "orig_cat_name" in ds_input.coords:
  385. cat_name_present = True
  386. else:
  387. cat_name_present = False
  388. ds_converted = ds_input.copy(deep=True)
  389. ds_converted.attrs = deepcopy(ds_input.attrs)
  390. # TODO: change attrs for additional coordinates
  391. # change category terminology
  392. cat_dim = ds_converted.attrs["cat"]
  393. ds_converted.attrs["cat"] = f"category ({terminology_to})"
  394. ds_converted = ds_converted.rename({cat_dim: ds_converted.attrs["cat"]})
  395. # find categories present in dataset
  396. cats_present = list(ds_converted.coords[f"category ({terminology_to})"])
  397. # restrict categories and map category names
  398. if "mapping" in conversion.keys():
  399. mapping_cats_present = [
  400. cat for cat in list(conversion["mapping"].keys()) if cat in cats_present
  401. ]
  402. ds_converted = ds_converted.pr.loc[{"category": mapping_cats_present}]
  403. from_cats = ds_converted.coords[f"category ({terminology_to})"].values
  404. to_cats = pd.Series(from_cats).replace(conversion["mapping"])
  405. ds_converted = ds_converted.assign_coords(
  406. {f"category ({terminology_to})": (f"category ({terminology_to})", to_cats)}
  407. )
  408. # redo the list of present cats after mapping, as we have new categories in the
  409. # target terminology now
  410. cats_present_mapped = list(
  411. ds_converted.coords[f"category (" f"{terminology_to})"].values
  412. )
  413. # aggregate categories
  414. if "aggregate" in conversion:
  415. aggregate_cats = conversion["aggregate"]
  416. for cat_to_agg in aggregate_cats:
  417. if debug:
  418. print(f"Category: {cat_to_agg}")
  419. source_cats = [
  420. cat
  421. for cat in aggregate_cats[cat_to_agg]["sources"]
  422. if cat in cats_present_mapped
  423. ]
  424. if debug:
  425. print(source_cats)
  426. data_agg = ds_converted.pr.loc[{"category": source_cats}].pr.sum(
  427. dim="category", skipna=True, min_count=1
  428. )
  429. nan_vars = [
  430. var
  431. for var in data_agg.data_vars
  432. if data_agg[var].isnull().all().data is True
  433. ]
  434. data_agg = data_agg.drop(nan_vars)
  435. if len(data_agg.data_vars) > 0:
  436. data_agg = data_agg.expand_dims([f"category ({terminology_to})"])
  437. data_agg = data_agg.assign_coords(
  438. coords={
  439. f"category ({terminology_to})": (
  440. f"category ({terminology_to})",
  441. [cat_to_agg],
  442. )
  443. }
  444. )
  445. if cat_name_present:
  446. data_agg = data_agg.assign_coords(
  447. coords={
  448. "orig_cat_name": (
  449. f"category ({terminology_to})",
  450. [aggregate_cats[cat_to_agg]["name"]],
  451. )
  452. }
  453. )
  454. ds_converted = ds_converted.pr.merge(data_agg, tolerance=tolerance)
  455. cats_present_mapped.append(cat_to_agg)
  456. else:
  457. print(f"no data to aggregate category {cat_to_agg}")
  458. return ds_converted
  459. def get_country_name(
  460. country_code: str,
  461. ) -> str:
  462. """Get country name from code"""
  463. if country_code in custom_country_mapping:
  464. country_name = custom_country_mapping[country_code]
  465. else:
  466. try:
  467. country = pycountry.countries.get(alpha_3=country_code)
  468. country_name = country.name
  469. except:
  470. raise ValueError(
  471. f"Country code {country_code} can not be mapped to " f"any country"
  472. )
  473. return country_name
  474. def get_country_code(
  475. country_name: str,
  476. ) -> str:
  477. """
  478. obtain country code. If the input is a code it will be returned,
  479. if the input
  480. is not a three letter code a search will be performed
  481. Parameters
  482. ----------
  483. __________
  484. country_name: str
  485. Country code or name to get the three-letter code for.
  486. Returns
  487. -------
  488. country_code: str
  489. """
  490. # First check if it's in the list of custom codes
  491. if country_name in custom_country_mapping:
  492. country_code = country_name
  493. else:
  494. try:
  495. # check if it's a 3 letter code
  496. country = pycountry.countries.get(alpha_3=country_name)
  497. country_code = country.alpha_3
  498. except:
  499. try:
  500. country = pycountry.countries.search_fuzzy(
  501. country_name.replace("_", " ")
  502. )
  503. except:
  504. raise ValueError(
  505. f"Country name {country_name} can not be mapped to "
  506. f"any country code. Try using the ISO3 code directly."
  507. )
  508. if len(country) > 1:
  509. country_code = None
  510. for current_country in country:
  511. if current_country.name == country_name:
  512. country_code = current_country.alpha_3
  513. if country_code is None:
  514. raise ValueError(
  515. f"Country name {country_name} has {len(country)} "
  516. f"possible results for country codes."
  517. )
  518. country_code = country[0].alpha_3
  519. return country_code
  520. def create_folder_mapping(folder: str, extracted: bool = False) -> None:
  521. """
  522. Create a mapping from 3 letter ISO country codes to folders
  523. based on the subfolders of the given folder. The mapping is
  524. stored in 'folder_mapping.json' in the given folder. Folder
  525. must be given relative to the repository root
  526. Parameters
  527. ----------
  528. folder: str
  529. folder to create the mapping for
  530. extracted: bool = False
  531. If true treat the folder as extracted data, where we
  532. only have one folder per country and no typos in the
  533. names
  534. Returns
  535. -------
  536. Nothing
  537. """
  538. folder = root_path / folder
  539. folder_mapping = {}
  540. # if not extracted:
  541. known_folders = custom_folders
  542. # else:
  543. # known_folders = {}
  544. for item in folder.iterdir():
  545. if item.is_dir() and not item.match("__pycache__"):
  546. if item.name in known_folders:
  547. ISO3 = known_folders[item.name]
  548. else:
  549. try:
  550. country = pycountry.countries.search_fuzzy(
  551. item.name.replace("_", " ")
  552. )
  553. if len(country) > 1:
  554. ISO3 = None
  555. for current_country in country:
  556. if current_country.name == item.name.replace("_", " "):
  557. ISO3 = current_country.alpha_3
  558. else:
  559. ISO3 = country[0].alpha_3
  560. except:
  561. ISO3 = None
  562. if ISO3 is None:
  563. print(f"No match for {item.name}")
  564. else:
  565. if ISO3 in folder_mapping.keys():
  566. folder_mapping[ISO3] = [folder_mapping[ISO3], item.name]
  567. else:
  568. folder_mapping[ISO3] = item.name
  569. with open(folder / "folder_mapping.json", "w") as mapping_file:
  570. json.dump(folder_mapping, mapping_file, indent=4)
  571. # TODO add crf
  572. def get_country_submissions(
  573. country_name: str,
  574. print_sub: bool = True,
  575. ) -> dict[str, list[str]]:
  576. """
  577. Input is a three letter ISO code for a country, or the countries name.
  578. The function tries to map the country name to an ISO code and then
  579. queries the folder mapping files for folders.
  580. Parameters
  581. ----------
  582. country_name: str
  583. String containing the country name or ISO 3 letter code
  584. print_sub: bool
  585. If True information on submissions will be written to stdout
  586. Returns
  587. -------
  588. returns a dict with keys for the dataset classes (e.g. UNFCCC, non-UNFCCC)
  589. Each value is a list of folders
  590. """
  591. data_folder = downloaded_data_path
  592. country_code = get_country_code(country_name)
  593. if print_sub:
  594. print(f"Country name {country_name} maps to ISO code {country_code}")
  595. country_submissions = {}
  596. if print_sub:
  597. print("#" * 80)
  598. print(f"The following submissions are available for {country_name}")
  599. for item in data_folder.iterdir():
  600. if item.is_dir():
  601. if print_sub:
  602. print("")
  603. print("-" * 80)
  604. print(f"Data folder {item.name}")
  605. print("-" * 80)
  606. with open(item / "folder_mapping.json") as mapping_file:
  607. folder_mapping = json.load(mapping_file)
  608. if country_code in folder_mapping:
  609. country_folders = folder_mapping[country_code]
  610. if isinstance(country_folders, str):
  611. # only one folder
  612. country_folders = [country_folders]
  613. submission_folders = []
  614. for country_folder in country_folders:
  615. current_folder = item / country_folder
  616. if print_sub:
  617. print(f"Submissions in folder {country_folder}:")
  618. for submission_folder in current_folder.iterdir():
  619. if submission_folder.is_dir():
  620. if print_sub:
  621. print(submission_folder.name)
  622. submission_folders.append(submission_folder.name)
  623. country_submissions[item.name] = submission_folders
  624. else:
  625. print(f"No submissions available for {country_name}.")
  626. return country_submissions
  627. def get_country_datasets(
  628. country_name: str,
  629. print_ds: bool = True,
  630. ) -> dict[str, list[str]]:
  631. """
  632. Input is a three letter ISO code for a country, or the country's name.
  633. The function tries to map the country name to an ISO code and then
  634. checks the code and data folders for content on the country.
  635. Parameters
  636. ----------
  637. country_name: str
  638. String containing the country name or ISO 3 letter code
  639. print_ds: bool
  640. If True information on submissions will be written to stdout
  641. Returns
  642. -------
  643. returns a dict with keys for the dataset classes (e.g. UNFCCC, non-UNFCCC)
  644. Each value is a list of folders
  645. """
  646. data_folder = extracted_data_path
  647. data_folder_legacy = legacy_data_path
  648. # obtain country code
  649. country_code = get_country_code(country_name)
  650. if print_ds:
  651. print(f"Country name {country_name} maps to ISO code {country_code}")
  652. rep_data = {}
  653. # data
  654. if print_ds:
  655. print("#" * 80)
  656. print(f"The following datasets are available for {country_name}")
  657. for item in data_folder.iterdir():
  658. if item.is_dir():
  659. cleaned_datasets_current_folder = {}
  660. if print_ds:
  661. print("-" * 80)
  662. print(f"Data folder {item.name}")
  663. print("-" * 80)
  664. with open(item / "folder_mapping.json") as mapping_file:
  665. folder_mapping = json.load(mapping_file)
  666. if country_code not in folder_mapping:
  667. if print_ds:
  668. print("No data available")
  669. print("")
  670. else:
  671. country_folder = folder_mapping[country_code]
  672. if not isinstance(country_folder, str):
  673. raise ValueError(
  674. "Wrong data type in folder mapping json file. Should be str."
  675. )
  676. datasets_current_folder = {}
  677. current_folder = item / country_folder
  678. for data_file in current_folder.iterdir():
  679. if data_file.suffix in [".nc", ".yaml", ".csv"]:
  680. if data_file.stem in datasets_current_folder:
  681. datasets_current_folder[data_file.stem].append(
  682. data_file.suffix
  683. )
  684. else:
  685. datasets_current_folder[data_file.stem] = [data_file.suffix]
  686. for dataset in datasets_current_folder:
  687. # process filename to get submission
  688. parts = dataset.split("_")
  689. if parts[0] != country_code:
  690. cleaned_datasets_current_folder[
  691. f"Wrong code: {parts[0]}"
  692. ] = dataset
  693. else:
  694. terminology = "_".join(parts[3:])
  695. key = f"{parts[1]} ({parts[2]}, {terminology})"
  696. data_info = ""
  697. if ".nc" in datasets_current_folder[dataset]:
  698. data_info = data_info + "NF (.nc), "
  699. if (".csv" in datasets_current_folder[dataset]) and (
  700. ".yaml" in datasets_current_folder[dataset]
  701. ):
  702. data_info = data_info + "IF (.yaml + .csv), "
  703. elif ".csv" in datasets_current_folder[dataset]:
  704. data_info = data_info + "incomplete IF? (.csv), "
  705. elif ".yaml" in datasets_current_folder[dataset]:
  706. data_info = data_info + "incomplete IF (.yaml), "
  707. code_file = get_code_file(country_code, parts[1])
  708. if code_file:
  709. data_info = data_info + f"code: {code_file.name}"
  710. else:
  711. data_info = data_info + "code: not found"
  712. cleaned_datasets_current_folder[key] = data_info
  713. if print_ds:
  714. if cleaned_datasets_current_folder:
  715. for country_ds in cleaned_datasets_current_folder:
  716. print(
  717. f"{country_ds}: {cleaned_datasets_current_folder[country_ds]}"
  718. )
  719. else:
  720. print("No data available")
  721. print("")
  722. rep_data[item.name] = cleaned_datasets_current_folder
  723. # legacy data
  724. if print_ds:
  725. print("#" * 80)
  726. print(f"The following legacy datasets are available for {country_name}")
  727. legacy_data = {}
  728. for item in data_folder_legacy.iterdir():
  729. if item.is_dir():
  730. cleaned_datasets_current_folder = {}
  731. if print_ds:
  732. print("-" * 80)
  733. print(f"Data folder {item.name}")
  734. print("-" * 80)
  735. with open(item / "folder_mapping.json") as mapping_file:
  736. folder_mapping = json.load(mapping_file)
  737. if country_code not in folder_mapping:
  738. if print_ds:
  739. print("No data available")
  740. print("")
  741. else:
  742. country_folder = folder_mapping[country_code]
  743. if not isinstance(country_folder, str):
  744. raise ValueError(
  745. "Wrong data type in folder mapping json file. Should be str."
  746. )
  747. datasets_current_folder = {}
  748. current_folder = item / country_folder
  749. for data_file in current_folder.iterdir():
  750. if data_file.suffix in [".nc", ".yaml", ".csv"]:
  751. if data_file.stem in datasets_current_folder:
  752. datasets_current_folder[data_file.stem].append(
  753. data_file.suffix
  754. )
  755. else:
  756. datasets_current_folder[data_file.stem] = [data_file.suffix]
  757. for dataset in datasets_current_folder:
  758. # process filename to get submission
  759. parts = dataset.split("_")
  760. if parts[0] != country_code:
  761. cleaned_datasets_current_folder[
  762. f"Wrong code: {parts[0]}"
  763. ] = dataset
  764. else:
  765. terminology = "_".join(parts[3:])
  766. key = f"{parts[1]} ({parts[2]}, {terminology}, legacy)"
  767. data_info = ""
  768. if ".nc" in datasets_current_folder[dataset]:
  769. data_info = data_info + "NF (.nc), "
  770. if (".csv" in datasets_current_folder[dataset]) and (
  771. ".yaml" in datasets_current_folder[dataset]
  772. ):
  773. data_info = data_info + "IF (.yaml + .csv), "
  774. elif ".csv" in datasets_current_folder[dataset]:
  775. data_info = data_info + "incomplete IF? (.csv), "
  776. elif ".yaml" in datasets_current_folder[dataset]:
  777. data_info = data_info + "incomplete IF (.yaml), "
  778. cleaned_datasets_current_folder[key] = data_info
  779. if print_ds:
  780. if cleaned_datasets_current_folder:
  781. for country_ds in cleaned_datasets_current_folder:
  782. print(
  783. f"{country_ds}: {cleaned_datasets_current_folder[country_ds]}"
  784. )
  785. else:
  786. print("No data available")
  787. print("")
  788. legacy_data[item.name] = cleaned_datasets_current_folder
  789. all_data = {
  790. "rep_data": rep_data,
  791. "legacy_data": legacy_data,
  792. }
  793. return all_data
  794. def get_code_file(
  795. country_name: str,
  796. submission: str,
  797. print_info: bool = False,
  798. ) -> Path:
  799. """
  800. For given country name and submission find the script that creates the data
  801. Parameters
  802. ----------
  803. country_name: str
  804. String containing the country name or ISO 3 letter code
  805. submission: str
  806. String of the submission
  807. print_info: bool = False
  808. If True print information on code found
  809. Returns
  810. -------
  811. returns a pathlib Path object for the code file
  812. """
  813. code_file_path = None
  814. UNFCCC_reader_path = code_path / "unfccc_reader"
  815. # CRF is an exception as it's read using the unfccc_crf_reader module
  816. # so we return the path to that.
  817. if submission[0:3] == "CRF":
  818. return root_path / "unfccc_crf_reader"
  819. if submission[0:2] == "DI":
  820. return root_path / "unfccc_di_reader"
  821. # obtain country code
  822. country_code = get_country_code(country_name)
  823. if print_info:
  824. print(f"Country name {country_name} maps to ISO code {country_code}")
  825. with open(UNFCCC_reader_path / "folder_mapping.json") as mapping_file:
  826. folder_mapping = json.load(mapping_file)
  827. if country_code not in folder_mapping:
  828. if print_info:
  829. print("No code available")
  830. print("")
  831. else:
  832. country_folder = UNFCCC_reader_path / folder_mapping[country_code]
  833. code_file_name_candidate = "read_" + country_code + "_" + submission + "*"
  834. for file in country_folder.iterdir():
  835. if file.match(code_file_name_candidate):
  836. if code_file_path is not None:
  837. raise ValueError(
  838. f"Found multiple code candidates: "
  839. f"{code_file_path} and file.name. "
  840. f"Please use only one file with name "
  841. f"'read_ISO3_submission_XXX.YYY'."
  842. )
  843. else:
  844. if print_info:
  845. print(f"Found code file {file.relative_to(root_path)}")
  846. code_file_path = file
  847. if code_file_path is not None:
  848. return code_file_path.relative_to(root_path)
  849. else:
  850. return None
  851. def fix_rows(
  852. data: pd.DataFrame, rows_to_fix: list, col_to_use: str, n_rows: int
  853. ) -> pd.DataFrame:
  854. """
  855. Function to fix rows that have been split during reading from pdf
  856. This is the version used for Malaysia BUR3,4. adapt for other BURs if needed
  857. :param data:
  858. :param rows_to_fix:
  859. :param col_to_use:
  860. :param n_rows:
  861. :return:
  862. """
  863. for row in rows_to_fix:
  864. # print(row)
  865. # find the row number and collect the row and the next two rows
  866. index = data.loc[data[col_to_use] == row].index
  867. # print(list(index))
  868. if not list(index):
  869. print(f"Can't merge split row {row}")
  870. print(data[col_to_use])
  871. # print(f"Merging split row {row} for table {page}")
  872. loc = data.index.get_loc(index[0])
  873. if n_rows == -2:
  874. locs_to_merge = list(range(loc - 1, loc + 1))
  875. elif n_rows == -3:
  876. locs_to_merge = list(range(loc - 1, loc + 2))
  877. elif n_rows == -5:
  878. locs_to_merge = list(range(loc - 1, loc + 4))
  879. else:
  880. locs_to_merge = list(range(loc, loc + n_rows))
  881. rows_to_merge = data.iloc[locs_to_merge]
  882. indices_to_merge = rows_to_merge.index
  883. # join the three rows
  884. new_row = rows_to_merge.agg(" ".join)
  885. # replace the double spaces that are created
  886. # must be done here and not at the end as splits are not always
  887. # the same and join would produce different col values
  888. new_row = new_row.str.replace(" ", " ")
  889. new_row = new_row.str.replace("N O", "NO")
  890. new_row = new_row.str.replace(", N", ",N")
  891. new_row = new_row.str.replace("- ", "-")
  892. # replace spaces in numbers
  893. pat = r"^(?P<first>[0-9\.,]*)\s(?P<last>[0-9\.,]*)$"
  894. def repl(m):
  895. return f"{m.group('first')}{m.group('last')}"
  896. new_row = new_row.str.replace(pat, repl, regex=True)
  897. data.loc[indices_to_merge[0]] = new_row
  898. data = data.drop(indices_to_merge[1:])
  899. return data