@@ -1,19 +1,23 @@
import primap2 as pm2
import unfccc_di_api
import pandas as pd
+import numpy as np
import pycountry
import itertools
import json
import copy
import xarray as xr
from datetime import date
-from typing import Optional, Dict
+from typing import Optional, Dict, List
from pathlib import Path
+from copy import deepcopy
from UNFCCC_DI_reader_config import di_to_pm2if_template_nai
from UNFCCC_DI_reader_config import di_to_pm2if_template_ai
from UNFCCC_DI_reader_config import di_query_filters
+from UNFCCC_DI_reader_config import cat_conversion
from util import NoDIDataError, extracted_data_path, get_country_name
+from util import nAI_countries, AI_countries
def read_UNFCCC_DI_for_party(
@@ -27,7 +31,8 @@ def read_UNFCCC_DI_for_party(
debug: Optional[bool]=False,
- # TODO
+ reads data for a party from the UNFCCC DI interface and saves to native and
+ interchange format
# read the data
@@ -44,11 +49,11 @@ def read_UNFCCC_DI_for_party(
# determine filename
if save_data:
- filename = determine_filename(party_code, date_str)
+ filename = determine_filename(party_code, date_str, True)
filename = None
- # convert it to pm2 interchange format and save
+ # convert raw data to pm2 interchange format and save
data_if = convert_DI_data_to_pm2_if(
@@ -58,7 +63,7 @@ def read_UNFCCC_DI_for_party(
- # convert to native pm2 format and save that
+ # convert raw data to native pm2 format and save that
data_pm2 = convert_DI_IF_data_to_pm2(
@@ -67,6 +72,202 @@ def read_UNFCCC_DI_for_party(
return data_pm2
+def process_UNFCCC_DI_for_party(
+ data_country: xr.Dataset,
+ country: str,
+ cat_terminology_in: str,
+ entities_to_ignore: List[str],
+ sectors: List[str],
+ gas_baskets: Dict[str, List[str]],
+ processing_info_country: Dict = None,
+) -> xr.Dataset:
+ """
+ Process data from DI interface (where necessary).
+ * Downscaling including subtraction of time series
+ * country specific sector aggregation
+ * Conversion to IPCC2006 categories
+ * general sector and gas basket aggregation (in new categories)
+ """
+ #### 1: general processing
+ # remove unused cats
+ data_country = data_country.dropna(f'category ({cat_terminology_in})', how='all')
+ # remove unused years
+ data_country = data_country.dropna(f'time', how='all')
+ # remove variables only containing nan
+ nan_vars_country = [var for var in data_country.data_vars if
+ data_country[var].isnull().all().data == True]
+ data_country = data_country.drop_vars(nan_vars_country)
+ # remove unnecessary variables
+ entities_ignore_present = [entity for entity in entities_to_ignore if
+ entity in data_country.data_vars]
+ data_country = data_country.drop_vars(entities_ignore_present)
+ #### 2: country specific processing
+ if processing_info_country is not None:
+ if 'tolerance' in processing_info_country:
+ tolerance = processing_info_country["tolerance"]
+ else:
+ tolerance = 0.01
+ # take only desired years
+ if 'years' in processing_info_country:
+ data_country = data_country.pr.loc[
+ {'time': processing_info_country['years']}]
+ # remove timeseries if desired
+ if 'remove_ts' in processing_info_country:
+ for case in processing_info_country['remove_ts']:
+ remove_info = processing_info_country['remove_ts'][case]
+ entities = remove_info.pop("entities")
+ for entity in entities:
+ data_country[entity].pr.loc[remove_info] = \
+ data_country[entity].pr.loc[remove_info] * np.nan
+ # remove all data for given years if necessary
+ if 'remove_years' in processing_info_country:
+ data_country.pr.loc[{'time': processing_info_country['remove_years']}] = \
+ data_country.pr.loc[{'time': processing_info_country[
+ 'remove_years']}] * np.nan
+ # subtract categories
+ if 'subtract_cats' in processing_info_country:
+ subtract_cats_current = processing_info_country['subtract_cats']
+ if 'entities' in subtract_cats_current.keys():
+ entities_current = subtract_cats_current['entities']
+ else:
+ entities_current = list(data_country.data_vars)
+ print(f"Subtracting categories for country {country}, entities "
+ f"{entities_current}")
+ for cat_to_generate in subtract_cats_current:
+ cats_to_subtract = subtract_cats_current[cat_to_generate]['subtract']
+ data_sub = data_country.pr.loc[{'category': cats_to_subtract}].pr.sum(
+ dim='category', skipna=True, min_count=1)
+ data_parent = data_country.pr.loc[
+ {'category': subtract_cats_current[cat_to_generate]['parent']}]
+ data_agg = data_parent - data_sub
+ nan_vars = [var for var in data_agg.data_vars if
+ data_agg[var].isnull().all().data == True]
+ data_agg = data_agg.drop(nan_vars)
+ if len(data_agg.data_vars) > 0:
+ print(f"Generating {cat_to_generate} through subtraction")
+ data_agg = data_agg.expand_dims([f'category ('
+ f'{cat_terminology_in})'])
+ data_agg = data_agg.assign_coords(
+ coords={f'category ({cat_terminology_in})':
+ (f'category ({cat_terminology_in})',
+ [cat_to_generate])})
+ data_country = data_country.pr.merge(data_agg, tolerance=tolerance)
+ else:
+ print(f"no data to generate category {cat_to_generate}")
+ # downscaling
+ if 'downscale' in processing_info_country:
+ if 'sectors' in processing_info_country['downscale']:
+ sector_downscaling = processing_info_country['downscale']['sectors']
+ for case in sector_downscaling.keys():
+ print(f"Downscaling for {case}.")
+ sector_downscaling_current = sector_downscaling[case]
+ entities = sector_downscaling_current.pop('entities')
+ for entity in entities:
+ data_country[entity] = data_country[
+ entity].pr.downscale_timeseries(
+ **sector_downscaling_current) # , skipna_evaluation_dims=None)
+ if 'entities' in processing_info_country['downscale']:
+ entity_downscaling = processing_info_country['downscale']['entities']
+ for case in entity_downscaling.keys():
+ #print(case)
+ print(data_country.coords[f'category ('
+ f'{cat_terminology_in})'].values)
+ data_country = data_country.pr.downscale_gas_timeseries(
+ **entity_downscaling[case], skipna=True,
+ skipna_evaluation_dims=None)
+ # aggregate categories
+ if 'aggregate_cats' in processing_info_country:
+ aggregate_cats_current = processing_info_country['aggregate_cats']
+ print(
+ f"Aggregating categories for country {country}")
+ for cat_to_agg in aggregate_cats_current:
+ print(f"Category: {cat_to_agg}")
+ source_cats = aggregate_cats_current[cat_to_agg]['sources']
+ data_agg = data_country.pr.loc[{'category': source_cats}].pr.sum(
+ dim='category', skipna=True, min_count=1)
+ nan_vars = [var for var in data_agg.data_vars if
+ data_agg[var].isnull().all().data == True]
+ data_agg = data_agg.drop(nan_vars)
+ if len(data_agg.data_vars) > 0:
+ data_agg = data_agg.expand_dims([f'category ('
+ f'{cat_terminology_in})'])
+ data_agg = data_agg.assign_coords(
+ coords={f'category ({cat_terminology_in})':
+ (f'category ({cat_terminology_in})', [cat_to_agg])})
+ data_country = data_country.pr.merge(data_agg, tolerance=tolerance)
+ else:
+ print(f"no data to aggregate category {cat_to_agg}")
+ # aggregate gases if desired
+ if 'aggregate_gases' in processing_info_country:
+ for case in processing_info_country['aggregate_gases'].keys():
+ case_info = processing_info_country['aggregate_gases'][case]
+ data_country[case_info['basket']] = \
+ data_country.pr.fill_na_gas_basket_from_contents(
+ **case_info)
+ #### 3: map categories
+ if country in nAI_countries:
+ # conversion from BURDI to IPCC2006_PRIMAP needed
+ cat_terminology_out = 'IPCC2006_PRIMAP'
+ data_country = convert_categories(
+ data_country,
+ cat_conversion[f"{cat_terminology_in}_to_{cat_terminology_out}"],
+ cat_terminology_out,
+ debug=False,
+ tolerance=0.01,
+ )
+ else:
+ cat_terminology_out = cat_terminology_in
+ # more general processing
+ # reduce categories to output cats
+ cats_to_keep = [cat for cat in
+ data_country.coords[f'category ({cat_terminology_out})'].values if
+ cat in sectors]
+ data_country = data_country.pr.loc[{'category': cats_to_keep}]
+ # create gas baskets
+ entities_present = set(data_country.data_vars)
+ for basket in gas_baskets.keys():
+ basket_contents_present = [gas for gas in gas_baskets[basket] if
+ gas in entities_present]
+ if len(basket_contents_present) > 0:
+ if basket in list(data_country.data_vars):
+ data_country[basket] = data_country.pr.fill_na_gas_basket_from_contents(
+ basket=basket, basket_contents=basket_contents_present, min_count=1)
+ else:
+ try:
+ data_country[basket] = xr.full_like(data_country["CO2"],
+ np.nan).pr.quantify(
+ units="Gg CO2 / year")
+ data_country[basket].attrs = {"entity": basket.split(' ')[0],
+ "gwp_context": basket.split(' ')[1][
+ 1:-1]}
+ data_country[basket] = data_country.pr.gas_basket_contents_sum(
+ basket=basket, basket_contents=basket_contents_present,
+ min_count=1)
+ except:
+ print(f"No gas basket created for {country}")
+ # amend title and comment
+ data_country.attrs["comment"] = data_country.attrs["comment"] + f" Processed on " \
+ f"{date.today()}"
+ data_country.attrs["title"] = data_country.attrs["title"] + f" Processed on " \
+ f"{date.today()}"
+ return data_country
def read_UNFCCC_DI_for_party_df(
party_code: str,
category_groups: Optional[Dict]=None,
@@ -420,7 +621,8 @@ def convert_DI_IF_data_to_pm2(
def determine_filename(
party_code: str,
- date_str: str
+ date_str: str,
+ raw: bool=False,
Determine the filename for a dataset from given country code and date string.
@@ -432,6 +634,8 @@ def determine_filename(
ISO 3 letter code of the country
formatted date string
+ raw:
+ bool specifying if filename fow raw or processed data should be returned
@@ -465,10 +669,73 @@ def determine_filename(
- filename = Path(country_folder) / f"{party_code}_DI_{date_str}"
+ if raw:
+ filename = Path(country_folder) / f"{party_code}_DI_{date_str}_raw"
+ else:
+ filename = Path(country_folder) / f"{party_code}_DI_{date_str}"
return filename
+def convert_categories(
+ ds_input: xr.Dataset,
+ conversion: Dict[str, Dict[str, str]],
+ #terminology_from: str,
+ terminology_to: str,
+ debug: bool=False,
+ tolerance: float=0.01,
+ ds_converted = ds_input.copy(deep=True)
+ ds_converted.attrs = deepcopy(ds_input.attrs)
+ # change category terminology
+ cat_dim = ds_converted.attrs["cat"]
+ ds_converted.attrs["cat"] = f"category ({terminology_to})"
+ ds_converted = ds_converted.rename({cat_dim: ds_converted.attrs["cat"]})
+ # find categories present in dataset
+ cats_present = list(ds_converted.coords[f'category ({terminology_to})'])
+ # restrict categories and map category names
+ if 'mapping' in conversion.keys():
+ mapping_cats_present = [cat for cat in list(conversion['mapping'].keys()) if
+ cat in cats_present]
+ ds_converted = ds_converted.pr.loc[
+ {'category': mapping_cats_present}]
+ from_cats = ds_converted.coords[f'category ({terminology_to})'].values
+ to_cats = pd.Series(from_cats).replace(conversion['mapping'])
+ ds_converted = ds_converted.assign_coords({f'category ({terminology_to})':
+ (f'category ({terminology_to})',
+ to_cats)})
+ # redo the list of present cats after mapping, as we have new categories in the
+ # target terminology now
+ cats_present_mapped = list(ds_converted.coords[f'category ({terminology_to})'])
+ # aggregate categories
+ if 'aggregate' in conversion:
+ aggregate_cats = conversion['aggregate']
+ for cat_to_agg in aggregate_cats:
+ if debug:
+ print(f"Category: {cat_to_agg}")
+ source_cats = [cat for cat in aggregate_cats[cat_to_agg]['sources'] if
+ cat in cats_present_mapped]
+ data_agg = ds_converted.pr.loc[{'category': source_cats}].pr.sum(
+ dim='category', skipna=True, min_count=1)
+ nan_vars = [var for var in data_agg.data_vars if
+ data_agg[var].isnull().all().data == True]
+ data_agg = data_agg.drop(nan_vars)
+ if len(data_agg.data_vars) > 0:
+ data_agg = data_agg.expand_dims([f'category ({terminology_to})'])
+ data_agg = data_agg.assign_coords(
+ coords={f'category ({terminology_to})':
+ (f'category ({terminology_to})', [cat_to_agg])})
+ ds_converted = ds_converted.pr.merge(data_agg, tolerance=tolerance)
+ else:
+ print(f"no data to aggregate category {cat_to_agg}")
+ return ds_converted
# functions