functions.py 39 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021
  1. import copy
  2. import pycountry
  3. import json
  4. import re
  5. import xarray as xr
  6. import pandas as pd
  7. import numpy as np
  8. from datetime import date
  9. from copy import deepcopy
  10. from typing import Dict, List, Optional, Union
  11. from pathlib import Path
  12. from .definitions import custom_country_mapping, custom_folders
  13. from .definitions import root_path, downloaded_data_path, extracted_data_path
  14. from .definitions import legacy_data_path, code_path
  15. from .definitions import GWP_factors
  16. def process_data_for_country(
  17. data_country: xr.Dataset,
  18. entities_to_ignore: List[str],
  19. gas_baskets: Dict[str, List[str]],
  20. filter_dims: Optional[Dict[str, List[str]]] = None,
  21. cat_terminology_out: Optional[str] = None,
  22. category_conversion: Dict[str, Dict] = None,
  23. sectors_out: List[str] = None,
  24. processing_info_country: Dict = None,
  25. ) -> xr.Dataset:
  26. """
  27. Process data from DI interface (where necessary).
  28. * Downscaling including subtraction of time series
  29. * country specific sector aggregation
  30. * Conversion to IPCC2006 categories
  31. * general sector and gas basket aggregation (in new categories)
  32. """
  33. # 0: gather information
  34. countries = list(data_country.coords[data_country.attrs["area"]].values)
  35. if len(countries) > 1:
  36. raise ValueError(
  37. f"Found {len(countries)} countries. Only single country data "
  38. f"can be processed by this function. countries: {countries}"
  39. )
  40. else:
  41. country_code = countries[0]
  42. # get category terminology
  43. cat_col = data_country.attrs["cat"]
  44. temp = re.findall(r"\((.*)\)", cat_col)
  45. cat_terminology_in = temp[0]
  46. # get scenario
  47. scenarios = list(data_country.coords[data_country.attrs["scen"]].values)
  48. if len(scenarios) > 1:
  49. raise ValueError(
  50. f"Found {len(scenarios)} scenarios. Only single scenario data "
  51. f"can be processed by this function. Scenarios: {scenarios}"
  52. )
  53. scenario = scenarios[0]
  54. # get source
  55. sources = list(data_country.coords["source"].values)
  56. if len(sources) > 1:
  57. raise ValueError(
  58. f"Found {len(sources)} sources. Only single source data "
  59. f"can be processed by this function. Sources: {sources}"
  60. )
  61. source = sources[0]
  62. # check if category name column present
  63. # TODO: replace 'name' in config by 'additional_cols' dict that defines the cols
  64. # and the values
  65. if "orig_cat_name" in data_country.coords:
  66. cat_name_present = True
  67. else:
  68. cat_name_present = False
  69. # 1: general processing
  70. # remove unused cats
  71. data_country = data_country.dropna(f"category ({cat_terminology_in})", how="all")
  72. # remove unused years
  73. data_country = data_country.dropna(f"time", how="all")
  74. # remove variables only containing nan
  75. nan_vars_country = [
  76. var
  77. for var in data_country.data_vars
  78. if bool(data_country[var].isnull().all().data) is True
  79. ]
  80. print(f"removing all-nan variables: {nan_vars_country}")
  81. data_country = data_country.drop_vars(nan_vars_country)
  82. # remove unnecessary variables
  83. entities_ignore_present = [
  84. entity for entity in entities_to_ignore if entity in data_country.data_vars
  85. ]
  86. data_country = data_country.drop_vars(entities_ignore_present)
  87. # filter ()
  88. if filter_dims is not None:
  89. data_country = data_country.pr.loc[filter_dims]
  90. # 2: country specific processing
  91. if processing_info_country is not None:
  92. if "tolerance" in processing_info_country:
  93. tolerance = processing_info_country["tolerance"]
  94. else:
  95. tolerance = 0.01
  96. # remove entities if needed
  97. if "ignore_entities" in processing_info_country:
  98. entities_to_ignore_country = processing_info_country["ignore_entities"]
  99. entities_ignore_present = [
  100. entity
  101. for entity in entities_to_ignore_country
  102. if entity in data_country.data_vars
  103. ]
  104. data_country = data_country.drop_vars(entities_ignore_present)
  105. # take only desired years
  106. if "years" in processing_info_country:
  107. data_country = data_country.pr.loc[
  108. {"time": processing_info_country["years"]}
  109. ]
  110. # remove timeseries if desired
  111. if "remove_ts" in processing_info_country:
  112. for case in processing_info_country["remove_ts"]:
  113. remove_info = copy.deepcopy(processing_info_country["remove_ts"][case])
  114. entities = remove_info.pop("entities")
  115. for entity in entities:
  116. data_country[entity].pr.loc[remove_info] = (
  117. data_country[entity].pr.loc[remove_info] * np.nan
  118. )
  119. # remove all data for given years if necessary
  120. if "remove_years" in processing_info_country:
  121. data_country = data_country.drop_sel(
  122. time=processing_info_country["remove_years"]
  123. )
  124. # subtract categories
  125. if "subtract_cats" in processing_info_country:
  126. subtract_cats_current = processing_info_country["subtract_cats"]
  127. print(f"Subtracting categories for country {country_code}")
  128. for cat_to_generate in subtract_cats_current:
  129. if "entities" in subtract_cats_current[cat_to_generate].keys():
  130. entities_current = subtract_cats_current[cat_to_generate][
  131. "entities"
  132. ]
  133. else:
  134. entities_current = list(data_country.data_vars)
  135. cats_to_subtract = subtract_cats_current[cat_to_generate]["subtract"]
  136. data_sub = (
  137. data_country[entities_current]
  138. .pr.loc[{"category": cats_to_subtract}]
  139. .pr.sum(dim="category", skipna=True, min_count=1)
  140. )
  141. data_parent = data_country[entities_current].pr.loc[
  142. {"category": subtract_cats_current[cat_to_generate]["parent"]}
  143. ]
  144. data_agg = data_parent - data_sub
  145. nan_vars = [
  146. var
  147. for var in data_agg.data_vars
  148. if data_agg[var].isnull().all().data is True
  149. ]
  150. data_agg = data_agg.drop(nan_vars)
  151. if len(data_agg.data_vars) > 0:
  152. print(f"Generating {cat_to_generate} through subtraction")
  153. data_agg = data_agg.expand_dims(
  154. [f"category (" f"{cat_terminology_in})"]
  155. )
  156. data_agg = data_agg.assign_coords(
  157. coords={
  158. f"category ({cat_terminology_in})": (
  159. f"category ({cat_terminology_in})",
  160. [cat_to_generate],
  161. )
  162. }
  163. )
  164. if cat_name_present:
  165. cat_name = subtract_cats_current[cat_to_generate]["name"]
  166. data_agg = data_agg.assign_coords(
  167. coords={
  168. "orig_cat_name": (
  169. f"category ({cat_terminology_in})",
  170. [cat_name],
  171. )
  172. }
  173. )
  174. data_country = data_country.pr.merge(data_agg, tolerance=tolerance)
  175. else:
  176. print(f"no data to generate category {cat_to_generate}")
  177. # downscaling
  178. if "downscale" in processing_info_country:
  179. if "sectors" in processing_info_country["downscale"]:
  180. sector_downscaling = processing_info_country["downscale"]["sectors"]
  181. for case in sector_downscaling.keys():
  182. print(f"Downscaling for {case}.")
  183. sector_downscaling_current = sector_downscaling[case]
  184. entities = sector_downscaling_current.pop("entities")
  185. for entity in entities:
  186. data_country[entity] = data_country[
  187. entity
  188. ].pr.downscale_timeseries(**sector_downscaling_current)
  189. # , skipna_evaluation_dims=None)
  190. if "entities" in processing_info_country["downscale"]:
  191. entity_downscaling = processing_info_country["downscale"]["entities"]
  192. for case in entity_downscaling.keys():
  193. print(f"Downscaling for {case}.")
  194. # print(data_country.coords[f'category ('
  195. # f'{cat_terminology_in})'].values)
  196. data_country = data_country.pr.downscale_gas_timeseries(
  197. **entity_downscaling[case],
  198. skipna=True,
  199. skipna_evaluation_dims=None,
  200. )
  201. # aggregate categories
  202. # TODO replace by primap2 function once it is in primap2 stable
  203. if "aggregate_cats" in processing_info_country:
  204. data_country = data_country.pr.dequantify()
  205. if "agg_tolerance" in processing_info_country:
  206. agg_tolerance = processing_info_country["agg_tolerance"]
  207. else:
  208. agg_tolerance = tolerance
  209. aggregate_cats_current = processing_info_country["aggregate_cats"]
  210. print(
  211. f"Aggregating categories for country {country_code}, source {source}, "
  212. f"scenario {scenario}"
  213. )
  214. for cat_to_agg in aggregate_cats_current:
  215. print(f"Category: {cat_to_agg}")
  216. source_cats = aggregate_cats_current[cat_to_agg]["sources"]
  217. data_agg = data_country.pr.loc[{"category": source_cats}].pr.sum(
  218. dim="category", skipna=True, min_count=1
  219. )
  220. nan_vars = [
  221. var
  222. for var in data_agg.data_vars
  223. if data_agg[var].isnull().all().data is True
  224. ]
  225. data_agg = data_agg.drop(nan_vars)
  226. if len(data_agg.data_vars) > 0:
  227. data_agg = data_agg.expand_dims(
  228. [f"category (" f"{cat_terminology_in})"]
  229. )
  230. data_agg = data_agg.assign_coords(
  231. coords={
  232. f"category ({cat_terminology_in})": (
  233. f"category ({cat_terminology_in})",
  234. [cat_to_agg],
  235. )
  236. }
  237. )
  238. if cat_name_present:
  239. cat_name = aggregate_cats_current[cat_to_agg]["name"]
  240. data_agg = data_agg.assign_coords(
  241. coords={
  242. "orig_cat_name": (
  243. f"category ({cat_terminology_in})",
  244. [cat_name],
  245. )
  246. }
  247. )
  248. data_country = data_country.pr.merge(
  249. data_agg, tolerance=agg_tolerance
  250. )
  251. else:
  252. print(f"no data to aggregate category {cat_to_agg}")
  253. data_country = data_country.pr.quantify()
  254. # copy HFCs and PFCs with default factors
  255. if "basket_copy" in processing_info_country:
  256. GWPs_to_add = processing_info_country["basket_copy"]["GWPs_to_add"]
  257. entities = processing_info_country["basket_copy"]["entities"]
  258. source_GWP = processing_info_country["basket_copy"]["source_GWP"]
  259. for entity in entities:
  260. data_source = data_country[f"{entity} ({source_GWP})"]
  261. for GWP in GWPs_to_add:
  262. data_GWP = (
  263. data_source * GWP_factors[f"{source_GWP}_to_{GWP}"][entity]
  264. )
  265. data_GWP.attrs["entity"] = entity
  266. data_GWP.attrs["gwp_context"] = GWP
  267. data_country[f"{entity} ({GWP})"] = data_GWP
  268. # aggregate gases if desired
  269. if "aggregate_gases" in processing_info_country:
  270. # TODO: why use different code here than below. Can this fill non-existen
  271. # gas baskets?
  272. for case in processing_info_country["aggregate_gases"].keys():
  273. case_info = processing_info_country["aggregate_gases"][case]
  274. data_country[
  275. case_info["basket"]
  276. ] = data_country.pr.fill_na_gas_basket_from_contents(**case_info)
  277. # 3: map categories
  278. if category_conversion is not None:
  279. data_country = convert_categories(
  280. data_country,
  281. category_conversion,
  282. cat_terminology_out,
  283. debug=False,
  284. tolerance=0.01,
  285. )
  286. else:
  287. cat_terminology_out = cat_terminology_in
  288. # more general processing
  289. # reduce categories to output cats
  290. if sectors_out is not None:
  291. cats_to_keep = [
  292. cat
  293. for cat in data_country.coords[f"category ({cat_terminology_out})"].values
  294. if cat in sectors_out
  295. ]
  296. data_country = data_country.pr.loc[{"category": cats_to_keep}]
  297. # create gas baskets
  298. entities_present = set(data_country.data_vars)
  299. for basket in gas_baskets.keys():
  300. basket_contents_present = [
  301. gas for gas in gas_baskets[basket] if gas in entities_present
  302. ]
  303. if len(basket_contents_present) > 0:
  304. if basket in list(data_country.data_vars):
  305. data_country[basket] = data_country.pr.fill_na_gas_basket_from_contents(
  306. basket=basket,
  307. basket_contents=basket_contents_present,
  308. skipna=True,
  309. min_count=1,
  310. )
  311. else:
  312. try:
  313. # print(data_country.data_vars)
  314. data_country[basket] = xr.full_like(
  315. data_country["CO2"], np.nan
  316. ).pr.quantify(units="Gg CO2 / year")
  317. data_country[basket].attrs = {
  318. "entity": basket.split(" ")[0],
  319. "gwp_context": basket.split(" ")[1][1:-1],
  320. }
  321. data_country[basket] = data_country.pr.gas_basket_contents_sum(
  322. basket=basket,
  323. basket_contents=basket_contents_present,
  324. min_count=1,
  325. )
  326. entities_present.add(basket)
  327. except Exception as ex:
  328. print(
  329. f"No gas basket created for {country_code}, {source}, "
  330. f"{scenario}: {ex}"
  331. )
  332. # amend title and comment
  333. data_country.attrs["comment"] = (
  334. data_country.attrs["comment"] + f" Processed on " f"{date.today()}"
  335. )
  336. data_country.attrs["title"] = (
  337. data_country.attrs["title"] + f" Processed on " f"{date.today()}"
  338. )
  339. return data_country
  340. def convert_categories(
  341. ds_input: xr.Dataset,
  342. conversion: Dict[str, Dict[str, str]],
  343. # terminology_from: str,
  344. terminology_to: str,
  345. debug: bool = False,
  346. tolerance: float = 0.01,
  347. ) -> xr.Dataset:
  348. """
  349. convert data from one category terminology to another
  350. # TODO rewrite to use aggregate_coordinates functions
  351. """
  352. print(f"converting categories to {terminology_to}")
  353. if "orig_cat_name" in ds_input.coords:
  354. cat_name_present = True
  355. else:
  356. cat_name_present = False
  357. ds_converted = ds_input.copy(deep=True)
  358. ds_converted.attrs = deepcopy(ds_input.attrs)
  359. # TODO: change attrs for additional coordinates
  360. # change category terminology
  361. cat_dim = ds_converted.attrs["cat"]
  362. ds_converted.attrs["cat"] = f"category ({terminology_to})"
  363. ds_converted = ds_converted.rename({cat_dim: ds_converted.attrs["cat"]})
  364. # find categories present in dataset
  365. cats_present = list(ds_converted.coords[f"category ({terminology_to})"])
  366. # restrict categories and map category names
  367. if "mapping" in conversion.keys():
  368. mapping_cats_present = [
  369. cat for cat in list(conversion["mapping"].keys()) if cat in cats_present
  370. ]
  371. ds_converted = ds_converted.pr.loc[{"category": mapping_cats_present}]
  372. from_cats = ds_converted.coords[f"category ({terminology_to})"].values
  373. to_cats = pd.Series(from_cats).replace(conversion["mapping"])
  374. ds_converted = ds_converted.assign_coords(
  375. {f"category ({terminology_to})": (f"category ({terminology_to})", to_cats)}
  376. )
  377. # redo the list of present cats after mapping, as we have new categories in the
  378. # target terminology now
  379. cats_present_mapped = list(
  380. ds_converted.coords[f"category (" f"{terminology_to})"].values
  381. )
  382. # aggregate categories
  383. if "aggregate" in conversion:
  384. aggregate_cats = conversion["aggregate"]
  385. for cat_to_agg in aggregate_cats:
  386. if debug:
  387. print(f"Category: {cat_to_agg}")
  388. source_cats = [
  389. cat
  390. for cat in aggregate_cats[cat_to_agg]["sources"]
  391. if cat in cats_present_mapped
  392. ]
  393. if debug:
  394. print(source_cats)
  395. data_agg = ds_converted.pr.loc[{"category": source_cats}].pr.sum(
  396. dim="category", skipna=True, min_count=1
  397. )
  398. nan_vars = [
  399. var
  400. for var in data_agg.data_vars
  401. if data_agg[var].isnull().all().data == True
  402. ]
  403. data_agg = data_agg.drop(nan_vars)
  404. if len(data_agg.data_vars) > 0:
  405. data_agg = data_agg.expand_dims([f"category ({terminology_to})"])
  406. data_agg = data_agg.assign_coords(
  407. coords={
  408. f"category ({terminology_to})": (
  409. f"category ({terminology_to})",
  410. [cat_to_agg],
  411. )
  412. }
  413. )
  414. if cat_name_present:
  415. data_agg = data_agg.assign_coords(
  416. coords={
  417. "orig_cat_name": (
  418. f"category ({terminology_to})",
  419. [aggregate_cats[cat_to_agg]["name"]],
  420. )
  421. }
  422. )
  423. ds_converted = ds_converted.pr.merge(data_agg, tolerance=tolerance)
  424. cats_present_mapped.append(cat_to_agg)
  425. else:
  426. print(f"no data to aggregate category {cat_to_agg}")
  427. return ds_converted
  428. def get_country_name(
  429. country_code: str,
  430. ) -> str:
  431. """get country name from code"""
  432. if country_code in custom_country_mapping:
  433. country_name = custom_country_mapping[country_code]
  434. else:
  435. try:
  436. country = pycountry.countries.get(alpha_3=country_code)
  437. country_name = country.name
  438. except:
  439. raise ValueError(
  440. f"Country code {country_code} can not be mapped to " f"any country"
  441. )
  442. return country_name
  443. def get_country_code(
  444. country_name: str,
  445. ) -> str:
  446. """
  447. obtain country code. If the input is a code it will be returned,
  448. if the input
  449. is not a three letter code a search will be performed
  450. Parameters
  451. __________
  452. country_name: str
  453. Country code or name to get the three-letter code for.
  454. Returns
  455. -------
  456. country_code: str
  457. """
  458. # First check if it's in the list of custom codes
  459. if country_name in custom_country_mapping:
  460. country_code = country_name
  461. else:
  462. try:
  463. # check if it's a 3 letter UNFCCC_GHG_data
  464. country = pycountry.countries.get(alpha_3=country_name)
  465. country_code = country.alpha_3
  466. except:
  467. try:
  468. country = pycountry.countries.search_fuzzy(
  469. country_name.replace("_", " ")
  470. )
  471. except:
  472. raise ValueError(
  473. f"Country name {country_name} can not be mapped to "
  474. f"any country code. Try using the ISO3 code directly."
  475. )
  476. if len(country) > 1:
  477. country_code = None
  478. for current_country in country:
  479. if current_country.name == country_name:
  480. country_code = current_country.alpha_3
  481. if country_code is None:
  482. raise ValueError(
  483. f"Country name {country_name} has {len(country)} "
  484. f"possible results for country codes."
  485. )
  486. country_code = country[0].alpha_3
  487. return country_code
  488. def create_folder_mapping(folder: str, extracted: bool = False) -> None:
  489. """
  490. Create a mapping from 3 letter ISO country codes to folders
  491. based on the subfolders of the given folder. The mapping is
  492. stored in 'folder_mapping.json' in the given folder. Folder
  493. must be given relative to the repository root
  494. Parameters
  495. ----------
  496. folder: str
  497. folder to create the mapping for
  498. extracted: bool = False
  499. If true treat the folder as extracted data, where we
  500. only have one folder per country and no typos in the
  501. names
  502. Returns
  503. -------
  504. Nothing
  505. """
  506. folder = root_path / folder
  507. folder_mapping = {}
  508. # if not extracted:
  509. known_folders = custom_folders
  510. # else:
  511. # known_folders = {}
  512. for item in folder.iterdir():
  513. if item.is_dir() and not item.match("__pycache__"):
  514. if item.name in known_folders:
  515. ISO3 = known_folders[item.name]
  516. else:
  517. try:
  518. country = pycountry.countries.search_fuzzy(
  519. item.name.replace("_", " ")
  520. )
  521. if len(country) > 1:
  522. ISO3 = None
  523. for current_country in country:
  524. if current_country.name == item.name.replace("_", " "):
  525. ISO3 = current_country.alpha_3
  526. else:
  527. ISO3 = country[0].alpha_3
  528. except:
  529. ISO3 = None
  530. if ISO3 is None:
  531. print(f"No match for {item.name}")
  532. else:
  533. if ISO3 in folder_mapping.keys():
  534. folder_mapping[ISO3] = [folder_mapping[ISO3], item.name]
  535. else:
  536. folder_mapping[ISO3] = item.name
  537. with open(folder / "folder_mapping.json", "w") as mapping_file:
  538. json.dump(dict(sorted(folder_mapping.items())), mapping_file, indent=4)
  539. # TODO add crf
  540. def get_country_submissions(
  541. country_name: str,
  542. print_sub: bool = True,
  543. ) -> Dict[str, List[str]]:
  544. """
  545. Input is a three letter ISO UNFCCC_GHG_data for a country, or the countries name.
  546. The function tries to map the country name to an ISO UNFCCC_GHG_data and then
  547. queries the folder mapping files for folders.
  548. Parameters
  549. ----------
  550. country_name: str
  551. String containing the country name or ISO 3 letter UNFCCC_GHG_data
  552. print_sub: bool
  553. If True information on submissions will be written to stdout
  554. Returns
  555. -------
  556. returns a dict with keys for the dataset classes (e.g. UNFCCC, non-UNFCCC)
  557. Each value is a list of folders
  558. """
  559. data_folder = downloaded_data_path
  560. country_code = get_country_code(country_name)
  561. if print_sub:
  562. print(f"Country name {country_name} maps to ISO code {country_code}")
  563. country_submissions = {}
  564. if print_sub:
  565. print(f"#" * 80)
  566. print(f"The following submissions are available for {country_name}")
  567. for item in data_folder.iterdir():
  568. if item.is_dir():
  569. if print_sub:
  570. print("")
  571. print("-" * 80)
  572. print(f"Data folder {item.name}")
  573. print("-" * 80)
  574. with open(item / "folder_mapping.json", "r") as mapping_file:
  575. folder_mapping = json.load(mapping_file)
  576. if country_code in folder_mapping:
  577. country_folders = folder_mapping[country_code]
  578. if isinstance(country_folders, str):
  579. # only one folder
  580. country_folders = [country_folders]
  581. submission_folders = []
  582. for country_folder in country_folders:
  583. current_folder = item / country_folder
  584. if print_sub:
  585. print(f"Submissions in folder {country_folder}:")
  586. for submission_folder in current_folder.iterdir():
  587. if submission_folder.is_dir():
  588. if print_sub:
  589. print(submission_folder.name)
  590. submission_folders.append(submission_folder.name)
  591. country_submissions[item.name] = submission_folders
  592. else:
  593. print(f"No submissions available for {country_name}.")
  594. return country_submissions
  595. def get_country_datasets(
  596. country_name: str,
  597. print_ds: bool = True,
  598. ) -> Dict[str, List[str]]:
  599. """
  600. Input is a three letter ISO code for a country, or the country's name.
  601. The function tries to map the country name to an ISO UNFCCC_GHG_data and then
  602. checks the UNFCCC_GHG_data and data folders for content on the country.
  603. Parameters
  604. ----------
  605. country_name: str
  606. String containing the country name or ISO 3 letter code
  607. print_ds: bool
  608. If True information on submissions will be written to stdout
  609. Returns
  610. -------
  611. returns a dict with keys for the dataset classes (e.g. UNFCCC, non-UNFCCC)
  612. Each value is a list of folders
  613. """
  614. data_folder = extracted_data_path
  615. data_folder_legacy = legacy_data_path
  616. # obtain country UNFCCC_GHG_data
  617. country_code = get_country_code(country_name)
  618. if print_ds:
  619. print(f"Country name {country_name} maps to ISO code {country_code}")
  620. rep_data = {}
  621. # data
  622. if print_ds:
  623. print(f"#" * 80)
  624. print(f"The following datasets are available for {country_name}")
  625. for item in data_folder.iterdir():
  626. if item.is_dir():
  627. cleaned_datasets_current_folder = {}
  628. if print_ds:
  629. print("-" * 80)
  630. print(f"Data folder {item.name}")
  631. print("-" * 80)
  632. with open(item / "folder_mapping.json", "r") as mapping_file:
  633. folder_mapping = json.load(mapping_file)
  634. if country_code not in folder_mapping:
  635. if print_ds:
  636. print("No data available")
  637. print("")
  638. else:
  639. country_folder = folder_mapping[country_code]
  640. if not isinstance(country_folder, str):
  641. raise ValueError(
  642. "Wrong data type in folder mapping json file. Should be str."
  643. )
  644. datasets_current_folder = {}
  645. current_folder = item / country_folder
  646. for data_file in current_folder.iterdir():
  647. if data_file.suffix in [".nc", ".yaml", ".csv"]:
  648. if data_file.stem in datasets_current_folder:
  649. datasets_current_folder[data_file.stem].append(
  650. data_file.suffix
  651. )
  652. else:
  653. datasets_current_folder[data_file.stem] = [data_file.suffix]
  654. for dataset in datasets_current_folder:
  655. # process filename to get submission
  656. parts = dataset.split("_")
  657. if parts[0] != country_code:
  658. cleaned_datasets_current_folder[
  659. f"Wrong code: {parts[0]}"
  660. ] = dataset
  661. else:
  662. terminology = "_".join(parts[3:])
  663. key = f"{parts[1]} ({parts[2]}, {terminology})"
  664. data_info = ""
  665. if ".nc" in datasets_current_folder[dataset]:
  666. data_info = data_info + "NF (.nc), "
  667. if (".csv" in datasets_current_folder[dataset]) and (
  668. ".yaml" in datasets_current_folder[dataset]
  669. ):
  670. data_info = data_info + "IF (.yaml + .csv), "
  671. elif ".csv" in datasets_current_folder[dataset]:
  672. data_info = data_info + "incomplete IF? (.csv), "
  673. elif ".yaml" in datasets_current_folder[dataset]:
  674. data_info = data_info + "incomplete IF (.yaml), "
  675. code_file = get_code_file(country_code, parts[1])
  676. if code_file:
  677. data_info = data_info + f"code: {code_file.name}"
  678. else:
  679. data_info = data_info + f"code: not found"
  680. cleaned_datasets_current_folder[key] = data_info
  681. if print_ds:
  682. if cleaned_datasets_current_folder:
  683. for country_ds in cleaned_datasets_current_folder:
  684. print(
  685. f"{country_ds}: {cleaned_datasets_current_folder[country_ds]}"
  686. )
  687. else:
  688. print("No data available")
  689. print("")
  690. rep_data[item.name] = cleaned_datasets_current_folder
  691. # legacy data
  692. if print_ds:
  693. print(f"#" * 80)
  694. print(f"The following legacy datasets are available for {country_name}")
  695. legacy_data = {}
  696. for item in data_folder_legacy.iterdir():
  697. if item.is_dir():
  698. cleaned_datasets_current_folder = {}
  699. if print_ds:
  700. print("-" * 80)
  701. print(f"Data folder {item.name}")
  702. print("-" * 80)
  703. with open(item / "folder_mapping.json", "r") as mapping_file:
  704. folder_mapping = json.load(mapping_file)
  705. if country_code not in folder_mapping:
  706. if print_ds:
  707. print("No data available")
  708. print("")
  709. else:
  710. country_folder = folder_mapping[country_code]
  711. if not isinstance(country_folder, str):
  712. raise ValueError(
  713. "Wrong data type in folder mapping json file. Should be str."
  714. )
  715. datasets_current_folder = {}
  716. current_folder = item / country_folder
  717. for data_file in current_folder.iterdir():
  718. if data_file.suffix in [".nc", ".yaml", ".csv"]:
  719. if data_file.stem in datasets_current_folder:
  720. datasets_current_folder[data_file.stem].append(
  721. data_file.suffix
  722. )
  723. else:
  724. datasets_current_folder[data_file.stem] = [data_file.suffix]
  725. for dataset in datasets_current_folder:
  726. # process filename to get submission
  727. parts = dataset.split("_")
  728. if parts[0] != country_code:
  729. cleaned_datasets_current_folder[
  730. f"Wrong UNFCCC_GHG_data: {parts[0]}"
  731. ] = dataset
  732. else:
  733. terminology = "_".join(parts[3:])
  734. key = f"{parts[1]} ({parts[2]}, {terminology}, legacy)"
  735. data_info = ""
  736. if ".nc" in datasets_current_folder[dataset]:
  737. data_info = data_info + "NF (.nc), "
  738. if (".csv" in datasets_current_folder[dataset]) and (
  739. ".yaml" in datasets_current_folder[dataset]
  740. ):
  741. data_info = data_info + "IF (.yaml + .csv), "
  742. elif ".csv" in datasets_current_folder[dataset]:
  743. data_info = data_info + "incomplete IF? (.csv), "
  744. elif ".yaml" in datasets_current_folder[dataset]:
  745. data_info = data_info + "incomplete IF (.yaml), "
  746. cleaned_datasets_current_folder[key] = data_info
  747. if print_ds:
  748. if cleaned_datasets_current_folder:
  749. for country_ds in cleaned_datasets_current_folder:
  750. print(
  751. f"{country_ds}: {cleaned_datasets_current_folder[country_ds]}"
  752. )
  753. else:
  754. print("No data available")
  755. print("")
  756. legacy_data[item.name] = cleaned_datasets_current_folder
  757. all_data = {
  758. "rep_data": rep_data,
  759. "legacy_data": legacy_data,
  760. }
  761. return all_data
  762. def get_code_file(
  763. country_name: str,
  764. submission: str,
  765. print_info: bool = False,
  766. ) -> Path:
  767. """
  768. For given country name and submission find the script that creates the data
  769. Parameters
  770. ----------
  771. country_name: str
  772. String containing the country name or ISO 3 letter UNFCCC_GHG_data
  773. submission: str
  774. String of the submission
  775. print_info: bool = False
  776. If True print information on UNFCCC_GHG_data found
  777. Returns
  778. -------
  779. returns a pathlib Path object for the UNFCCC_GHG_data file
  780. """
  781. code_file_path = None
  782. UNFCCC_reader_path = code_path / "UNFCCC_reader"
  783. # CRF is an exception as it's read using the UNFCCC_CRF_reader module
  784. # so we return the path to that.
  785. if submission[0:3] == "CRF":
  786. return root_path / "UNFCCC_CRF_reader"
  787. if submission[0:2] == "DI":
  788. return root_path / "UNFCCC_DI_reader"
  789. # obtain country UNFCCC_GHG_data
  790. country_code = get_country_code(country_name)
  791. if print_info:
  792. print(f"Country name {country_name} maps to ISO UNFCCC_GHG_data {country_code}")
  793. with open(UNFCCC_reader_path / "folder_mapping.json", "r") as mapping_file:
  794. folder_mapping = json.load(mapping_file)
  795. if country_code not in folder_mapping:
  796. if print_info:
  797. print("No UNFCCC_GHG_data available")
  798. print("")
  799. else:
  800. country_folder = UNFCCC_reader_path / folder_mapping[country_code]
  801. code_file_name_candidate = "read_" + country_code + "_" + submission + "*"
  802. for file in country_folder.iterdir():
  803. if file.match(code_file_name_candidate):
  804. if code_file_path is not None:
  805. raise ValueError(
  806. f"Found multiple UNFCCC_GHG_data candidates: "
  807. f"{code_file_path} and file.name. "
  808. f"Please use only one file with name "
  809. f"'read_ISO3_submission_XXX.YYY'."
  810. )
  811. else:
  812. if print_info:
  813. print(
  814. f"Found UNFCCC_GHG_data file {file.relative_to(root_path)}"
  815. )
  816. code_file_path = file
  817. if code_file_path is not None:
  818. return code_file_path.relative_to(root_path)
  819. else:
  820. return None
  821. def fix_rows(
  822. data: pd.DataFrame, rows_to_fix: list, col_to_use: str, n_rows: int
  823. ) -> pd.DataFrame:
  824. """
  825. Function to fix rows that have been split during reading from pdf
  826. This is the version used for Malaysia BUR3,4. adapt for other BURs if needed
  827. :param data:
  828. :param rows_to_fix:
  829. :param col_to_use:
  830. :param n_rows:
  831. :return:
  832. """
  833. for row in rows_to_fix:
  834. # print(row)
  835. # find the row number and collect the row and the next two rows
  836. index = data.loc[data[col_to_use] == row].index
  837. # print(list(index))
  838. if not list(index):
  839. print(f"Can't merge split row {row}")
  840. print(data[col_to_use])
  841. # print(f"Merging split row {row} for table {page}")
  842. loc = data.index.get_loc(index[0])
  843. if n_rows == -2:
  844. locs_to_merge = list(range(loc - 1, loc + 1))
  845. elif n_rows == -3:
  846. locs_to_merge = list(range(loc - 1, loc + 2))
  847. elif n_rows == -5:
  848. locs_to_merge = list(range(loc - 1, loc + 4))
  849. else:
  850. locs_to_merge = list(range(loc, loc + n_rows))
  851. rows_to_merge = data.iloc[locs_to_merge]
  852. indices_to_merge = rows_to_merge.index
  853. # join the three rows
  854. new_row = rows_to_merge.agg(" ".join)
  855. # replace the double spaces that are created
  856. # must be done here and not at the end as splits are not always
  857. # the same and join would produce different col values
  858. new_row = new_row.str.replace(" ", " ")
  859. new_row = new_row.str.replace("N O", "NO")
  860. new_row = new_row.str.replace(", N", ",N")
  861. new_row = new_row.str.replace("- ", "-")
  862. # replace spaces in numbers
  863. pat = r"^(?P<first>[0-9\.,]*)\s(?P<last>[0-9\.,]*)$"
  864. repl = lambda m: f"{m.group('first')}{m.group('last')}"
  865. new_row = new_row.str.replace(pat, repl, regex=True)
  866. data.loc[indices_to_merge[0]] = new_row
  867. data = data.drop(indices_to_merge[1:])
  868. return data
  869. def make_wide_table(
  870. data: pd.DataFrame,
  871. keyword: str,
  872. col: Union[int, str],
  873. index_cols: List[Union[int, str]]
  874. ) -> pd.DataFrame:
  875. index = data.loc[data[col] == keyword].index
  876. if not list(index):
  877. print("Keyword for table transformation not found")
  878. return data
  879. elif len(index)==1:
  880. print("Keyword for table transformation found only once")
  881. return data
  882. else:
  883. df_all = None
  884. for i, item in enumerate(index):
  885. loc = data.index.get_loc(item)
  886. if i < len(index) - 1:
  887. next_loc = data.index.get_loc(index[i + 1])
  888. else:
  889. next_loc = data.index[-1] + 1
  890. df_to_add = data.loc[list(range(loc, next_loc))]
  891. # select only cols which don't have NaN, Null, or '' as header
  892. filter_nan = ((~df_to_add.iloc[0].isnull()) & (df_to_add.iloc[0] != 'NaN')& (df_to_add.iloc[0] != ''))
  893. df_to_add = df_to_add.loc[: , filter_nan]
  894. df_to_add.columns = df_to_add.iloc[0]
  895. #print(df_to_add.columns)
  896. df_to_add = df_to_add.drop(loc)
  897. df_to_add = df_to_add.set_index(index_cols)
  898. if df_all is None:
  899. df_all = df_to_add
  900. else:
  901. df_all = pd.concat([df_all, df_to_add], axis=1, join='outer')
  902. return df_all