123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810 |
- import pycountry
- import json
- import re
- import xarray as xr
- import pandas as pd
- import numpy as np
- from datetime import date
- from copy import deepcopy
- from typing import Dict, List, Optional
- from pathlib import Path
- from .definitions import custom_country_mapping, custom_folders
- from .definitions import root_path, downloaded_data_path, extracted_data_path
- from .definitions import legacy_data_path, code_path
- def process_data_for_country(
- data_country: xr.Dataset,
- entities_to_ignore: List[str],
- gas_baskets: Dict[str, List[str]],
- filter_dims: Optional[Dict[str, List[str]]] = None,
- cat_terminology_out: Optional[str] = None,
- category_conversion: Dict[str, Dict] = None,
- sectors_out: List[str] = None,
- processing_info_country: Dict = None,
- ) -> xr.Dataset:
- """
- Process data from DI interface (where necessary).
- * Downscaling including subtraction of time series
- * country specific sector aggregation
- * Conversion to IPCC2006 categories
- * general sector and gas basket aggregation (in new categories)
- """
- # 0: gather information
- countries = list(data_country.coords[data_country.attrs['area']].values)
- if len(countries) > 1:
- raise ValueError(
- f"Found {len(countries)} countries. Only single country data "
- f"can be processed by this function. countries: {countries}")
- else:
- country_code = countries[0]
- # get category terminology
- cat_col = data_country.attrs['cat']
- temp = re.findall(r'\((.*)\)', cat_col)
- cat_terminology_in = temp[0]
- # get scenario
- scenarios = list(data_country.coords[data_country.attrs['scen']].values)
- if len(scenarios) > 1:
- raise ValueError(
- f"Found {len(scenarios)} scenarios. Only single scenario data "
- f"can be processed by this function. Scenarios: {scenarios}")
- scenario = scenarios[0]
- # get source
- sources = list(data_country.coords['source'].values)
- if len(sources) > 1:
- raise ValueError(
- f"Found {len(sources)} sources. Only single source data "
- f"can be processed by this function. Sources: {sources}")
- source = sources[0]
- # check if category name column present
- # TODO: replace 'name' in config by 'additional_cols' dict that defines the cols
- # and the values
- if 'orig_cat_name' in data_country.coords:
- cat_name_present = True
- else:
- cat_name_present = False
- # 1: general processing
- # remove unused cats
- data_country = data_country.dropna(f'category ({cat_terminology_in})', how='all')
- # remove unused years
- data_country = data_country.dropna(f'time', how='all')
- # remove variables only containing nan
- nan_vars_country = [var for var in data_country.data_vars if
- data_country[var].isnull().all().data is True]
- print(f"removing all-nan variables: {nan_vars_country}")
- data_country = data_country.drop_vars(nan_vars_country)
- # remove unnecessary variables
- entities_ignore_present = [entity for entity in entities_to_ignore if
- entity in data_country.data_vars]
- data_country = data_country.drop_vars(entities_ignore_present)
- # filter ()
- if filter_dims is not None:
- data_country = data_country.pr.loc[filter_dims]
- # 2: country specific processing
- if processing_info_country is not None:
- if 'tolerance' in processing_info_country:
- tolerance = processing_info_country["tolerance"]
- else:
- tolerance = 0.01
- # remove entities if needed
- if 'ignore_entities' in processing_info_country:
- entities_to_ignore_country = processing_info_country[
- 'ignore_entities']
- entities_ignore_present = \
- [entity for entity in entities_to_ignore_country if
- entity in data_country.data_vars]
- data_country = data_country.drop_vars(entities_ignore_present)
- # take only desired years
- if 'years' in processing_info_country:
- data_country = data_country.pr.loc[
- {'time': processing_info_country['years']}]
- # remove timeseries if desired
- if 'remove_ts' in processing_info_country:
- for case in processing_info_country['remove_ts']:
- remove_info = processing_info_country['remove_ts'][case]
- entities = remove_info.pop("entities")
- for entity in entities:
- data_country[entity].pr.loc[remove_info] = \
- data_country[entity].pr.loc[remove_info] * np.nan
- # remove all data for given years if necessary
- if 'remove_years' in processing_info_country:
- data_country = data_country.drop_sel(
- time=processing_info_country['remove_years'])
- # subtract categories
- if 'subtract_cats' in processing_info_country:
- subtract_cats_current = processing_info_country['subtract_cats']
- if 'entities' in subtract_cats_current.keys():
- entities_current = subtract_cats_current['entities']
- else:
- entities_current = list(data_country.data_vars)
- print(f"Subtracting categories for country {country_code}, entities "
- f"{entities_current}")
- for cat_to_generate in subtract_cats_current:
- cats_to_subtract = \
- subtract_cats_current[cat_to_generate]['subtract']
- data_sub = \
- data_country.pr.loc[{'category': cats_to_subtract}].pr.sum(
- dim='category', skipna=True, min_count=1)
- data_parent = data_country.pr.loc[
- {'category': subtract_cats_current[cat_to_generate]['parent']}]
- data_agg = data_parent - data_sub
- nan_vars = [var for var in data_agg.data_vars if
- data_agg[var].isnull().all().data is True]
- data_agg = data_agg.drop(nan_vars)
- if len(data_agg.data_vars) > 0:
- print(f"Generating {cat_to_generate} through subtraction")
- data_agg = data_agg.expand_dims([f'category ('
- f'{cat_terminology_in})'])
- data_agg = data_agg.assign_coords(
- coords={f'category ({cat_terminology_in})':
- (f'category ({cat_terminology_in})',
- [cat_to_generate])})
- if cat_name_present:
- cat_name = subtract_cats_current[cat_to_generate]['name']
- data_agg = data_agg.assign_coords(
- coords={'orig_cat_name':
- (f'category ({cat_terminology_in})',
- [cat_name])})
- data_country = data_country.pr.merge(data_agg,
- tolerance=tolerance)
- else:
- print(f"no data to generate category {cat_to_generate}")
- # downscaling
- if 'downscale' in processing_info_country:
- if 'sectors' in processing_info_country['downscale']:
- sector_downscaling = \
- processing_info_country['downscale']['sectors']
- for case in sector_downscaling.keys():
- print(f"Downscaling for {case}.")
- sector_downscaling_current = sector_downscaling[case]
- entities = sector_downscaling_current.pop('entities')
- for entity in entities:
- data_country[entity] = data_country[
- entity].pr.downscale_timeseries(
- **sector_downscaling_current)
- # , skipna_evaluation_dims=None)
- if 'entities' in processing_info_country['downscale']:
- entity_downscaling = \
- processing_info_country['downscale']['entities']
- for case in entity_downscaling.keys():
- print(f"Downscaling for {case}.")
- # print(data_country.coords[f'category ('
- # f'{cat_terminology_in})'].values)
- data_country = data_country.pr.downscale_gas_timeseries(
- **entity_downscaling[case], skipna=True,
- skipna_evaluation_dims=None)
- # aggregate categories
- if 'aggregate_cats' in processing_info_country:
- if 'agg_tolerance' in processing_info_country:
- agg_tolerance = processing_info_country['agg_tolerance']
- else:
- agg_tolerance = tolerance
- aggregate_cats_current = processing_info_country['aggregate_cats']
- print(
- f"Aggregating categories for country {country_code}, source {source}, "
- f"scenario {scenario}")
- for cat_to_agg in aggregate_cats_current:
- print(f"Category: {cat_to_agg}")
- source_cats = aggregate_cats_current[cat_to_agg]['sources']
- data_agg = data_country.pr.loc[{'category': source_cats}].pr.sum(
- dim='category', skipna=True, min_count=1)
- nan_vars = [var for var in data_agg.data_vars if
- data_agg[var].isnull().all().data is True]
- data_agg = data_agg.drop(nan_vars)
- if len(data_agg.data_vars) > 0:
- data_agg = data_agg.expand_dims([f'category ('
- f'{cat_terminology_in})'])
- data_agg = data_agg.assign_coords(
- coords={f'category ({cat_terminology_in})':
- (f'category ({cat_terminology_in})',
- [cat_to_agg])})
- if cat_name_present:
- cat_name = aggregate_cats_current[cat_to_agg]['name']
- data_agg = data_agg.assign_coords(
- coords={'orig_cat_name':
- (f'category ({cat_terminology_in})',
- [cat_name])})
- data_country = data_country.pr.merge(data_agg,
- tolerance=agg_tolerance)
- else:
- print(f"no data to aggregate category {cat_to_agg}")
- # aggregate gases if desired
- if 'aggregate_gases' in processing_info_country:
- # TODO: why use different code here than below. Can this fill non-existen
- # gas baskets?
- for case in processing_info_country['aggregate_gases'].keys():
- case_info = processing_info_country['aggregate_gases'][case]
- data_country[case_info['basket']] = \
- data_country.pr.fill_na_gas_basket_from_contents(
- **case_info)
- # 3: map categories
- if category_conversion is not None:
- data_country = convert_categories(
- data_country,
- category_conversion,
- cat_terminology_out,
- debug=False,
- tolerance=0.01,
- )
- else:
- cat_terminology_out = cat_terminology_in
- # more general processing
- # reduce categories to output cats
- if sectors_out is not None:
- cats_to_keep = [cat for cat in
- data_country.coords[f'category ({cat_terminology_out})'].values
- if cat in sectors_out]
- data_country = data_country.pr.loc[{'category': cats_to_keep}]
- # create gas baskets
- entities_present = set(data_country.data_vars)
- for basket in gas_baskets.keys():
- basket_contents_present = [gas for gas in gas_baskets[basket] if
- gas in entities_present]
- if len(basket_contents_present) > 0:
- if basket in list(data_country.data_vars):
- data_country[basket] = data_country.pr.fill_na_gas_basket_from_contents(
- basket=basket, basket_contents=basket_contents_present,
- skipna=True, min_count=1)
- else:
- try:
- #print(data_country.data_vars)
- data_country[basket] = xr.full_like(data_country["CO2"],
- np.nan).pr.quantify(
- units="Gg CO2 / year")
- data_country[basket].attrs = {"entity": basket.split(' ')[0],
- "gwp_context": basket.split(' ')[1][
- 1:-1]}
- data_country[basket] = data_country.pr.gas_basket_contents_sum(
- basket=basket, basket_contents=basket_contents_present,
- min_count=1)
- entities_present.add(basket)
- except Exception as ex:
- print(f"No gas basket created for {country_code}, {source}, "
- f"{scenario}: {ex}")
- # amend title and comment
- data_country.attrs["comment"] = data_country.attrs["comment"] + f" Processed on " \
- f"{date.today()}"
- data_country.attrs["title"] = data_country.attrs["title"] + f" Processed on " \
- f"{date.today()}"
- return data_country
- def convert_categories(
- ds_input: xr.Dataset,
- conversion: Dict[str, Dict[str, str]],
- #terminology_from: str,
- terminology_to: str,
- debug: bool=False,
- tolerance: float=0.01,
- )->xr.Dataset:
- """
- convert data from one category terminology to another
- """
- print(f"converting categories to {terminology_to}")
- if 'orig_cat_name' in ds_input.coords:
- cat_name_present = True
- else:
- cat_name_present = False
- ds_converted = ds_input.copy(deep=True)
- ds_converted.attrs = deepcopy(ds_input.attrs)
- # TODO: change attrs for additional coordinates
- # change category terminology
- cat_dim = ds_converted.attrs["cat"]
- ds_converted.attrs["cat"] = f"category ({terminology_to})"
- ds_converted = ds_converted.rename({cat_dim: ds_converted.attrs["cat"]})
- # find categories present in dataset
- cats_present = list(ds_converted.coords[f'category ({terminology_to})'])
- # restrict categories and map category names
- if 'mapping' in conversion.keys():
- mapping_cats_present = [cat for cat in list(conversion['mapping'].keys()) if
- cat in cats_present]
- ds_converted = ds_converted.pr.loc[
- {'category': mapping_cats_present}]
- from_cats = ds_converted.coords[f'category ({terminology_to})'].values
- to_cats = pd.Series(from_cats).replace(conversion['mapping'])
- ds_converted = ds_converted.assign_coords({f'category ({terminology_to})':
- (f'category ({terminology_to})',
- to_cats)})
- # redo the list of present cats after mapping, as we have new categories in the
- # target terminology now
- cats_present_mapped = list(ds_converted.coords[f'category ({terminology_to})'])
- # aggregate categories
- if 'aggregate' in conversion:
- aggregate_cats = conversion['aggregate']
- for cat_to_agg in aggregate_cats:
- if debug:
- print(f"Category: {cat_to_agg}")
- source_cats = [cat for cat in aggregate_cats[cat_to_agg]['sources'] if
- cat in cats_present_mapped]
- if debug:
- print(source_cats)
- data_agg = ds_converted.pr.loc[{'category': source_cats}].pr.sum(
- dim='category', skipna=True, min_count=1)
- nan_vars = [var for var in data_agg.data_vars if
- data_agg[var].isnull().all().data == True]
- data_agg = data_agg.drop(nan_vars)
- if len(data_agg.data_vars) > 0:
- data_agg = data_agg.expand_dims([f'category ({terminology_to})'])
- data_agg = data_agg.assign_coords(
- coords={f'category ({terminology_to})':
- (f'category ({terminology_to})', [cat_to_agg])})
- if cat_name_present:
- data_agg = data_agg.assign_coords(
- coords={'orig_cat_name':
- (f'category ({terminology_to})',
- [aggregate_cats[cat_to_agg]['name']])})
- ds_converted = ds_converted.pr.merge(data_agg, tolerance=tolerance)
- cats_present_mapped.append(cat_to_agg)
- else:
- print(f"no data to aggregate category {cat_to_agg}")
- return ds_converted
- def get_country_name(
- country_code: str,
- ) -> str:
- """get country name from code """
- if country_code in custom_country_mapping:
- country_name = custom_country_mapping[country_code]
- else:
- try:
- country = pycountry.countries.get(alpha_3=country_code)
- country_name = country.name
- except:
- raise ValueError(f"Country code {country_code} can not be mapped to "
- f"any country")
- return country_name
- def get_country_code(
- country_name: str,
- )->str:
- """
- obtain country code. If the input is a code it will be returned,
- if the input
- is not a three letter code a search will be performed
- Parameters
- __________
- country_name: str
- Country code or name to get the three-letter code for.
- Returns
- -------
- country_code: str
- """
- # First check if it's in the list of custom codes
- if country_name in custom_country_mapping:
- country_code = country_name
- else:
- try:
- # check if it's a 3 letter UNFCCC_GHG_data
- country = pycountry.countries.get(alpha_3=country_name)
- country_code = country.alpha_3
- except:
- try:
- country = pycountry.countries.search_fuzzy(country_name.replace("_", " "))
- except:
- raise ValueError(f"Country name {country_name} can not be mapped to "
- f"any country UNFCCC_GHG_data. Try using the ISO3 UNFCCC_GHG_data directly.")
- if len(country) > 1:
- country_code = None
- for current_country in country:
- if current_country.name == country_name:
- country_code = current_country.alpha_3
- if country_code is None:
- raise ValueError(f"Country name {country_name} has {len(country)} "
- f"possible results for country codes.")
- country_code = country[0].alpha_3
- return country_code
- def create_folder_mapping(
- folder: str,
- extracted: bool = False
- ) -> None:
- """
- Create a mapping from 3 letter ISO country codes to folders
- based on the subfolders of the given folder. The mapping is
- stored in 'folder_mapping.json' in the given folder. Folder
- must be given relative to the repository root
- Parameters
- ----------
- folder: str
- folder to create the mapping for
- extracted: bool = False
- If true treat the folder as extracted data, where we
- only have one folder per country and no typos in the
- names
- Returns
- -------
- Nothing
- """
- folder = root_path / folder
- folder_mapping = {}
- #if not extracted:
- known_folders = custom_folders
- #else:
- # known_folders = {}
- for item in folder.iterdir():
- if item.is_dir() and not item.match("__pycache__"):
- if item.name in known_folders:
- ISO3 = known_folders[item.name]
- else:
- try:
- country = pycountry.countries.search_fuzzy(item.name.replace("_", " "))
- if len(country) > 1:
- ISO3 = None
- for current_country in country:
- if current_country.name == item.name.replace("_", " "):
- ISO3 = current_country.alpha_3
- else:
- ISO3 = country[0].alpha_3
- except:
- ISO3 = None
- if ISO3 is None:
- print(f"No match for {item.name}")
- else:
- if ISO3 in folder_mapping.keys():
- folder_mapping[ISO3] = [folder_mapping[ISO3], item.name]
- else:
- folder_mapping[ISO3] = item.name
- with open(folder / "folder_mapping.json", "w") as mapping_file:
- json.dump(folder_mapping, mapping_file, indent=4)
- # TODO add crf
- def get_country_submissions(
- country_name: str,
- print_sub: bool = True,
- ) -> Dict[str, List[str]]:
- """
- Input is a three letter ISO UNFCCC_GHG_data for a country, or the countries name.
- The function tries to map the country name to an ISO UNFCCC_GHG_data and then
- queries the folder mapping files for folders.
- Parameters
- ----------
- country_name: str
- String containing the country name or ISO 3 letter UNFCCC_GHG_data
- print_sub: bool
- If True information on submissions will be written to stdout
- Returns
- -------
- returns a dict with keys for the dataset classes (e.g. UNFCCC, non-UNFCCC)
- Each value is a list of folders
- """
- data_folder = downloaded_data_path
- country_code = get_country_code(country_name)
- if print_sub:
- print(f"Country name {country_name} maps to ISO code {country_code}")
- country_submissions = {}
- if print_sub:
- print(f"#" * 80)
- print(f"The following submissions are available for {country_name}")
- for item in data_folder.iterdir():
- if item.is_dir():
- if print_sub:
- print("")
- print("-" * 80)
- print(f"Data folder {item.name}")
- print("-" * 80)
- with open(item / "folder_mapping.json", "r") as mapping_file:
- folder_mapping = json.load(mapping_file)
- if country_code in folder_mapping:
- country_folders = folder_mapping[country_code]
- if isinstance(country_folders, str):
- # only one folder
- country_folders = [country_folders]
- submission_folders = []
- for country_folder in country_folders:
- current_folder = item / country_folder
- if print_sub:
- print(f"Submissions in folder {country_folder}:")
- for submission_folder in current_folder.iterdir():
- if submission_folder.is_dir():
- if print_sub:
- print(submission_folder.name)
- submission_folders.append(submission_folder.name)
- country_submissions[item.name] = submission_folders
- else:
- print(f"No submissions available for {country_name}.")
- return country_submissions
- def get_country_datasets(
- country_name: str,
- print_ds: bool = True,
- ) -> Dict[str, List[str]]:
- """
- Input is a three letter ISO code for a country, or the country's name.
- The function tries to map the country name to an ISO UNFCCC_GHG_data and then
- checks the UNFCCC_GHG_data and data folders for content on the country.
- Parameters
- ----------
- country_name: str
- String containing the country name or ISO 3 letter code
- print_ds: bool
- If True information on submissions will be written to stdout
- Returns
- -------
- returns a dict with keys for the dataset classes (e.g. UNFCCC, non-UNFCCC)
- Each value is a list of folders
- """
- data_folder = extracted_data_path
- data_folder_legacy = legacy_data_path
- # obtain country UNFCCC_GHG_data
- country_code = get_country_code(country_name)
- if print_ds:
- print(f"Country name {country_name} maps to ISO code {country_code}")
- rep_data = {}
- # data
- if print_ds:
- print(f"#" * 80)
- print(f"The following datasets are available for {country_name}")
- for item in data_folder.iterdir():
- if item.is_dir():
- cleaned_datasets_current_folder = {}
- if print_ds:
- print("-" * 80)
- print(f"Data folder {item.name}")
- print("-" * 80)
- with open(item / "folder_mapping.json", "r") as mapping_file:
- folder_mapping = json.load(mapping_file)
- if country_code not in folder_mapping:
- if print_ds:
- print("No data available")
- print("")
- else:
- country_folder = folder_mapping[country_code]
- if not isinstance(country_folder, str):
- raise ValueError("Wrong data type in folder mapping json file. Should be str.")
- datasets_current_folder = {}
- current_folder = item / country_folder
- for data_file in current_folder.iterdir():
- if data_file.suffix in ['.nc', '.yaml', '.csv']:
- if data_file.stem in datasets_current_folder:
- datasets_current_folder[data_file.stem].append(data_file.suffix)
- else:
- datasets_current_folder[data_file.stem] = [data_file.suffix]
- for dataset in datasets_current_folder:
- # process filename to get submission
- parts = dataset.split('_')
- if parts[0] != country_code:
- cleaned_datasets_current_folder[f'Wrong code: {parts[0]}'] =\
- dataset
- else:
- terminology = "_".join(parts[3 : ])
- key = f"{parts[1]} ({parts[2]}, {terminology})"
- data_info = ""
- if '.nc' in datasets_current_folder[dataset]:
- data_info = data_info + "NF (.nc), "
- if ('.csv' in datasets_current_folder[dataset]) and ('.yaml' in datasets_current_folder[dataset]):
- data_info = data_info + "IF (.yaml + .csv), "
- elif '.csv' in datasets_current_folder[dataset]:
- data_info = data_info + "incomplete IF? (.csv), "
- elif '.yaml' in datasets_current_folder[dataset]:
- data_info = data_info + "incomplete IF (.yaml), "
- code_file = get_code_file(country_code, parts[1])
- if code_file:
- data_info = data_info + f"code: {code_file.name}"
- else:
- data_info = data_info + f"code: not found"
- cleaned_datasets_current_folder[key] = data_info
- if print_ds:
- if cleaned_datasets_current_folder:
- for country_ds in cleaned_datasets_current_folder:
- print(f"{country_ds}: {cleaned_datasets_current_folder[country_ds]}")
- else:
- print("No data available")
- print("")
- rep_data[item.name] = cleaned_datasets_current_folder
- # legacy data
- if print_ds:
- print(f"#" * 80)
- print(f"The following legacy datasets are available for {country_name}")
- legacy_data = {}
- for item in data_folder_legacy.iterdir():
- if item.is_dir():
- cleaned_datasets_current_folder = {}
- if print_ds:
- print("-" * 80)
- print(f"Data folder {item.name}")
- print("-" * 80)
- with open(item / "folder_mapping.json", "r") as mapping_file:
- folder_mapping = json.load(mapping_file)
- if country_code not in folder_mapping:
- if print_ds:
- print("No data available")
- print("")
- else:
- country_folder = folder_mapping[country_code]
- if not isinstance(country_folder, str):
- raise ValueError("Wrong data type in folder mapping json file. Should be str.")
- datasets_current_folder = {}
- current_folder = item / country_folder
- for data_file in current_folder.iterdir():
- if data_file.suffix in ['.nc', '.yaml', '.csv']:
- if data_file.stem in datasets_current_folder:
- datasets_current_folder[data_file.stem].append(data_file.suffix)
- else:
- datasets_current_folder[data_file.stem] = [data_file.suffix]
- for dataset in datasets_current_folder:
- # process filename to get submission
- parts = dataset.split('_')
- if parts[0] != country_code:
- cleaned_datasets_current_folder[f'Wrong UNFCCC_GHG_data: {parts[0]}'] = dataset
- else:
- terminology = "_".join(parts[3 : ])
- key = f"{parts[1]} ({parts[2]}, {terminology}, legacy)"
- data_info = ""
- if '.nc' in datasets_current_folder[dataset]:
- data_info = data_info + "NF (.nc), "
- if ('.csv' in datasets_current_folder[dataset]) and ('.yaml' in datasets_current_folder[dataset]):
- data_info = data_info + "IF (.yaml + .csv), "
- elif '.csv' in datasets_current_folder[dataset]:
- data_info = data_info + "incomplete IF? (.csv), "
- elif '.yaml' in datasets_current_folder[dataset]:
- data_info = data_info + "incomplete IF (.yaml), "
- cleaned_datasets_current_folder[key] = data_info
- if print_ds:
- if cleaned_datasets_current_folder:
- for country_ds in cleaned_datasets_current_folder:
- print(f"{country_ds}: {cleaned_datasets_current_folder[country_ds]}")
- else:
- print("No data available")
- print("")
- legacy_data[item.name] = cleaned_datasets_current_folder
- all_data = {
- "rep_data": rep_data,
- "legacy_data": legacy_data,
- }
- return all_data
- def get_code_file(
- country_name: str,
- submission: str,
- print_info: bool = False,
- ) -> Path:
- """
- For given country name and submission find the script that creates the data
- Parameters
- ----------
- country_name: str
- String containing the country name or ISO 3 letter UNFCCC_GHG_data
- submission: str
- String of the submission
- print_info: bool = False
- If True print information on UNFCCC_GHG_data found
- Returns
- -------
- returns a pathlib Path object for the UNFCCC_GHG_data file
- """
- code_file_path = None
- UNFCCC_reader_path = code_path / "UNFCCC_reader"
- # CRF is an exception as it's read using the UNFCCC_CRF_reader module
- # so we return the path to that.
- if submission[0:3] == "CRF":
- return root_path / "UNFCCC_CRF_reader"
- if submission[0:2] == "DI":
- return root_path / "UNFCCC_DI_reader"
- # obtain country UNFCCC_GHG_data
- country_code = get_country_code(country_name)
- if print_info:
- print(f"Country name {country_name} maps to ISO UNFCCC_GHG_data {country_code}")
- with open(UNFCCC_reader_path / "folder_mapping.json", "r") as mapping_file:
- folder_mapping = json.load(mapping_file)
- if country_code not in folder_mapping:
- if print_info:
- print("No UNFCCC_GHG_data available")
- print("")
- else:
- country_folder = UNFCCC_reader_path / folder_mapping[country_code]
- code_file_name_candidate = "read_" + country_code + "_" + submission + "*"
- for file in country_folder.iterdir():
- if file.match(code_file_name_candidate):
- if code_file_path is not None:
- raise ValueError(f"Found multiple UNFCCC_GHG_data candidates: "
- f"{code_file_path} and file.name. "
- f"Please use only one file with name "
- f"'read_ISO3_submission_XXX.YYY'.")
- else:
- if print_info:
- print(f"Found UNFCCC_GHG_data file {file.relative_to(root_path)}")
- code_file_path = file
- if code_file_path is not None:
- return code_file_path.relative_to(root_path)
- else:
- return None
|