123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- """read data set"""
- import os
- import pathlib
- import pandas as pd
- import primap2 as pm2 # type: ignore
- from faostat_data_primap.helper.country_mapping import country_to_iso3_mapping
- from faostat_data_primap.helper.definitions import (
- config_to_if,
- read_config_all,
- )
- from faostat_data_primap.helper.paths import (
- downloaded_data_path,
- extracted_data_path,
- )
- def get_all_domains(downloaded_data_path: pathlib.Path) -> list[str]:
- """
- Get a list of all available domains.
- Parameters
- ----------
- downloaded_data_path
- The path to the downloaded data sets.
- Returns
- -------
- All domains in the downloaded data directory.
- """
- return [
- domain
- for domain in os.listdir(downloaded_data_path)
- if (downloaded_data_path / domain).is_dir()
- ]
- def get_latest_release(domain_path: pathlib.Path) -> str:
- """
- Get the latest release in a domain directory.
- Parameters
- ----------
- domain_path
- The path to the domain
- Returns
- -------
- Name of the directory with latest data.
- """
- all_releases = [
- release_name
- for release_name in os.listdir(domain_path)
- if (domain_path / release_name).is_dir()
- ]
- return sorted(all_releases, reverse=True)[0]
- def read_data(
- domains_and_releases_to_read: list[tuple[str, str]], save_path: pathlib.Path
- ) -> None:
- """
- Read specified domains and releases and save output files.
- Parameters
- ----------
- domains_and_releases_to_read
- The domains and releases to use
- save_path
- The path to save the data to
- """
- df_list = []
- for domain, release in domains_and_releases_to_read:
- read_config = read_config_all[domain][release]
- print(f"Read {read_config['filename']}")
- dataset_path = downloaded_data_path / domain / release / read_config["filename"]
- # There are some non-utf8 characters
- df_domain = pd.read_csv(dataset_path, encoding="ISO-8859-1")
- # remove rows by element
- if "elements_to_remove" in read_config.keys():
- df_domain = df_domain[
- ~df_domain["Element"].isin(read_config["elements_to_remove"])
- ]
- # remove rows by area
- if "areas_to_remove" in read_config.keys():
- df_domain = df_domain[
- ~df_domain["Area"].isin(read_config["areas_to_remove"])
- ]
- # create country columns
- df_domain["country (ISO3)"] = df_domain["Area"].map(country_to_iso3_mapping)
- # check all countries are converted into iso3 codes
- if any(df_domain["country (ISO3)"].isna()):
- msg = f"Not all countries are converted into ISO3 codes for {domain}"
- raise ValueError(msg)
- # create entity column
- df_domain["entity"] = df_domain["Element"].map(read_config["entity_mapping"])
- # check all entities are mapped
- if any(df_domain["entity"].isna()):
- msg = f"Not all entities are mapped for {domain}"
- raise ValueError(msg)
- # create category column (combination of Item and Element works best)
- df_domain["category"] = df_domain["Item"] + "-" + df_domain["Element"]
- # drop columns we don't need
- df_domain = df_domain.drop(
- read_config["columns_to_drop"],
- axis=1,
- )
- df_list.append(df_domain)
- df_all = pd.concat(df_list, axis=0, join="outer", ignore_index=True)
- # some domains don't have Source column or values are empty
- if "Source" not in df_all.columns:
- df_all["Source"] = "unknown"
- else:
- df_all["Source"] = df_all["Source"].fillna("unknown")
- # Remove the "Y" prefix for the years columns
- df_all = df_all.rename(columns=lambda x: x.lstrip("Y") if x.startswith("Y") else x)
- # Make sure the units are correct
- df_all["Unit"] = df_all["entity"] + " * " + df_all["Unit"] + "/ year"
- df_all["Unit"] = df_all["Unit"].replace(read_config_all["replace_units"])
- date_last_updated = sorted(
- [i[1] for i in domains_and_releases_to_read], reverse=True
- )[0]
- release_name = f"v{date_last_updated}"
- data_if = pm2.pm2io.convert_wide_dataframe_if(
- df_all,
- coords_cols=config_to_if["coords_cols"],
- coords_defaults={
- "scenario": release_name,
- },
- coords_terminologies=config_to_if["coords_terminologies"],
- coords_value_mapping=config_to_if["coords_value_mapping"],
- filter_keep=config_to_if["filter_keep"],
- filter_remove=config_to_if["filter_remove"],
- meta_data=config_to_if["meta_data"],
- )
- # convert to PRIMAP2 native format
- data_pm2 = pm2.pm2io.from_interchange_format(data_if, data_if.attrs)
- # convert back to IF for standardized units
- data_if = data_pm2.pr.to_interchange_format()
- # save raw data
- output_filename = f"FAOSTAT_Agrifood_system_emissions_{release_name}"
- if not save_path.exists():
- save_path.mkdir()
- output_folder = save_path / release_name
- if not output_folder.exists():
- output_folder.mkdir()
- filepath = output_folder / (output_filename + ".csv")
- print(f"Writing primap2 file to {filepath}")
- pm2.pm2io.write_interchange_format(
- filepath,
- data_if,
- )
- compression = dict(zlib=True, complevel=9)
- encoding = {var: compression for var in data_pm2.data_vars}
- filepath = output_folder / (output_filename + ".nc")
- print(f"Writing netcdf file to {filepath}")
- data_pm2.pr.to_netcdf(filepath, encoding=encoding)
- # next steps
- # convert to IPCC2006_PRIMAP categories
- # save final version
- def read_latest_data(
- downloaded_data_path_custom: pathlib.Path = downloaded_data_path,
- save_path: pathlib.Path = extracted_data_path,
- ) -> None:
- """
- Read and save the latest data
- Converts downloaded data into interchange format and primap2 native format
- and saves the files in the extracted_data directory.
- """
- domains = get_all_domains(downloaded_data_path_custom)
- domains_and_releases_to_read = []
- for domain in domains:
- domain_path = downloaded_data_path_custom / domain
- domains_and_releases_to_read.append((domain, get_latest_release(domain_path)))
- read_data(
- domains_and_releases_to_read=domains_and_releases_to_read, save_path=save_path
- )
|