read.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. """read data set"""
  2. import os
  3. import pathlib
  4. import climate_categories as cc
  5. import pandas as pd
  6. import primap2 as pm2 # type: ignore
  7. import xarray
  8. import xarray as xr
  9. from faostat_data_primap.helper.category_aggregation import (
  10. agg_info_fao,
  11. agg_info_ipcc2006_primap_CH4,
  12. agg_info_ipcc2006_primap_CO2,
  13. agg_info_ipcc2006_primap_N2O,
  14. )
  15. from faostat_data_primap.helper.country_mapping import country_to_iso3_mapping
  16. from faostat_data_primap.helper.definitions import (
  17. config_to_if,
  18. read_config_all,
  19. )
  20. from faostat_data_primap.helper.paths import (
  21. downloaded_data_path,
  22. extracted_data_path,
  23. root_path,
  24. )
  25. def get_all_domains(downloaded_data_path: pathlib.Path) -> list[str]:
  26. """
  27. Get a list of all available domains.
  28. Parameters
  29. ----------
  30. downloaded_data_path
  31. The path to the downloaded data sets.
  32. Returns
  33. -------
  34. All domains in the downloaded data directory.
  35. """
  36. return [
  37. domain
  38. for domain in os.listdir(downloaded_data_path)
  39. if (downloaded_data_path / domain).is_dir()
  40. ]
  41. def get_latest_release(domain_path: pathlib.Path) -> str:
  42. """
  43. Get the latest release in a domain directory.
  44. Parameters
  45. ----------
  46. domain_path
  47. The path to the domain
  48. Returns
  49. -------
  50. Name of the directory with latest data.
  51. """
  52. all_releases = [
  53. release_name
  54. for release_name in os.listdir(domain_path)
  55. if (domain_path / release_name).is_dir()
  56. ]
  57. return sorted(all_releases, reverse=True)[0]
  58. # TODO split out functions to avoid PLR0915
  59. def read_data( # noqa: PLR0915 PLR0912
  60. read_path: pathlib.Path,
  61. domains_and_releases_to_read: list[tuple[str, str]],
  62. save_path: pathlib.Path,
  63. ) -> None:
  64. """
  65. Read specified domains and releases and save output files.
  66. Parameters
  67. ----------
  68. read_path
  69. Where to look for the downloaded data
  70. domains_and_releases_to_read
  71. The domains and releases to use
  72. save_path
  73. The path to save the data to
  74. """
  75. df_list = []
  76. for domain, release in domains_and_releases_to_read:
  77. read_config = read_config_all[domain][release]
  78. print(f"Read {read_config['filename']}")
  79. dataset_path = read_path / domain / release / read_config["filename"]
  80. # There are some non-utf8 characters
  81. df_domain = pd.read_csv(dataset_path, encoding="ISO-8859-1")
  82. if "items_to_remove" in read_config.keys():
  83. df_domain = df_domain[
  84. ~df_domain["Item"].isin(read_config["items_to_remove"])
  85. ]
  86. # remove rows by element
  87. if "elements_to_remove" in read_config.keys():
  88. df_domain = df_domain[
  89. ~df_domain["Element"].isin(read_config["elements_to_remove"])
  90. ]
  91. # remove rows by area
  92. if "areas_to_remove" in read_config.keys():
  93. df_domain = df_domain[
  94. ~df_domain["Area"].isin(read_config["areas_to_remove"])
  95. ]
  96. # create country columns
  97. df_domain["country (ISO3)"] = df_domain["Area"].map(country_to_iso3_mapping)
  98. # check all countries are converted into iso3 codes
  99. if any(df_domain["country (ISO3)"].isna()):
  100. msg = f"Not all countries are converted into ISO3 codes for {domain}"
  101. raise ValueError(msg)
  102. # create entity column
  103. df_domain["entity"] = df_domain["Element"].map(read_config["entity_mapping"])
  104. # check all entities are mapped
  105. if any(df_domain["entity"].isna()):
  106. msg = f"Not all entities are mapped for {domain}"
  107. raise ValueError(msg)
  108. # create category column (combination of Item and Element works best)
  109. df_domain["Item - Element"] = df_domain["Item"] + " - " + df_domain["Element"]
  110. if "category_mapping_item_element" in read_config.keys():
  111. df_domain["category"] = df_domain["Item - Element"].map(
  112. read_config["category_mapping_item_element"]
  113. )
  114. # sometimes there are too many categories per domain to write
  115. # everything in the config file
  116. # TODO we could do this for crops as well, but it's not necessary
  117. if ("category_mapping_element" in read_config.keys()) and (
  118. "category_mapping_item" in read_config.keys()
  119. ):
  120. # split steps for easier debugging
  121. df_domain["mapped_item"] = df_domain["Item"].map(
  122. read_config["category_mapping_item"]
  123. )
  124. df_domain["mapped_element"] = df_domain["Element"].map(
  125. read_config["category_mapping_element"]
  126. )
  127. if "category" in df_domain.columns:
  128. df_domain["category_1"] = (
  129. df_domain["mapped_item"] + df_domain["mapped_element"]
  130. )
  131. df_domain["category"] = df_domain["category"].fillna(
  132. df_domain["category_1"]
  133. )
  134. df_domain = df_domain.drop(
  135. labels=["category_1"],
  136. axis=1,
  137. )
  138. else:
  139. df_domain["category"] = (
  140. df_domain["mapped_item"] + df_domain["mapped_element"]
  141. )
  142. df_domain = df_domain.drop(
  143. labels=[
  144. "mapped_item",
  145. "mapped_element",
  146. ],
  147. axis=1,
  148. )
  149. # some rows can only be removed by Item - Element column
  150. if "items-elements_to_remove" in read_config.keys():
  151. df_domain = df_domain[
  152. ~df_domain["Item - Element"].isin(
  153. read_config["items-elements_to_remove"]
  154. )
  155. ]
  156. # else:
  157. # msg = f"Could not find mapping for {domain=}."
  158. # raise ValueError(msg)
  159. # drop combined item - element columns
  160. df_domain = df_domain.drop(
  161. labels=[
  162. "Item - Element",
  163. ],
  164. axis=1,
  165. )
  166. # check if all Item-Element combinations are now converted to category codes
  167. fao_categories = list(cc.FAO.df.index)
  168. unknown_categories = [
  169. i for i in df_domain.category.unique() if i not in fao_categories
  170. ]
  171. if unknown_categories:
  172. msg = (
  173. f"Not all categories are part of FAO categorisation. "
  174. f"Check mapping for {unknown_categories} in domain {domain}"
  175. )
  176. raise ValueError(msg)
  177. # drop columns we don't need
  178. df_domain = df_domain.drop(
  179. read_config["columns_to_drop"],
  180. axis=1,
  181. )
  182. df_list.append(df_domain)
  183. df_all = pd.concat(df_list, axis=0, join="outer", ignore_index=True)
  184. # some domains don't have Source column or values are empty
  185. # we assume these values come from FAO
  186. # TODO Better not to hard-code this in case the label changes
  187. if "Source" not in df_all.columns:
  188. df_all["Source"] = "FAO TIER 1"
  189. else:
  190. df_all["Source"] = df_all["Source"].fillna("FAO TIER 1")
  191. # Remove the "Y" prefix for the years columns
  192. df_all = df_all.rename(columns=lambda x: x.lstrip("Y") if x.startswith("Y") else x)
  193. # Make sure the units are correct
  194. df_all["Unit"] = df_all["entity"] + " * " + df_all["Unit"] + "/ year"
  195. df_all["Unit"] = df_all["Unit"].replace(read_config_all["replace_units"])
  196. date_last_updated = sorted(
  197. [i[1] for i in domains_and_releases_to_read], reverse=True
  198. )[0]
  199. release_name = f"v{date_last_updated}"
  200. data_if = pm2.pm2io.convert_wide_dataframe_if(
  201. df_all,
  202. coords_cols=config_to_if["coords_cols"],
  203. coords_defaults={
  204. "scenario": release_name,
  205. },
  206. coords_terminologies=config_to_if["coords_terminologies"],
  207. coords_value_mapping=config_to_if["coords_value_mapping"],
  208. filter_keep=config_to_if["filter_keep"],
  209. filter_remove=config_to_if["filter_remove"],
  210. meta_data=config_to_if["meta_data"],
  211. )
  212. # convert to PRIMAP2 native format
  213. data_pm2 = pm2.pm2io.from_interchange_format(data_if, data_if.attrs)
  214. # convert back to IF for standardized units
  215. data_if = data_pm2.pr.to_interchange_format()
  216. # save raw data
  217. output_filename = f"FAOSTAT_Agrifood_system_emissions_{release_name}_raw"
  218. if not save_path.exists():
  219. save_path.mkdir()
  220. output_folder = save_path / release_name
  221. if not output_folder.exists():
  222. output_folder.mkdir()
  223. filepath = output_folder / (output_filename + ".csv")
  224. print(f"Writing raw primap2 file to {filepath}")
  225. pm2.pm2io.write_interchange_format(
  226. filepath,
  227. data_if,
  228. )
  229. compression = dict(zlib=True, complevel=9)
  230. encoding = {var: compression for var in data_pm2.data_vars}
  231. filepath = output_folder / (output_filename + ".nc")
  232. print(f"Writing netcdf file to {filepath}")
  233. data_pm2.pr.to_netcdf(filepath, encoding=encoding)
  234. # process data - conversion and category aggregation
  235. # todo variable naming
  236. result_proc = process(data_pm2)
  237. # save processed data
  238. result_proc_if = result_proc.pr.to_interchange_format()
  239. output_filename = f"FAOSTAT_Agrifood_system_emissions_{release_name}"
  240. # output_folder = extracted_data_path / release_name
  241. if not output_folder.exists():
  242. output_folder.mkdir()
  243. filepath = output_folder / (output_filename + ".csv")
  244. print(f"Writing processed primap2 file to {filepath}")
  245. pm2.pm2io.write_interchange_format(
  246. filepath,
  247. result_proc_if,
  248. )
  249. compression = dict(zlib=True, complevel=9)
  250. encoding = {var: compression for var in result_proc.data_vars}
  251. filepath = output_folder / (output_filename + ".nc")
  252. print(f"Writing netcdf file to {filepath}")
  253. result_proc.pr.to_netcdf(filepath, encoding=encoding)
  254. def process(ds: xarray.Dataset) -> xarray.Dataset:
  255. """
  256. Process dataset.
  257. Perform the conversion from FAO to IPCC2006_PRIMAP categories
  258. and aggregate categories.
  259. Parameters
  260. ----------
  261. ds
  262. The data set to process.
  263. Returns
  264. -------
  265. The processed dataset
  266. """
  267. # make categorisation A from yaml
  268. categorisation_a = cc.FAO
  269. # make categorisation B from yaml
  270. categorisation_b = cc.IPCC2006_PRIMAP
  271. # category FAOSTAT not yet part of climate categories, so we need to add it manually
  272. cats = {
  273. "FAO": categorisation_a,
  274. "IPCC2006_PRIMAP": categorisation_b,
  275. }
  276. # drop UNFCCC data
  277. ds = ds.drop_sel(source="UNFCCC")
  278. # consistency check in original categorisation
  279. ds_checked = ds.pr.add_aggregates_coordinates(agg_info=agg_info_fao) # noqa: F841
  280. # We need a conversion CSV file for each entity
  281. # That's a temporary workaround until the filter function in climate categories works
  282. conv = {}
  283. gases = ["CO2", "CH4", "N2O"]
  284. for var in gases:
  285. conversion_path = root_path / f"conv_FAO_IPPCC2006_PRIMAP_{var}.csv"
  286. conv[var] = cc.Conversion.from_csv(
  287. conversion_path,
  288. cats=cats, # type: ignore
  289. )
  290. # convert for each entity
  291. da_dict = {}
  292. for var in gases:
  293. da_dict[var] = ds[var].pr.convert(
  294. dim="category (FAO)",
  295. conversion=conv[var],
  296. )
  297. result = xr.Dataset(da_dict)
  298. result.attrs = ds.attrs
  299. result.attrs["cat"] = "category (IPCC2006_PRIMAP)"
  300. # convert to interchange format and back to get rid of empty categories
  301. # TODO there may be a better way to do this
  302. result_if = result.pr.to_interchange_format()
  303. result = pm2.pm2io.from_interchange_format(result_if)
  304. # aggregation for each gas for better understanding
  305. # TODO creates some duplicate code, we can combine again later
  306. result_proc = result.pr.add_aggregates_coordinates(
  307. agg_info=agg_info_ipcc2006_primap_N2O
  308. )
  309. result_proc = result_proc.pr.add_aggregates_coordinates(
  310. agg_info=agg_info_ipcc2006_primap_CO2
  311. )
  312. result_proc = result_proc.pr.add_aggregates_coordinates(
  313. agg_info=agg_info_ipcc2006_primap_CH4
  314. )
  315. return result_proc # type: ignore
  316. def read_latest_data(
  317. downloaded_data_path_custom: pathlib.Path = downloaded_data_path,
  318. save_path: pathlib.Path = extracted_data_path,
  319. ) -> None:
  320. """
  321. Read and save the latest data
  322. Converts downloaded data into interchange format and primap2 native format
  323. and saves the files in the extracted_data directory.
  324. """
  325. domains = get_all_domains(downloaded_data_path_custom)
  326. domains_and_releases_to_read = []
  327. for domain in domains:
  328. domain_path = downloaded_data_path_custom / domain
  329. domains_and_releases_to_read.append((domain, get_latest_release(domain_path)))
  330. read_data(
  331. read_path=downloaded_data_path_custom,
  332. domains_and_releases_to_read=domains_and_releases_to_read,
  333. save_path=save_path,
  334. )