Browse Source

refactor DI reader and work on DI processing config

Johannes Gütschow 1 year ago
parent
commit
3ce118207d

+ 46 - 0
UNFCCC_GHG_data/UNFCCC_DI_reader/DI_AI_parties.conf

@@ -0,0 +1,46 @@
+id,code,name
+3,AUS,Australia
+4,AUT,Austria
+5,BEL,Belgium
+6,BGR,Bulgaria
+7,BLR,Belarus
+9,CAN,Canada
+10,CHE,Switzerland
+11,CYP,Cyprus
+12,CZE,Czechia
+13,DEU,Germany
+15,DNK,Denmark
+17,ESP,Spain
+18,EST,Estonia
+19,EUA,European Union (Convention)
+20,EUC,European Union (KP)
+21,FIN,Finland
+23,FRA,France
+27,GBR,United Kingdom of Great Britain and Northern Ireland
+28,GRC,Greece
+30,HRV,Croatia
+31,HUN,Hungary
+32,IRL,Ireland
+33,ISL,Iceland
+34,ITA,Italy
+35,JPN,Japan
+36,KAZ,Kazakhstan
+38,LIE,Liechtenstein
+39,LTU,Lithuania
+40,LUX,Luxembourg
+41,LVA,Latvia
+42,MCO,Monaco
+43,MLT,Malta
+44,NLD,Netherlands
+45,NOR,Norway
+46,NZL,New Zealand
+47,POL,Poland
+48,PRT,Portugal
+49,ROU,Romania
+50,RUS,Russian Federation
+51,SVK,Slovakia
+52,SVN,Slovenia
+53,SWE,Sweden
+54,TUR,Türkiye
+55,UKR,Ukraine
+56,USA,United States of America

+ 154 - 0
UNFCCC_GHG_data/UNFCCC_DI_reader/DI_NAI_parties.conf

@@ -0,0 +1,154 @@
+id,code,name,noData
+100064,AFG,Afghanistan,
+100065,ALB,Albania,
+100066,DZA,Algeria,
+100067,AND,Andorra,True
+100068,AGO,Angola,
+100069,ATG,Antigua and Barbuda,
+100070,ARG,Argentina,
+100071,ARM,Armenia,
+100072,AZE,Azerbaijan,
+100073,BHS,Bahamas,
+100074,BHR,Bahrain,
+100075,BGD,Bangladesh,
+100076,BRB,Barbados,
+100077,BLZ,Belize,
+100078,BEN,Benin,
+100079,BTN,Bhutan,
+100080,BOL,Bolivia (Plurinational State of),
+100081,BIH,Bosnia and Herzegovina,
+100082,BWA,Botswana,
+100083,BRA,Brazil,
+100084,BRN,Brunei Darussalam,
+100085,BFA,Burkina Faso,
+100086,BDI,Burundi,
+100087,CPV,Cabo Verde,
+100088,KHM,Cambodia,
+100089,CMR,Cameroon,
+100090,CAF,Central African Republic,
+100091,TCD,Chad,
+100092,CHL,Chile,
+100093,CHN,China,
+100094,COL,Colombia,
+100095,COM,Comoros,
+100096,COG,Congo,
+100097,COK,Cook Islands,
+100098,CRI,Costa Rica,
+100099,CIV,Cote d'Ivoire,
+100100,CUB,Cuba,
+100101,PRK,Democratic People's Republic of Korea,
+100102,COD,Democratic Republic of the Congo,
+100103,DJI,Djibouti,
+100104,DMA,Dominica,
+100105,DOM,Dominican Republic,
+100106,ECU,Ecuador,
+100107,EGY,Egypt,
+100108,SLV,El Salvador,
+100109,GNQ,Equatorial Guinea,True
+100110,ERI,Eritrea,
+100111,ETH,Ethiopia,
+100112,FJI,Fiji,
+100113,GAB,Gabon,
+100114,GMB,Gambia,
+100115,GEO,Georgia,
+100116,GHA,Ghana,
+100117,GRD,Grenada,
+100118,GTM,Guatemala,
+100119,GIN,Guinea,
+100120,GNB,Guinea-Bissau,
+100121,GUY,Guyana,
+100122,HTI,Haiti,
+100123,HND,Honduras,
+100124,IND,India,
+100125,IDN,Indonesia,
+100126,IRN,Iran (Islamic Republic of),
+100127,IRQ,Iraq,
+100128,ISR,Israel,
+100129,JAM,Jamaica,
+100130,JOR,Jordan,
+100132,KEN,Kenya,
+100133,KIR,Kiribati,
+100134,KWT,Kuwait,
+100135,KGZ,Kyrgyzstan,
+100136,LAO,Lao People's Democratic Republic,
+100137,LBN,Lebanon,
+100138,LSO,Lesotho,
+100139,LBR,Liberia,
+100140,LBY,Libya,True
+100141,MDG,Madagascar,
+100142,MWI,Malawi,
+100143,MYS,Malaysia,
+100144,MDV,Maldives,
+100145,MLI,Mali,
+100146,MHL,Marshall Islands,
+100147,MRT,Mauritania,
+100148,MUS,Mauritius,
+100149,MEX,Mexico,
+100150,FSM,Micronesia (Federated States of),
+100152,MNG,Mongolia,
+100153,MNE,Montenegro,
+100154,MAR,Morocco,
+100155,MOZ,Mozambique,
+100156,MMR,Myanmar,
+100157,NAM,Namibia,
+100158,NRU,Nauru,
+100159,NPL,Nepal,
+100160,NIC,Nicaragua,
+100161,NER,Niger,
+100162,NGA,Nigeria,
+100163,NIU,Niue,
+100164,OMN,Oman,
+100165,PAK,Pakistan,
+100166,PLW,Palau,
+100167,PAN,Panama,
+100168,PNG,Papua New Guinea,
+100169,PRY,Paraguay,
+100170,PER,Peru,
+100171,PHL,Philippines,
+100172,QAT,Qatar,
+100173,KOR,Republic of Korea,
+100174,MDA,Republic of Moldova,
+100175,RWA,Rwanda,
+100176,KNA,Saint Kitts and Nevis,
+100177,LCA,Saint Lucia,
+100178,VCT,Saint Vincent and the Grenadines,
+100179,WSM,Samoa,
+100180,SMR,San Marino,
+100181,STP,Sao Tome and Principe,
+100182,SAU,Saudi Arabia,
+100183,SEN,Senegal,
+100184,SRB,Serbia,
+100185,SYC,Seychelles,
+100186,SLE,Sierra Leone,True
+100187,SGP,Singapore,
+100188,SLB,Solomon Islands,
+100189,SOM,Somalia,True
+100190,ZAF,South Africa,
+100191,SSD,South Sudan,
+100192,LKA,Sri Lanka,
+100193,SDN,Sudan,
+100194,SUR,Suriname,
+100195,SWZ,Eswatini,
+100196,SYR,Syrian Arab Republic,
+100197,TJK,Tajikistan,
+100198,THA,Thailand,
+100199,MKD,North Macedonia,
+100200,TLS,Timor-Leste,
+100201,TGO,Togo,
+100202,TON,Tonga,
+100203,TTO,Trinidad and Tobago,
+100204,TUN,Tunisia,
+100205,TKM,Turkmenistan,
+100206,TUV,Tuvalu,
+100207,UGA,Uganda,
+100208,ARE,United Arab Emirates,
+100209,TZA,United Republic of Tanzania,
+100210,URY,Uruguay,
+100211,UZB,Uzbekistan,
+100212,VUT,Vanuatu,
+100213,VEN,Venezuela (Bolivarian Republic of),
+100214,VNM,Viet Nam,
+100215,YEM,Yemen,
+100216,ZMB,Zambia,
+100217,ZWE,Zimbabwe,
+100229,PSE,State of Palestine,

+ 774 - 9
UNFCCC_GHG_data/UNFCCC_DI_reader/UNFCCC_DI_reader_config.py

@@ -327,14 +327,86 @@ cat_conversion = {
 di_processing_templates = {
 di_processing_templates = {
     # templates fro the DI processing. Most processing rules will apply to several
     # templates fro the DI processing. Most processing rules will apply to several
     # versions. So we store them here and refer to them in the processing info dict
     # versions. So we store them here and refer to them in the processing info dict
+    #AFG: not needed (newer data in BUR1), 2005, 2013 only
+    #AGO: 2000, 2005 only (external key needed for some gases / sectors)
+    #TODO ALB: pre 2000 downscaling
+    #AND: no data
+    'ARE': { # 1990, 2000, 2005, 2014. some aggregation for fgases (pfcs) needed
+        'DI2023-05-24': {
+            'agg_tolerance': 0.015,
+            'ignore_entities': ["NMVOC"], #errors when aggregating cats
+            'aggregate_cats': {
+                '2': {'sources': ['2.A', '2.B', '2.C'],
+                     'name': '2.  Industrial Processes'},
+                '15163': {'sources': ['1', '2', '4', '6'],
+                          'name': 'Total GHG emissions excluding LULUCF/LUCF'},
+                '24540': {'sources': ['1', '2', '5', '4', '6'],
+                          'name': 'Total GHG emissions including LULUCF/LUCF'},
+            },
+        },
+    },
+    # ARG newer data in BUR
+    # ARM 1990, 2000, 2006, 2010, no processing needed
+    # ATG 1990, 2000, no processing needed
+    'AZE': {
+        # 1990-2013, but from different submissions and not completely consistent
+        # including different sector coverage
+        # for FGASES emissions are in HFCs for some years and in PFCs for others.
+        # waste data has inconsistent subsectors
+        'DI2023-05-24': {
+            'remove_ts': {
+                '1.A.1': { #contains data for all subsectors
+                    'category': ['1.A.1'],
+                    'entities': ['CH4', 'KYOTOGHG (SARGWP100)'],
+                        'time': ['1990', '2000', '2005', '2006', '2007', '2008', '2009',
+                                 '2010', '2011', '2012'],
+                },
+            },
+            'downscale': { # needed for 1990, 2000, 2005-2012
+                'sectors': {
+                    '1.A': {
+                        'basket': '1.A',
+                        'basket_contents': ['1.A.1', '1.A.2', '1.A.3', '1.A.4',
+                                            '1.A.5'],
+                        'entities': ['CH4'],
+                        'dim': 'category (BURDI)',
+                        'skipna_evaluation_dims': None,
+                        'skipna': True,
+                    },
+                },
+            },
+        }
+    },
+    # BDI 1998, 2005, 2010, 2015 # data coverage is a bit inconsistent
+    # BEN 1995, 2000 # data coverage a bit inconsistent
     'BFA': {
     'BFA': {
-        'DI2022-08-22': { # remove 2007, seems to have summed sectors (Agri and LULUCF)
+        'DI2023-05-24': {  # remove 2007, seems to have summed sectors (Agri and LULUCF)
             # and missing sectors (e.g. 1,2 for CH4, N2O)
             # and missing sectors (e.g. 1,2 for CH4, N2O)
             'remove_years': ['2007'],
             'remove_years': ['2007'],
         },
         },
     },
     },
+    # BGD 1994, 2001, 2005; coverage mostly consistent but not fully
+    # BHR 1994, 2000 (with some gaps in 2000)
+    'BHS': { # 1990, 1994, 2000 (differing coverage, might be unusable for some sectors)
+        # TODO: check e.g. 4 and 5
+        'DI2023-05-24': {
+            'downscale': {
+                'sectors': {
+                    '4': { # 1994
+                        'basket': '4',
+                        'basket_contents': ['4.A', '4.B', '4.D', '4.G'],
+                        'entities': ['CH4', 'CO2', 'KYOTOGHG (SARGWP100)'], # no N2O but
+                        # CO2 is unusual
+                        'dim': 'category (BURDI)',
+                        'skipna_evaluation_dims': None,
+                        'skipna': True,
+                    },
+                },
+            },
+        }
+    },
     'BIH': {
     'BIH': {
-        'DI2022-08-22': {
+        'DI2023-05-24': {
             # downscaling in two steps
             # downscaling in two steps
             # 1990-2001 has different coverage than 2002-2012 and 2013-2014
             # 1990-2001 has different coverage than 2002-2012 and 2013-2014
             # do not downscale KyotoGHG for 1990-2001 as that's aggregated
             # do not downscale KyotoGHG for 1990-2001 as that's aggregated
@@ -409,12 +481,12 @@ di_processing_templates = {
                         'skipna': True,
                         'skipna': True,
                     },
                     },
                 },
                 },
-                'entities': { # 2002-2014
+                'entities': {  # 2002-2014
                     'KYOTO': {
                     'KYOTO': {
                         'basket': 'KYOTOGHG (SARGWP100)',
                         'basket': 'KYOTOGHG (SARGWP100)',
                         'basket_contents': ['CH4', 'CO2', 'N2O'],
                         'basket_contents': ['CH4', 'CO2', 'N2O'],
                         'sel': {'category (BURDI)':
                         'sel': {'category (BURDI)':
-                                    ['1' ,'1.A' ,'1.A.1', '1.A.2', '1.A.3', '1.A.4',
+                                    ['1', '1.A', '1.A.1', '1.A.2', '1.A.3', '1.A.4',
                                      '1.A.5', '1.B', '1.B.1', '1.B.2', '2', '2.A',
                                      '1.A.5', '1.B', '1.B.1', '1.B.2', '2', '2.A',
                                      '2.B', '2.C', '2.D', '2.E', '4', '4.A', '4.B',
                                      '2.B', '2.C', '2.D', '2.E', '4', '4.A', '4.B',
                                      '4.C', '4.D', '4.E', '5', '5.A', '6', '6.A',
                                      '4.C', '4.D', '4.E', '5', '5.A', '6', '6.A',
@@ -430,21 +502,714 @@ di_processing_templates = {
             },
             },
         },
         },
     },
     },
+    # BLZ 1994, 2000, 2003, 2006, 2009 (energy sector missing in 200X)
+    # BOL 1990, 1994, 1998, 2000, 2002 (energy sectors missing for CH4, N2O), 2004 (sm),
+    # BRA 1990-2016 (BUR4)
+    'BRB': {
+        'DI2023-05-24': {
+            'aggregate_cats': {
+                '14637': {'sources': ['14423', '14424'],
+                     'name': 'International Bunkers'},
+            },
+            # downscaling in two steps
+            # 2000 - 2012 LULUCF KYOTOGHG
+            # later KYOTOGHG to gases using 1997 shares (not ideal)
+            'downscale': {
+                'sectors': {
+                    '5_2000': {
+                        'basket': '5',
+                        'basket_contents': ['5.A', '5.B', '5.C', '5.D'],
+                        'entities': ['KYOTOGHG (SARGWP100)'],
+                        'dim': 'category (BURDI)',
+                        'sel': {'time': ['2000', '2001', '2002', '2003', '2004',
+                                         '2005', '2006', '2007', '2009', '2010']},
+                        'skipna_evaluation_dims': None,
+                        'skipna': True,
+                    },
+                },
+                'entities': {  # 2000-2010 (1997 as key)
+                    'KYOTO': {
+                        'basket': 'KYOTOGHG (SARGWP100)',
+                        'basket_contents': ['CO2', 'CH4', 'N2O'],
+                        'sel': {'category (BURDI)':
+                                    ['1', '1.A', '1.A.1', '1.A.2', '1.A.3', '1.A.4',
+                                     '2', '2.A', '5', '14423', '14424',
+                                     '14637', '4', '4.A', '4.B', '4.D',
+                                     '6', '6.A', '6.B', '15163', '24540',
+                                     ],
+                                'time': ['1997', '2000', '2001', '2002', '2003', '2004',
+                                         '2005', '2006', '2007', '2008', '2009',
+                                         '2010'],
+                                },
+                    },
+                },
+            },
+        },
+    },
+    # BRN 2010 only (though with full sectors)
+    # BTN 1994, 2000, 2015. patchy coverage but no downscaling needed / possible
+    # BWA 1994, 2000, 2015. inconsistent coverage
+    # TODO CAF 1994, 2003-2010. 1994 has different coverage and might be inconsistent
+    # CHL: more data in BUR4/5
+    'CHN' :{
+        'DI2023-05-24': { #1994 (gaps), 2005 (needs downscaling), 2010, 2012, 2014
+            # (relatively complete and consistent)
+            'remove_ts': {
+                '1.A.1': { #contains data for all subsectors
+                    'category': ['1.A.1'],
+                    'entities': ['N2O'],
+                        'time': ['1994'],
+                },
+            },
+            'downscale': { # needed for 2005
+                'sectors': {
+                    '1': { # 2005
+                        'basket': '1',
+                        'basket_contents': ['1.A', '1.B'],
+                        'entities': ['CH4', 'CO2', 'N2O'],
+                        'dim': 'category (BURDI)',
+                        'skipna_evaluation_dims': None,
+                        'skipna': True,
+                    },
+                    '1.B': { # 2005
+                        'basket': '1.B',
+                        'basket_contents': ['1.B.1', '1.B.2'],
+                        'entities': ['CH4'],
+                        'dim': 'category (BURDI)',
+                        'skipna_evaluation_dims': None,
+                        'skipna': True,
+                    },
+                    '1.A': { # 2005
+                        'basket': '1.A',
+                        'basket_contents': ['1.A.1', '1.A.2', '1.A.3', '1.A.4',
+                                            '1.A.5'],
+                        'entities': ['CO2'],
+                        'dim': 'category (BURDI)',
+                        'skipna_evaluation_dims': None,
+                        'skipna': True,
+                    },
+                    # with current functionality we can't downscale 1.A further for
+                    # non-CO2 as it needs several steps and CO2 is present
+                    '2': { # 2005
+                        'basket': '2',
+                        'basket_contents': ['2.A', '2.B', '2.C'],
+                        'entities': ['CO2', 'N2O'],
+                        'dim': 'category (BURDI)',
+                        'skipna_evaluation_dims': None,
+                        'skipna': True,
+                    },
+                    '4': { # 2005
+                        'basket': '4',
+                        'basket_contents': ['4.A', '4.B', '4.C', '4.D', '4.E', '4.F'],
+                        'entities': ['CH4', 'N2O'],
+                        'dim': 'category (BURDI)',
+                        'skipna_evaluation_dims': None,
+                        'skipna': True,
+                    },
+                    '5': { # several years
+                        'basket': '5',
+                        'basket_contents': ['5.A', '5.B'],
+                        'entities': ['CO2', 'CH4', 'N2O'],
+                        'dim': 'category (BURDI)',
+                        'skipna_evaluation_dims': None,
+                        'skipna': True,
+                    },
+                    '6': { # 2005
+                        'basket': '6',
+                        'basket_contents': ['6.A', '6.B', '6.C', '6.D'],
+                        'entities': ['CO2', 'CH4', 'N2O'],
+                        'dim': 'category (BURDI)',
+                        'skipna_evaluation_dims': None,
+                        'skipna': True,
+                    },
+                },
+            },
+        }
+    },
+    'CIV' :{
+        'DI2023-05-24': { #1994 (needs some downscaling), 2000
+            'downscale': { # needed for 2005
+                'sectors': {
+                    '1.A': { # 2005
+                        'basket': '1.A',
+                        'basket_contents': ['1.A.1', '1.A.2', '1.A.3', '1.A.4'],
+                        'entities': ['CO2', 'CH4', 'N2O', 'KYOTOGHG (SARGWP100)'],
+                        'dim': 'category (BURDI)',
+                        'skipna_evaluation_dims': None,
+                        'skipna': True,
+                    },
+                },
+            },
+        },
+    },
+    # CMR: 1994, 2000, not fully consistent
+    # COD: 1994, 1999-2003, coverage not fully consistent, downscaling complicated
+    # COG: 1994, 2000, not fully consistent
+    # COK: 1994, limited coverage
+    # COL: not needed, more data in BUR3, 1990, 1994, 2000, 2004,
+    # COM: 1994, 2000
+    # CPV: more data in NC3
+    # CRI: more data in NIR
+    'CUB': { # 1990, (1992, 1994, 1996, 1998 dwn needed), 2000, 2002
+        'DI2023-05-24': {
+            # calculate LULUCF from 0 an M.0.EL
+            'subtract_cats': {
+                '5': {'parent': '24540', 'subtract': ['15163'],
+                      'name': '5.  Land-Use Change and Forestry'},
+            },
+            'downscale': { # not tested yet
+                'sectors': {
+                    '0': {
+                        'basket': '24540',
+                        'basket_contents': ['15163', '5'],
+                        'entities': ['CO', 'NMVOC', 'NOx', 'SO2'],
+                        'dim': 'category (BURDI)',
+                    },
+                    'M.0.EL': {
+                        'basket': '15163',
+                        'basket_contents': ['1', '2', '3', '4', '6'],
+                        'entities': ['CH4', 'CO2', 'N2O', 'C2F6', 'CF4', 'HFC134',
+                                     'HFC23', 'SF6', 'CO', 'NMVOC', 'NOx', 'SO2'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '1': {
+                        'basket': '1',
+                        'basket_contents': ['1.A', '1.B'],
+                        'entities': ['CH4', 'CO2', 'N2O', 'CO', 'NMVOC', 'NOx', 'SO2'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '1.A': {
+                        'basket': '1.A',
+                        'basket_contents': ['1.A.1', '1.A.2', '1.A.3', '1.A.4', '1.A.5'],
+                        'entities': ['CH4', 'CO2', 'N2O', 'CO', 'NMVOC', 'NOx', 'SO2'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '1.B': {
+                        'basket': '1.B',
+                        'basket_contents': ['1.B.1', '1.B.2'],
+                        'entities': ['CH4', 'CO2', 'CO', 'NMVOC', 'NOx', 'SO2'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '2': {
+                        'basket': '2',
+                        'basket_contents': ['2.A', '2.B', '2.C', '2.D', '2.E', '2.G'],
+                        'entities': ['CH4', 'CO2', 'N2O', 'C2F6', 'CF4', 'HFC134',
+                                     'HFC23', 'SF6', 'CO', 'NMVOC', 'NOx', 'SO2'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '4': {
+                        'basket': '4',
+                        'basket_contents': ['4.A', '4.B', '4.C', '4.D', '4.E', '4.F'],
+                        'entities': ['CH4', 'CO2', 'N2O', 'CO', 'NMVOC', 'NOx', 'SO2'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '5': {
+                        'basket': '5',
+                        'basket_contents': ['5.A', '5.B', '5.C', '5.D'],
+                        'entities': ['CH4', 'CO2', 'N2O', 'CO', 'NOx'],
+                        'dim': 'category (BURDI)',
+                    },
+                '6': {
+                        'basket': '6',
+                        'basket_contents': ['6.A', '6.B', '6.C'],
+                        'entities': ['CH4', 'CO2', 'N2O', 'CO', 'NMVOC', 'NOx', 'SO2'],
+                        'dim': 'category (BURDI)',
+                    },
+                },
+            },
+        },
+    },
+    # DJI: 1994, 2000
+    'DMA' :{
+        'DI2023-05-24': {  # 1994, 2000, (2001-2017, some dwn)
+            # LULUCF has gaps, cat 0 assumes 0 for LULUCF in these years
+            # we omit aerosols and ghg precusors as only so2 can be downscaled
+            'downscale': {
+                'sectors': {
+                    '1_CH4': {
+                        'basket': '1',
+                        'basket_contents': ['1.A', '1.B'],
+                        'entities': ['CH4', 'N2O'],
+                        'dim': 'category (BURDI)',
+                        'sel': {'time': ['1994', '2000', '2001', '2002', '2003',
+                                         '2004', '2005']},
+                    },
+                    '1_CO2': {
+                        'basket': '1',
+                        'basket_contents': ['1.A', '1.B'],
+                        'entities': ['CO2'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '1.A': {
+                        'basket': '1.A',
+                        'basket_contents': ['1.A.1', '1.A.2', '1.A.3', '1.A.4',
+                                            '1.A.5'],
+                        'entities': ['CH4', 'N2O'],
+                        'dim': 'category (BURDI)',
+                        'sel': {'time': ['1994', '2000', '2001', '2002', '2003',
+                                         '2004', '2005']},
+                    },
+                    '2': {
+                        'basket': '2',
+                        'basket_contents': ['2.A', '2.F'],
+                        'entities': ['CO2', 'HFCS (SARGWP100)'],
+                        'dim': 'category (BURDI)',
+                    },
+                    'bunkers': {
+                        'basket': '14637',
+                        'basket_contents': ['14423', '14424'],
+                        'entities': ['CO2'],
+                        'dim': 'category (BURDI)',
+                    },
+                },
+            },
+        },
+    },
+    # DOM: # 1990, 1994, 1998, 2000, 2010
+    # DZA: 1994, 2000
+    'ECU': {
+        'DI2023-05-24': { # 1990 (1994, 2000), 2010, 2012
+            #omit aerosols / GHG precursosrs in downscaling
+            'downscale': {
+                'sectors': {
+                    '1': {
+                        'basket': '1',
+                        'basket_contents': ['1.A', '1.B'],
+                        'entities': ['KYOTOGHG (SARGWP100)'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '1.A': {
+                        'basket': '1.A',
+                        'basket_contents': ['1.A.1', '1.A.2', '1.A.3', '1.A.4',
+                                            '1.A.5'],
+                        'entities': ['KYOTOGHG (SARGWP100)'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '1.B': {
+                        'basket': '1.B',
+                        'basket_contents': ['1.B.1', '1.B.2'],
+                        'entities': ['KYOTOGHG (SARGWP100)'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '2': {
+                        'basket': '2',
+                        'basket_contents': ['2.A', '2.B', '2.C', '2.D', '2.G'],
+                        'entities': ['KYOTOGHG (SARGWP100)'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '4': {
+                        'basket': '4',
+                        'basket_contents': ['4.A', '4.B', '4.C', '4.D', '4.E',
+                                            '4.F', '4.G'],
+                        'entities': ['KYOTOGHG (SARGWP100)'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '5': {
+                        'basket': '5',
+                        'basket_contents': ['5.A', '5.B', '5.C', '5.D'],
+                        'entities': ['KYOTOGHG (SARGWP100)'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '6': {
+                        'basket': '6',
+                        'basket_contents': ['6.A', '6.B', '6.D'],
+                        'entities': ['KYOTOGHG (SARGWP100)'],
+                        'dim': 'category (BURDI)',
+                    },
+                },
+                'entities': {
+                    'KYOTO': {
+                        'basket': 'KYOTOGHG (SARGWP100)',
+                        'basket_contents': ['CH4', 'CO2', 'N2O'],
+                        'sel': {'category (BURDI)':
+                                    ['15163', '24540',
+                                     '1', '1.A', '1.A.1', '1.A.2', '1.A.3', '1.A.4',
+                                     '1.A.5', '1.B',  '1.B.1',  '1.B.2',
+                                     '2', '2.A', '2.B', '2.C', '2.D', '2.G',
+                                     '4', '4.A', '4.B', '4.C', '4.D', '4.E', '4.F',
+                                     '4.G',
+                                     '5', '5.A', '5.B', '5.C', '5.D',
+                                     '6', '6.A', '6.B', '6.D', '7']}
+                    },
+                },
+            },
+        },
+    },
+    # EGY: 1990, 2000, 2005
+    # 'ERI' #1994 1995-1999 (partial coverage, KYOTOGHG and total are incomplete), 2000
+    'ETH': {
+        'DI2023-05-24': { # 1990-1993 (downscaling needed), 1994-2013
+            'downscale': {
+                # omit aerosols / ghg precursors as missing for most years
+                'sectors': { # for 1990-1994
+                    '1.A': {
+                        'basket': '1.A',
+                        'basket_contents': ['1.A.1', '1.A.2', '1.A.3', '1.A.4'],
+                        'entities': ['CO2', 'CH4', 'N2O'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '2': {
+                        'basket': '2',
+                        'basket_contents': ['2.A', '2.B', '2.C'],
+                        'entities': ['CO2'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '4': {
+                        'basket': '4',
+                        'basket_contents': ['4.A', '4.B', '4.C', '4.D', '4.E'],
+                        'entities': ['CH4', 'N2O'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '6': {
+                        'basket': '6',
+                        'basket_contents': ['6.A', '6.B', '6.C'],
+                        'entities': ['CH4', 'N2O'],
+                        'dim': 'category (BURDI)',
+                    },
+                    'bunkers': {
+                        'basket': '14637',
+                        'basket_contents': ['14424'],
+                        'entities': ['CO2', 'KYOTOGHG (SARGWP100)'],
+                        'dim': 'category (BURDI)',
+                    },
+                },
+                'entities': {
+                    'bunkers': {
+                        'basket': 'KYOTOGHG (SARGWP100)',
+                        'basket_contents': ['CH4', 'CO2', 'N2O'],
+                        'sel': {'category (BURDI)': ['14637', '14424']}
+                    },
+                },
+            },
+        },
+    },
+    # FJI: 1994, 2000
+    # FSM: 1994, 2000
+    # GAB: 1994, 2000 (more data in NIR)
+    # from here down aerosols and GHG precursors are always omitted in downscaling
+    # GEO:
+    'GEO': {
+        'DI2023-05-24': { # 1990-1997, 2000, 2000-2013 (more data in NC4)
+            'downscale': {
+                'sectors': { # for 1991-1997
+                    '1.A': {
+                        'basket': '1.A',
+                        'basket_contents': ['1.A.1', '1.A.2', '1.A.3', '1.A.4',
+                                            '1.A.5'],
+                        'entities': ['CO2', 'CH4', 'N2O'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '2': {
+                        'basket': '2',
+                        'basket_contents': ['2.A', '2.B', '2.C', '2.D', '2.E', '2.F',
+                                            '2.G'],
+                        'entities': ['CO2', 'CH4', 'N2O', 'C2F6', 'CF4', 'HFC125',
+                                     'HFC134', 'HFC134a', 'HFC32', 'SF6'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '4': {
+                        'basket': '4',
+                        'basket_contents': ['4.A', '4.B', '4.C', '4.D', '4.E', '4.F',
+                                            '4.G'],
+                        'entities': ['CH4', 'N2O'],
+                        'dim': 'category (BURDI)',
+                    },
+                    # 5 subsectors are chaotic
+                    '6': {
+                        'basket': '6',
+                        'basket_contents': ['6.A', '6.B', '6.D'],
+                        'entities': ['CH4'],
+                        'dim': 'category (BURDI)',
+                    },
+                },
+            },
+        },
+    },
+    # GHA: 1990-2006
+    # GIN: 1994, 2000
+    # GMB: 1993, 2000
+    'GNB': {
+        'DI2023-05-24': { # 1994, 2006 (dwn), 2010 coverage and subsectors inconsistent
+            'downscale': {
+                'sectors': { # for 2006
+                    '1': { # no further downscaling as inconsistent subcategories
+                        'basket': '1',
+                        'basket_contents': ['1.A', '1.B'],
+                        'entities': ['CH4', 'N2O'],
+                        'dim': 'category (BURDI)',
+                    },
+                },
+            },
+        },
+    },
+    # GED: 1994, limited coverage
+    # GTM: 1990, 1994, 2000, 2005,
+    # GUY: 1990-2004
+    # HND: 1995, 2000, 2005, 2015
+    # HTI: 1994-2000
+    'IDN': {
+        'DI2023-05-24': { # 1990-1994, 2000
+            'downscale': {
+                'sectors': { # for 1990-1993
+                    '1.B': {
+                        'basket': '1.B',
+                        'basket_contents': ['1.B.1', '1.B.2'],
+                        'entities': ['CH4', 'CO2'],
+                        'dim': 'category (BURDI)',
+                        'sel': {'time': ['1990', '1991', '1992', '1993', '1994']},
+                    },
+                    '4': {
+                        'basket': '4',
+                        'basket_contents': ['4.A', '4.B', '4.C', '4.D', '4.E', '4.F',
+                                            '4.G'],
+                        'entities': ['CH4', 'N2O'],
+                        'dim': 'category (BURDI)',
+                        'sel': {'time': ['1990', '1991', '1992', '1993', '1994']},
+                    },
+                },
+            },
+        },
+    },
+    'IND': {
+        'DI2023-05-24': { # 1994,2000, 2010, 2016. Subsectors doffer a bit especilly
+            # for 1994 and for LULUCF data
+            'downscale': {
+                'sectors': { # for 1994
+                    '1.A': {
+                        'basket': '1.A',
+                        'basket_contents': ['1.A.1', '1.A.2', '1.A.3', '1.A.4'],
+                        'entities': ['CH4'],
+                        'dim': 'category (BURDI)',
+                        'sel': {'time': ['1994', '2000']},
+                    },
+                },
+            },
+        },
+    },
+    'KEN': {
+        'DI2023-05-24': { # 1994,1995, 2000, 2005, 2010. Subsectors doffer a bit
+            # especilly for 1994
+            # 1994 data is inconsistent with 1995 and following years and has
+            # unrealisticly high N2O emissions from the energy sector
+            'remove_years': ['1994'],
+            'aggregate_cats': {
+                '1.B': {'sources': ['1.B.2'],
+                     'name': '1.B  Fugitive Emissions from Fuels'},
+            },
+        },
+    },
+    # KGZ: 1990-2010
+    # KHM: 1994, 2000 (more data in BUR1)
+    # KIR: 1994, (2004,2005 partial coverage), 2006-2008
+    # KNA: 1994
+    # KOR: 1990-2018 (more data in 2022 inventory)
+    # KWT: 1994, 2016
+    # LAO: 1990, 2000 (1990 data maybe inconsistent)
+    # LBN: 1994, 2000, 2011-2013
+    # LBR: 2000, 2014 (2000 misses some sectors, e.g. IPPU)
+    # LCA: 1994, 2000, 2005, 2010, sectors a bit inconsistent for 1994
+    # LKA: 1994, 2000. a bit inconsisten in subsectrs (all emissions in "other in
+    # 1994 for some sectors)
+    'LSO': {
+        'DI2023-05-24': { # 1994,2000, 2000 needs downscaling
+            'downscale': {
+                'sectors': { # for 2000
+                    '1': {
+                        'basket': '1',
+                        'basket_contents': ['1.A'],
+                        'entities': ['CH4', 'CO2', 'N2O'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '1.A': {
+                        'basket': '1.A',
+                        'basket_contents': ['1.A.2', '1.A.3', '1.A.4', '1.A.5'],
+                        'entities': ['CH4', 'CO2', 'N2O'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '4': {
+                        'basket': '4',
+                        'basket_contents': ['4.A', '4.B', '4.D', '4.E'],
+                        'entities': ['CH4', 'N2O'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '5': {
+                        'basket': '5',
+                        'basket_contents': ['5.A', '5.B', '5.C', '5.D'],
+                        'entities': ['CO2'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '6': {
+                        'basket': '6',
+                        'basket_contents': ['6.A', '6.B'],
+                        'entities': ['CH4', 'N2O'],
+                        'dim': 'category (BURDI)',
+                    },
+                },
+            },
+        },
+    },
+    'MAR': { # TODO IPPU subsectors chaotic (swap between other and metal prodction)
+        'DI2023-05-24': { # 1994,2000, (2000-2006,2007 needs downscaling), 2010, 2012
+            'downscale': {
+                'sectors': {
+                    '1.B': {
+                        'basket': '1.B',
+                        'basket_contents': ['1.B.1', '1.B.2'],
+                        'entities': ['KYOTOGHG (SARGWP100)'],
+                        'dim': 'category (BURDI)',
+                    },
+                    '5': {
+                        'basket': '5',
+                        'basket_contents': ['5.A', '5.B'],
+                        'entities': ['KYOTOGHG (SARGWP100)'],
+                        'dim': 'category (BURDI)',
+                        'tolerance' : 0.018, # LULUCF data inconstent in 2012
+                    },
+                },
+                'entities': {
+                    'all': {
+                        'basket': 'KYOTOGHG (SARGWP100)',
+                        'basket_contents': ['CH4', 'CO2', 'N2O'],
+                        'sel': {'category (BURDI)': [
+                            '1', '2', '4', '5', '6', '15163', '24540',
+                            '1.A', '1.A.1', '1.A.2', '1.A.3', '1.A.4',
+                            '1.B', '1.B.1', '1.B.2',
+                            '2.A', '2.C', '2.D',
+                            '4.A', '4.B', '4.C', '4.D',
+                            '5.A', '5.B',
+                            '6.A', '6.B', '6.D',
+                        ]}
+                    },
+                },
+            },
+        },
+    },
+    # MDA: 1990-2013 (more data in NIR / NC5)
+    'MDG': {
+        'DI2023-05-24': { # 1994,2000, 2005-2010 (2006-2010 needs downscaling)
+            'downscale': {
+                'sectors': {
+                    '1': {
+                        'basket': '1',
+                        'basket_contents': ['1.A'],
+                        'entities': ['CO2', 'CH4', 'N2O'],
+                        'dim': 'category (BURDI)',
+                        'sel': {'time': ['2005', '2006', '2007', '2008', '2009',
+                                         '2010']},
+                    },
+                    # further downscaling is not possible in a consistent manner with
+                    # current code (and not necessary for primap-hist). Using the
+                    # 2005 subsector information would lead to individual gas
+                    # timeseries which are inconsistent with given kyotoghg subsector
+                    # timeseries while using the kyotoghg subsector information will
+                    # not give individual gas subsector timeseries which add up to
+                    # the individual gas main sector timeseries
+                    # same for 6
+                },
+                'entities': {
+                    'kyotoghg_4': { # in general similar problem to 1.A, but most sectors have
+                        # only one gas and we need the data for PRIMAP-hist,
+                        # so we have to do it anyway
+                        'basket': 'KYOTOGHG (SARGWP100)',
+                        'basket_contents': ['CH4', 'N2O'],
+                        'sel': {
+                            'category (BURDI)': [
+                                '4.A', '4.B', '4.C', '4.D', '4.E', '4.F'],
+                            'time': [
+                                '2005', '2006', '2007', '2008', '2009','2010'],
+                        }
+                    },
+                },
+            },
+        },
+    },
 }
 }
 
 
 di_processing_info = {
 di_processing_info = {
-    # only countries with special processing listet
+    # only countries with special processing listed
     # category conversion is defined on a country group level
     # category conversion is defined on a country group level
     # the 'default' option is used if no specific option is found such that
     # the 'default' option is used if no specific option is found such that
     # processing of new versions can be done before creating a configuration for the
     # processing of new versions can be done before creating a configuration for the
     # version.
     # version.
+    'ARE': {
+        'default': di_processing_templates['ARE']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['ARE']['DI2023-05-24'],
+    },
+    'AZE': {
+        'default': di_processing_templates['AZE']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['AZE']['DI2023-05-24'],
+    },
     'BFA': {
     'BFA': {
-        'default': di_processing_templates['BFA']['DI2022-08-22'],
-        'DI2022-08-22': di_processing_templates['BFA']['DI2022-08-22'],
+        'default': di_processing_templates['BFA']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['BFA']['DI2023-05-24'],
+    },
+    'BHS': {
+        'default': di_processing_templates['BHS']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['BHS']['DI2023-05-24'],
     },
     },
     'BIH': {
     'BIH': {
-        'default': di_processing_templates['BIH']['DI2022-08-22'],
-        'DI2022-08-22': di_processing_templates['BIH']['DI2022-08-22'],
+        'default': di_processing_templates['BIH']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['BIH']['DI2023-05-24'],
+    },
+    'BRB': {
+        'default': di_processing_templates['BRB']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['BRB']['DI2023-05-24'],
+    },
+    'CHN': {
+        'default': di_processing_templates['CHN']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['CHN']['DI2023-05-24'],
+    },
+    'CIV': {
+        'default': di_processing_templates['CIV']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['CIV']['DI2023-05-24'],
+    },
+    'CUB': {
+        'default': di_processing_templates['CUB']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['CUB']['DI2023-05-24'],
+    },
+    'DMA': {
+        'default': di_processing_templates['DMA']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['DMA']['DI2023-05-24'],
+    },
+    'ECU': {
+        'default': di_processing_templates['ECU']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['ECU']['DI2023-05-24'],
+    },
+    'ETH': {
+        'default': di_processing_templates['ETH']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['ETH']['DI2023-05-24'],
+    },
+    'GEO': {
+        'default': di_processing_templates['GEO']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['GEO']['DI2023-05-24'],
+    },
+    'GNB': {
+        'default': di_processing_templates['GNB']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['GNB']['DI2023-05-24'],
+    },
+    'IDN': {
+        'default': di_processing_templates['IDN']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['IDN']['DI2023-05-24'],
+    },
+    'IND': {
+        'default': di_processing_templates['IND']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['IND']['DI2023-05-24'],
+    },
+    'KEN': {
+        'default': di_processing_templates['KEN']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['KEN']['DI2023-05-24'],
+    },
+    'LSO': {
+        'default': di_processing_templates['LSO']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['LSO']['DI2023-05-24'],
+    },
+    'MAR': {
+        'default': di_processing_templates['MAR']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['MAR']['DI2023-05-24'],
+    },
+    'MDG': {
+        'default': di_processing_templates['MDG']['DI2023-05-24'],
+        'DI2023-05-24': di_processing_templates['MDG']['DI2023-05-24'],
     },
     },
 }
 }
 
 

+ 5 - 955
UNFCCC_GHG_data/UNFCCC_DI_reader/UNFCCC_DI_reader_core.py

@@ -1,38 +1,24 @@
 import primap2 as pm2
 import primap2 as pm2
 import unfccc_di_api
 import unfccc_di_api
 import pandas as pd
 import pandas as pd
-import numpy as np
 import pycountry
 import pycountry
 import itertools
 import itertools
-import json
 import copy
 import copy
 import xarray as xr
 import xarray as xr
-import datalad.api
-import re
-from datalad.support.exceptions import IncompleteResultsError
+
 from datetime import date
 from datetime import date
-from typing import Optional, Dict, List, Union
-from pathlib import Path
+from typing import Optional, Dict
 from copy import deepcopy
 from copy import deepcopy
-from dask.base import tokenize
-
-from UNFCCC_GHG_data.UNFCCC_CRF_reader.UNFCCC_CRF_reader_core import find_latest_date
 
 
 from .UNFCCC_DI_reader_config import di_to_pm2if_template_nai
 from .UNFCCC_DI_reader_config import di_to_pm2if_template_nai
 from .UNFCCC_DI_reader_config import di_to_pm2if_template_ai
 from .UNFCCC_DI_reader_config import di_to_pm2if_template_ai
 from .UNFCCC_DI_reader_config import di_query_filters
 from .UNFCCC_DI_reader_config import di_query_filters
-from .UNFCCC_DI_reader_config import di_processing_info
-from .UNFCCC_DI_reader_config import cat_conversion
-from .UNFCCC_DI_reader_config import gas_baskets
+
 from .UNFCCC_DI_reader_config import cat_code_regexp
 from .UNFCCC_DI_reader_config import cat_code_regexp
 from .util import NoDIDataError, nAI_countries, AI_countries
 from .util import NoDIDataError, nAI_countries, AI_countries
-from .util import DI_date_format, regex_date
+from .util import DI_date_format
 
 
-from UNFCCC_GHG_data.helper import custom_country_mapping
-from UNFCCC_GHG_data.helper import get_country_code, get_country_name
-from UNFCCC_GHG_data.helper import extracted_data_path_UNFCCC, root_path, code_path
-from UNFCCC_GHG_data.helper import dataset_path_UNFCCC
-from UNFCCC_GHG_data.helper import convert_categories
+from .UNFCCC_DI_reader_io import save_DI_dataset, save_DI_country_data
 
 
 
 
 def read_UNFCCC_DI_for_country(
 def read_UNFCCC_DI_for_country(
@@ -84,292 +70,6 @@ def read_UNFCCC_DI_for_country(
     return data_pm2
     return data_pm2
 
 
 
 
-def process_and_save_UNFCCC_DI_for_country(
-        country_code: str,
-        date_str: Union[str, None]=None,
-) -> xr.Dataset:
-    '''
-    process data and save them to disk using default parameters
-    '''
-
-    # get latest dataset if no date given
-    if date_str is None:
-        # get the latest date
-        raw_data_file = find_latest_DI_data(country_code, raw=True)
-    else:
-        raw_data_file = determine_filename(country_code, date_str, raw=True,
-                                           hash=False)
-
-        raw_data_file = raw_data_file.parent / (raw_data_file.name + '.nc')
-        print(f"process {raw_data_file.name}")
-        if not raw_data_file.exists():
-            raise ValueError(f"File {raw_data_file.name} does not exist. Check if it "
-                             "has been read.")
-
-    # load the data
-    data_to_process = pm2.open_dataset(raw_data_file)
-
-    # get parameters
-    countries = list(data_to_process.coords[data_to_process.attrs['area']].values)
-    if len(countries) > 1:
-        raise ValueError(
-            f"Found {len(countries)} countries. Only single country data "
-            f"can be processed by this function. countries: {countries}")
-    else:
-        country_code = countries[0]
-    processing_info_country = di_processing_info[country_code]
-    entities_to_ignore = [] # TODO: check and make default list
-
-    # process
-    data_processed = process_UNFCCC_DI_for_country(
-        data_country=data_to_process,
-        entities_to_ignore=entities_to_ignore,
-        gas_baskets=gas_baskets,
-        cat_conversion=cat_conversion,
-        sectors=None,
-        processing_info_country=processing_info_country,
-    )
-
-    # save
-    save_DI_country_data(data_processed, raw=False)
-
-    return data_processed
-
-
-def process_UNFCCC_DI_for_country(
-        data_country: xr.Dataset,
-        entities_to_ignore: List[str],
-        gas_baskets: Dict[str, List[str]],
-        cat_conversion: Dict[str, Dict] = None,
-        sectors: List[str] = None,
-        processing_info_country: Dict = None,
-) -> xr.Dataset:
-    """
-        Process data from DI interface (where necessary).
-        * Downscaling including subtraction of time series
-        * country specific sector aggregation
-        * Conversion to IPCC2006 categories
-        * general sector and gas basket aggregation (in new categories)
-    """
-    #### 0: gather information
-    countries = list(data_country.coords[data_country.attrs['area']].values)
-    if len(countries) > 1:
-        raise ValueError(
-            f"Found {len(countries)} countries. Only single country data "
-            f"can be processed by this function. countries: {countries}")
-    else:
-        country_code = countries[0]
-
-    cat_col = data_country.attrs['cat']
-    temp = re.findall(r'\((.*)\)', cat_col)
-    cat_terminology_in = temp[0]
-
-    #### 1: general processing
-    # remove unused cats
-    data_country = data_country.dropna(f'category ({cat_terminology_in})', how='all')
-    # remove unused years
-    data_country = data_country.dropna(f'time', how='all')
-    # remove variables only containing nan
-    nan_vars_country = [var for var in data_country.data_vars if
-                        data_country[var].isnull().all().data == True]
-    data_country = data_country.drop_vars(nan_vars_country)
-
-    # remove unnecessary variables
-    entities_ignore_present = [entity for entity in entities_to_ignore if
-                               entity in data_country.data_vars]
-    data_country = data_country.drop_vars(entities_ignore_present)
-
-    #### 2: country specific processing
-
-
-    if processing_info_country is not None:
-        # get scenario
-        scenarios = list(data_country.coords[data_country.attrs['scen']].values)
-        if len(scenarios) > 1:
-            raise ValueError(
-                f"Found {len(scenarios)} scenarios. Only single scenario data "
-                f"can be processed by this function. Scenarios: {scenarios}")
-        else:
-            scenario = scenarios[0]
-            if scenario in processing_info_country.keys():
-                processing_info_country_scen = processing_info_country[scenario]
-            else:
-                processing_info_country_scen = processing_info_country['default']
-
-
-            if 'tolerance' in processing_info_country_scen:
-                tolerance = processing_info_country_scen["tolerance"]
-            else:
-                tolerance = 0.01
-
-            # take only desired years
-            if 'years' in processing_info_country_scen:
-                data_country = data_country.pr.loc[
-                    {'time': processing_info_country_scen['years']}]
-
-            # remove timeseries if desired
-            if 'remove_ts' in processing_info_country_scen:
-                for case in processing_info_country_scen['remove_ts']:
-                    remove_info = processing_info_country_scen['remove_ts'][case]
-                    entities = remove_info.pop("entities")
-                    for entity in entities:
-                        data_country[entity].pr.loc[remove_info] = \
-                            data_country[entity].pr.loc[remove_info] * np.nan
-
-            # remove all data for given years if necessary
-            if 'remove_years' in processing_info_country_scen:
-                data_country.pr.loc[{'time': processing_info_country_scen[
-                    'remove_years']}] = \
-                    data_country.pr.loc[{'time': processing_info_country_scen[
-                        'remove_years']}] * np.nan
-
-            # subtract categories
-            if 'subtract_cats' in processing_info_country_scen:
-                subtract_cats_current = processing_info_country_scen['subtract_cats']
-                if 'entities' in subtract_cats_current.keys():
-                    entities_current = subtract_cats_current['entities']
-                else:
-                    entities_current = list(data_country.data_vars)
-                print(f"Subtracting categories for country {country_code}, entities "
-                      f"{entities_current}")
-                for cat_to_generate in subtract_cats_current:
-                    cats_to_subtract = \
-                        subtract_cats_current[cat_to_generate]['subtract']
-                    data_sub = \
-                        data_country.pr.loc[{'category': cats_to_subtract}].pr.sum(
-                        dim='category', skipna=True, min_count=1)
-                    data_parent = data_country.pr.loc[
-                        {'category': subtract_cats_current[cat_to_generate]['parent']}]
-                    data_agg = data_parent - data_sub
-                    nan_vars = [var for var in data_agg.data_vars if
-                                data_agg[var].isnull().all().data == True]
-                    data_agg = data_agg.drop(nan_vars)
-                    if len(data_agg.data_vars) > 0:
-                        print(f"Generating {cat_to_generate} through subtraction")
-                        data_agg = data_agg.expand_dims([f'category ('
-                                                         f'{cat_terminology_in})'])
-                        data_agg = data_agg.assign_coords(
-                            coords={f'category ({cat_terminology_in})':
-                                        (f'category ({cat_terminology_in})',
-                                         [cat_to_generate])})
-                        data_country = data_country.pr.merge(data_agg,
-                                                             tolerance=tolerance)
-                    else:
-                        print(f"no data to generate category {cat_to_generate}")
-
-            # downscaling
-            if 'downscale' in processing_info_country_scen:
-                if 'sectors' in processing_info_country_scen['downscale']:
-                    sector_downscaling = \
-                        processing_info_country_scen['downscale']['sectors']
-                    for case in sector_downscaling.keys():
-                        print(f"Downscaling for {case}.")
-                        sector_downscaling_current = sector_downscaling[case]
-                        entities = sector_downscaling_current.pop('entities')
-                        for entity in entities:
-                            data_country[entity] = data_country[
-                                entity].pr.downscale_timeseries(
-                                **sector_downscaling_current)
-                            # , skipna_evaluation_dims=None)
-
-                if 'entities' in processing_info_country_scen['downscale']:
-                    entity_downscaling = \
-                        processing_info_country_scen['downscale']['entities']
-                    for case in entity_downscaling.keys():
-                        #print(case)
-                        print(data_country.coords[f'category ('
-                                                  f'{cat_terminology_in})'].values)
-                        data_country = data_country.pr.downscale_gas_timeseries(
-                            **entity_downscaling[case], skipna=True,
-                            skipna_evaluation_dims=None)
-
-            # aggregate categories
-            if 'aggregate_cats' in processing_info_country_scen:
-                aggregate_cats_current = processing_info_country_scen['aggregate_cats']
-                print(
-                    f"Aggregating categories for country {country_code}")
-                for cat_to_agg in aggregate_cats_current:
-                    print(f"Category: {cat_to_agg}")
-                    source_cats = aggregate_cats_current[cat_to_agg]['sources']
-                    data_agg = data_country.pr.loc[{'category': source_cats}].pr.sum(
-                        dim='category', skipna=True, min_count=1)
-                    nan_vars = [var for var in data_agg.data_vars if
-                                data_agg[var].isnull().all().data == True]
-                    data_agg = data_agg.drop(nan_vars)
-                    if len(data_agg.data_vars) > 0:
-                        data_agg = data_agg.expand_dims([f'category ('
-                                                         f'{cat_terminology_in})'])
-                        data_agg = data_agg.assign_coords(
-                            coords={f'category ({cat_terminology_in})':
-                                        (f'category ({cat_terminology_in})',
-                                         [cat_to_agg])})
-                        data_country = data_country.pr.merge(data_agg,
-                                                             tolerance=tolerance)
-                    else:
-                        print(f"no data to aggregate category {cat_to_agg}")
-
-            # aggregate gases if desired
-            if 'aggregate_gases' in processing_info_country_scen:
-                for case in processing_info_country_scen['aggregate_gases'].keys():
-                    case_info = processing_info_country_scen['aggregate_gases'][case]
-                    data_country[case_info['basket']] = \
-                        data_country.pr.fill_na_gas_basket_from_contents(
-                            **case_info)
-
-    #### 3: map categories
-    if country_code in nAI_countries:
-        # conversion from BURDI to IPCC2006_PRIMAP needed
-        cat_terminology_out = 'IPCC2006_PRIMAP'
-        data_country = convert_categories(
-            data_country,
-            cat_conversion[f"{cat_terminology_in}_to_{cat_terminology_out}"],
-            cat_terminology_out,
-            debug=False,
-            tolerance=0.01,
-        )
-    else:
-        cat_terminology_out = cat_terminology_in
-
-    # more general processing
-    # reduce categories to output cats
-    if sectors is not None:
-        cats_to_keep = [cat for cat in
-                        data_country.coords[f'category ({cat_terminology_out})'].values if
-                        cat in sectors]
-        data_country = data_country.pr.loc[{'category': cats_to_keep}]
-
-    # create gas baskets
-    entities_present = set(data_country.data_vars)
-    for basket in gas_baskets.keys():
-        basket_contents_present = [gas for gas in gas_baskets[basket] if
-                                   gas in entities_present]
-        if len(basket_contents_present) > 0:
-            if basket in list(data_country.data_vars):
-                data_country[basket] = data_country.pr.fill_na_gas_basket_from_contents(
-                    basket=basket, basket_contents=basket_contents_present, min_count=1)
-            else:
-                try:
-                    data_country[basket] = xr.full_like(data_country["CO2"],
-                                                        np.nan).pr.quantify(
-                        units="Gg CO2 / year")
-                    data_country[basket].attrs = {"entity": basket.split(' ')[0],
-                                                  "gwp_context": basket.split(' ')[1][
-                                                                 1:-1]}
-                    data_country[basket] = data_country.pr.gas_basket_contents_sum(
-                        basket=basket, basket_contents=basket_contents_present,
-                        min_count=1)
-                except:
-                    print(f"No gas basket created for {country_code}")
-
-    # amend title and comment
-    data_country.attrs["comment"] = data_country.attrs["comment"] + f" Processed on " \
-                                                                    f"{date.today()}"
-    data_country.attrs["title"] = data_country.attrs["title"] + f" Processed on " \
-                                                                    f"{date.today()}"
-
-    return data_country
-
-
 def read_UNFCCC_DI_for_country_df(
 def read_UNFCCC_DI_for_country_df(
         country_code: str,
         country_code: str,
         category_groups: Optional[Dict]=None,
         category_groups: Optional[Dict]=None,
@@ -683,152 +383,6 @@ def convert_DI_IF_data_to_pm2(
     return data_pm2
     return data_pm2
 
 
 
 
-def save_DI_country_data(
-        data_pm2: xr.Dataset,
-        raw: bool=True,
-):
-    '''
-    save primap2 and IF data to country folder
-    can be used for raw and processed data but for a single country only
-    '''
-
-    # preparations
-    data_if = data_pm2.pr.to_interchange_format()
-
-    ## get country
-    countries = data_if[data_pm2.attrs['area']].unique()
-    if len(countries) > 1:
-        raise ValueError(f"More than one country in input data. This function can only"
-                         f"handle single country data. Countries: {countries}")
-    else:
-        country_code = countries[0]
-
-    ## get timestamp
-    scenario_col = data_pm2.attrs['scen']
-    scenarios = data_if[scenario_col].unique()
-    if len(scenarios) > 1:
-        raise ValueError(f"More than one scenario in input data. This function can only"
-                         f"handle single scenario data. Scenarios: {scenarios}")
-    else:
-        scenario = scenarios[0]
-
-    date_str = scenario[2:]
-
-    # calculate the hash of the data to see if it's identical to present data
-    data_for_token = data_if.drop(columns=[scenario_col])
-    token = tokenize(data_for_token)
-
-    # get the filename with the hash and check if it exists (separate for pm2 format
-    # and IF to fix broken datasets if necessary)
-    filename_hash = root_path / determine_filename(country_code, token, raw, hash=True)
-
-    # primap2 native format
-    filename_hash_nc = filename_hash.parent / (filename_hash.name + '.nc')
-    if not filename_hash_nc.exists():
-        # if parent dir does not exist create it
-        if not filename_hash.parent.exists():
-            filename_hash.parent.mkdir()
-        # save the data
-        print(f"Data has changed. Save to {filename_hash_nc.name}")
-        compression = dict(zlib=True, complevel=9)
-        encoding = {var: compression for var in data_pm2.data_vars}
-        data_pm2.pr.to_netcdf(filename_hash_nc, encoding=encoding)
-
-    # primap2 IF
-    filename_hash_csv = filename_hash.parent / (filename_hash.name + '.csv')
-    if not filename_hash_csv.exists():
-        # save the data
-        print(f"Data has changed. Save to {filename_hash.name + '.csv/.yaml'}")
-        pm2.pm2io.write_interchange_format(filename_hash, data_if)
-    else:
-        print(f"Data unchanged for {country_code}. Create symlinks.")
-
-    # get the filename with the date
-    filename_date = root_path / determine_filename(country_code, date_str, raw)
-
-    # create the symlinks to the actual data (with the hash)
-    suffixes = ['.nc', '.csv', '.yaml']
-    for suffix in suffixes:
-        file_date = filename_date.parent / (filename_date.name + suffix)
-        file_hash = filename_hash.name + suffix
-        if file_date.exists():
-            file_date.unlink()
-        file_date.symlink_to(file_hash)
-
-
-def save_DI_dataset(
-        data_pm2: xr.Dataset,
-        raw: bool=True,
-        annexI: bool=False,
-):
-    '''
-    save primap2 and IF data to dataset folder
-    can be used for raw and processed data but not to save to country folders
-    '''
-
-    # preparations
-    data_if = data_pm2.pr.to_interchange_format()
-    if annexI:
-        country_group = "AnnexI"
-    else:
-        country_group = "non-AnnexI"
-
-
-    ## get timestamp
-    scenario_col = data_pm2.attrs['scen']
-    scenarios = data_if[scenario_col].unique()
-    if len(scenarios) > 1:
-        raise ValueError(f"More than one scenario in input data. This function can only"
-                         f"handle single scenario data. Scenarios: {scenarios}")
-    else:
-        scenario = scenarios[0]
-
-    date_str = scenario[2:]
-
-    # calculate the hash of the data to see if it's identical to present data
-    data_for_token = data_if.drop(columns=[scenario_col])
-    token = tokenize(data_for_token)
-
-    # get the filename with the hash and check if it exists (separate for pm2 format
-    # and IF to fix broken datasets if necessary)
-    filename_hash = root_path / determine_dataset_filename(token, raw, annexI=annexI,
-                                               hash=True)
-    # primap2 native format
-    filename_hash_nc = filename_hash.parent / (filename_hash.name + '.nc')
-    if not filename_hash_nc.exists():
-        # if parent dir does not exist create it
-        # TODO double, also in determine_dataset_filename. same for country data
-        if not filename_hash.parent.exists():
-            filename_hash.parent.mkdir()
-        # save the data
-        print(f"Data has changed. Save to {filename_hash_nc.name}")
-        compression = dict(zlib=True, complevel=9)
-        encoding = {var: compression for var in data_pm2.data_vars}
-        data_pm2.pr.to_netcdf(filename_hash_nc, encoding=encoding)
-
-    # primap2 IF
-    filename_hash_csv = filename_hash.parent / (filename_hash.name + '.csv')
-    if not filename_hash_csv.exists():
-        # save the data
-        print(f"Data has changed. Save to {filename_hash.name + '.csv/.yaml'}")
-        pm2.pm2io.write_interchange_format(filename_hash, data_if)
-    else:
-        print(f"Data unchanged for {country_group}. Create symlinks.")
-
-    # get the filename with the date
-    filename_date = root_path / determine_dataset_filename(date_str, raw=raw,
-                                               annexI=annexI, hash=False)
-
-    # create the symlinks to the actual data (with the hash)
-    suffixes = ['.nc', '.csv', '.yaml']
-    for suffix in suffixes:
-        file_date = filename_date.parent / (filename_date.name + suffix)
-        file_hash = filename_hash.name + suffix
-        if file_date.exists():
-            file_date.unlink()
-        file_date.symlink_to(file_hash)
-
-
 ## functions for multiple country reading
 ## functions for multiple country reading
 def read_UNFCCC_DI_for_country_group(
 def read_UNFCCC_DI_for_country_group(
         annexI: bool=False,
         annexI: bool=False,
@@ -901,507 +455,3 @@ def read_UNFCCC_DI_for_country_group(
     return data_all
     return data_all
 
 
 
 
-def process_UNFCCC_DI_for_country_group(
-        annexI: bool=False,
-) -> xr.Dataset:
-    '''
-    This function processes DI data for all countries in a group (annexI or non-AnnexI)
-    TODO: currently only non-annexI is implemented
-    The function processes all data in one go using datalad run. as the output data file
-    names are unknown beforehand datalad run uses explicit=false
-
-    TODO: use the latest
-
-
-    '''
-
-    today = date.today()
-    date_str = today.strftime(DI_date_format)
-
-    if annexI:
-        raise ValueError("Bulk reading for AnnexI countries not implemented yet")
-    else:
-        countries = nAI_countries
-
-    # read the data
-    data_all = None
-    for country in countries[0:5]:
-        print(f"reading DI data for country {country}")
-
-        try:
-            data_country = read_UNFCCC_DI_for_country(
-                country_code=country,
-                category_groups=None,  # read all categories
-                read_subsectors=False,  # not applicable as we read all categories
-                date_str=date_str,
-                pm2if_specifications=None,
-                # automatically use the right specs for AI and NAI
-                default_gwp=None,  # automatically uses right default GWP for AI and NAI
-                debug=False)
-
-            if data_all is None:
-                data_all = data_country
-            else:
-                data_all = data_all.pr.merge(data_country)
-        except unfccc_di_api.NoDataError as err:
-            print(f"No data for {country}.")
-            print(err)
-
-    # TODO: write metadata
-
-    # save the data
-    save_DI_dataset(data_all, raw=True, annexI=annexI)
-
-    return data_all
-
-# TODO: add interface functions and script for read all data
-# add process all sfunctios and scripts
-# merge into main
-# rund reading procedure
-# config for all DI data
-# re-run crf etc
-
-
-## datalad and pydoit interface functions
-def read_DI_for_country_datalad(
-        country: str,
-) -> None:
-    """
-    Wrapper around read_UNFCCC_DI_for_country which takes care of selecting input
-    and output files and using datalad run to trigger the data reading
-
-    Parameters
-    __________
-
-    country: str
-        country name or ISO 3-letter country code
-
-    """
-
-    # get date to determine output filename
-    today = date.today()
-    date_str = today.strftime(DI_date_format)
-
-    # get all the info for the country
-    country_info = get_input_and_output_files_for_country_DI(country, date_str,
-                                                             raw=True, verbose=True)
-
-    print(f"Attempting to read DI data for {country_info['name']}.")
-    print("#"*80)
-    print("")
-    print(f"Using the UNFCCC_DI_reader")
-    print("")
-    print(f"Run the script using datalad run via the python api")
-    script = code_path / "UNFCCC_DI_reader" / "read_UNFCCC_DI_for_country.py"
-    script = script.relative_to(root_path)
-
-    cmd = f"./venv/bin/python3 {script.as_posix()} --country={country_info['code']} " \
-          f"--date={date_str}"
-    try:
-        datalad.api.run(
-            cmd=cmd,
-            dataset=root_path,
-            message=f"Read DI data for {country_info['name']}.",
-            inputs=country_info["input"],
-            outputs=country_info["output"],
-            dry_run=None,
-            explicit=False,
-        )
-    except IncompleteResultsError as IRE:
-        print(f"IncompleteResultsError occured when running {cmd}: {IRE}")
-    except Exception as ex:
-        print(f"Exception occurred when running {cmd}")
-        print(ex.message)
-
-
-def process_DI_for_country_datalad(
-        country: str,
-        date_str: Union[str, None],
-) -> None:
-    """
-    Wrapper around process_UNFCCC_DI_for_country which takes care of selecting input
-    and output files and using datalad run to trigger the data processing
-
-    Parameters
-    __________
-
-    country: str
-        country name or ISO 3-letter country code
-    date_str: str
-        Date of the data to be processed in the format %Y-%m-%d (e.g. 2023-01-30). If
-        no date is given the last data read will be processed.
-    """
-
-    # get all the info for the country
-    country_info = get_input_and_output_files_for_country_DI(country, date_str,
-                                                             raw=True, verbose=True)
-
-    print(f"Attempting to process DI data for {country_info['name']}.")
-    print("#"*80)
-    print("")
-    print(f"Using the UNFCCC_DI_reader")
-    print("")
-    print(f"Run the script using datalad run via the python api")
-    script = code_path / "UNFCCC_DI_reader" / "process_UNFCCC_DI_for_country.py"
-    script = script.relative_to(root_path)
-
-    cmd = f"./venv/bin/python3 {script.as_posix()} --country={country_info['code']} " \
-          f"--date={date_str}"
-    try:
-        datalad.api.run(
-            cmd=cmd,
-            dataset=root_path,
-            message=f"Read DI data for {country_info['name']}.",
-            inputs=country_info["input"],
-            outputs=country_info["output"],
-            dry_run=None,
-            explicit=False,
-        )
-    except IncompleteResultsError as IRE:
-        print(f"IncompleteResultsError occurred when running {cmd}: {IRE}")
-    except Exception as ex:
-        print(f"Exception occurred when running {cmd}")
-        print(ex.message)
-
-
-def read_DI_for_country_group_datalad(
-        annexI: bool=False,
-) -> None:
-    """
-    Wrapper around read_UNFCCC_DI_for_country_group which takes care of selecting input
-    and output files and using datalad run to trigger the data processing
-
-    Parameters
-    __________
-
-    country: str
-        country name or ISO 3-letter country code
-    date_str: str
-        Date of the data to be processed in the format %Y-%m-%d (e.g. 2023-01-30). If
-        no date is given the last data read will be processed.
-    """
-
-    if annexI:
-        country_group = "AnnexI"
-    else:
-        country_group = "non-AnnexI"
-
-    print(f"Attempting to read DI data for {country_group}.")
-    print("#"*80)
-    print("")
-    print(f"Using the UNFCCC_DI_reader")
-    print("")
-    print(f"Run the script using datalad run via the python api")
-    script = code_path / "UNFCCC_DI_reader" / "read_UNFCCC_DI_for_country_group.py"
-    script = script.relative_to(root_path)
-
-    cmd = f"./venv/bin/python3 {script.as_posix()} "
-    if annexI:
-        cmd = cmd + f" --annexI"
-
-    try:
-        datalad.api.run(
-            cmd=cmd,
-            dataset=root_path,
-            message=f"Read DI data for {country_group}.",
-            inputs=[],
-            outputs=[],
-            dry_run=None,
-            explicit=False,
-        )
-    except IncompleteResultsError as IRE:
-        print(f"IncompleteResultsError occurred when running {cmd}: {IRE}")
-    except Exception as ex:
-        print(f"Exception occurred when running {cmd}")
-        print(ex.message)
-
-
-## helper functions
-def determine_filename(
-        country_code: str,
-        date_or_hash: str,
-        raw: bool=False,
-        hash: bool=False,
-) -> Path:
-    """
-    Determine the filename for a dataset from given country code and date string.
-
-
-    Parameters
-    ----------
-    country_code: str
-        ISO 3 letter code of the country
-    date_or_hash:
-        formatted date string
-    raw: bool
-        bool specifying if filename fow raw or processed data should be returned
-    hash: str
-
-    Returns
-    _______
-        pathlib Path object for the file name (without suffix)
-
-    """
-
-    # get the country folder
-    with open(extracted_data_path_UNFCCC / "folder_mapping.json", "r") as mapping_file:
-        folder_mapping = json.load(mapping_file)
-
-    if country_code in folder_mapping:
-        file_filter = {}
-        file_filter["party"] = country_code
-        country_folders = folder_mapping[country_code]
-        if isinstance(country_folders, str):
-            # only one folder
-            country_folder = extracted_data_path_UNFCCC / country_folders
-        else:
-            raise ValueError("More than one output folder for country "
-                             f"{country_code}. This should not happen.")
-    else:
-        # folder not in mapping. It will be created if not present yet
-        country_name = get_country_name(country_code)
-        country_folder = extracted_data_path_UNFCCC / country_name.replace(" ", "_")
-
-        if country_folder.exists():
-           print(f"Output folder {country_name.replace(' ', '_')} for country "
-                 f"{country_code} exists but is not in folder mapping. Update "
-                 "folder mapping")
-        else:
-            country_folder.mkdir()
-
-    filename = f"{country_code}_DI_{date_or_hash}"
-    if raw:
-        filename = f"{filename}_raw"
-    if hash:
-        filename = f"{filename}_hash"
-    filename = country_folder / filename
-
-    return filename.relative_to(root_path)
-
-
-def determine_dataset_filename(
-        date_or_hash: str,
-        raw: bool=False,
-        annexI: bool=False,
-        hash: bool = False,
-) -> Path:
-    """
-    Determine the filename for a dataset from given country group and date string.
-
-    Parameters
-    ----------
-    date_or_hash:
-        formatted date string
-    raw: bool
-        bool specifying if filename fow raw or processed data should be returned
-    annexI: bool, default False
-        True if AnnexI data, False if non-AnnexI data
-    hash: str
-
-    Returns
-    _______
-        pathlib Path object for the file name (without suffix)
-    """
-
-    # get the country folder
-    if annexI:
-        current_dataset_path = dataset_path_UNFCCC / "DI_AnnexI"
-        filename = f"DI_AnnexI_{date_or_hash}"
-    else:
-        current_dataset_path = dataset_path_UNFCCC / "DI_non_AnnexI"
-        filename = f"DI_non_AnnexI_{date_or_hash}"
-
-    if not current_dataset_path.exists():
-        current_dataset_path.mkdir()
-
-    if raw:
-        filename = f"{filename}_raw"
-    if hash:
-        filename = f"{filename}_hash"
-    filename = current_dataset_path / filename
-
-    return filename.relative_to(root_path)
-
-
-def get_input_and_output_files_for_country_DI(
-        country: str,
-        date_str: str,
-        raw: bool,
-        verbose: Optional[bool]=True,
-) -> Dict[str, Union[List, str]]:
-    """
-    Get input and output files for a given country
-    """
-
-    country_info = {}
-
-    if country in custom_country_mapping:
-        country_code = country
-    else:
-        country_code = get_country_code(country)
-    # now get the country name
-    country_name = get_country_name(country_code)
-    country_info["code"] = country_code
-    country_info["name"] = country_name
-    # now get the country name
-    country_name = get_country_name(country_code)
-    country_info["code"] = country_code
-    country_info["name"] = country_name
-
-    # determine latest data
-    print(f"Determining output files for {country_name}")
-
-    # get input files (only for processing)
-    if raw:
-        input_files = []
-    else:
-        # get latest dataset if no date given
-        if date_str is None:
-            # get the latest date
-            input_file = [find_latest_DI_data(country_code, raw=True)]
-        else:
-            input_file = [determine_filename(country_code, date_str, raw=False,
-                                               hash=False)]
-            if input_file[0].is_symlink():
-                # also get the file with the actual data
-                input_file.append(input_file[0].readlink())
-            else:
-                # DI processing input files wit date labels should always be symlinks
-                # to the files with hashes holding the actual data.
-                raise(ValueError, f"Input file {input_file[0].name} is not a symlink "
-                                  f" or not existent. Check if the data you want to "
-                                  f"process exists and if your repository is ")
-
-        input_files = [f"{input_file.as_posix()}.{suffix}" for
-                        suffix in ['yaml', 'csv', 'nc']]
-
-        if verbose:
-            print(f"The following files are considered as input_files:")
-            for file in input_files:
-                print(file)
-            print("")
-
-    # get output files
-    output_file = determine_filename(country_code, date_str, raw=raw)
-    output_files = [f"{output_file.as_posix()}.{suffix}" for
-                    suffix in ['yaml', 'csv', 'nc']]
-
-    if verbose:
-        print(f"The following files are considered as output_files:")
-        for file in output_files:
-            print(file)
-        print("")
-
-    # add to country info
-    country_info["input"] = input_files
-    country_info["output"] = [] #output_files # not used because we don't know the
-    # hash in advance
-
-    return country_info
-
-
-def get_present_hashes_for_country_DI(
-        country_code: str,
-        raw: bool,
-) -> List:
-    '''
-    Get the hashes of outputs
-    '''
-
-    regex_hash = r"_([a-f0-9]*)_"
-    if raw:
-        regex_hash = regex_hash + "raw_hash\.nc"
-    else:
-        regex_hash = regex_hash + "hash\.nc"
-
-    # get the country folder
-    with open(extracted_data_path_UNFCCC / "folder_mapping.json", "r") as mapping_file:
-        folder_mapping = json.load(mapping_file)
-
-    if country_code in folder_mapping:
-        file_filter = {}
-        file_filter["party"] = country_code
-        country_folders = folder_mapping[country_code]
-        if isinstance(country_folders, str):
-            # only one folder
-            country_folder = extracted_data_path_UNFCCC / country_folders
-        else:
-            raise ValueError("More than one output folder for country "
-                             f"{country_code}. This should not happen.")
-
-        files_list = list(country_folder.glob("*_hash.nc"))
-        # filter according to raw flag
-        if raw:
-            files_list = [file.name for file in files_list if
-                          re.search(r'_raw_hash', file.name)]
-        else:
-            files_list = [file.name for file in files_list if
-                          not re.search(r'_raw_hash', file.name)]
-
-        hash_list = [re.findall(regex_hash, file)[0] for file in files_list]
-        return hash_list
-
-    else:
-        # folder not in mapping.
-        return []
-
-
-def find_latest_DI_data(
-        country_code: str,
-        raw: bool=True,
-)->Union[Path, None]:
-    '''
-    Find the path to the nc file with the latest DI data for a given country
-    '''
-
-    if raw:
-        regex = regex_date + r"_raw\.nc"
-    else:
-        regex = regex_date + r"\.nc"
-
-    # get the country folder
-    with open(extracted_data_path_UNFCCC / "folder_mapping.json", "r") as mapping_file:
-        folder_mapping = json.load(mapping_file)
-
-    if country_code in folder_mapping:
-        file_filter = {}
-        file_filter["party"] = country_code
-        country_folders = folder_mapping[country_code]
-        if isinstance(country_folders, str):
-            # only one folder
-            country_folder = extracted_data_path_UNFCCC / country_folders
-        else:
-            raise ValueError("More than one output folder for country "
-                             f"{country_code}. This should not happen.")
-
-        files_path_list = list(country_folder.glob("*.nc"))
-        # remove files with hash
-        files_list = [file.name for file in files_path_list
-                      if not re.search(r'_hash\.nc', file.name)]
-        # filter according to raw flag
-        if raw:
-            files_list = [file for file in files_list if
-                          re.search(r'_raw\.nc', file)]
-        else:
-            files_list = [file for file in files_list if
-                          not re.search(r'_raw\.nc', file)]
-
-        if len(files_list) > 0:
-            date_list = [re.findall(regex, file)[0] for file in files_list]
-            latest_date = find_latest_date(date_list, '%Y-%m-%d')
-            latest_file = [file for file in files_path_list if re.search(latest_date,
-                                                                         file.name)][0]
-            return latest_file
-        else:
-            return None
-
-    else:
-        # folder not in mapping.
-        return None
-
-# TODO
-
-# functions
-
-# def compare_with_existing
-# def

+ 161 - 0
UNFCCC_GHG_data/UNFCCC_DI_reader/UNFCCC_DI_reader_datalad.py

@@ -0,0 +1,161 @@
+from datetime import date
+from typing import Union
+import datalad.api
+from datalad.support.exceptions import IncompleteResultsError
+from UNFCCC_GHG_data.helper import root_path, code_path
+
+from .UNFCCC_DI_reader_helper import get_input_and_output_files_for_country_DI
+from .util import DI_date_format
+
+## datalad and pydoit interface functions
+def read_DI_for_country_datalad(
+        country: str,
+) -> None:
+    """
+    Wrapper around read_UNFCCC_DI_for_country which takes care of selecting input
+    and output files and using datalad run to trigger the data reading
+
+    Parameters
+    __________
+
+    country: str
+        country name or ISO 3-letter country code
+
+    """
+
+    # get date to determine output filename
+    today = date.today()
+    date_str = today.strftime(DI_date_format)
+
+    # get all the info for the country
+    country_info = get_input_and_output_files_for_country_DI(country, date_str,
+                                                             raw=True, verbose=True)
+
+    print(f"Attempting to read DI data for {country_info['name']}.")
+    print("#"*80)
+    print("")
+    print(f"Using the UNFCCC_DI_reader")
+    print("")
+    print(f"Run the script using datalad run via the python api")
+    script = code_path / "UNFCCC_DI_reader" / "read_UNFCCC_DI_for_country.py"
+    script = script.relative_to(root_path)
+
+    cmd = f"./venv/bin/python3 {script.as_posix()} --country={country_info['code']} " \
+          f"--date={date_str}"
+    try:
+        datalad.api.run(
+            cmd=cmd,
+            dataset=root_path,
+            message=f"Read DI data for {country_info['name']}.",
+            inputs=country_info["input"],
+            outputs=country_info["output"],
+            dry_run=None,
+            explicit=False,
+        )
+    except IncompleteResultsError as IRE:
+        print(f"IncompleteResultsError occured when running {cmd}: {IRE}")
+    except Exception as ex:
+        print(f"Exception occurred when running {cmd}")
+        print(ex.message)
+
+
+def process_DI_for_country_datalad(
+        country: str,
+        date_str: Union[str, None],
+) -> None:
+    """
+    Wrapper around process_UNFCCC_DI_for_country which takes care of selecting input
+    and output files and using datalad run to trigger the data processing
+
+    Parameters
+    __________
+
+    country: str
+        country name or ISO 3-letter country code
+    date_str: str
+        Date of the data to be processed in the format %Y-%m-%d (e.g. 2023-01-30). If
+        no date is given the last data read will be processed.
+    """
+
+    # get all the info for the country
+    country_info = get_input_and_output_files_for_country_DI(country, date_str,
+                                                             raw=True, verbose=True)
+
+    print(f"Attempting to process DI data for {country_info['name']}.")
+    print("#"*80)
+    print("")
+    print(f"Using the UNFCCC_DI_reader")
+    print("")
+    print(f"Run the script using datalad run via the python api")
+    script = code_path / "UNFCCC_DI_reader" / "process_UNFCCC_DI_for_country.py"
+    script = script.relative_to(root_path)
+
+    cmd = f"./venv/bin/python3 {script.as_posix()} --country={country_info['code']} " \
+          f"--date={date_str}"
+    try:
+        datalad.api.run(
+            cmd=cmd,
+            dataset=root_path,
+            message=f"Read DI data for {country_info['name']}.",
+            inputs=country_info["input"],
+            outputs=country_info["output"],
+            dry_run=None,
+            explicit=False,
+        )
+    except IncompleteResultsError as IRE:
+        print(f"IncompleteResultsError occurred when running {cmd}: {IRE}")
+    except Exception as ex:
+        print(f"Exception occurred when running {cmd}")
+        print(ex.message)
+
+
+def read_DI_for_country_group_datalad(
+        annexI: bool=False,
+) -> None:
+    """
+    Wrapper around read_UNFCCC_DI_for_country_group which takes care of selecting input
+    and output files and using datalad run to trigger the data processing
+
+    Parameters
+    __________
+
+    country: str
+        country name or ISO 3-letter country code
+    date_str: str
+        Date of the data to be processed in the format %Y-%m-%d (e.g. 2023-01-30). If
+        no date is given the last data read will be processed.
+    """
+
+    if annexI:
+        country_group = "AnnexI"
+    else:
+        country_group = "non-AnnexI"
+
+    print(f"Attempting to read DI data for {country_group}.")
+    print("#"*80)
+    print("")
+    print(f"Using the UNFCCC_DI_reader")
+    print("")
+    print(f"Run the script using datalad run via the python api")
+    script = code_path / "UNFCCC_DI_reader" / "read_UNFCCC_DI_for_country_group.py"
+    script = script.relative_to(root_path)
+
+    cmd = f"./venv/bin/python3 {script.as_posix()} "
+    if annexI:
+        cmd = cmd + f" --annexI"
+
+    try:
+        datalad.api.run(
+            cmd=cmd,
+            dataset=root_path,
+            message=f"Read DI data for {country_group}.",
+            inputs=[],
+            outputs=[],
+            dry_run=None,
+            explicit=False,
+        )
+    except IncompleteResultsError as IRE:
+        print(f"IncompleteResultsError occurred when running {cmd}: {IRE}")
+    except Exception as ex:
+        print(f"Exception occurred when running {cmd}")
+        print(ex.message)

+ 294 - 0
UNFCCC_GHG_data/UNFCCC_DI_reader/UNFCCC_DI_reader_helper.py

@@ -0,0 +1,294 @@
+import json
+import re
+from typing import Optional, Dict, List, Union
+from pathlib import Path
+from UNFCCC_GHG_data.UNFCCC_CRF_reader.UNFCCC_CRF_reader_core import find_latest_date
+from .util import regex_date
+from UNFCCC_GHG_data.helper import custom_country_mapping
+from UNFCCC_GHG_data.helper import get_country_code, get_country_name
+from UNFCCC_GHG_data.helper import extracted_data_path_UNFCCC, root_path, code_path
+from UNFCCC_GHG_data.helper import dataset_path_UNFCCC
+
+
+## helper functions
+def determine_filename(
+        country_code: str,
+        date_or_hash: str,
+        raw: bool=False,
+        hash: bool=False,
+) -> Path:
+    """
+    Determine the filename for a dataset from given country code and date string.
+
+
+    Parameters
+    ----------
+    country_code: str
+        ISO 3 letter code of the country
+    date_or_hash:
+        formatted date string
+    raw: bool
+        bool specifying if filename fow raw or processed data should be returned
+    hash: str
+
+    Returns
+    _______
+        pathlib Path object for the file name (without suffix)
+
+    """
+
+    # get the country folder
+    with open(extracted_data_path_UNFCCC / "folder_mapping.json", "r") as mapping_file:
+        folder_mapping = json.load(mapping_file)
+
+    if country_code in folder_mapping:
+        file_filter = {}
+        file_filter["party"] = country_code
+        country_folders = folder_mapping[country_code]
+        if isinstance(country_folders, str):
+            # only one folder
+            country_folder = extracted_data_path_UNFCCC / country_folders
+        else:
+            raise ValueError("More than one output folder for country "
+                             f"{country_code}. This should not happen.")
+    else:
+        # folder not in mapping. It will be created if not present yet
+        country_name = get_country_name(country_code)
+        country_folder = extracted_data_path_UNFCCC / country_name.replace(" ", "_")
+
+        if country_folder.exists():
+           print(f"Output folder {country_name.replace(' ', '_')} for country "
+                 f"{country_code} exists but is not in folder mapping. Update "
+                 "folder mapping")
+        else:
+            country_folder.mkdir()
+
+    filename = f"{country_code}_DI_{date_or_hash}"
+    if raw:
+        filename = f"{filename}_raw"
+    if hash:
+        filename = f"{filename}_hash"
+    filename = country_folder / filename
+
+    return filename.relative_to(root_path)
+
+
+def determine_dataset_filename(
+        date_or_hash: str,
+        raw: bool=False,
+        annexI: bool=False,
+        hash: bool = False,
+) -> Path:
+    """
+    Determine the filename for a dataset from given country group and date string.
+
+    Parameters
+    ----------
+    date_or_hash:
+        formatted date string
+    raw: bool
+        bool specifying if filename fow raw or processed data should be returned
+    annexI: bool, default False
+        True if AnnexI data, False if non-AnnexI data
+    hash: str
+
+    Returns
+    _______
+        pathlib Path object for the file name (without suffix)
+    """
+
+    # get the country folder
+    if annexI:
+        current_dataset_path = dataset_path_UNFCCC / "DI_AnnexI"
+        filename = f"DI_AnnexI_{date_or_hash}"
+    else:
+        current_dataset_path = dataset_path_UNFCCC / "DI_non_AnnexI"
+        filename = f"DI_non_AnnexI_{date_or_hash}"
+
+    if not current_dataset_path.exists():
+        current_dataset_path.mkdir()
+
+    if raw:
+        filename = f"{filename}_raw"
+    if hash:
+        filename = f"{filename}_hash"
+    filename = current_dataset_path / filename
+
+    return filename.relative_to(root_path)
+
+
+def get_input_and_output_files_for_country_DI(
+        country: str,
+        date_str: str,
+        raw: bool,
+        verbose: Optional[bool]=True,
+) -> Dict[str, Union[List, str]]:
+    """
+    Get input and output files for a given country
+    """
+
+    country_info = {}
+
+    if country in custom_country_mapping:
+        country_code = country
+    else:
+        country_code = get_country_code(country)
+    # now get the country name
+    country_name = get_country_name(country_code)
+    country_info["code"] = country_code
+    country_info["name"] = country_name
+    # now get the country name
+    country_name = get_country_name(country_code)
+    country_info["code"] = country_code
+    country_info["name"] = country_name
+
+    # determine latest data
+    print(f"Determining output files for {country_name}")
+
+    # get input files (only for processing)
+    if raw:
+        input_files = []
+    else:
+        # get latest dataset if no date given
+        if date_str is None:
+            # get the latest date
+            input_file = [find_latest_DI_data(country_code, raw=True)]
+        else:
+            input_file = [determine_filename(country_code, date_str, raw=False,
+                                               hash=False)]
+            if input_file[0].is_symlink():
+                # also get the file with the actual data
+                input_file.append(input_file[0].readlink())
+            else:
+                # DI processing input files wit date labels should always be symlinks
+                # to the files with hashes holding the actual data.
+                raise(ValueError, f"Input file {input_file[0].name} is not a symlink "
+                                  f" or not existent. Check if the data you want to "
+                                  f"process exists and if your repository is ")
+
+        input_files = [f"{input_file.as_posix()}.{suffix}" for
+                        suffix in ['yaml', 'csv', 'nc']]
+
+        if verbose:
+            print(f"The following files are considered as input_files:")
+            for file in input_files:
+                print(file)
+            print("")
+
+    # get output files
+    output_file = determine_filename(country_code, date_str, raw=raw)
+    output_files = [f"{output_file.as_posix()}.{suffix}" for
+                    suffix in ['yaml', 'csv', 'nc']]
+
+    if verbose:
+        print(f"The following files are considered as output_files:")
+        for file in output_files:
+            print(file)
+        print("")
+
+    # add to country info
+    country_info["input"] = input_files
+    country_info["output"] = [] #output_files # not used because we don't know the
+    # hash in advance
+
+    return country_info
+
+
+def get_present_hashes_for_country_DI(
+        country_code: str,
+        raw: bool,
+) -> List:
+    '''
+    Get the hashes of outputs
+    '''
+
+    regex_hash = r"_([a-f0-9]*)_"
+    if raw:
+        regex_hash = regex_hash + "raw_hash\.nc"
+    else:
+        regex_hash = regex_hash + "hash\.nc"
+
+    # get the country folder
+    with open(extracted_data_path_UNFCCC / "folder_mapping.json", "r") as mapping_file:
+        folder_mapping = json.load(mapping_file)
+
+    if country_code in folder_mapping:
+        file_filter = {}
+        file_filter["party"] = country_code
+        country_folders = folder_mapping[country_code]
+        if isinstance(country_folders, str):
+            # only one folder
+            country_folder = extracted_data_path_UNFCCC / country_folders
+        else:
+            raise ValueError("More than one output folder for country "
+                             f"{country_code}. This should not happen.")
+
+        files_list = list(country_folder.glob("*_hash.nc"))
+        # filter according to raw flag
+        if raw:
+            files_list = [file.name for file in files_list if
+                          re.search(r'_raw_hash', file.name)]
+        else:
+            files_list = [file.name for file in files_list if
+                          not re.search(r'_raw_hash', file.name)]
+
+        hash_list = [re.findall(regex_hash, file)[0] for file in files_list]
+        return hash_list
+
+    else:
+        # folder not in mapping.
+        return []
+
+
+def find_latest_DI_data(
+        country_code: str,
+        raw: bool=True,
+)->Union[Path, None]:
+    '''
+    Find the path to the nc file with the latest DI data for a given country
+    '''
+
+    if raw:
+        regex = regex_date + r"_raw\.nc"
+    else:
+        regex = regex_date + r"\.nc"
+
+    # get the country folder
+    with open(extracted_data_path_UNFCCC / "folder_mapping.json", "r") as mapping_file:
+        folder_mapping = json.load(mapping_file)
+
+    if country_code in folder_mapping:
+        file_filter = {}
+        file_filter["party"] = country_code
+        country_folders = folder_mapping[country_code]
+        if isinstance(country_folders, str):
+            # only one folder
+            country_folder = extracted_data_path_UNFCCC / country_folders
+        else:
+            raise ValueError("More than one output folder for country "
+                             f"{country_code}. This should not happen.")
+
+        files_path_list = list(country_folder.glob("*.nc"))
+        # remove files with hash
+        files_list = [file.name for file in files_path_list
+                      if not re.search(r'_hash\.nc', file.name)]
+        # filter according to raw flag
+        if raw:
+            files_list = [file for file in files_list if
+                          re.search(r'_raw\.nc', file)]
+        else:
+            files_list = [file for file in files_list if
+                          not re.search(r'_raw\.nc', file)]
+
+        if len(files_list) > 0:
+            date_list = [re.findall(regex, file)[0] for file in files_list]
+            latest_date = find_latest_date(date_list, '%Y-%m-%d')
+            latest_file = [file for file in files_path_list if re.search(latest_date,
+                                                                         file.name)][0]
+            return latest_file
+        else:
+            return None
+
+    else:
+        # folder not in mapping.
+        return None

+ 153 - 0
UNFCCC_GHG_data/UNFCCC_DI_reader/UNFCCC_DI_reader_io.py

@@ -0,0 +1,153 @@
+import primap2 as pm2
+import xarray as xr
+from dask.base import tokenize
+
+from UNFCCC_GHG_data.helper import root_path
+
+from .UNFCCC_DI_reader_helper import determine_filename, determine_dataset_filename
+
+
+def save_DI_country_data(
+        data_pm2: xr.Dataset,
+        raw: bool=True,
+):
+    '''
+    save primap2 and IF data to country folder
+    can be used for raw and processed data but for a single country only
+    '''
+
+    # preparations
+    data_if = data_pm2.pr.to_interchange_format()
+
+    ## get country
+    countries = data_if[data_pm2.attrs['area']].unique()
+    if len(countries) > 1:
+        raise ValueError(f"More than one country in input data. This function can only"
+                         f"handle single country data. Countries: {countries}")
+    else:
+        country_code = countries[0]
+
+    ## get timestamp
+    scenario_col = data_pm2.attrs['scen']
+    scenarios = data_if[scenario_col].unique()
+    if len(scenarios) > 1:
+        raise ValueError(f"More than one scenario in input data. This function can only"
+                         f"handle single scenario data. Scenarios: {scenarios}")
+    else:
+        scenario = scenarios[0]
+
+    date_str = scenario[2:]
+
+    # calculate the hash of the data to see if it's identical to present data
+    data_for_token = data_if.drop(columns=[scenario_col])
+    token = tokenize(data_for_token)
+
+    # get the filename with the hash and check if it exists (separate for pm2 format
+    # and IF to fix broken datasets if necessary)
+    filename_hash = root_path / determine_filename(country_code, token, raw, hash=True)
+
+    # primap2 native format
+    filename_hash_nc = filename_hash.parent / (filename_hash.name + '.nc')
+    if not filename_hash_nc.exists():
+        # if parent dir does not exist create it
+        if not filename_hash.parent.exists():
+            filename_hash.parent.mkdir()
+        # save the data
+        print(f"Data has changed. Save to {filename_hash_nc.name}")
+        compression = dict(zlib=True, complevel=9)
+        encoding = {var: compression for var in data_pm2.data_vars}
+        data_pm2.pr.to_netcdf(filename_hash_nc, encoding=encoding)
+
+    # primap2 IF
+    filename_hash_csv = filename_hash.parent / (filename_hash.name + '.csv')
+    if not filename_hash_csv.exists():
+        # save the data
+        print(f"Data has changed. Save to {filename_hash.name + '.csv/.yaml'}")
+        pm2.pm2io.write_interchange_format(filename_hash, data_if)
+    else:
+        print(f"Data unchanged for {country_code}. Create symlinks.")
+
+    # get the filename with the date
+    filename_date = root_path / determine_filename(country_code, date_str, raw)
+
+    # create the symlinks to the actual data (with the hash)
+    suffixes = ['.nc', '.csv', '.yaml']
+    for suffix in suffixes:
+        file_date = filename_date.parent / (filename_date.name + suffix)
+        file_hash = filename_hash.name + suffix
+        if file_date.exists():
+            file_date.unlink()
+        file_date.symlink_to(file_hash)
+
+
+def save_DI_dataset(
+        data_pm2: xr.Dataset,
+        raw: bool=True,
+        annexI: bool=False,
+):
+    '''
+    save primap2 and IF data to dataset folder
+    can be used for raw and processed data but not to save to country folders
+    '''
+
+    # preparations
+    data_if = data_pm2.pr.to_interchange_format()
+    if annexI:
+        country_group = "AnnexI"
+    else:
+        country_group = "non-AnnexI"
+
+
+    ## get timestamp
+    scenario_col = data_pm2.attrs['scen']
+    scenarios = data_if[scenario_col].unique()
+    if len(scenarios) > 1:
+        raise ValueError(f"More than one scenario in input data. This function can only"
+                         f"handle single scenario data. Scenarios: {scenarios}")
+    else:
+        scenario = scenarios[0]
+
+    date_str = scenario[2:]
+
+    # calculate the hash of the data to see if it's identical to present data
+    data_for_token = data_if.drop(columns=[scenario_col])
+    token = tokenize(data_for_token)
+
+    # get the filename with the hash and check if it exists (separate for pm2 format
+    # and IF to fix broken datasets if necessary)
+    filename_hash = root_path / determine_dataset_filename(token, raw, annexI=annexI,
+                                               hash=True)
+    # primap2 native format
+    filename_hash_nc = filename_hash.parent / (filename_hash.name + '.nc')
+    if not filename_hash_nc.exists():
+        # if parent dir does not exist create it
+        # TODO double, also in determine_dataset_filename. same for country data
+        if not filename_hash.parent.exists():
+            filename_hash.parent.mkdir()
+        # save the data
+        print(f"Data has changed. Save to {filename_hash_nc.name}")
+        compression = dict(zlib=True, complevel=9)
+        encoding = {var: compression for var in data_pm2.data_vars}
+        data_pm2.pr.to_netcdf(filename_hash_nc, encoding=encoding)
+
+    # primap2 IF
+    filename_hash_csv = filename_hash.parent / (filename_hash.name + '.csv')
+    if not filename_hash_csv.exists():
+        # save the data
+        print(f"Data has changed. Save to {filename_hash.name + '.csv/.yaml'}")
+        pm2.pm2io.write_interchange_format(filename_hash, data_if)
+    else:
+        print(f"Data unchanged for {country_group}. Create symlinks.")
+
+    # get the filename with the date
+    filename_date = root_path / determine_dataset_filename(date_str, raw=raw,
+                                               annexI=annexI, hash=False)
+
+    # create the symlinks to the actual data (with the hash)
+    suffixes = ['.nc', '.csv', '.yaml']
+    for suffix in suffixes:
+        file_date = filename_date.parent / (filename_date.name + suffix)
+        file_hash = filename_hash.name + suffix
+        if file_date.exists():
+            file_date.unlink()
+        file_date.symlink_to(file_hash)

+ 398 - 0
UNFCCC_GHG_data/UNFCCC_DI_reader/UNFCCC_DI_reader_proc.py

@@ -0,0 +1,398 @@
+import primap2 as pm2
+import unfccc_di_api
+import numpy as np
+import xarray as xr
+import re
+from datetime import date
+from typing import Optional, Dict, List, Union
+
+from .UNFCCC_DI_reader_config import di_processing_info
+from .UNFCCC_DI_reader_config import cat_conversion
+from .UNFCCC_DI_reader_config import gas_baskets
+from .util import NoDIDataError, nAI_countries
+from .util import DI_date_format
+
+from UNFCCC_GHG_data.helper import convert_categories
+
+from .UNFCCC_DI_reader_core import read_UNFCCC_DI_for_country
+
+from .UNFCCC_DI_reader_helper import find_latest_DI_data
+from .UNFCCC_DI_reader_helper import determine_filename
+
+from .UNFCCC_DI_reader_io import save_DI_dataset, save_DI_country_data
+
+
+def process_and_save_UNFCCC_DI_for_country(
+        country_code: str,
+        date_str: Union[str, None] = None,
+) -> xr.Dataset:
+    """
+    process data and save them to disk using default parameters
+    """
+
+    # get latest dataset if no date given
+    if date_str is None:
+        # get the latest date
+        raw_data_file = find_latest_DI_data(country_code, raw=True)
+    else:
+        raw_data_file = determine_filename(country_code, date_str, raw=True,
+                                           hash=False)
+
+        raw_data_file = raw_data_file.parent / (raw_data_file.name + '.nc')
+        print(f"process {raw_data_file.name}")
+        if not raw_data_file.exists():
+            raise ValueError(f"File {raw_data_file.name} does not exist. Check if it "
+                             "has been read.")
+
+    # load the data
+    data_to_process = pm2.open_dataset(raw_data_file)
+
+    # get parameters
+    countries = list(data_to_process.coords[data_to_process.attrs['area']].values)
+    if len(countries) > 1:
+        raise ValueError(
+            f"Found {len(countries)} countries. Only single country data "
+            f"can be processed by this function. countries: {countries}")
+    else:
+        country_code = countries[0]
+    processing_info_country = di_processing_info[country_code]
+    entities_to_ignore = []  # TODO: check and make default list
+
+    # process
+    data_processed = process_UNFCCC_DI_for_country(
+        data_country=data_to_process,
+        entities_to_ignore=entities_to_ignore,
+        gas_baskets=gas_baskets,
+        cat_conversion=cat_conversion,
+        sectors_out=None,
+        processing_info_country=processing_info_country,
+    )
+
+    # save
+    save_DI_country_data(data_processed, raw=False)
+
+    return data_processed
+
+
+def process_UNFCCC_DI_for_country(
+        data_country: xr.Dataset,
+        entities_to_ignore: List[str],
+        gas_baskets: Dict[str, List[str]],
+        filter_dims: Optional[Dict[str, List[str]]] = None,
+        cat_conversion: Dict[str, Dict] = None,
+        sectors_out: List[str] = None,
+        processing_info_country: Dict = None,
+) -> xr.Dataset:
+    """
+        Process data from DI interface (where necessary).
+        * Downscaling including subtraction of time series
+        * country specific sector aggregation
+        * Conversion to IPCC2006 categories
+        * general sector and gas basket aggregation (in new categories)
+    """
+    # 0: gather information
+    countries = list(data_country.coords[data_country.attrs['area']].values)
+    if len(countries) > 1:
+        raise ValueError(
+            f"Found {len(countries)} countries. Only single country data "
+            f"can be processed by this function. countries: {countries}")
+    else:
+        country_code = countries[0]
+
+    cat_col = data_country.attrs['cat']
+    temp = re.findall(r'\((.*)\)', cat_col)
+    cat_terminology_in = temp[0]
+
+    # 1: general processing
+    # remove unused cats
+    data_country = data_country.dropna(f'category ({cat_terminology_in})', how='all')
+    # remove unused years
+    data_country = data_country.dropna(f'time', how='all')
+    # remove variables only containing nan
+    nan_vars_country = [var for var in data_country.data_vars if
+                        data_country[var].isnull().all().data is True]
+    print(f"removing all-nan variables: {nan_vars_country}")
+    data_country = data_country.drop_vars(nan_vars_country)
+
+    # remove unnecessary variables
+    entities_ignore_present = [entity for entity in entities_to_ignore if
+                               entity in data_country.data_vars]
+    data_country = data_country.drop_vars(entities_ignore_present)
+
+    # filter ()
+    if filter_dims is not None:
+        data_country = data_country.pr.loc[filter_dims]
+
+    # 2: country specific processing
+    if processing_info_country is not None:
+        # get scenario
+        scenarios = list(data_country.coords[data_country.attrs['scen']].values)
+        if len(scenarios) > 1:
+            raise ValueError(
+                f"Found {len(scenarios)} scenarios. Only single scenario data "
+                f"can be processed by this function. Scenarios: {scenarios}")
+        else:
+            scenario = scenarios[0]
+            if scenario in processing_info_country.keys():
+                processing_info_country_scen = processing_info_country[scenario]
+            else:
+                processing_info_country_scen = processing_info_country['default']
+
+            if 'tolerance' in processing_info_country_scen:
+                tolerance = processing_info_country_scen["tolerance"]
+            else:
+                tolerance = 0.01
+
+            # remove entities if needed
+            if 'ignore_entities' in processing_info_country_scen:
+                entities_to_ignore_country = processing_info_country_scen[
+                    'ignore_entities']
+                entities_ignore_present = \
+                    [entity for entity in entities_to_ignore_country if
+                     entity in data_country.data_vars]
+                data_country = data_country.drop_vars(entities_ignore_present)
+
+            # take only desired years
+            if 'years' in processing_info_country_scen:
+                data_country = data_country.pr.loc[
+                    {'time': processing_info_country_scen['years']}]
+
+            # remove timeseries if desired
+            if 'remove_ts' in processing_info_country_scen:
+                for case in processing_info_country_scen['remove_ts']:
+                    remove_info = processing_info_country_scen['remove_ts'][case]
+                    entities = remove_info.pop("entities")
+                    for entity in entities:
+                        data_country[entity].pr.loc[remove_info] = \
+                            data_country[entity].pr.loc[remove_info] * np.nan
+
+            # remove all data for given years if necessary
+            if 'remove_years' in processing_info_country_scen:
+                data_country = data_country.drop_sel(
+                    time=processing_info_country_scen['remove_years'])
+                # entities = data_country.data_vars
+                # for entity in entities:
+                #     data_country[entity].pr.loc[{'time': processing_info_country_scen[
+                #         'remove_years']}] = data_country[entity].pr.loc[\
+                #             {'time': processing_info_country_scen['remove_years']}] *\
+                #             np.nan
+
+            # subtract categories
+            if 'subtract_cats' in processing_info_country_scen:
+                subtract_cats_current = processing_info_country_scen['subtract_cats']
+                if 'entities' in subtract_cats_current.keys():
+                    entities_current = subtract_cats_current['entities']
+                else:
+                    entities_current = list(data_country.data_vars)
+                print(f"Subtracting categories for country {country_code}, entities "
+                      f"{entities_current}")
+                for cat_to_generate in subtract_cats_current:
+                    cats_to_subtract = \
+                        subtract_cats_current[cat_to_generate]['subtract']
+                    cat_name = subtract_cats_current[cat_to_generate]['name']
+                    data_sub = \
+                        data_country.pr.loc[{'category': cats_to_subtract}].pr.sum(
+                            dim='category', skipna=True, min_count=1)
+                    data_parent = data_country.pr.loc[
+                        {'category': subtract_cats_current[cat_to_generate]['parent']}]
+                    data_agg = data_parent - data_sub
+                    nan_vars = [var for var in data_agg.data_vars if
+                                data_agg[var].isnull().all().data is True]
+                    data_agg = data_agg.drop(nan_vars)
+                    if len(data_agg.data_vars) > 0:
+                        print(f"Generating {cat_to_generate} through subtraction")
+                        data_agg = data_agg.expand_dims([f'category ('
+                                                         f'{cat_terminology_in})'])
+                        data_agg = data_agg.assign_coords(
+                            coords={f'category ({cat_terminology_in})':
+                                        (f'category ({cat_terminology_in})',
+                                         [cat_to_generate])})
+                        data_agg = data_agg.assign_coords(
+                            coords={'orig_cat_name':
+                                        (f'category ({cat_terminology_in})',
+                                         [cat_name])})
+                        data_country = data_country.pr.merge(data_agg,
+                                                             tolerance=tolerance)
+                    else:
+                        print(f"no data to generate category {cat_to_generate}")
+
+            # downscaling
+            if 'downscale' in processing_info_country_scen:
+                if 'sectors' in processing_info_country_scen['downscale']:
+                    sector_downscaling = \
+                        processing_info_country_scen['downscale']['sectors']
+                    for case in sector_downscaling.keys():
+                        print(f"Downscaling for {case}.")
+                        sector_downscaling_current = sector_downscaling[case]
+                        entities = sector_downscaling_current.pop('entities')
+                        for entity in entities:
+                            data_country[entity] = data_country[
+                                entity].pr.downscale_timeseries(
+                                **sector_downscaling_current)
+                            # , skipna_evaluation_dims=None)
+
+                if 'entities' in processing_info_country_scen['downscale']:
+                    entity_downscaling = \
+                        processing_info_country_scen['downscale']['entities']
+                    for case in entity_downscaling.keys():
+                        print(f"Downscaling for {case}.")
+                        # print(data_country.coords[f'category ('
+                        #                          f'{cat_terminology_in})'].values)
+                        data_country = data_country.pr.downscale_gas_timeseries(
+                            **entity_downscaling[case], skipna=True,
+                            skipna_evaluation_dims=None)
+
+            # aggregate categories
+            if 'aggregate_cats' in processing_info_country_scen:
+                if 'agg_tolerance' in processing_info_country_scen:
+                    agg_tolerance = processing_info_country_scen['agg_tolerance']
+                else:
+                    agg_tolerance = tolerance
+                aggregate_cats_current = processing_info_country_scen['aggregate_cats']
+                print(
+                    f"Aggregating categories for country {country_code}")
+                for cat_to_agg in aggregate_cats_current:
+                    print(f"Category: {cat_to_agg}")
+                    cat_name = aggregate_cats_current[cat_to_agg]['name']
+                    source_cats = aggregate_cats_current[cat_to_agg]['sources']
+                    data_agg = data_country.pr.loc[{'category': source_cats}].pr.sum(
+                        dim='category', skipna=True, min_count=1)
+                    nan_vars = [var for var in data_agg.data_vars if
+                                data_agg[var].isnull().all().data is True]
+                    data_agg = data_agg.drop(nan_vars)
+                    if len(data_agg.data_vars) > 0:
+                        data_agg = data_agg.expand_dims([f'category ('
+                                                         f'{cat_terminology_in})'])
+                        data_agg = data_agg.assign_coords(
+                            coords={f'category ({cat_terminology_in})':
+                                        (f'category ({cat_terminology_in})',
+                                         [cat_to_agg])})
+
+                        data_agg = data_agg.assign_coords(
+                            coords={'orig_cat_name':
+                                        (f'category ({cat_terminology_in})',
+                                         [cat_name])})
+                        data_country = data_country.pr.merge(data_agg,
+                                                             tolerance=agg_tolerance)
+                    else:
+                        print(f"no data to aggregate category {cat_to_agg}")
+
+            # aggregate gases if desired
+            if 'aggregate_gases' in processing_info_country_scen:
+                for case in processing_info_country_scen['aggregate_gases'].keys():
+                    case_info = processing_info_country_scen['aggregate_gases'][case]
+                    data_country[case_info['basket']] = \
+                        data_country.pr.fill_na_gas_basket_from_contents(
+                            **case_info)
+
+    # 3: map categories
+    if country_code in nAI_countries:
+        # conversion from BURDI to IPCC2006_PRIMAP needed
+        cat_terminology_out = 'IPCC2006_PRIMAP'
+        data_country = convert_categories(
+            data_country,
+            cat_conversion[f"{cat_terminology_in}_to_{cat_terminology_out}"],
+            cat_terminology_out,
+            debug=False,
+            tolerance=0.01,
+        )
+    else:
+        cat_terminology_out = cat_terminology_in
+
+    # more general processing
+    # reduce categories to output cats
+    if sectors_out is not None:
+        cats_to_keep = [cat for cat in
+                        data_country.coords[f'category ({cat_terminology_out})'].values
+                        if cat in sectors_out]
+        data_country = data_country.pr.loc[{'category': cats_to_keep}]
+
+    # create gas baskets
+    entities_present = set(data_country.data_vars)
+    for basket in gas_baskets.keys():
+        basket_contents_present = [gas for gas in gas_baskets[basket] if
+                                   gas in entities_present]
+        if len(basket_contents_present) > 0:
+            if basket in list(data_country.data_vars):
+                data_country[basket] = data_country.pr.fill_na_gas_basket_from_contents(
+                    basket=basket, basket_contents=basket_contents_present,
+                    skipna=True, min_count=1)
+            else:
+                try:
+                    data_country[basket] = xr.full_like(data_country["CO2"],
+                                                        np.nan).pr.quantify(
+                        units="Gg CO2 / year")
+                    data_country[basket].attrs = {"entity": basket.split(' ')[0],
+                                                  "gwp_context": basket.split(' ')[1][
+                                                                 1:-1]}
+                    data_country[basket] = data_country.pr.gas_basket_contents_sum(
+                        basket=basket, basket_contents=basket_contents_present,
+                        min_count=1)
+                except Exception as ex:
+                    print(f"No gas basket created for {country_code}: {ex}")
+
+    # amend title and comment
+    data_country.attrs["comment"] = data_country.attrs["comment"] + f" Processed on " \
+                                                                    f"{date.today()}"
+    data_country.attrs["title"] = data_country.attrs["title"] + f" Processed on " \
+                                                                    f"{date.today()}"
+
+    return data_country
+
+
+def process_UNFCCC_DI_for_country_group(
+        annexI: bool = False,
+) -> xr.Dataset:
+    """
+    This function processes DI data for all countries in a group (annexI or non-AnnexI)
+    TODO: currently only non-annexI is implemented
+    The function processes all data in one go using datalad run. as the output data file
+    names are unknown beforehand datalad run uses explicit=false
+
+    TODO: use the latest
+
+
+    """
+
+    today = date.today()
+    date_str = today.strftime(DI_date_format)
+
+    if annexI:
+        raise ValueError("Bulk reading for AnnexI countries not implemented yet")
+    else:
+        countries = nAI_countries
+
+    # read the data
+    data_all = None
+    for country in countries[0:5]:
+        print(f"reading DI data for country {country}")
+
+        try:
+            data_country = read_UNFCCC_DI_for_country(
+                country_code=country,
+                category_groups=None,  # read all categories
+                read_subsectors=False,  # not applicable as we read all categories
+                date_str=date_str,
+                pm2if_specifications=None,
+                # automatically use the right specs for AI and NAI
+                default_gwp=None,  # automatically uses right default GWP for AI and NAI
+                debug=False)
+
+            if data_all is None:
+                data_all = data_country
+            else:
+                data_all = data_all.pr.merge(data_country)
+        except unfccc_di_api.NoDataError as err:
+            print(f"No data for {country}.")
+            print(err)
+
+    # TODO: write metadata
+
+    # save the data
+    save_DI_dataset(data_all, raw=True, annexI=annexI)
+
+    return data_all
+
+# TODO:
+# add process all sfunctios and scripts
+# config for all DI data

+ 16 - 11
UNFCCC_GHG_data/UNFCCC_DI_reader/__init__.py

@@ -1,23 +1,28 @@
 # submodule to read data from UNFCCC DI API using the unfccc_di_api package
 # submodule to read data from UNFCCC DI API using the unfccc_di_api package
 
 
 #import unfccc_di_api
 #import unfccc_di_api
-from .UNFCCC_DI_reader_core import \
-    read_UNFCCC_DI_for_country, read_DI_for_country_datalad, \
-    process_UNFCCC_DI_for_country, process_and_save_UNFCCC_DI_for_country, \
-    process_DI_for_country_datalad, \
-    convert_DI_data_to_pm2_if, convert_DI_IF_data_to_pm2, determine_filename, \
-    read_UNFCCC_DI_for_country_group, read_DI_for_country_group_datalad
+from .UNFCCC_DI_reader_core import read_UNFCCC_DI_for_country,  \
+    convert_DI_data_to_pm2_if, convert_DI_IF_data_to_pm2, \
+    read_UNFCCC_DI_for_country_group
 
 
+from .UNFCCC_DI_reader_proc import process_UNFCCC_DI_for_country, \
+    process_and_save_UNFCCC_DI_for_country, process_UNFCCC_DI_for_country_group
+
+from .UNFCCC_DI_reader_datalad import read_DI_for_country_datalad, \
+read_DI_for_country_group_datalad, process_DI_for_country_datalad
+
+from .UNFCCC_DI_reader_helper import determine_filename
 
 
 __all__ = [
 __all__ = [
     "read_UNFCCC_DI_for_country",
     "read_UNFCCC_DI_for_country",
-    "read_DI_for_country_datalad",
-    "process_UNFCCC_DI_for_country",
-    "process_and_save_UNFCCC_DI_for_country",
-    "process_DI_for_country_datalad",
     "convert_DI_data_to_pm2_if",
     "convert_DI_data_to_pm2_if",
     "convert_DI_IF_data_to_pm2",
     "convert_DI_IF_data_to_pm2",
-    "determine_filename",
     "read_UNFCCC_DI_for_country_group",
     "read_UNFCCC_DI_for_country_group",
+    "process_UNFCCC_DI_for_country",
+    "process_and_save_UNFCCC_DI_for_country",
+    "process_UNFCCC_DI_for_country_group",
+    "read_DI_for_country_datalad",
+    "process_DI_for_country_datalad",
     "read_DI_for_country_group_datalad",
     "read_DI_for_country_group_datalad",
+    "determine_filename",
 ]
 ]

+ 7 - 3
UNFCCC_GHG_data/UNFCCC_DI_reader/util.py

@@ -1,8 +1,12 @@
 import unfccc_di_api
 import unfccc_di_api
+import pandas as pd
+from UNFCCC_GHG_data.helper import code_path
 
 
-reader = unfccc_di_api.UNFCCCApiReader()
-nAI_countries = list(reader.non_annex_one_reader.parties["code"])
-AI_countries = list(reader.annex_one_reader.parties["code"])
+#reader = unfccc_di_api.UNFCCCApiReader()
+#nAI_countries = list(reader.non_annex_one_reader.parties["code"])
+nAI_countries = pd.read_csv(code_path / 'UNFCCC_DI_reader' / 'DI_NAI_parties.conf')
+#AI_countries = list(reader.annex_one_reader.parties["code"])
+AI_countries = pd.read_csv(code_path / 'UNFCCC_DI_reader' / 'DI_AI_parties.conf')
 
 
 DI_date_format = '%Y-%m-%d'
 DI_date_format = '%Y-%m-%d'
 regex_date = r"([0-9]{4}-[0-9]{2}-[0-9]{2})"
 regex_date = r"([0-9]{4}-[0-9]{2}-[0-9]{2})"