functions.py 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979
  1. import copy
  2. import pycountry
  3. import json
  4. import re
  5. import xarray as xr
  6. import pandas as pd
  7. import numpy as np
  8. from datetime import date
  9. from copy import deepcopy
  10. from typing import Dict, List, Optional
  11. from pathlib import Path
  12. from .definitions import custom_country_mapping, custom_folders
  13. from .definitions import root_path, downloaded_data_path, extracted_data_path
  14. from .definitions import legacy_data_path, code_path
  15. from .definitions import GWP_factors
  16. def process_data_for_country(
  17. data_country: xr.Dataset,
  18. entities_to_ignore: List[str],
  19. gas_baskets: Dict[str, List[str]],
  20. filter_dims: Optional[Dict[str, List[str]]] = None,
  21. cat_terminology_out: Optional[str] = None,
  22. category_conversion: Dict[str, Dict] = None,
  23. sectors_out: List[str] = None,
  24. processing_info_country: Dict = None,
  25. ) -> xr.Dataset:
  26. """
  27. Process data from DI interface (where necessary).
  28. * Downscaling including subtraction of time series
  29. * country specific sector aggregation
  30. * Conversion to IPCC2006 categories
  31. * general sector and gas basket aggregation (in new categories)
  32. """
  33. # 0: gather information
  34. countries = list(data_country.coords[data_country.attrs["area"]].values)
  35. if len(countries) > 1:
  36. raise ValueError(
  37. f"Found {len(countries)} countries. Only single country data "
  38. f"can be processed by this function. countries: {countries}"
  39. )
  40. else:
  41. country_code = countries[0]
  42. # get category terminology
  43. cat_col = data_country.attrs["cat"]
  44. temp = re.findall(r"\((.*)\)", cat_col)
  45. cat_terminology_in = temp[0]
  46. # get scenario
  47. scenarios = list(data_country.coords[data_country.attrs["scen"]].values)
  48. if len(scenarios) > 1:
  49. raise ValueError(
  50. f"Found {len(scenarios)} scenarios. Only single scenario data "
  51. f"can be processed by this function. Scenarios: {scenarios}"
  52. )
  53. scenario = scenarios[0]
  54. # get source
  55. sources = list(data_country.coords["source"].values)
  56. if len(sources) > 1:
  57. raise ValueError(
  58. f"Found {len(sources)} sources. Only single source data "
  59. f"can be processed by this function. Sources: {sources}"
  60. )
  61. source = sources[0]
  62. # check if category name column present
  63. # TODO: replace 'name' in config by 'additional_cols' dict that defines the cols
  64. # and the values
  65. if "orig_cat_name" in data_country.coords:
  66. cat_name_present = True
  67. else:
  68. cat_name_present = False
  69. # 1: general processing
  70. # remove unused cats
  71. data_country = data_country.dropna(f"category ({cat_terminology_in})", how="all")
  72. # remove unused years
  73. data_country = data_country.dropna(f"time", how="all")
  74. # remove variables only containing nan
  75. nan_vars_country = [
  76. var
  77. for var in data_country.data_vars
  78. if bool(data_country[var].isnull().all().data) is True
  79. ]
  80. print(f"removing all-nan variables: {nan_vars_country}")
  81. data_country = data_country.drop_vars(nan_vars_country)
  82. # remove unnecessary variables
  83. entities_ignore_present = [
  84. entity for entity in entities_to_ignore if entity in data_country.data_vars
  85. ]
  86. data_country = data_country.drop_vars(entities_ignore_present)
  87. # filter ()
  88. if filter_dims is not None:
  89. data_country = data_country.pr.loc[filter_dims]
  90. # 2: country specific processing
  91. if processing_info_country is not None:
  92. if "tolerance" in processing_info_country:
  93. tolerance = processing_info_country["tolerance"]
  94. else:
  95. tolerance = 0.01
  96. # remove entities if needed
  97. if "ignore_entities" in processing_info_country:
  98. entities_to_ignore_country = processing_info_country["ignore_entities"]
  99. entities_ignore_present = [
  100. entity
  101. for entity in entities_to_ignore_country
  102. if entity in data_country.data_vars
  103. ]
  104. data_country = data_country.drop_vars(entities_ignore_present)
  105. # take only desired years
  106. if "years" in processing_info_country:
  107. data_country = data_country.pr.loc[
  108. {"time": processing_info_country["years"]}
  109. ]
  110. # remove timeseries if desired
  111. if "remove_ts" in processing_info_country:
  112. for case in processing_info_country["remove_ts"]:
  113. remove_info = copy.deepcopy(processing_info_country["remove_ts"][case])
  114. entities = remove_info.pop("entities")
  115. for entity in entities:
  116. data_country[entity].pr.loc[remove_info] = (
  117. data_country[entity].pr.loc[remove_info] * np.nan
  118. )
  119. # remove all data for given years if necessary
  120. if "remove_years" in processing_info_country:
  121. data_country = data_country.drop_sel(
  122. time=processing_info_country["remove_years"]
  123. )
  124. # subtract categories
  125. if "subtract_cats" in processing_info_country:
  126. subtract_cats_current = processing_info_country["subtract_cats"]
  127. print(f"Subtracting categories for country {country_code}")
  128. for cat_to_generate in subtract_cats_current:
  129. if "entities" in subtract_cats_current[cat_to_generate].keys():
  130. entities_current = subtract_cats_current[cat_to_generate][
  131. "entities"
  132. ]
  133. else:
  134. entities_current = list(data_country.data_vars)
  135. cats_to_subtract = subtract_cats_current[cat_to_generate]["subtract"]
  136. data_sub = (
  137. data_country[entities_current]
  138. .pr.loc[{"category": cats_to_subtract}]
  139. .pr.sum(dim="category", skipna=True, min_count=1)
  140. )
  141. data_parent = data_country[entities_current].pr.loc[
  142. {"category": subtract_cats_current[cat_to_generate]["parent"]}
  143. ]
  144. data_agg = data_parent - data_sub
  145. nan_vars = [
  146. var
  147. for var in data_agg.data_vars
  148. if data_agg[var].isnull().all().data is True
  149. ]
  150. data_agg = data_agg.drop(nan_vars)
  151. if len(data_agg.data_vars) > 0:
  152. print(f"Generating {cat_to_generate} through subtraction")
  153. data_agg = data_agg.expand_dims(
  154. [f"category (" f"{cat_terminology_in})"]
  155. )
  156. data_agg = data_agg.assign_coords(
  157. coords={
  158. f"category ({cat_terminology_in})": (
  159. f"category ({cat_terminology_in})",
  160. [cat_to_generate],
  161. )
  162. }
  163. )
  164. if cat_name_present:
  165. cat_name = subtract_cats_current[cat_to_generate]["name"]
  166. data_agg = data_agg.assign_coords(
  167. coords={
  168. "orig_cat_name": (
  169. f"category ({cat_terminology_in})",
  170. [cat_name],
  171. )
  172. }
  173. )
  174. data_country = data_country.pr.merge(data_agg, tolerance=tolerance)
  175. else:
  176. print(f"no data to generate category {cat_to_generate}")
  177. # downscaling
  178. if "downscale" in processing_info_country:
  179. if "sectors" in processing_info_country["downscale"]:
  180. sector_downscaling = processing_info_country["downscale"]["sectors"]
  181. for case in sector_downscaling.keys():
  182. print(f"Downscaling for {case}.")
  183. sector_downscaling_current = sector_downscaling[case]
  184. entities = sector_downscaling_current.pop("entities")
  185. for entity in entities:
  186. data_country[entity] = data_country[
  187. entity
  188. ].pr.downscale_timeseries(**sector_downscaling_current)
  189. # , skipna_evaluation_dims=None)
  190. if "entities" in processing_info_country["downscale"]:
  191. entity_downscaling = processing_info_country["downscale"]["entities"]
  192. for case in entity_downscaling.keys():
  193. print(f"Downscaling for {case}.")
  194. # print(data_country.coords[f'category ('
  195. # f'{cat_terminology_in})'].values)
  196. data_country = data_country.pr.downscale_gas_timeseries(
  197. **entity_downscaling[case],
  198. skipna=True,
  199. skipna_evaluation_dims=None,
  200. )
  201. # aggregate categories
  202. if "aggregate_cats" in processing_info_country:
  203. if "agg_tolerance" in processing_info_country:
  204. agg_tolerance = processing_info_country["agg_tolerance"]
  205. else:
  206. agg_tolerance = tolerance
  207. aggregate_cats_current = processing_info_country["aggregate_cats"]
  208. print(
  209. f"Aggregating categories for country {country_code}, source {source}, "
  210. f"scenario {scenario}"
  211. )
  212. for cat_to_agg in aggregate_cats_current:
  213. print(f"Category: {cat_to_agg}")
  214. source_cats = aggregate_cats_current[cat_to_agg]["sources"]
  215. data_agg = data_country.pr.loc[{"category": source_cats}].pr.sum(
  216. dim="category", skipna=True, min_count=1
  217. )
  218. nan_vars = [
  219. var
  220. for var in data_agg.data_vars
  221. if data_agg[var].isnull().all().data is True
  222. ]
  223. data_agg = data_agg.drop(nan_vars)
  224. if len(data_agg.data_vars) > 0:
  225. data_agg = data_agg.expand_dims(
  226. [f"category (" f"{cat_terminology_in})"]
  227. )
  228. data_agg = data_agg.assign_coords(
  229. coords={
  230. f"category ({cat_terminology_in})": (
  231. f"category ({cat_terminology_in})",
  232. [cat_to_agg],
  233. )
  234. }
  235. )
  236. if cat_name_present:
  237. cat_name = aggregate_cats_current[cat_to_agg]["name"]
  238. data_agg = data_agg.assign_coords(
  239. coords={
  240. "orig_cat_name": (
  241. f"category ({cat_terminology_in})",
  242. [cat_name],
  243. )
  244. }
  245. )
  246. data_country = data_country.pr.merge(
  247. data_agg, tolerance=agg_tolerance
  248. )
  249. else:
  250. print(f"no data to aggregate category {cat_to_agg}")
  251. # copy HFCs and PFCs with default factors
  252. if "basket_copy" in processing_info_country:
  253. GWPs_to_add = processing_info_country["basket_copy"]["GWPs_to_add"]
  254. entities = processing_info_country["basket_copy"]["entities"]
  255. source_GWP = processing_info_country["basket_copy"]["source_GWP"]
  256. for entity in entities:
  257. data_source = data_country[f"{entity} ({source_GWP})"]
  258. for GWP in GWPs_to_add:
  259. data_GWP = (
  260. data_source * GWP_factors[f"{source_GWP}_to_{GWP}"][entity]
  261. )
  262. data_GWP.attrs["entity"] = entity
  263. data_GWP.attrs["gwp_context"] = GWP
  264. data_country[f"{entity} ({GWP})"] = data_GWP
  265. # aggregate gases if desired
  266. if "aggregate_gases" in processing_info_country:
  267. # TODO: why use different code here than below. Can this fill non-existen
  268. # gas baskets?
  269. for case in processing_info_country["aggregate_gases"].keys():
  270. case_info = processing_info_country["aggregate_gases"][case]
  271. data_country[
  272. case_info["basket"]
  273. ] = data_country.pr.fill_na_gas_basket_from_contents(**case_info)
  274. # 3: map categories
  275. if category_conversion is not None:
  276. data_country = convert_categories(
  277. data_country,
  278. category_conversion,
  279. cat_terminology_out,
  280. debug=False,
  281. tolerance=0.01,
  282. )
  283. else:
  284. cat_terminology_out = cat_terminology_in
  285. # more general processing
  286. # reduce categories to output cats
  287. if sectors_out is not None:
  288. cats_to_keep = [
  289. cat
  290. for cat in data_country.coords[f"category ({cat_terminology_out})"].values
  291. if cat in sectors_out
  292. ]
  293. data_country = data_country.pr.loc[{"category": cats_to_keep}]
  294. # create gas baskets
  295. entities_present = set(data_country.data_vars)
  296. for basket in gas_baskets.keys():
  297. basket_contents_present = [
  298. gas for gas in gas_baskets[basket] if gas in entities_present
  299. ]
  300. if len(basket_contents_present) > 0:
  301. if basket in list(data_country.data_vars):
  302. data_country[basket] = data_country.pr.fill_na_gas_basket_from_contents(
  303. basket=basket,
  304. basket_contents=basket_contents_present,
  305. skipna=True,
  306. min_count=1,
  307. )
  308. else:
  309. try:
  310. # print(data_country.data_vars)
  311. data_country[basket] = xr.full_like(
  312. data_country["CO2"], np.nan
  313. ).pr.quantify(units="Gg CO2 / year")
  314. data_country[basket].attrs = {
  315. "entity": basket.split(" ")[0],
  316. "gwp_context": basket.split(" ")[1][1:-1],
  317. }
  318. data_country[basket] = data_country.pr.gas_basket_contents_sum(
  319. basket=basket,
  320. basket_contents=basket_contents_present,
  321. min_count=1,
  322. )
  323. entities_present.add(basket)
  324. except Exception as ex:
  325. print(
  326. f"No gas basket created for {country_code}, {source}, "
  327. f"{scenario}: {ex}"
  328. )
  329. # amend title and comment
  330. data_country.attrs["comment"] = (
  331. data_country.attrs["comment"] + f" Processed on " f"{date.today()}"
  332. )
  333. data_country.attrs["title"] = (
  334. data_country.attrs["title"] + f" Processed on " f"{date.today()}"
  335. )
  336. return data_country
  337. def convert_categories(
  338. ds_input: xr.Dataset,
  339. conversion: Dict[str, Dict[str, str]],
  340. # terminology_from: str,
  341. terminology_to: str,
  342. debug: bool = False,
  343. tolerance: float = 0.01,
  344. ) -> xr.Dataset:
  345. """
  346. convert data from one category terminology to another
  347. """
  348. print(f"converting categories to {terminology_to}")
  349. if "orig_cat_name" in ds_input.coords:
  350. cat_name_present = True
  351. else:
  352. cat_name_present = False
  353. ds_converted = ds_input.copy(deep=True)
  354. ds_converted.attrs = deepcopy(ds_input.attrs)
  355. # TODO: change attrs for additional coordinates
  356. # change category terminology
  357. cat_dim = ds_converted.attrs["cat"]
  358. ds_converted.attrs["cat"] = f"category ({terminology_to})"
  359. ds_converted = ds_converted.rename({cat_dim: ds_converted.attrs["cat"]})
  360. # find categories present in dataset
  361. cats_present = list(ds_converted.coords[f"category ({terminology_to})"])
  362. # restrict categories and map category names
  363. if "mapping" in conversion.keys():
  364. mapping_cats_present = [
  365. cat for cat in list(conversion["mapping"].keys()) if cat in cats_present
  366. ]
  367. ds_converted = ds_converted.pr.loc[{"category": mapping_cats_present}]
  368. from_cats = ds_converted.coords[f"category ({terminology_to})"].values
  369. to_cats = pd.Series(from_cats).replace(conversion["mapping"])
  370. ds_converted = ds_converted.assign_coords(
  371. {f"category ({terminology_to})": (f"category ({terminology_to})", to_cats)}
  372. )
  373. # redo the list of present cats after mapping, as we have new categories in the
  374. # target terminology now
  375. cats_present_mapped = list(
  376. ds_converted.coords[f"category (" f"{terminology_to})"].values
  377. )
  378. # aggregate categories
  379. if "aggregate" in conversion:
  380. aggregate_cats = conversion["aggregate"]
  381. for cat_to_agg in aggregate_cats:
  382. if debug:
  383. print(f"Category: {cat_to_agg}")
  384. source_cats = [
  385. cat
  386. for cat in aggregate_cats[cat_to_agg]["sources"]
  387. if cat in cats_present_mapped
  388. ]
  389. if debug:
  390. print(source_cats)
  391. data_agg = ds_converted.pr.loc[{"category": source_cats}].pr.sum(
  392. dim="category", skipna=True, min_count=1
  393. )
  394. nan_vars = [
  395. var
  396. for var in data_agg.data_vars
  397. if data_agg[var].isnull().all().data == True
  398. ]
  399. data_agg = data_agg.drop(nan_vars)
  400. if len(data_agg.data_vars) > 0:
  401. data_agg = data_agg.expand_dims([f"category ({terminology_to})"])
  402. data_agg = data_agg.assign_coords(
  403. coords={
  404. f"category ({terminology_to})": (
  405. f"category ({terminology_to})",
  406. [cat_to_agg],
  407. )
  408. }
  409. )
  410. if cat_name_present:
  411. data_agg = data_agg.assign_coords(
  412. coords={
  413. "orig_cat_name": (
  414. f"category ({terminology_to})",
  415. [aggregate_cats[cat_to_agg]["name"]],
  416. )
  417. }
  418. )
  419. ds_converted = ds_converted.pr.merge(data_agg, tolerance=tolerance)
  420. cats_present_mapped.append(cat_to_agg)
  421. else:
  422. print(f"no data to aggregate category {cat_to_agg}")
  423. return ds_converted
  424. def get_country_name(
  425. country_code: str,
  426. ) -> str:
  427. """get country name from code"""
  428. if country_code in custom_country_mapping:
  429. country_name = custom_country_mapping[country_code]
  430. else:
  431. try:
  432. country = pycountry.countries.get(alpha_3=country_code)
  433. country_name = country.name
  434. except:
  435. raise ValueError(
  436. f"Country code {country_code} can not be mapped to " f"any country"
  437. )
  438. return country_name
  439. def get_country_code(
  440. country_name: str,
  441. ) -> str:
  442. """
  443. obtain country code. If the input is a code it will be returned,
  444. if the input
  445. is not a three letter code a search will be performed
  446. Parameters
  447. __________
  448. country_name: str
  449. Country code or name to get the three-letter code for.
  450. Returns
  451. -------
  452. country_code: str
  453. """
  454. # First check if it's in the list of custom codes
  455. if country_name in custom_country_mapping:
  456. country_code = country_name
  457. else:
  458. try:
  459. # check if it's a 3 letter UNFCCC_GHG_data
  460. country = pycountry.countries.get(alpha_3=country_name)
  461. country_code = country.alpha_3
  462. except:
  463. try:
  464. country = pycountry.countries.search_fuzzy(
  465. country_name.replace("_", " ")
  466. )
  467. except:
  468. raise ValueError(
  469. f"Country name {country_name} can not be mapped to "
  470. f"any country UNFCCC_GHG_data. Try using the ISO3 UNFCCC_GHG_data directly."
  471. )
  472. if len(country) > 1:
  473. country_code = None
  474. for current_country in country:
  475. if current_country.name == country_name:
  476. country_code = current_country.alpha_3
  477. if country_code is None:
  478. raise ValueError(
  479. f"Country name {country_name} has {len(country)} "
  480. f"possible results for country codes."
  481. )
  482. country_code = country[0].alpha_3
  483. return country_code
  484. def create_folder_mapping(folder: str, extracted: bool = False) -> None:
  485. """
  486. Create a mapping from 3 letter ISO country codes to folders
  487. based on the subfolders of the given folder. The mapping is
  488. stored in 'folder_mapping.json' in the given folder. Folder
  489. must be given relative to the repository root
  490. Parameters
  491. ----------
  492. folder: str
  493. folder to create the mapping for
  494. extracted: bool = False
  495. If true treat the folder as extracted data, where we
  496. only have one folder per country and no typos in the
  497. names
  498. Returns
  499. -------
  500. Nothing
  501. """
  502. folder = root_path / folder
  503. folder_mapping = {}
  504. # if not extracted:
  505. known_folders = custom_folders
  506. # else:
  507. # known_folders = {}
  508. for item in folder.iterdir():
  509. if item.is_dir() and not item.match("__pycache__"):
  510. if item.name in known_folders:
  511. ISO3 = known_folders[item.name]
  512. else:
  513. try:
  514. country = pycountry.countries.search_fuzzy(
  515. item.name.replace("_", " ")
  516. )
  517. if len(country) > 1:
  518. ISO3 = None
  519. for current_country in country:
  520. if current_country.name == item.name.replace("_", " "):
  521. ISO3 = current_country.alpha_3
  522. else:
  523. ISO3 = country[0].alpha_3
  524. except:
  525. ISO3 = None
  526. if ISO3 is None:
  527. print(f"No match for {item.name}")
  528. else:
  529. if ISO3 in folder_mapping.keys():
  530. folder_mapping[ISO3] = [folder_mapping[ISO3], item.name]
  531. else:
  532. folder_mapping[ISO3] = item.name
  533. with open(folder / "folder_mapping.json", "w") as mapping_file:
  534. json.dump(folder_mapping, mapping_file, indent=4)
  535. # TODO add crf
  536. def get_country_submissions(
  537. country_name: str,
  538. print_sub: bool = True,
  539. ) -> Dict[str, List[str]]:
  540. """
  541. Input is a three letter ISO UNFCCC_GHG_data for a country, or the countries name.
  542. The function tries to map the country name to an ISO UNFCCC_GHG_data and then
  543. queries the folder mapping files for folders.
  544. Parameters
  545. ----------
  546. country_name: str
  547. String containing the country name or ISO 3 letter UNFCCC_GHG_data
  548. print_sub: bool
  549. If True information on submissions will be written to stdout
  550. Returns
  551. -------
  552. returns a dict with keys for the dataset classes (e.g. UNFCCC, non-UNFCCC)
  553. Each value is a list of folders
  554. """
  555. data_folder = downloaded_data_path
  556. country_code = get_country_code(country_name)
  557. if print_sub:
  558. print(f"Country name {country_name} maps to ISO code {country_code}")
  559. country_submissions = {}
  560. if print_sub:
  561. print(f"#" * 80)
  562. print(f"The following submissions are available for {country_name}")
  563. for item in data_folder.iterdir():
  564. if item.is_dir():
  565. if print_sub:
  566. print("")
  567. print("-" * 80)
  568. print(f"Data folder {item.name}")
  569. print("-" * 80)
  570. with open(item / "folder_mapping.json", "r") as mapping_file:
  571. folder_mapping = json.load(mapping_file)
  572. if country_code in folder_mapping:
  573. country_folders = folder_mapping[country_code]
  574. if isinstance(country_folders, str):
  575. # only one folder
  576. country_folders = [country_folders]
  577. submission_folders = []
  578. for country_folder in country_folders:
  579. current_folder = item / country_folder
  580. if print_sub:
  581. print(f"Submissions in folder {country_folder}:")
  582. for submission_folder in current_folder.iterdir():
  583. if submission_folder.is_dir():
  584. if print_sub:
  585. print(submission_folder.name)
  586. submission_folders.append(submission_folder.name)
  587. country_submissions[item.name] = submission_folders
  588. else:
  589. print(f"No submissions available for {country_name}.")
  590. return country_submissions
  591. def get_country_datasets(
  592. country_name: str,
  593. print_ds: bool = True,
  594. ) -> Dict[str, List[str]]:
  595. """
  596. Input is a three letter ISO code for a country, or the country's name.
  597. The function tries to map the country name to an ISO UNFCCC_GHG_data and then
  598. checks the UNFCCC_GHG_data and data folders for content on the country.
  599. Parameters
  600. ----------
  601. country_name: str
  602. String containing the country name or ISO 3 letter code
  603. print_ds: bool
  604. If True information on submissions will be written to stdout
  605. Returns
  606. -------
  607. returns a dict with keys for the dataset classes (e.g. UNFCCC, non-UNFCCC)
  608. Each value is a list of folders
  609. """
  610. data_folder = extracted_data_path
  611. data_folder_legacy = legacy_data_path
  612. # obtain country UNFCCC_GHG_data
  613. country_code = get_country_code(country_name)
  614. if print_ds:
  615. print(f"Country name {country_name} maps to ISO code {country_code}")
  616. rep_data = {}
  617. # data
  618. if print_ds:
  619. print(f"#" * 80)
  620. print(f"The following datasets are available for {country_name}")
  621. for item in data_folder.iterdir():
  622. if item.is_dir():
  623. cleaned_datasets_current_folder = {}
  624. if print_ds:
  625. print("-" * 80)
  626. print(f"Data folder {item.name}")
  627. print("-" * 80)
  628. with open(item / "folder_mapping.json", "r") as mapping_file:
  629. folder_mapping = json.load(mapping_file)
  630. if country_code not in folder_mapping:
  631. if print_ds:
  632. print("No data available")
  633. print("")
  634. else:
  635. country_folder = folder_mapping[country_code]
  636. if not isinstance(country_folder, str):
  637. raise ValueError(
  638. "Wrong data type in folder mapping json file. Should be str."
  639. )
  640. datasets_current_folder = {}
  641. current_folder = item / country_folder
  642. for data_file in current_folder.iterdir():
  643. if data_file.suffix in [".nc", ".yaml", ".csv"]:
  644. if data_file.stem in datasets_current_folder:
  645. datasets_current_folder[data_file.stem].append(
  646. data_file.suffix
  647. )
  648. else:
  649. datasets_current_folder[data_file.stem] = [data_file.suffix]
  650. for dataset in datasets_current_folder:
  651. # process filename to get submission
  652. parts = dataset.split("_")
  653. if parts[0] != country_code:
  654. cleaned_datasets_current_folder[
  655. f"Wrong code: {parts[0]}"
  656. ] = dataset
  657. else:
  658. terminology = "_".join(parts[3:])
  659. key = f"{parts[1]} ({parts[2]}, {terminology})"
  660. data_info = ""
  661. if ".nc" in datasets_current_folder[dataset]:
  662. data_info = data_info + "NF (.nc), "
  663. if (".csv" in datasets_current_folder[dataset]) and (
  664. ".yaml" in datasets_current_folder[dataset]
  665. ):
  666. data_info = data_info + "IF (.yaml + .csv), "
  667. elif ".csv" in datasets_current_folder[dataset]:
  668. data_info = data_info + "incomplete IF? (.csv), "
  669. elif ".yaml" in datasets_current_folder[dataset]:
  670. data_info = data_info + "incomplete IF (.yaml), "
  671. code_file = get_code_file(country_code, parts[1])
  672. if code_file:
  673. data_info = data_info + f"code: {code_file.name}"
  674. else:
  675. data_info = data_info + f"code: not found"
  676. cleaned_datasets_current_folder[key] = data_info
  677. if print_ds:
  678. if cleaned_datasets_current_folder:
  679. for country_ds in cleaned_datasets_current_folder:
  680. print(
  681. f"{country_ds}: {cleaned_datasets_current_folder[country_ds]}"
  682. )
  683. else:
  684. print("No data available")
  685. print("")
  686. rep_data[item.name] = cleaned_datasets_current_folder
  687. # legacy data
  688. if print_ds:
  689. print(f"#" * 80)
  690. print(f"The following legacy datasets are available for {country_name}")
  691. legacy_data = {}
  692. for item in data_folder_legacy.iterdir():
  693. if item.is_dir():
  694. cleaned_datasets_current_folder = {}
  695. if print_ds:
  696. print("-" * 80)
  697. print(f"Data folder {item.name}")
  698. print("-" * 80)
  699. with open(item / "folder_mapping.json", "r") as mapping_file:
  700. folder_mapping = json.load(mapping_file)
  701. if country_code not in folder_mapping:
  702. if print_ds:
  703. print("No data available")
  704. print("")
  705. else:
  706. country_folder = folder_mapping[country_code]
  707. if not isinstance(country_folder, str):
  708. raise ValueError(
  709. "Wrong data type in folder mapping json file. Should be str."
  710. )
  711. datasets_current_folder = {}
  712. current_folder = item / country_folder
  713. for data_file in current_folder.iterdir():
  714. if data_file.suffix in [".nc", ".yaml", ".csv"]:
  715. if data_file.stem in datasets_current_folder:
  716. datasets_current_folder[data_file.stem].append(
  717. data_file.suffix
  718. )
  719. else:
  720. datasets_current_folder[data_file.stem] = [data_file.suffix]
  721. for dataset in datasets_current_folder:
  722. # process filename to get submission
  723. parts = dataset.split("_")
  724. if parts[0] != country_code:
  725. cleaned_datasets_current_folder[
  726. f"Wrong UNFCCC_GHG_data: {parts[0]}"
  727. ] = dataset
  728. else:
  729. terminology = "_".join(parts[3:])
  730. key = f"{parts[1]} ({parts[2]}, {terminology}, legacy)"
  731. data_info = ""
  732. if ".nc" in datasets_current_folder[dataset]:
  733. data_info = data_info + "NF (.nc), "
  734. if (".csv" in datasets_current_folder[dataset]) and (
  735. ".yaml" in datasets_current_folder[dataset]
  736. ):
  737. data_info = data_info + "IF (.yaml + .csv), "
  738. elif ".csv" in datasets_current_folder[dataset]:
  739. data_info = data_info + "incomplete IF? (.csv), "
  740. elif ".yaml" in datasets_current_folder[dataset]:
  741. data_info = data_info + "incomplete IF (.yaml), "
  742. cleaned_datasets_current_folder[key] = data_info
  743. if print_ds:
  744. if cleaned_datasets_current_folder:
  745. for country_ds in cleaned_datasets_current_folder:
  746. print(
  747. f"{country_ds}: {cleaned_datasets_current_folder[country_ds]}"
  748. )
  749. else:
  750. print("No data available")
  751. print("")
  752. legacy_data[item.name] = cleaned_datasets_current_folder
  753. all_data = {
  754. "rep_data": rep_data,
  755. "legacy_data": legacy_data,
  756. }
  757. return all_data
  758. def get_code_file(
  759. country_name: str,
  760. submission: str,
  761. print_info: bool = False,
  762. ) -> Path:
  763. """
  764. For given country name and submission find the script that creates the data
  765. Parameters
  766. ----------
  767. country_name: str
  768. String containing the country name or ISO 3 letter UNFCCC_GHG_data
  769. submission: str
  770. String of the submission
  771. print_info: bool = False
  772. If True print information on UNFCCC_GHG_data found
  773. Returns
  774. -------
  775. returns a pathlib Path object for the UNFCCC_GHG_data file
  776. """
  777. code_file_path = None
  778. UNFCCC_reader_path = code_path / "UNFCCC_reader"
  779. # CRF is an exception as it's read using the UNFCCC_CRF_reader module
  780. # so we return the path to that.
  781. if submission[0:3] == "CRF":
  782. return root_path / "UNFCCC_CRF_reader"
  783. if submission[0:2] == "DI":
  784. return root_path / "UNFCCC_DI_reader"
  785. # obtain country UNFCCC_GHG_data
  786. country_code = get_country_code(country_name)
  787. if print_info:
  788. print(f"Country name {country_name} maps to ISO UNFCCC_GHG_data {country_code}")
  789. with open(UNFCCC_reader_path / "folder_mapping.json", "r") as mapping_file:
  790. folder_mapping = json.load(mapping_file)
  791. if country_code not in folder_mapping:
  792. if print_info:
  793. print("No UNFCCC_GHG_data available")
  794. print("")
  795. else:
  796. country_folder = UNFCCC_reader_path / folder_mapping[country_code]
  797. code_file_name_candidate = "read_" + country_code + "_" + submission + "*"
  798. for file in country_folder.iterdir():
  799. if file.match(code_file_name_candidate):
  800. if code_file_path is not None:
  801. raise ValueError(
  802. f"Found multiple UNFCCC_GHG_data candidates: "
  803. f"{code_file_path} and file.name. "
  804. f"Please use only one file with name "
  805. f"'read_ISO3_submission_XXX.YYY'."
  806. )
  807. else:
  808. if print_info:
  809. print(
  810. f"Found UNFCCC_GHG_data file {file.relative_to(root_path)}"
  811. )
  812. code_file_path = file
  813. if code_file_path is not None:
  814. return code_file_path.relative_to(root_path)
  815. else:
  816. return None
  817. def fix_rows(
  818. data: pd.DataFrame, rows_to_fix: list, col_to_use: str, n_rows: int
  819. ) -> pd.DataFrame:
  820. """
  821. Function to fix rows that have been split during reading from pdf
  822. This is the version used for Malaysia BUR3,4. adapt for other BURs if needed
  823. :param data:
  824. :param rows_to_fix:
  825. :param col_to_use:
  826. :param n_rows:
  827. :return:
  828. """
  829. for row in rows_to_fix:
  830. # print(row)
  831. # find the row number and collect the row and the next two rows
  832. index = data.loc[data[col_to_use] == row].index
  833. # print(list(index))
  834. if not list(index):
  835. print(f"Can't merge split row {row}")
  836. print(data[col_to_use])
  837. # print(f"Merging split row {row} for table {page}")
  838. loc = data.index.get_loc(index[0])
  839. if n_rows == -2:
  840. locs_to_merge = list(range(loc - 1, loc + 1))
  841. elif n_rows == -3:
  842. locs_to_merge = list(range(loc - 1, loc + 2))
  843. elif n_rows == -5:
  844. locs_to_merge = list(range(loc - 1, loc + 4))
  845. else:
  846. locs_to_merge = list(range(loc, loc + n_rows))
  847. rows_to_merge = data.iloc[locs_to_merge]
  848. indices_to_merge = rows_to_merge.index
  849. # join the three rows
  850. new_row = rows_to_merge.agg(" ".join)
  851. # replace the double spaces that are created
  852. # must be done here and not at the end as splits are not always
  853. # the same and join would produce different col values
  854. new_row = new_row.str.replace(" ", " ")
  855. new_row = new_row.str.replace("N O", "NO")
  856. new_row = new_row.str.replace(", N", ",N")
  857. new_row = new_row.str.replace("- ", "-")
  858. # replace spaces in numbers
  859. pat = r"^(?P<first>[0-9\.,]*)\s(?P<last>[0-9\.,]*)$"
  860. repl = lambda m: f"{m.group('first')}{m.group('last')}"
  861. new_row = new_row.str.replace(pat, repl, regex=True)
  862. data.loc[indices_to_merge[0]] = new_row
  863. data = data.drop(indices_to_merge[1:])
  864. return data