functions.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510
  1. import pycountry
  2. import json
  3. import xarray as xr
  4. from copy import deepcopy
  5. from typing import Dict, List
  6. from pathlib import Path
  7. from .definitions import custom_country_mapping, custom_folders
  8. from .definitions import root_path, downloaded_data_path, extracted_data_path
  9. from .definitions import legacy_data_path, code_path
  10. def convert_categories(
  11. ds_input: xr.Dataset,
  12. conversion: Dict[str, Dict[str, str]],
  13. #terminology_from: str,
  14. terminology_to: str,
  15. debug: bool=False,
  16. tolerance: float=0.01,
  17. )->xr.Dataset:
  18. """
  19. convert data from one category terminology to another
  20. """
  21. ds_converted = ds_input.copy(deep=True)
  22. ds_converted.attrs = deepcopy(ds_input.attrs)
  23. # change category terminology
  24. cat_dim = ds_converted.attrs["cat"]
  25. ds_converted.attrs["cat"] = f"category ({terminology_to})"
  26. ds_converted = ds_converted.rename({cat_dim: ds_converted.attrs["cat"]})
  27. # find categories present in dataset
  28. cats_present = list(ds_converted.coords[f'category ({terminology_to})'])
  29. # restrict categories and map category names
  30. if 'mapping' in conversion.keys():
  31. mapping_cats_present = [cat for cat in list(conversion['mapping'].keys()) if
  32. cat in cats_present]
  33. ds_converted = ds_converted.pr.loc[
  34. {'category': mapping_cats_present}]
  35. from_cats = ds_converted.coords[f'category ({terminology_to})'].values
  36. to_cats = pd.Series(from_cats).replace(conversion['mapping'])
  37. ds_converted = ds_converted.assign_coords({f'category ({terminology_to})':
  38. (f'category ({terminology_to})',
  39. to_cats)})
  40. # redo the list of present cats after mapping, as we have new categories in the
  41. # target terminology now
  42. cats_present_mapped = list(ds_converted.coords[f'category ({terminology_to})'])
  43. # aggregate categories
  44. if 'aggregate' in conversion:
  45. aggregate_cats = conversion['aggregate']
  46. for cat_to_agg in aggregate_cats:
  47. if debug:
  48. print(f"Category: {cat_to_agg}")
  49. source_cats = [cat for cat in aggregate_cats[cat_to_agg]['sources'] if
  50. cat in cats_present_mapped]
  51. data_agg = ds_converted.pr.loc[{'category': source_cats}].pr.sum(
  52. dim='category', skipna=True, min_count=1)
  53. nan_vars = [var for var in data_agg.data_vars if
  54. data_agg[var].isnull().all().data == True]
  55. data_agg = data_agg.drop(nan_vars)
  56. if len(data_agg.data_vars) > 0:
  57. data_agg = data_agg.expand_dims([f'category ({terminology_to})'])
  58. data_agg = data_agg.assign_coords(
  59. coords={f'category ({terminology_to})':
  60. (f'category ({terminology_to})', [cat_to_agg])})
  61. ds_converted = ds_converted.pr.merge(data_agg, tolerance=tolerance)
  62. else:
  63. print(f"no data to aggregate category {cat_to_agg}")
  64. return ds_converted
  65. def get_country_name(
  66. country_code: str,
  67. ) -> str:
  68. """get country name from code """
  69. if country_code in custom_country_mapping:
  70. country_name = custom_country_mapping[country_code]
  71. else:
  72. try:
  73. country = pycountry.countries.get(alpha_3=country_code)
  74. country_name = country.name
  75. except:
  76. raise ValueError(f"Country code {country_code} can not be mapped to "
  77. f"any country")
  78. return country_name
  79. def get_country_code(
  80. country_name: str,
  81. )->str:
  82. """
  83. obtain country code. If the input is a code it will be returned,
  84. if the input
  85. is not a three letter code a search will be performed
  86. Parameters
  87. __________
  88. country_name: str
  89. Country code or name to get the three-letter code for.
  90. Returns
  91. -------
  92. country_code: str
  93. """
  94. # First check if it's in the list of custom codes
  95. if country_name in custom_country_mapping:
  96. country_code = country_name
  97. else:
  98. try:
  99. # check if it's a 3 letter UNFCCC_GHG_data
  100. country = pycountry.countries.get(alpha_3=country_name)
  101. country_code = country.alpha_3
  102. except:
  103. try:
  104. country = pycountry.countries.search_fuzzy(country_name.replace("_", " "))
  105. except:
  106. raise ValueError(f"Country name {country_name} can not be mapped to "
  107. f"any country UNFCCC_GHG_data. Try using the ISO3 UNFCCC_GHG_data directly.")
  108. if len(country) > 1:
  109. country_code = None
  110. for current_country in country:
  111. if current_country.name == country_name:
  112. country_code = current_country.alpha_3
  113. if country_code is None:
  114. raise ValueError(f"Country name {country_name} has {len(country)} "
  115. f"possible results for country codes.")
  116. country_code = country[0].alpha_3
  117. return country_code
  118. def create_folder_mapping(
  119. folder: str,
  120. extracted: bool = False
  121. ) -> None:
  122. """
  123. Create a mapping from 3 letter ISO country codes to folders
  124. based on the subfolders of the given folder. The mapping is
  125. stored in 'folder_mapping.json' in the given folder. Folder
  126. must be given relative to the repository root
  127. Parameters
  128. ----------
  129. folder: str
  130. folder to create the mapping for
  131. extracted: bool = False
  132. If true treat the folder as extracted data, where we
  133. only have one folder per country and no typos in the
  134. names
  135. Returns
  136. -------
  137. Nothing
  138. """
  139. folder = root_path / folder
  140. folder_mapping = {}
  141. #if not extracted:
  142. known_folders = custom_folders
  143. #else:
  144. # known_folders = {}
  145. for item in folder.iterdir():
  146. if item.is_dir() and not item.match("__pycache__"):
  147. if item.name in known_folders:
  148. ISO3 = known_folders[item.name]
  149. else:
  150. try:
  151. country = pycountry.countries.search_fuzzy(item.name.replace("_", " "))
  152. if len(country) > 1:
  153. ISO3 = None
  154. for current_country in country:
  155. if current_country.name == item.name.replace("_", " "):
  156. ISO3 = current_country.alpha_3
  157. else:
  158. ISO3 = country[0].alpha_3
  159. except:
  160. ISO3 = None
  161. if ISO3 is None:
  162. print(f"No match for {item.name}")
  163. else:
  164. if ISO3 in folder_mapping.keys():
  165. folder_mapping[ISO3] = [folder_mapping[ISO3], item.name]
  166. else:
  167. folder_mapping[ISO3] = item.name
  168. with open(folder / "folder_mapping.json", "w") as mapping_file:
  169. json.dump(folder_mapping, mapping_file, indent=4)
  170. # TODO add crf
  171. def get_country_submissions(
  172. country_name: str,
  173. print_sub: bool = True,
  174. ) -> Dict[str, List[str]]:
  175. """
  176. Input is a three letter ISO UNFCCC_GHG_data for a country, or the countries name.
  177. The function tries to map the country name to an ISO UNFCCC_GHG_data and then
  178. queries the folder mapping files for folders.
  179. Parameters
  180. ----------
  181. country_name: str
  182. String containing the country name or ISO 3 letter UNFCCC_GHG_data
  183. print_sub: bool
  184. If True information on submissions will be written to stdout
  185. Returns
  186. -------
  187. returns a dict with keys for the dataset classes (e.g. UNFCCC, non-UNFCCC)
  188. Each value is a list of folders
  189. """
  190. data_folder = downloaded_data_path
  191. country_code = get_country_code(country_name)
  192. if print_sub:
  193. print(f"Country name {country_name} maps to ISO code {country_code}")
  194. country_submissions = {}
  195. if print_sub:
  196. print(f"#" * 80)
  197. print(f"The following submissions are available for {country_name}")
  198. for item in data_folder.iterdir():
  199. if item.is_dir():
  200. if print_sub:
  201. print("")
  202. print("-" * 80)
  203. print(f"Data folder {item.name}")
  204. print("-" * 80)
  205. with open(item / "folder_mapping.json", "r") as mapping_file:
  206. folder_mapping = json.load(mapping_file)
  207. if country_code in folder_mapping:
  208. country_folders = folder_mapping[country_code]
  209. if isinstance(country_folders, str):
  210. # only one folder
  211. country_folders = [country_folders]
  212. submission_folders = []
  213. for country_folder in country_folders:
  214. current_folder = item / country_folder
  215. if print_sub:
  216. print(f"Submissions in folder {country_folder}:")
  217. for submission_folder in current_folder.iterdir():
  218. if submission_folder.is_dir():
  219. if print_sub:
  220. print(submission_folder.name)
  221. submission_folders.append(submission_folder.name)
  222. country_submissions[item.name] = submission_folders
  223. else:
  224. print(f"No submissions available for {country_name}.")
  225. return country_submissions
  226. def get_country_datasets(
  227. country_name: str,
  228. print_ds: bool = True,
  229. ) -> Dict[str, List[str]]:
  230. """
  231. Input is a three letter ISO code for a country, or the country's name.
  232. The function tries to map the country name to an ISO UNFCCC_GHG_data and then
  233. checks the UNFCCC_GHG_data and data folders for content on the country.
  234. Parameters
  235. ----------
  236. country_name: str
  237. String containing the country name or ISO 3 letter code
  238. print_ds: bool
  239. If True information on submissions will be written to stdout
  240. Returns
  241. -------
  242. returns a dict with keys for the dataset classes (e.g. UNFCCC, non-UNFCCC)
  243. Each value is a list of folders
  244. """
  245. data_folder = extracted_data_path
  246. data_folder_legacy = legacy_data_path
  247. # obtain country UNFCCC_GHG_data
  248. country_code = get_country_code(country_name)
  249. if print_ds:
  250. print(f"Country name {country_name} maps to ISO code {country_code}")
  251. rep_data = {}
  252. # data
  253. if print_ds:
  254. print(f"#" * 80)
  255. print(f"The following datasets are available for {country_name}")
  256. for item in data_folder.iterdir():
  257. if item.is_dir():
  258. cleaned_datasets_current_folder = {}
  259. if print_ds:
  260. print("-" * 80)
  261. print(f"Data folder {item.name}")
  262. print("-" * 80)
  263. with open(item / "folder_mapping.json", "r") as mapping_file:
  264. folder_mapping = json.load(mapping_file)
  265. if country_code not in folder_mapping:
  266. if print_ds:
  267. print("No data available")
  268. print("")
  269. else:
  270. country_folder = folder_mapping[country_code]
  271. if not isinstance(country_folder, str):
  272. raise ValueError("Wrong data type in folder mapping json file. Should be str.")
  273. datasets_current_folder = {}
  274. current_folder = item / country_folder
  275. for data_file in current_folder.iterdir():
  276. if data_file.suffix in ['.nc', '.yaml', '.csv']:
  277. if data_file.stem in datasets_current_folder:
  278. datasets_current_folder[data_file.stem].append(data_file.suffix)
  279. else:
  280. datasets_current_folder[data_file.stem] = [data_file.suffix]
  281. for dataset in datasets_current_folder:
  282. # process filename to get submission
  283. parts = dataset.split('_')
  284. if parts[0] != country_code:
  285. cleaned_datasets_current_folder[f'Wrong code: {parts[0]}'] =\
  286. dataset
  287. else:
  288. terminology = "_".join(parts[3 : ])
  289. key = f"{parts[1]} ({parts[2]}, {terminology})"
  290. data_info = ""
  291. if '.nc' in datasets_current_folder[dataset]:
  292. data_info = data_info + "NF (.nc), "
  293. if ('.csv' in datasets_current_folder[dataset]) and ('.yaml' in datasets_current_folder[dataset]):
  294. data_info = data_info + "IF (.yaml + .csv), "
  295. elif '.csv' in datasets_current_folder[dataset]:
  296. data_info = data_info + "incomplete IF? (.csv), "
  297. elif '.yaml' in datasets_current_folder[dataset]:
  298. data_info = data_info + "incomplete IF (.yaml), "
  299. code_file = get_code_file(country_code, parts[1])
  300. if code_file:
  301. data_info = data_info + f"code: {code_file.name}"
  302. else:
  303. data_info = data_info + f"code: not found"
  304. cleaned_datasets_current_folder[key] = data_info
  305. if print_ds:
  306. if cleaned_datasets_current_folder:
  307. for country_ds in cleaned_datasets_current_folder:
  308. print(f"{country_ds}: {cleaned_datasets_current_folder[country_ds]}")
  309. else:
  310. print("No data available")
  311. print("")
  312. rep_data[item.name] = cleaned_datasets_current_folder
  313. # legacy data
  314. if print_ds:
  315. print(f"#" * 80)
  316. print(f"The following legacy datasets are available for {country_name}")
  317. legacy_data = {}
  318. for item in data_folder_legacy.iterdir():
  319. if item.is_dir():
  320. cleaned_datasets_current_folder = {}
  321. if print_ds:
  322. print("-" * 80)
  323. print(f"Data folder {item.name}")
  324. print("-" * 80)
  325. with open(item / "folder_mapping.json", "r") as mapping_file:
  326. folder_mapping = json.load(mapping_file)
  327. if country_code not in folder_mapping:
  328. if print_ds:
  329. print("No data available")
  330. print("")
  331. else:
  332. country_folder = folder_mapping[country_code]
  333. if not isinstance(country_folder, str):
  334. raise ValueError("Wrong data type in folder mapping json file. Should be str.")
  335. datasets_current_folder = {}
  336. current_folder = item / country_folder
  337. for data_file in current_folder.iterdir():
  338. if data_file.suffix in ['.nc', '.yaml', '.csv']:
  339. if data_file.stem in datasets_current_folder:
  340. datasets_current_folder[data_file.stem].append(data_file.suffix)
  341. else:
  342. datasets_current_folder[data_file.stem] = [data_file.suffix]
  343. for dataset in datasets_current_folder:
  344. # process filename to get submission
  345. parts = dataset.split('_')
  346. if parts[0] != country_code:
  347. cleaned_datasets_current_folder[f'Wrong UNFCCC_GHG_data: {parts[0]}'] = dataset
  348. else:
  349. terminology = "_".join(parts[3 : ])
  350. key = f"{parts[1]} ({parts[2]}, {terminology}, legacy)"
  351. data_info = ""
  352. if '.nc' in datasets_current_folder[dataset]:
  353. data_info = data_info + "NF (.nc), "
  354. if ('.csv' in datasets_current_folder[dataset]) and ('.yaml' in datasets_current_folder[dataset]):
  355. data_info = data_info + "IF (.yaml + .csv), "
  356. elif '.csv' in datasets_current_folder[dataset]:
  357. data_info = data_info + "incomplete IF? (.csv), "
  358. elif '.yaml' in datasets_current_folder[dataset]:
  359. data_info = data_info + "incomplete IF (.yaml), "
  360. cleaned_datasets_current_folder[key] = data_info
  361. if print_ds:
  362. if cleaned_datasets_current_folder:
  363. for country_ds in cleaned_datasets_current_folder:
  364. print(f"{country_ds}: {cleaned_datasets_current_folder[country_ds]}")
  365. else:
  366. print("No data available")
  367. print("")
  368. legacy_data[item.name] = cleaned_datasets_current_folder
  369. all_data = {
  370. "rep_data": rep_data,
  371. "legacy_data": legacy_data,
  372. }
  373. return all_data
  374. def get_code_file(
  375. country_name: str,
  376. submission: str,
  377. print_info: bool = False,
  378. ) -> Path:
  379. """
  380. For given country name and submission find the script that creates the data
  381. Parameters
  382. ----------
  383. country_name: str
  384. String containing the country name or ISO 3 letter UNFCCC_GHG_data
  385. submission: str
  386. String of the submission
  387. print_info: bool = False
  388. If True print information on UNFCCC_GHG_data found
  389. Returns
  390. -------
  391. returns a pathlib Path object for the UNFCCC_GHG_data file
  392. """
  393. code_file_path = None
  394. UNFCCC_reader_path = code_path / "UNFCCC_reader"
  395. # CRF is an exception as it's read using the UNFCCC_CRF_reader module
  396. # so we return the path to that.
  397. if submission[0:3] == "CRF":
  398. return root_path / "UNFCCC_CRF_reader"
  399. if submission[0:2] == "DI":
  400. return root_path / "UNFCCC_DI_reader"
  401. # obtain country UNFCCC_GHG_data
  402. country_code = get_country_code(country_name)
  403. if print_info:
  404. print(f"Country name {country_name} maps to ISO UNFCCC_GHG_data {country_code}")
  405. with open(UNFCCC_reader_path / "folder_mapping.json", "r") as mapping_file:
  406. folder_mapping = json.load(mapping_file)
  407. if country_code not in folder_mapping:
  408. if print_info:
  409. print("No UNFCCC_GHG_data available")
  410. print("")
  411. else:
  412. country_folder = UNFCCC_reader_path / folder_mapping[country_code]
  413. code_file_name_candidate = "read_" + country_code + "_" + submission + "*"
  414. for file in country_folder.iterdir():
  415. if file.match(code_file_name_candidate):
  416. if code_file_path is not None:
  417. raise ValueError(f"Found multiple UNFCCC_GHG_data candidates: "
  418. f"{code_file_path} and file.name. "
  419. f"Please use only one file with name "
  420. f"'read_ISO3_submission_XXX.YYY'.")
  421. else:
  422. if print_info:
  423. print(f"Found UNFCCC_GHG_data file {file.relative_to(root_path)}")
  424. code_file_path = file
  425. if code_file_path is not None:
  426. return code_file_path.relative_to(root_path)
  427. else:
  428. return None