functions.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868
  1. import pycountry
  2. import json
  3. import re
  4. import xarray as xr
  5. import pandas as pd
  6. import numpy as np
  7. from datetime import date
  8. from copy import deepcopy
  9. from typing import Dict, List, Optional
  10. from pathlib import Path
  11. from .definitions import custom_country_mapping, custom_folders
  12. from .definitions import root_path, downloaded_data_path, extracted_data_path
  13. from .definitions import legacy_data_path, code_path
  14. from .definitions import GWP_factors
  15. def process_data_for_country(
  16. data_country: xr.Dataset,
  17. entities_to_ignore: List[str],
  18. gas_baskets: Dict[str, List[str]],
  19. filter_dims: Optional[Dict[str, List[str]]] = None,
  20. cat_terminology_out: Optional[str] = None,
  21. category_conversion: Dict[str, Dict] = None,
  22. sectors_out: List[str] = None,
  23. processing_info_country: Dict = None,
  24. ) -> xr.Dataset:
  25. """
  26. Process data from DI interface (where necessary).
  27. * Downscaling including subtraction of time series
  28. * country specific sector aggregation
  29. * Conversion to IPCC2006 categories
  30. * general sector and gas basket aggregation (in new categories)
  31. """
  32. # 0: gather information
  33. countries = list(data_country.coords[data_country.attrs['area']].values)
  34. if len(countries) > 1:
  35. raise ValueError(
  36. f"Found {len(countries)} countries. Only single country data "
  37. f"can be processed by this function. countries: {countries}")
  38. else:
  39. country_code = countries[0]
  40. # get category terminology
  41. cat_col = data_country.attrs['cat']
  42. temp = re.findall(r'\((.*)\)', cat_col)
  43. cat_terminology_in = temp[0]
  44. # get scenario
  45. scenarios = list(data_country.coords[data_country.attrs['scen']].values)
  46. if len(scenarios) > 1:
  47. raise ValueError(
  48. f"Found {len(scenarios)} scenarios. Only single scenario data "
  49. f"can be processed by this function. Scenarios: {scenarios}")
  50. scenario = scenarios[0]
  51. # get source
  52. sources = list(data_country.coords['source'].values)
  53. if len(sources) > 1:
  54. raise ValueError(
  55. f"Found {len(sources)} sources. Only single source data "
  56. f"can be processed by this function. Sources: {sources}")
  57. source = sources[0]
  58. # check if category name column present
  59. # TODO: replace 'name' in config by 'additional_cols' dict that defines the cols
  60. # and the values
  61. if 'orig_cat_name' in data_country.coords:
  62. cat_name_present = True
  63. else:
  64. cat_name_present = False
  65. # 1: general processing
  66. # remove unused cats
  67. data_country = data_country.dropna(f'category ({cat_terminology_in})', how='all')
  68. # remove unused years
  69. data_country = data_country.dropna(f'time', how='all')
  70. # remove variables only containing nan
  71. nan_vars_country = [var for var in data_country.data_vars if
  72. bool(data_country[var].isnull().all().data) is True]
  73. print(f"removing all-nan variables: {nan_vars_country}")
  74. data_country = data_country.drop_vars(nan_vars_country)
  75. # remove unnecessary variables
  76. entities_ignore_present = [entity for entity in entities_to_ignore if
  77. entity in data_country.data_vars]
  78. data_country = data_country.drop_vars(entities_ignore_present)
  79. # filter ()
  80. if filter_dims is not None:
  81. data_country = data_country.pr.loc[filter_dims]
  82. # 2: country specific processing
  83. if processing_info_country is not None:
  84. if 'tolerance' in processing_info_country:
  85. tolerance = processing_info_country["tolerance"]
  86. else:
  87. tolerance = 0.01
  88. # remove entities if needed
  89. if 'ignore_entities' in processing_info_country:
  90. entities_to_ignore_country = processing_info_country[
  91. 'ignore_entities']
  92. entities_ignore_present = \
  93. [entity for entity in entities_to_ignore_country if
  94. entity in data_country.data_vars]
  95. data_country = data_country.drop_vars(entities_ignore_present)
  96. # take only desired years
  97. if 'years' in processing_info_country:
  98. data_country = data_country.pr.loc[
  99. {'time': processing_info_country['years']}]
  100. # remove timeseries if desired
  101. if 'remove_ts' in processing_info_country:
  102. for case in processing_info_country['remove_ts']:
  103. remove_info = processing_info_country['remove_ts'][case]
  104. entities = remove_info.pop("entities")
  105. for entity in entities:
  106. data_country[entity].pr.loc[remove_info] = \
  107. data_country[entity].pr.loc[remove_info] * np.nan
  108. # remove all data for given years if necessary
  109. if 'remove_years' in processing_info_country:
  110. data_country = data_country.drop_sel(
  111. time=processing_info_country['remove_years'])
  112. # subtract categories
  113. if 'subtract_cats' in processing_info_country:
  114. subtract_cats_current = processing_info_country['subtract_cats']
  115. if 'entities' in subtract_cats_current.keys():
  116. entities_current = subtract_cats_current['entities']
  117. else:
  118. entities_current = list(data_country.data_vars)
  119. print(f"Subtracting categories for country {country_code}, entities "
  120. f"{entities_current}")
  121. for cat_to_generate in subtract_cats_current:
  122. cats_to_subtract = \
  123. subtract_cats_current[cat_to_generate]['subtract']
  124. data_sub = \
  125. data_country.pr.loc[{'category': cats_to_subtract}].pr.sum(
  126. dim='category', skipna=True, min_count=1)
  127. data_parent = data_country.pr.loc[
  128. {'category': subtract_cats_current[cat_to_generate]['parent']}]
  129. data_agg = data_parent - data_sub
  130. nan_vars = [var for var in data_agg.data_vars if
  131. data_agg[var].isnull().all().data is True]
  132. data_agg = data_agg.drop(nan_vars)
  133. if len(data_agg.data_vars) > 0:
  134. print(f"Generating {cat_to_generate} through subtraction")
  135. data_agg = data_agg.expand_dims([f'category ('
  136. f'{cat_terminology_in})'])
  137. data_agg = data_agg.assign_coords(
  138. coords={f'category ({cat_terminology_in})':
  139. (f'category ({cat_terminology_in})',
  140. [cat_to_generate])})
  141. if cat_name_present:
  142. cat_name = subtract_cats_current[cat_to_generate]['name']
  143. data_agg = data_agg.assign_coords(
  144. coords={'orig_cat_name':
  145. (f'category ({cat_terminology_in})',
  146. [cat_name])})
  147. data_country = data_country.pr.merge(data_agg,
  148. tolerance=tolerance)
  149. else:
  150. print(f"no data to generate category {cat_to_generate}")
  151. # downscaling
  152. if 'downscale' in processing_info_country:
  153. if 'sectors' in processing_info_country['downscale']:
  154. sector_downscaling = \
  155. processing_info_country['downscale']['sectors']
  156. for case in sector_downscaling.keys():
  157. print(f"Downscaling for {case}.")
  158. sector_downscaling_current = sector_downscaling[case]
  159. entities = sector_downscaling_current.pop('entities')
  160. for entity in entities:
  161. data_country[entity] = data_country[
  162. entity].pr.downscale_timeseries(
  163. **sector_downscaling_current)
  164. # , skipna_evaluation_dims=None)
  165. if 'entities' in processing_info_country['downscale']:
  166. entity_downscaling = \
  167. processing_info_country['downscale']['entities']
  168. for case in entity_downscaling.keys():
  169. print(f"Downscaling for {case}.")
  170. # print(data_country.coords[f'category ('
  171. # f'{cat_terminology_in})'].values)
  172. data_country = data_country.pr.downscale_gas_timeseries(
  173. **entity_downscaling[case], skipna=True,
  174. skipna_evaluation_dims=None)
  175. # aggregate categories
  176. if 'aggregate_cats' in processing_info_country:
  177. if 'agg_tolerance' in processing_info_country:
  178. agg_tolerance = processing_info_country['agg_tolerance']
  179. else:
  180. agg_tolerance = tolerance
  181. aggregate_cats_current = processing_info_country['aggregate_cats']
  182. print(
  183. f"Aggregating categories for country {country_code}, source {source}, "
  184. f"scenario {scenario}")
  185. for cat_to_agg in aggregate_cats_current:
  186. print(f"Category: {cat_to_agg}")
  187. source_cats = aggregate_cats_current[cat_to_agg]['sources']
  188. data_agg = data_country.pr.loc[{'category': source_cats}].pr.sum(
  189. dim='category', skipna=True, min_count=1)
  190. nan_vars = [var for var in data_agg.data_vars if
  191. data_agg[var].isnull().all().data is True]
  192. data_agg = data_agg.drop(nan_vars)
  193. if len(data_agg.data_vars) > 0:
  194. data_agg = data_agg.expand_dims([f'category ('
  195. f'{cat_terminology_in})'])
  196. data_agg = data_agg.assign_coords(
  197. coords={f'category ({cat_terminology_in})':
  198. (f'category ({cat_terminology_in})',
  199. [cat_to_agg])})
  200. if cat_name_present:
  201. cat_name = aggregate_cats_current[cat_to_agg]['name']
  202. data_agg = data_agg.assign_coords(
  203. coords={'orig_cat_name':
  204. (f'category ({cat_terminology_in})',
  205. [cat_name])})
  206. data_country = data_country.pr.merge(data_agg,
  207. tolerance=agg_tolerance)
  208. else:
  209. print(f"no data to aggregate category {cat_to_agg}")
  210. # copy HFCs and PFCs with default factors
  211. if 'basket_copy' in processing_info_country:
  212. GWPs_to_add = processing_info_country["basket_copy"]["GWPs_to_add"]
  213. entities = processing_info_country["basket_copy"]["entities"]
  214. source_GWP = processing_info_country["basket_copy"]["source_GWP"]
  215. for entity in entities:
  216. data_source = data_country[f'{entity} ({source_GWP})']
  217. for GWP in GWPs_to_add:
  218. data_GWP = data_source * \
  219. GWP_factors[f"{source_GWP}_to_{GWP}"][entity]
  220. data_GWP.attrs["entity"] = entity
  221. data_GWP.attrs["gwp_context"] = GWP
  222. data_country[f"{entity} ({GWP})"] = data_GWP
  223. # aggregate gases if desired
  224. if 'aggregate_gases' in processing_info_country:
  225. # TODO: why use different code here than below. Can this fill non-existen
  226. # gas baskets?
  227. for case in processing_info_country['aggregate_gases'].keys():
  228. case_info = processing_info_country['aggregate_gases'][case]
  229. data_country[case_info['basket']] = \
  230. data_country.pr.fill_na_gas_basket_from_contents(
  231. **case_info)
  232. # 3: map categories
  233. if category_conversion is not None:
  234. data_country = convert_categories(
  235. data_country,
  236. category_conversion,
  237. cat_terminology_out,
  238. debug=False,
  239. tolerance=0.01,
  240. )
  241. else:
  242. cat_terminology_out = cat_terminology_in
  243. # more general processing
  244. # reduce categories to output cats
  245. if sectors_out is not None:
  246. cats_to_keep = [cat for cat in
  247. data_country.coords[f'category ({cat_terminology_out})'].values
  248. if cat in sectors_out]
  249. data_country = data_country.pr.loc[{'category': cats_to_keep}]
  250. # create gas baskets
  251. entities_present = set(data_country.data_vars)
  252. for basket in gas_baskets.keys():
  253. basket_contents_present = [gas for gas in gas_baskets[basket] if
  254. gas in entities_present]
  255. if len(basket_contents_present) > 0:
  256. if basket in list(data_country.data_vars):
  257. data_country[basket] = data_country.pr.fill_na_gas_basket_from_contents(
  258. basket=basket, basket_contents=basket_contents_present,
  259. skipna=True, min_count=1)
  260. else:
  261. try:
  262. #print(data_country.data_vars)
  263. data_country[basket] = xr.full_like(data_country["CO2"],
  264. np.nan).pr.quantify(
  265. units="Gg CO2 / year")
  266. data_country[basket].attrs = {"entity": basket.split(' ')[0],
  267. "gwp_context": basket.split(' ')[1][
  268. 1:-1]}
  269. data_country[basket] = data_country.pr.gas_basket_contents_sum(
  270. basket=basket, basket_contents=basket_contents_present,
  271. min_count=1)
  272. entities_present.add(basket)
  273. except Exception as ex:
  274. print(f"No gas basket created for {country_code}, {source}, "
  275. f"{scenario}: {ex}")
  276. # amend title and comment
  277. data_country.attrs["comment"] = data_country.attrs["comment"] + f" Processed on " \
  278. f"{date.today()}"
  279. data_country.attrs["title"] = data_country.attrs["title"] + f" Processed on " \
  280. f"{date.today()}"
  281. return data_country
  282. def convert_categories(
  283. ds_input: xr.Dataset,
  284. conversion: Dict[str, Dict[str, str]],
  285. #terminology_from: str,
  286. terminology_to: str,
  287. debug: bool=False,
  288. tolerance: float=0.01,
  289. )->xr.Dataset:
  290. """
  291. convert data from one category terminology to another
  292. """
  293. print(f"converting categories to {terminology_to}")
  294. if 'orig_cat_name' in ds_input.coords:
  295. cat_name_present = True
  296. else:
  297. cat_name_present = False
  298. ds_converted = ds_input.copy(deep=True)
  299. ds_converted.attrs = deepcopy(ds_input.attrs)
  300. # TODO: change attrs for additional coordinates
  301. # change category terminology
  302. cat_dim = ds_converted.attrs["cat"]
  303. ds_converted.attrs["cat"] = f"category ({terminology_to})"
  304. ds_converted = ds_converted.rename({cat_dim: ds_converted.attrs["cat"]})
  305. # find categories present in dataset
  306. cats_present = list(ds_converted.coords[f'category ({terminology_to})'])
  307. # restrict categories and map category names
  308. if 'mapping' in conversion.keys():
  309. mapping_cats_present = [cat for cat in list(conversion['mapping'].keys()) if
  310. cat in cats_present]
  311. ds_converted = ds_converted.pr.loc[
  312. {'category': mapping_cats_present}]
  313. from_cats = ds_converted.coords[f'category ({terminology_to})'].values
  314. to_cats = pd.Series(from_cats).replace(conversion['mapping'])
  315. ds_converted = ds_converted.assign_coords({f'category ({terminology_to})':
  316. (f'category ({terminology_to})',
  317. to_cats)})
  318. # redo the list of present cats after mapping, as we have new categories in the
  319. # target terminology now
  320. cats_present_mapped = list(ds_converted.coords[f'category ('
  321. f'{terminology_to})'].values)
  322. # aggregate categories
  323. if 'aggregate' in conversion:
  324. aggregate_cats = conversion['aggregate']
  325. for cat_to_agg in aggregate_cats:
  326. if debug:
  327. print(f"Category: {cat_to_agg}")
  328. source_cats = [cat for cat in aggregate_cats[cat_to_agg]['sources'] if
  329. cat in cats_present_mapped]
  330. if debug:
  331. print(source_cats)
  332. data_agg = ds_converted.pr.loc[{'category': source_cats}].pr.sum(
  333. dim='category', skipna=True, min_count=1)
  334. nan_vars = [var for var in data_agg.data_vars if
  335. data_agg[var].isnull().all().data == True]
  336. data_agg = data_agg.drop(nan_vars)
  337. if len(data_agg.data_vars) > 0:
  338. data_agg = data_agg.expand_dims([f'category ({terminology_to})'])
  339. data_agg = data_agg.assign_coords(
  340. coords={f'category ({terminology_to})':
  341. (f'category ({terminology_to})', [cat_to_agg])})
  342. if cat_name_present:
  343. data_agg = data_agg.assign_coords(
  344. coords={'orig_cat_name':
  345. (f'category ({terminology_to})',
  346. [aggregate_cats[cat_to_agg]['name']])})
  347. ds_converted = ds_converted.pr.merge(data_agg, tolerance=tolerance)
  348. cats_present_mapped.append(cat_to_agg)
  349. else:
  350. print(f"no data to aggregate category {cat_to_agg}")
  351. return ds_converted
  352. def get_country_name(
  353. country_code: str,
  354. ) -> str:
  355. """get country name from code """
  356. if country_code in custom_country_mapping:
  357. country_name = custom_country_mapping[country_code]
  358. else:
  359. try:
  360. country = pycountry.countries.get(alpha_3=country_code)
  361. country_name = country.name
  362. except:
  363. raise ValueError(f"Country code {country_code} can not be mapped to "
  364. f"any country")
  365. return country_name
  366. def get_country_code(
  367. country_name: str,
  368. )->str:
  369. """
  370. obtain country code. If the input is a code it will be returned,
  371. if the input
  372. is not a three letter code a search will be performed
  373. Parameters
  374. __________
  375. country_name: str
  376. Country code or name to get the three-letter code for.
  377. Returns
  378. -------
  379. country_code: str
  380. """
  381. # First check if it's in the list of custom codes
  382. if country_name in custom_country_mapping:
  383. country_code = country_name
  384. else:
  385. try:
  386. # check if it's a 3 letter UNFCCC_GHG_data
  387. country = pycountry.countries.get(alpha_3=country_name)
  388. country_code = country.alpha_3
  389. except:
  390. try:
  391. country = pycountry.countries.search_fuzzy(country_name.replace("_", " "))
  392. except:
  393. raise ValueError(f"Country name {country_name} can not be mapped to "
  394. f"any country UNFCCC_GHG_data. Try using the ISO3 UNFCCC_GHG_data directly.")
  395. if len(country) > 1:
  396. country_code = None
  397. for current_country in country:
  398. if current_country.name == country_name:
  399. country_code = current_country.alpha_3
  400. if country_code is None:
  401. raise ValueError(f"Country name {country_name} has {len(country)} "
  402. f"possible results for country codes.")
  403. country_code = country[0].alpha_3
  404. return country_code
  405. def create_folder_mapping(
  406. folder: str,
  407. extracted: bool = False
  408. ) -> None:
  409. """
  410. Create a mapping from 3 letter ISO country codes to folders
  411. based on the subfolders of the given folder. The mapping is
  412. stored in 'folder_mapping.json' in the given folder. Folder
  413. must be given relative to the repository root
  414. Parameters
  415. ----------
  416. folder: str
  417. folder to create the mapping for
  418. extracted: bool = False
  419. If true treat the folder as extracted data, where we
  420. only have one folder per country and no typos in the
  421. names
  422. Returns
  423. -------
  424. Nothing
  425. """
  426. folder = root_path / folder
  427. folder_mapping = {}
  428. #if not extracted:
  429. known_folders = custom_folders
  430. #else:
  431. # known_folders = {}
  432. for item in folder.iterdir():
  433. if item.is_dir() and not item.match("__pycache__"):
  434. if item.name in known_folders:
  435. ISO3 = known_folders[item.name]
  436. else:
  437. try:
  438. country = pycountry.countries.search_fuzzy(item.name.replace("_", " "))
  439. if len(country) > 1:
  440. ISO3 = None
  441. for current_country in country:
  442. if current_country.name == item.name.replace("_", " "):
  443. ISO3 = current_country.alpha_3
  444. else:
  445. ISO3 = country[0].alpha_3
  446. except:
  447. ISO3 = None
  448. if ISO3 is None:
  449. print(f"No match for {item.name}")
  450. else:
  451. if ISO3 in folder_mapping.keys():
  452. folder_mapping[ISO3] = [folder_mapping[ISO3], item.name]
  453. else:
  454. folder_mapping[ISO3] = item.name
  455. with open(folder / "folder_mapping.json", "w") as mapping_file:
  456. json.dump(folder_mapping, mapping_file, indent=4)
  457. # TODO add crf
  458. def get_country_submissions(
  459. country_name: str,
  460. print_sub: bool = True,
  461. ) -> Dict[str, List[str]]:
  462. """
  463. Input is a three letter ISO UNFCCC_GHG_data for a country, or the countries name.
  464. The function tries to map the country name to an ISO UNFCCC_GHG_data and then
  465. queries the folder mapping files for folders.
  466. Parameters
  467. ----------
  468. country_name: str
  469. String containing the country name or ISO 3 letter UNFCCC_GHG_data
  470. print_sub: bool
  471. If True information on submissions will be written to stdout
  472. Returns
  473. -------
  474. returns a dict with keys for the dataset classes (e.g. UNFCCC, non-UNFCCC)
  475. Each value is a list of folders
  476. """
  477. data_folder = downloaded_data_path
  478. country_code = get_country_code(country_name)
  479. if print_sub:
  480. print(f"Country name {country_name} maps to ISO code {country_code}")
  481. country_submissions = {}
  482. if print_sub:
  483. print(f"#" * 80)
  484. print(f"The following submissions are available for {country_name}")
  485. for item in data_folder.iterdir():
  486. if item.is_dir():
  487. if print_sub:
  488. print("")
  489. print("-" * 80)
  490. print(f"Data folder {item.name}")
  491. print("-" * 80)
  492. with open(item / "folder_mapping.json", "r") as mapping_file:
  493. folder_mapping = json.load(mapping_file)
  494. if country_code in folder_mapping:
  495. country_folders = folder_mapping[country_code]
  496. if isinstance(country_folders, str):
  497. # only one folder
  498. country_folders = [country_folders]
  499. submission_folders = []
  500. for country_folder in country_folders:
  501. current_folder = item / country_folder
  502. if print_sub:
  503. print(f"Submissions in folder {country_folder}:")
  504. for submission_folder in current_folder.iterdir():
  505. if submission_folder.is_dir():
  506. if print_sub:
  507. print(submission_folder.name)
  508. submission_folders.append(submission_folder.name)
  509. country_submissions[item.name] = submission_folders
  510. else:
  511. print(f"No submissions available for {country_name}.")
  512. return country_submissions
  513. def get_country_datasets(
  514. country_name: str,
  515. print_ds: bool = True,
  516. ) -> Dict[str, List[str]]:
  517. """
  518. Input is a three letter ISO code for a country, or the country's name.
  519. The function tries to map the country name to an ISO UNFCCC_GHG_data and then
  520. checks the UNFCCC_GHG_data and data folders for content on the country.
  521. Parameters
  522. ----------
  523. country_name: str
  524. String containing the country name or ISO 3 letter code
  525. print_ds: bool
  526. If True information on submissions will be written to stdout
  527. Returns
  528. -------
  529. returns a dict with keys for the dataset classes (e.g. UNFCCC, non-UNFCCC)
  530. Each value is a list of folders
  531. """
  532. data_folder = extracted_data_path
  533. data_folder_legacy = legacy_data_path
  534. # obtain country UNFCCC_GHG_data
  535. country_code = get_country_code(country_name)
  536. if print_ds:
  537. print(f"Country name {country_name} maps to ISO code {country_code}")
  538. rep_data = {}
  539. # data
  540. if print_ds:
  541. print(f"#" * 80)
  542. print(f"The following datasets are available for {country_name}")
  543. for item in data_folder.iterdir():
  544. if item.is_dir():
  545. cleaned_datasets_current_folder = {}
  546. if print_ds:
  547. print("-" * 80)
  548. print(f"Data folder {item.name}")
  549. print("-" * 80)
  550. with open(item / "folder_mapping.json", "r") as mapping_file:
  551. folder_mapping = json.load(mapping_file)
  552. if country_code not in folder_mapping:
  553. if print_ds:
  554. print("No data available")
  555. print("")
  556. else:
  557. country_folder = folder_mapping[country_code]
  558. if not isinstance(country_folder, str):
  559. raise ValueError("Wrong data type in folder mapping json file. Should be str.")
  560. datasets_current_folder = {}
  561. current_folder = item / country_folder
  562. for data_file in current_folder.iterdir():
  563. if data_file.suffix in ['.nc', '.yaml', '.csv']:
  564. if data_file.stem in datasets_current_folder:
  565. datasets_current_folder[data_file.stem].append(data_file.suffix)
  566. else:
  567. datasets_current_folder[data_file.stem] = [data_file.suffix]
  568. for dataset in datasets_current_folder:
  569. # process filename to get submission
  570. parts = dataset.split('_')
  571. if parts[0] != country_code:
  572. cleaned_datasets_current_folder[f'Wrong code: {parts[0]}'] =\
  573. dataset
  574. else:
  575. terminology = "_".join(parts[3 : ])
  576. key = f"{parts[1]} ({parts[2]}, {terminology})"
  577. data_info = ""
  578. if '.nc' in datasets_current_folder[dataset]:
  579. data_info = data_info + "NF (.nc), "
  580. if ('.csv' in datasets_current_folder[dataset]) and ('.yaml' in datasets_current_folder[dataset]):
  581. data_info = data_info + "IF (.yaml + .csv), "
  582. elif '.csv' in datasets_current_folder[dataset]:
  583. data_info = data_info + "incomplete IF? (.csv), "
  584. elif '.yaml' in datasets_current_folder[dataset]:
  585. data_info = data_info + "incomplete IF (.yaml), "
  586. code_file = get_code_file(country_code, parts[1])
  587. if code_file:
  588. data_info = data_info + f"code: {code_file.name}"
  589. else:
  590. data_info = data_info + f"code: not found"
  591. cleaned_datasets_current_folder[key] = data_info
  592. if print_ds:
  593. if cleaned_datasets_current_folder:
  594. for country_ds in cleaned_datasets_current_folder:
  595. print(f"{country_ds}: {cleaned_datasets_current_folder[country_ds]}")
  596. else:
  597. print("No data available")
  598. print("")
  599. rep_data[item.name] = cleaned_datasets_current_folder
  600. # legacy data
  601. if print_ds:
  602. print(f"#" * 80)
  603. print(f"The following legacy datasets are available for {country_name}")
  604. legacy_data = {}
  605. for item in data_folder_legacy.iterdir():
  606. if item.is_dir():
  607. cleaned_datasets_current_folder = {}
  608. if print_ds:
  609. print("-" * 80)
  610. print(f"Data folder {item.name}")
  611. print("-" * 80)
  612. with open(item / "folder_mapping.json", "r") as mapping_file:
  613. folder_mapping = json.load(mapping_file)
  614. if country_code not in folder_mapping:
  615. if print_ds:
  616. print("No data available")
  617. print("")
  618. else:
  619. country_folder = folder_mapping[country_code]
  620. if not isinstance(country_folder, str):
  621. raise ValueError("Wrong data type in folder mapping json file. Should be str.")
  622. datasets_current_folder = {}
  623. current_folder = item / country_folder
  624. for data_file in current_folder.iterdir():
  625. if data_file.suffix in ['.nc', '.yaml', '.csv']:
  626. if data_file.stem in datasets_current_folder:
  627. datasets_current_folder[data_file.stem].append(data_file.suffix)
  628. else:
  629. datasets_current_folder[data_file.stem] = [data_file.suffix]
  630. for dataset in datasets_current_folder:
  631. # process filename to get submission
  632. parts = dataset.split('_')
  633. if parts[0] != country_code:
  634. cleaned_datasets_current_folder[f'Wrong UNFCCC_GHG_data: {parts[0]}'] = dataset
  635. else:
  636. terminology = "_".join(parts[3 : ])
  637. key = f"{parts[1]} ({parts[2]}, {terminology}, legacy)"
  638. data_info = ""
  639. if '.nc' in datasets_current_folder[dataset]:
  640. data_info = data_info + "NF (.nc), "
  641. if ('.csv' in datasets_current_folder[dataset]) and ('.yaml' in datasets_current_folder[dataset]):
  642. data_info = data_info + "IF (.yaml + .csv), "
  643. elif '.csv' in datasets_current_folder[dataset]:
  644. data_info = data_info + "incomplete IF? (.csv), "
  645. elif '.yaml' in datasets_current_folder[dataset]:
  646. data_info = data_info + "incomplete IF (.yaml), "
  647. cleaned_datasets_current_folder[key] = data_info
  648. if print_ds:
  649. if cleaned_datasets_current_folder:
  650. for country_ds in cleaned_datasets_current_folder:
  651. print(f"{country_ds}: {cleaned_datasets_current_folder[country_ds]}")
  652. else:
  653. print("No data available")
  654. print("")
  655. legacy_data[item.name] = cleaned_datasets_current_folder
  656. all_data = {
  657. "rep_data": rep_data,
  658. "legacy_data": legacy_data,
  659. }
  660. return all_data
  661. def get_code_file(
  662. country_name: str,
  663. submission: str,
  664. print_info: bool = False,
  665. ) -> Path:
  666. """
  667. For given country name and submission find the script that creates the data
  668. Parameters
  669. ----------
  670. country_name: str
  671. String containing the country name or ISO 3 letter UNFCCC_GHG_data
  672. submission: str
  673. String of the submission
  674. print_info: bool = False
  675. If True print information on UNFCCC_GHG_data found
  676. Returns
  677. -------
  678. returns a pathlib Path object for the UNFCCC_GHG_data file
  679. """
  680. code_file_path = None
  681. UNFCCC_reader_path = code_path / "UNFCCC_reader"
  682. # CRF is an exception as it's read using the UNFCCC_CRF_reader module
  683. # so we return the path to that.
  684. if submission[0:3] == "CRF":
  685. return root_path / "UNFCCC_CRF_reader"
  686. if submission[0:2] == "DI":
  687. return root_path / "UNFCCC_DI_reader"
  688. # obtain country UNFCCC_GHG_data
  689. country_code = get_country_code(country_name)
  690. if print_info:
  691. print(f"Country name {country_name} maps to ISO UNFCCC_GHG_data {country_code}")
  692. with open(UNFCCC_reader_path / "folder_mapping.json", "r") as mapping_file:
  693. folder_mapping = json.load(mapping_file)
  694. if country_code not in folder_mapping:
  695. if print_info:
  696. print("No UNFCCC_GHG_data available")
  697. print("")
  698. else:
  699. country_folder = UNFCCC_reader_path / folder_mapping[country_code]
  700. code_file_name_candidate = "read_" + country_code + "_" + submission + "*"
  701. for file in country_folder.iterdir():
  702. if file.match(code_file_name_candidate):
  703. if code_file_path is not None:
  704. raise ValueError(f"Found multiple UNFCCC_GHG_data candidates: "
  705. f"{code_file_path} and file.name. "
  706. f"Please use only one file with name "
  707. f"'read_ISO3_submission_XXX.YYY'.")
  708. else:
  709. if print_info:
  710. print(f"Found UNFCCC_GHG_data file {file.relative_to(root_path)}")
  711. code_file_path = file
  712. if code_file_path is not None:
  713. return code_file_path.relative_to(root_path)
  714. else:
  715. return None
  716. def fix_rows(data: pd.DataFrame, rows_to_fix: list, col_to_use: str, n_rows: int)->pd.DataFrame:
  717. '''
  718. Function to fix rows that have been split during reading from pdf
  719. This is the version used for Malaysia BUR3,4. adapt for other BURs if needed
  720. :param data:
  721. :param rows_to_fix:
  722. :param col_to_use:
  723. :param n_rows:
  724. :return:
  725. '''
  726. for row in rows_to_fix:
  727. #print(row)
  728. # find the row number and collect the row and the next two rows
  729. index = data.loc[data[col_to_use] == row].index
  730. #print(list(index))
  731. if not list(index):
  732. print(f"Can't merge split row {row}")
  733. print(data[col_to_use])
  734. #print(f"Merging split row {row} for table {page}")
  735. loc = data.index.get_loc(index[0])
  736. if n_rows == -3:
  737. locs_to_merge = list(range(loc - 1, loc + 2))
  738. elif n_rows == -5:
  739. locs_to_merge = list(range(loc - 1, loc + 4))
  740. else:
  741. locs_to_merge = list(range(loc, loc + n_rows))
  742. rows_to_merge = data.iloc[locs_to_merge]
  743. indices_to_merge = rows_to_merge.index
  744. # join the three rows
  745. new_row = rows_to_merge.agg(' '.join)
  746. # replace the double spaces that are created
  747. # must be done here and not at the end as splits are not always
  748. # the same and join would produce different col values
  749. new_row = new_row.str.replace(" ", " ")
  750. new_row = new_row.str.replace("N O", "NO")
  751. new_row = new_row.str.replace(", N", ",N")
  752. new_row = new_row.str.replace("- ", "-")
  753. data.loc[indices_to_merge[0]] = new_row
  754. data = data.drop(indices_to_merge[1:])
  755. return data