read.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. """read data set"""
  2. import os
  3. import pathlib
  4. import pandas as pd
  5. import primap2 as pm2 # type: ignore
  6. from faostat_data_primap.helper.country_mapping import country_to_iso3_mapping
  7. from faostat_data_primap.helper.definitions import (
  8. config_to_if,
  9. read_config_all,
  10. )
  11. from faostat_data_primap.helper.paths import (
  12. downloaded_data_path,
  13. extracted_data_path,
  14. )
  15. def get_all_domains(downloaded_data_path: pathlib.Path) -> list[str]:
  16. """
  17. Get a list of all available domains.
  18. Parameters
  19. ----------
  20. downloaded_data_path
  21. The path to the downloaded data sets.
  22. Returns
  23. -------
  24. All domains in the downloaded data directory.
  25. """
  26. return [
  27. domain
  28. for domain in os.listdir(downloaded_data_path)
  29. if (downloaded_data_path / domain).is_dir()
  30. ]
  31. def get_latest_release(domain_path: pathlib.Path) -> str:
  32. """
  33. Get the latest release in a domain directory.
  34. Parameters
  35. ----------
  36. domain_path
  37. The path to the domain
  38. Returns
  39. -------
  40. Name of the directory with latest data.
  41. """
  42. all_releases = [
  43. release_name
  44. for release_name in os.listdir(domain_path)
  45. if (domain_path / release_name).is_dir()
  46. ]
  47. return sorted(all_releases, reverse=True)[0]
  48. def read_data(
  49. domains_and_releases_to_read: list[tuple[str, str]], save_path: pathlib.Path
  50. ) -> None:
  51. """
  52. Read specified domains and releases and save output files.
  53. Parameters
  54. ----------
  55. domains_and_releases_to_read
  56. The domains and releases to use
  57. save_path
  58. The path to save the data to
  59. """
  60. df_list = []
  61. for domain, release in domains_and_releases_to_read:
  62. read_config = read_config_all[domain][release]
  63. print(f"Read {read_config['filename']}")
  64. dataset_path = downloaded_data_path / domain / release / read_config["filename"]
  65. # There are some non-utf8 characters
  66. df_domain = pd.read_csv(dataset_path, encoding="ISO-8859-1")
  67. # remove rows by element
  68. if "elements_to_remove" in read_config.keys():
  69. df_domain = df_domain[
  70. ~df_domain["Element"].isin(read_config["elements_to_remove"])
  71. ]
  72. # remove rows by area
  73. if "areas_to_remove" in read_config.keys():
  74. df_domain = df_domain[
  75. ~df_domain["Area"].isin(read_config["areas_to_remove"])
  76. ]
  77. # create country columns
  78. df_domain["country (ISO3)"] = df_domain["Area"].map(country_to_iso3_mapping)
  79. # check all countries are converted into iso3 codes
  80. if any(df_domain["country (ISO3)"].isna()):
  81. msg = f"Not all countries are converted into ISO3 codes for {domain}"
  82. raise ValueError(msg)
  83. # create entity column
  84. df_domain["entity"] = df_domain["Element"].map(read_config["entity_mapping"])
  85. # check all entities are mapped
  86. if any(df_domain["entity"].isna()):
  87. msg = f"Not all entities are mapped for {domain}"
  88. raise ValueError(msg)
  89. # create category column (combination of Item and Element works best)
  90. df_domain["category"] = df_domain["Item"] + "-" + df_domain["Element"]
  91. # drop columns we don't need
  92. df_domain = df_domain.drop(
  93. read_config["columns_to_drop"],
  94. axis=1,
  95. )
  96. df_list.append(df_domain)
  97. df_all = pd.concat(df_list, axis=0, join="outer", ignore_index=True)
  98. # some domains don't have Source column or values are empty
  99. if "Source" not in df_all.columns:
  100. df_all["Source"] = "unknown"
  101. else:
  102. df_all["Source"] = df_all["Source"].fillna("unknown")
  103. # Remove the "Y" prefix for the years columns
  104. df_all = df_all.rename(columns=lambda x: x.lstrip("Y") if x.startswith("Y") else x)
  105. # Make sure the units are correct
  106. df_all["Unit"] = df_all["entity"] + " * " + df_all["Unit"] + "/ year"
  107. df_all["Unit"] = df_all["Unit"].replace(read_config_all["replace_units"])
  108. date_last_updated = sorted(
  109. [i[1] for i in domains_and_releases_to_read], reverse=True
  110. )[0]
  111. release_name = f"v{date_last_updated}"
  112. data_if = pm2.pm2io.convert_wide_dataframe_if(
  113. df_all,
  114. coords_cols=config_to_if["coords_cols"],
  115. coords_defaults={
  116. "scenario": release_name,
  117. },
  118. coords_terminologies=config_to_if["coords_terminologies"],
  119. coords_value_mapping=config_to_if["coords_value_mapping"],
  120. filter_keep=config_to_if["filter_keep"],
  121. filter_remove=config_to_if["filter_remove"],
  122. meta_data=config_to_if["meta_data"],
  123. )
  124. # convert to PRIMAP2 native format
  125. data_pm2 = pm2.pm2io.from_interchange_format(data_if, data_if.attrs)
  126. # convert back to IF for standardized units
  127. data_if = data_pm2.pr.to_interchange_format()
  128. # save raw data
  129. output_filename = f"FAOSTAT_Agrifood_system_emissions_{release_name}"
  130. if not save_path.exists():
  131. save_path.mkdir()
  132. output_folder = save_path / release_name
  133. if not output_folder.exists():
  134. output_folder.mkdir()
  135. filepath = output_folder / (output_filename + ".csv")
  136. print(f"Writing primap2 file to {filepath}")
  137. pm2.pm2io.write_interchange_format(
  138. filepath,
  139. data_if,
  140. )
  141. compression = dict(zlib=True, complevel=9)
  142. encoding = {var: compression for var in data_pm2.data_vars}
  143. filepath = output_folder / (output_filename + ".nc")
  144. print(f"Writing netcdf file to {filepath}")
  145. data_pm2.pr.to_netcdf(filepath, encoding=encoding)
  146. # next steps
  147. # convert to IPCC2006_PRIMAP categories
  148. # save final version
  149. def read_latest_data(
  150. downloaded_data_path_custom: pathlib.Path = downloaded_data_path,
  151. save_path: pathlib.Path = extracted_data_path,
  152. ) -> None:
  153. """
  154. Read and save the latest data
  155. Converts downloaded data into interchange format and primap2 native format
  156. and saves the files in the extracted_data directory.
  157. """
  158. domains = get_all_domains(downloaded_data_path_custom)
  159. domains_and_releases_to_read = []
  160. for domain in domains:
  161. domain_path = downloaded_data_path_custom / domain
  162. domains_and_releases_to_read.append((domain, get_latest_release(domain_path)))
  163. read_data(
  164. domains_and_releases_to_read=domains_and_releases_to_read, save_path=save_path
  165. )