|
@@ -24,11 +24,14 @@ from .UNFCCC_DI_reader_config import di_query_filters
|
|
from .UNFCCC_DI_reader_config import di_processing_info
|
|
from .UNFCCC_DI_reader_config import di_processing_info
|
|
from .UNFCCC_DI_reader_config import cat_conversion
|
|
from .UNFCCC_DI_reader_config import cat_conversion
|
|
from .UNFCCC_DI_reader_config import gas_baskets
|
|
from .UNFCCC_DI_reader_config import gas_baskets
|
|
-from .util import NoDIDataError, get_country_name, get_country_code
|
|
|
|
-from .util import nAI_countries, AI_countries, custom_country_mapping
|
|
|
|
-from .util import code_path, root_path, extracted_data_path
|
|
|
|
|
|
+from .util import NoDIDataError, nAI_countries, AI_countries
|
|
from .util import DI_date_format, regex_date
|
|
from .util import DI_date_format, regex_date
|
|
|
|
|
|
|
|
+from UNFCCC_GHG_data.helper import custom_country_mapping
|
|
|
|
+from UNFCCC_GHG_data.helper import get_country_code, get_country_name
|
|
|
|
+from UNFCCC_GHG_data.helper import extracted_data_path_UNFCCC, root_path, code_path
|
|
|
|
+from UNFCCC_GHG_data.helper import dataset_path_UNFCCC
|
|
|
|
+
|
|
|
|
|
|
def read_UNFCCC_DI_for_country(
|
|
def read_UNFCCC_DI_for_country(
|
|
country_code: str,
|
|
country_code: str,
|
|
@@ -612,7 +615,8 @@ def convert_DI_data_to_pm2_if(
|
|
if date_str == "country":
|
|
if date_str == "country":
|
|
pm2if_specifications["coords_defaults"]["scenario"] = f"DIrolling"
|
|
pm2if_specifications["coords_defaults"]["scenario"] = f"DIrolling"
|
|
elif date_str is None:
|
|
elif date_str is None:
|
|
- date_str = str(date.today())
|
|
|
|
|
|
+ today = date.today()
|
|
|
|
+ date_str = today.strftime(DI_date_format)
|
|
pm2if_specifications["coords_defaults"]["scenario"] = f"DI{date_str}"
|
|
pm2if_specifications["coords_defaults"]["scenario"] = f"DI{date_str}"
|
|
|
|
|
|
# set metadata
|
|
# set metadata
|
|
@@ -737,7 +741,7 @@ def save_DI_country_data(
|
|
|
|
|
|
# get the filename with the hash and check if it exists (separate for pm2 format
|
|
# get the filename with the hash and check if it exists (separate for pm2 format
|
|
# and IF to fix broken datasets if necessary)
|
|
# and IF to fix broken datasets if necessary)
|
|
- filename_hash = determine_filename(country_code, token, raw, hash=True)
|
|
|
|
|
|
+ filename_hash = root_path / determine_filename(country_code, token, raw, hash=True)
|
|
|
|
|
|
# primap2 native format
|
|
# primap2 native format
|
|
filename_hash_nc = filename_hash.parent / (filename_hash.name + '.nc')
|
|
filename_hash_nc = filename_hash.parent / (filename_hash.name + '.nc')
|
|
@@ -761,7 +765,79 @@ def save_DI_country_data(
|
|
print(f"Data unchanged for {country_code}. Create symlinks.")
|
|
print(f"Data unchanged for {country_code}. Create symlinks.")
|
|
|
|
|
|
# get the filename with the date
|
|
# get the filename with the date
|
|
- filename_date = determine_filename(country_code, date_str, raw)
|
|
|
|
|
|
+ filename_date = root_path / determine_filename(country_code, date_str, raw)
|
|
|
|
+
|
|
|
|
+ # create the symlinks to the actual data (with the hash)
|
|
|
|
+ suffixes = ['.nc', '.csv', '.yaml']
|
|
|
|
+ for suffix in suffixes:
|
|
|
|
+ file_date = filename_date.parent / (filename_date.name + suffix)
|
|
|
|
+ file_hash = filename_hash.name + suffix
|
|
|
|
+ if file_date.exists():
|
|
|
|
+ file_date.unlink()
|
|
|
|
+ file_date.symlink_to(file_hash)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def save_DI_dataset(
|
|
|
|
+ data_pm2: xr.Dataset,
|
|
|
|
+ raw: bool=True,
|
|
|
|
+ non_AnnexI: bool=True,
|
|
|
|
+):
|
|
|
|
+ '''
|
|
|
|
+ save primap2 and IF data to dataset folder
|
|
|
|
+ can be used for raw and processed data but not to save to country folders
|
|
|
|
+ '''
|
|
|
|
+
|
|
|
|
+ # preparations
|
|
|
|
+ data_if = data_pm2.pr.to_interchange_format()
|
|
|
|
+ if non_AnnexI:
|
|
|
|
+ country_group = "non-AnnexI"
|
|
|
|
+ else:
|
|
|
|
+ country_group = "AnnexI"
|
|
|
|
+
|
|
|
|
+ ## get timestamp
|
|
|
|
+ scenario_col = data_pm2.attrs['scen']
|
|
|
|
+ scenarios = data_if[scenario_col].unique()
|
|
|
|
+ if len(scenarios) > 1:
|
|
|
|
+ raise ValueError(f"More than one scenario in input data. This function can only"
|
|
|
|
+ f"handle single scenario data. Scenarios: {scenarios}")
|
|
|
|
+ else:
|
|
|
|
+ scenario = scenarios[0]
|
|
|
|
+
|
|
|
|
+ date_str = scenario[2:]
|
|
|
|
+
|
|
|
|
+ # calculate the hash of the data to see if it's identical to present data
|
|
|
|
+ data_for_token = data_if.drop(columns=[scenario_col])
|
|
|
|
+ token = tokenize(data_for_token)
|
|
|
|
+
|
|
|
|
+ # get the filename with the hash and check if it exists (separate for pm2 format
|
|
|
|
+ # and IF to fix broken datasets if necessary)
|
|
|
|
+ filename_hash = determine_dataset_filename(token, raw, non_AnnexI=non_AnnexI,
|
|
|
|
+ hash=True)
|
|
|
|
+ # primap2 native format
|
|
|
|
+ filename_hash_nc = filename_hash.parent / (filename_hash.name + '.nc')
|
|
|
|
+ if not filename_hash_nc.exists():
|
|
|
|
+ # if parent dir does not exist create it
|
|
|
|
+ # TODO double, also in determine_dataset_filename. same for country data
|
|
|
|
+ if not filename_hash.parent.exists():
|
|
|
|
+ filename_hash.parent.mkdir()
|
|
|
|
+ # save the data
|
|
|
|
+ print(f"Data has changed. Save to {filename_hash_nc.name}")
|
|
|
|
+ compression = dict(zlib=True, complevel=9)
|
|
|
|
+ encoding = {var: compression for var in data_pm2.data_vars}
|
|
|
|
+ data_pm2.pr.to_netcdf(filename_hash_nc, encoding=encoding)
|
|
|
|
+
|
|
|
|
+ # primap2 IF
|
|
|
|
+ filename_hash_csv = filename_hash.parent / (filename_hash.name + '.csv')
|
|
|
|
+ if not filename_hash_csv.exists():
|
|
|
|
+ # save the data
|
|
|
|
+ print(f"Data has changed. Save to {filename_hash.name + '.csv/.yaml'}")
|
|
|
|
+ pm2.pm2io.write_interchange_format(filename_hash, data_if)
|
|
|
|
+ else:
|
|
|
|
+ print(f"Data unchanged for {country_group}. Create symlinks.")
|
|
|
|
+
|
|
|
|
+ # get the filename with the date
|
|
|
|
+ filename_date = determine_dataset_filename(date_str, raw=raw,
|
|
|
|
+ non_AnnexI=non_AnnexI, hash=False)
|
|
|
|
|
|
# create the symlinks to the actual data (with the hash)
|
|
# create the symlinks to the actual data (with the hash)
|
|
suffixes = ['.nc', '.csv', '.yaml']
|
|
suffixes = ['.nc', '.csv', '.yaml']
|
|
@@ -773,6 +849,59 @@ def save_DI_country_data(
|
|
file_date.symlink_to(file_hash)
|
|
file_date.symlink_to(file_hash)
|
|
|
|
|
|
|
|
|
|
|
|
+## functions for multiple country reading
|
|
|
|
+def read_UNFCCC_DI_for_all_countries(
|
|
|
|
+ non_AnnexI: bool=True,
|
|
|
|
+) -> xr.Dataset:
|
|
|
|
+ '''
|
|
|
|
+ This function reads DI data for all countries in a group (annexI or non-AnnexI)
|
|
|
|
+ TODO: currently only non-annexI is implemented
|
|
|
|
+ The function reads all data in one go using datalad run. as the output data file
|
|
|
|
+ names are unknown beforehand datalad run uses explicit=false
|
|
|
|
+ TODO: decide if dataset creation goes in here as well. Makes sense, I think. Then
|
|
|
|
+ the function can return the xarray dataset
|
|
|
|
+ '''
|
|
|
|
+
|
|
|
|
+ today = date.today()
|
|
|
|
+ date_str = today.strftime(DI_date_format)
|
|
|
|
+
|
|
|
|
+ if non_AnnexI:
|
|
|
|
+ countries = nAI_countries
|
|
|
|
+ else:
|
|
|
|
+ raise ValueError("Bulk reading for AnnexI countries not implemented yet")
|
|
|
|
+
|
|
|
|
+ # read the data
|
|
|
|
+ data_all = None
|
|
|
|
+ for country in countries[0:5]:
|
|
|
|
+ print(f"reading DI data for country {country}")
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ data_country = read_UNFCCC_DI_for_country(
|
|
|
|
+ country_code=country,
|
|
|
|
+ category_groups=None, # read all categories
|
|
|
|
+ read_subsectors=False, # not applicable as we read all categories
|
|
|
|
+ date_str=date_str,
|
|
|
|
+ pm2if_specifications=None,
|
|
|
|
+ # automatically use the right specs for AI and NAI
|
|
|
|
+ default_gwp=None, # automatically uses right default GWP for AI and NAI
|
|
|
|
+ debug=False)
|
|
|
|
+
|
|
|
|
+ if data_all is None:
|
|
|
|
+ data_all = data_country
|
|
|
|
+ else:
|
|
|
|
+ data_all = data_all.pr.merge(data_country)
|
|
|
|
+ except unfccc_di_api.NoDataError as err:
|
|
|
|
+ print(f"No data for {country}.")
|
|
|
|
+ print(err)
|
|
|
|
+
|
|
|
|
+ # TODO: write metadata
|
|
|
|
+
|
|
|
|
+ # save the data
|
|
|
|
+ #save_DI_dataset(data_all, raw=True, non_AnnexI=non_AnnexI)
|
|
|
|
+
|
|
|
|
+ return data_all
|
|
|
|
+
|
|
|
|
+
|
|
## datalad and pydoit interface functions
|
|
## datalad and pydoit interface functions
|
|
def read_DI_for_country_datalad(
|
|
def read_DI_for_country_datalad(
|
|
country: str,
|
|
country: str,
|
|
@@ -790,7 +919,8 @@ def read_DI_for_country_datalad(
|
|
"""
|
|
"""
|
|
|
|
|
|
# get date to determine output filename
|
|
# get date to determine output filename
|
|
- date_str = str(date.today())
|
|
|
|
|
|
+ today = date.today()
|
|
|
|
+ date_str = today.strftime(DI_date_format)
|
|
|
|
|
|
# get all the info for the country
|
|
# get all the info for the country
|
|
country_info = get_input_and_output_files_for_country_DI(country, date_str,
|
|
country_info = get_input_and_output_files_for_country_DI(country, date_str,
|
|
@@ -815,7 +945,7 @@ def read_DI_for_country_datalad(
|
|
inputs=country_info["input"],
|
|
inputs=country_info["input"],
|
|
outputs=country_info["output"],
|
|
outputs=country_info["output"],
|
|
dry_run=None,
|
|
dry_run=None,
|
|
- explicit=True,
|
|
|
|
|
|
+ explicit=False,
|
|
)
|
|
)
|
|
except IncompleteResultsError as IRE:
|
|
except IncompleteResultsError as IRE:
|
|
print(f"IncompleteResultsError occured when running {cmd}: {IRE}")
|
|
print(f"IncompleteResultsError occured when running {cmd}: {IRE}")
|
|
@@ -865,7 +995,7 @@ def process_DI_for_country_datalad(
|
|
inputs=country_info["input"],
|
|
inputs=country_info["input"],
|
|
outputs=country_info["output"],
|
|
outputs=country_info["output"],
|
|
dry_run=None,
|
|
dry_run=None,
|
|
- explicit=True,
|
|
|
|
|
|
+ explicit=False,
|
|
)
|
|
)
|
|
except IncompleteResultsError as IRE:
|
|
except IncompleteResultsError as IRE:
|
|
print(f"IncompleteResultsError occurred when running {cmd}: {IRE}")
|
|
print(f"IncompleteResultsError occurred when running {cmd}: {IRE}")
|
|
@@ -874,15 +1004,21 @@ def process_DI_for_country_datalad(
|
|
print(ex.message)
|
|
print(ex.message)
|
|
|
|
|
|
|
|
|
|
-## helper functions
|
|
|
|
-
|
|
|
|
|
|
+def read_DI_for_all_countries_datalad(
|
|
|
|
+ non_AnnexI: bool=True,
|
|
|
|
+):
|
|
|
|
+ '''
|
|
|
|
+ This function calls datalad run to read all data in one go. as the output data file
|
|
|
|
+ names are unknown beforehand datalad run uses explicit=false
|
|
|
|
+ '''
|
|
|
|
|
|
|
|
+## helper functions
|
|
def determine_filename(
|
|
def determine_filename(
|
|
country_code: str,
|
|
country_code: str,
|
|
date_or_hash: str,
|
|
date_or_hash: str,
|
|
raw: bool=False,
|
|
raw: bool=False,
|
|
hash: bool=False,
|
|
hash: bool=False,
|
|
-)->Path:
|
|
|
|
|
|
+) -> Path:
|
|
"""
|
|
"""
|
|
Determine the filename for a dataset from given country code and date string.
|
|
Determine the filename for a dataset from given country code and date string.
|
|
|
|
|
|
@@ -891,10 +1027,11 @@ def determine_filename(
|
|
----------
|
|
----------
|
|
country_code: str
|
|
country_code: str
|
|
ISO 3 letter code of the country
|
|
ISO 3 letter code of the country
|
|
- date_str:
|
|
|
|
|
|
+ date_or_hash:
|
|
formatted date string
|
|
formatted date string
|
|
- raw:
|
|
|
|
|
|
+ raw: bool
|
|
bool specifying if filename fow raw or processed data should be returned
|
|
bool specifying if filename fow raw or processed data should be returned
|
|
|
|
+ hash: str
|
|
|
|
|
|
Returns
|
|
Returns
|
|
_______
|
|
_______
|
|
@@ -903,7 +1040,7 @@ def determine_filename(
|
|
"""
|
|
"""
|
|
|
|
|
|
# get the country folder
|
|
# get the country folder
|
|
- with open(extracted_data_path / "folder_mapping.json", "r") as mapping_file:
|
|
|
|
|
|
+ with open(extracted_data_path_UNFCCC / "folder_mapping.json", "r") as mapping_file:
|
|
folder_mapping = json.load(mapping_file)
|
|
folder_mapping = json.load(mapping_file)
|
|
|
|
|
|
if country_code in folder_mapping:
|
|
if country_code in folder_mapping:
|
|
@@ -912,14 +1049,14 @@ def determine_filename(
|
|
country_folders = folder_mapping[country_code]
|
|
country_folders = folder_mapping[country_code]
|
|
if isinstance(country_folders, str):
|
|
if isinstance(country_folders, str):
|
|
# only one folder
|
|
# only one folder
|
|
- country_folder = extracted_data_path / country_folders
|
|
|
|
|
|
+ country_folder = extracted_data_path_UNFCCC / country_folders
|
|
else:
|
|
else:
|
|
raise ValueError("More than one output folder for country "
|
|
raise ValueError("More than one output folder for country "
|
|
f"{country_code}. This should not happen.")
|
|
f"{country_code}. This should not happen.")
|
|
else:
|
|
else:
|
|
# folder not in mapping. It will be created if not present yet
|
|
# folder not in mapping. It will be created if not present yet
|
|
country_name = get_country_name(country_code)
|
|
country_name = get_country_name(country_code)
|
|
- country_folder = extracted_data_path / country_name.replace(" ", "_")
|
|
|
|
|
|
+ country_folder = extracted_data_path_UNFCCC / country_name.replace(" ", "_")
|
|
|
|
|
|
if country_folder.exists():
|
|
if country_folder.exists():
|
|
print(f"Output folder {country_name.replace(' ', '_')} for country "
|
|
print(f"Output folder {country_name.replace(' ', '_')} for country "
|
|
@@ -938,6 +1075,50 @@ def determine_filename(
|
|
return filename.relative_to(root_path)
|
|
return filename.relative_to(root_path)
|
|
|
|
|
|
|
|
|
|
|
|
+def determine_dataset_filename(
|
|
|
|
+ date_or_hash: str,
|
|
|
|
+ raw: bool=False,
|
|
|
|
+ non_AnnexI: bool=True,
|
|
|
|
+ hash: bool = False,
|
|
|
|
+) -> Path:
|
|
|
|
+ """
|
|
|
|
+ Determine the filename for a dataset from given country group and date string.
|
|
|
|
+
|
|
|
|
+ Parameters
|
|
|
|
+ ----------
|
|
|
|
+ date_or_hash:
|
|
|
|
+ formatted date string
|
|
|
|
+ raw: bool
|
|
|
|
+ bool specifying if filename fow raw or processed data should be returned
|
|
|
|
+ non_AnnexI: bool
|
|
|
|
+ True if non-AnnexI False if AnnexI
|
|
|
|
+ hash: str
|
|
|
|
+
|
|
|
|
+ Returns
|
|
|
|
+ _______
|
|
|
|
+ pathlib Path object for the file name (without suffix)
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ # get the country folder
|
|
|
|
+ if non_AnnexI:
|
|
|
|
+ current_dataset_path = dataset_path_UNFCCC / "DI_non_AnnexI"
|
|
|
|
+ filename = f"DI_non_AnnexI_{date_or_hash}"
|
|
|
|
+ else:
|
|
|
|
+ current_dataset_path = dataset_path_UNFCCC / "DI_AnnexI"
|
|
|
|
+ filename = f"DI_AnnexI_{date_or_hash}"
|
|
|
|
+
|
|
|
|
+ if not current_dataset_path.exists():
|
|
|
|
+ current_dataset_path.mkdir()
|
|
|
|
+
|
|
|
|
+ if raw:
|
|
|
|
+ filename = f"{filename}_raw"
|
|
|
|
+ if hash:
|
|
|
|
+ filename = f"{filename}_hash"
|
|
|
|
+ filename = current_dataset_path / filename
|
|
|
|
+
|
|
|
|
+ return filename.relative_to(root_path)
|
|
|
|
+
|
|
|
|
+
|
|
def convert_categories(
|
|
def convert_categories(
|
|
ds_input: xr.Dataset,
|
|
ds_input: xr.Dataset,
|
|
conversion: Dict[str, Dict[str, str]],
|
|
conversion: Dict[str, Dict[str, str]],
|
|
@@ -1090,7 +1271,7 @@ def get_present_hashes_for_country_DI(
|
|
regex_hash = regex_hash + "hash\.nc"
|
|
regex_hash = regex_hash + "hash\.nc"
|
|
|
|
|
|
# get the country folder
|
|
# get the country folder
|
|
- with open(extracted_data_path / "folder_mapping.json", "r") as mapping_file:
|
|
|
|
|
|
+ with open(extracted_data_path_UNFCCC / "folder_mapping.json", "r") as mapping_file:
|
|
folder_mapping = json.load(mapping_file)
|
|
folder_mapping = json.load(mapping_file)
|
|
|
|
|
|
if country_code in folder_mapping:
|
|
if country_code in folder_mapping:
|
|
@@ -1099,7 +1280,7 @@ def get_present_hashes_for_country_DI(
|
|
country_folders = folder_mapping[country_code]
|
|
country_folders = folder_mapping[country_code]
|
|
if isinstance(country_folders, str):
|
|
if isinstance(country_folders, str):
|
|
# only one folder
|
|
# only one folder
|
|
- country_folder = extracted_data_path / country_folders
|
|
|
|
|
|
+ country_folder = extracted_data_path_UNFCCC / country_folders
|
|
else:
|
|
else:
|
|
raise ValueError("More than one output folder for country "
|
|
raise ValueError("More than one output folder for country "
|
|
f"{country_code}. This should not happen.")
|
|
f"{country_code}. This should not happen.")
|
|
@@ -1135,7 +1316,7 @@ def find_latest_DI_data(
|
|
regex = regex_date + r"\.nc"
|
|
regex = regex_date + r"\.nc"
|
|
|
|
|
|
# get the country folder
|
|
# get the country folder
|
|
- with open(extracted_data_path / "folder_mapping.json", "r") as mapping_file:
|
|
|
|
|
|
+ with open(extracted_data_path_UNFCCC / "folder_mapping.json", "r") as mapping_file:
|
|
folder_mapping = json.load(mapping_file)
|
|
folder_mapping = json.load(mapping_file)
|
|
|
|
|
|
if country_code in folder_mapping:
|
|
if country_code in folder_mapping:
|
|
@@ -1144,7 +1325,7 @@ def find_latest_DI_data(
|
|
country_folders = folder_mapping[country_code]
|
|
country_folders = folder_mapping[country_code]
|
|
if isinstance(country_folders, str):
|
|
if isinstance(country_folders, str):
|
|
# only one folder
|
|
# only one folder
|
|
- country_folder = extracted_data_path / country_folders
|
|
|
|
|
|
+ country_folder = extracted_data_path_UNFCCC / country_folders
|
|
else:
|
|
else:
|
|
raise ValueError("More than one output folder for country "
|
|
raise ValueError("More than one output folder for country "
|
|
f"{country_code}. This should not happen.")
|
|
f"{country_code}. This should not happen.")
|