Source code for ise.data.process

"""End-to-end ISMIP6 data processing pipeline for ISEFlow training data.

This module converts raw ISMIP6 forcing and projection files into the
sector-level, analysis-ready ``dataset.csv`` consumed by ``FeatureEngineer``
and ultimately by ``ISEFlow.fit()``.

Public entry points
-------------------
``process_sectors`` (main entry point):
    End-to-end pipeline.  Reads raw ISMIP6 forcing NetCDFs from the GHub
    directory layout and pre-computed IVAF scalar projection files from
    Zenodo, aggregates both to sector-level annual time series (86 years,
    2015-2100), joins them on (aogcm, year, sector), and returns a single
    ``pandas.DataFrame``::

        from ise.data.process import process_sectors

        dataset = process_sectors(
            ice_sheet="AIS",
            forcing_directory="/path/to/GHub/AIS/",
            grid_file="/path/to/AIS_sectors_8km.nc",
            zenodo_directory="/path/to/zenodo_download/",
            export_directory="outputs/",
        )

    Intermediate CSVs (``AIS_atmospheric.csv``, ``AIS_oceanic.csv``,
    ``forcings.csv``, ``projections.csv``, ``dataset.csv``) are written to
    ``export_directory`` so individual stages are skipped on re-runs
    (controlled by ``overwrite=False``).

``ProjectionProcessor``:
    Only needed when starting from **raw 3-D ISMIP6 NetCDF output files**
    rather than the pre-computed Zenodo scalar files.  Computes Ice Volume
    Above Flotation (IVAF) from bed topography, ice thickness, and ice/grounded
    fraction at each grid cell, subtracts the matched control-run IVAF, and
    writes ``ivaf_<ice_sheet>_<group>_<model>_<exp>.nc`` files::

        from ise.data.process import ProjectionProcessor

        processor = ProjectionProcessor(
            ice_sheet="AIS",
            forcings_directory="/path/to/forcing/",
            projections_directory="/path/to/projections/",
            scalefac_path="af2_scalefac.nc",
            densities_path="AIS_densities.csv",
        )
        processor.process()

``DatasetMerger``:
    Lower-level alternative to ``process_sectors()`` for when intermediate
    per-run CSV files already exist on disk.  Performs only the join step
    (forcing ↔ projection matched by CMIP model and pathway).

Supporting functions
--------------------
``process_AIS_atmospheric_sectors`` / ``process_GrIS_atmospheric_sectors``
    Aggregate atmospheric forcing NetCDFs to sector-level annual means.
``process_AIS_oceanic_sectors`` / ``process_GrIS_oceanic_sectors``
    Aggregate oceanic forcing NetCDFs to sector-level annual means.
``process_AIS_outputs`` / ``process_GrIS_outputs``
    Load pre-computed IVAF scalar projections from Zenodo and convert to SLE.
``merge_datasets``
    Join sector-level forcings and projections DataFrames on (aogcm, year, sector).
``get_model_densities``
    Extract ice/water density values (rhoi, rhow) from raw ISMIP6 NetCDFs.
``combine_gris_forcings``
    Concatenate annual GrIS atmospheric NetCDF files into per-AOGCM combined files.
"""

import os
import time
import warnings

import numpy as np
import pandas as pd
import xarray as xr
from tqdm import tqdm

from ise.data.forcings import ForcingFile
from ise.data.grids import GridFile
from ise.data.utils import convert_and_subset_times
from ise.utils.functions import get_all_filepaths


[docs] class ProjectionProcessor: """ A class for processing ISMIP6 projections (outputs) for ice sheet models, specifically for calculating Ice Volume Above Flotation (IVAF), handling control projections, and processing experimental projections. Args: ice_sheet (str): The ice sheet being analyzed ('AIS' or 'GIS'). forcings_directory (str): Path to the directory containing forcing datasets. projections_directory (str): Path to the directory containing projection datasets. scalefac_path (str, optional): Path to the NetCDF file containing scaling factors for each grid cell. Defaults to None. densities_path (str, optional): Path to the CSV file containing density data for models. Defaults to None. Attributes: forcings_directory (str): Path to forcing data. projections_directory (str): Path to projection data. densities_path (str): Path to density dataset. scalefac_path (str): Path to scaling factor dataset. ice_sheet (str): Ice sheet identifier ('AIS' or 'GIS'). resolution (int): Resolution of the dataset (5 for GIS, 8 for AIS). Methods: process(): Processes ISMIP6 projections by calculating IVAF and subtracting control projections. Note: This class is only needed when starting from raw 3-D ISMIP6 NetCDF output files. If you are using the pre-computed scalar files from Zenodo (``ComputedScalarsPaper/`` for AIS, ``v7_CMIP5_pub/`` for GrIS), call ``process_sectors()`` directly instead. """ def __init__( self, ice_sheet, forcings_directory, projections_directory, scalefac_path=None, densities_path=None, ): self.forcings_directory = forcings_directory self.projections_directory = projections_directory self.densities_path = densities_path self.scalefac_path = scalefac_path self.ice_sheet = ice_sheet.upper() if self.ice_sheet.lower() in ("gris", "gis"): self.ice_sheet = "GIS" self.resolution = 5 if self.ice_sheet == "GIS" else 8
[docs] def process(self): """ Process ISMIP6 projections by calculating IVAF for control and experiment projections, subtracting out control IVAF from experiments, and exporting IVAF files. For each model run the method: 1. Loads bed topography, ice thickness, ice fraction, and grounded fraction. 2. Computes IVAF at every grid cell and time step. 3. Subtracts the matched control-run IVAF to isolate the forced signal. 4. Writes ``ivaf_<ice_sheet>_<group>_<model>_<exp>.nc`` next to the input files. Returns: int: 1 if processing is successful. Raises: ValueError: If projections_directory is not specified. """ if self.projections_directory is None: raise ValueError("Projections path must be specified") self._calculate_ivaf_minus_control( self.projections_directory, self.densities_path, self.scalefac_path ) return 1
def _calculate_ivaf_minus_control( self, data_directory: str, densities_fp: str, scalefac_path: str ): """ Calculates the Ice Volume Above Flotation (IVAF) for each model run within the given directory. This method processes control projections first and then processes experimental projections by subtracting the control IVAF. Args: data_directory (str): Path to the directory containing projection data files. densities_fp (str or pd.DataFrame): Path to a CSV file containing density values or a preloaded pandas DataFrame. scalefac_path (str): Path to a NetCDF file containing scaling factors for each grid cell. Returns: int: 1 indicating successful IVAF calculation. Raises: ValueError: If densities_fp is not provided or is of an invalid type. """ # error handling for densities argument (must be str filepath or dataframe) if densities_fp is None: raise ValueError( "densities_fp must be specified. Run get_model_densities() to get density data." ) if isinstance(densities_fp, str): densities = pd.read_csv(densities_fp) elif isinstance(densities_fp, pd.DataFrame): pass else: raise ValueError("densities argument must be a string or a pandas DataFrame.") # open scaling model scalefac_ds = xr.open_dataset(scalefac_path) scalefac_model = np.transpose(scalefac_ds.af2.values, (1, 0)) # adjust scaling model based on desired resolution if self.ice_sheet == "AIS": scalefac_model = scalefac_model[:: self.resolution, :: self.resolution] elif self.ice_sheet == "GIS" and scalefac_model.shape != (337, 577): if scalefac_model.shape[0] == 6081: raise ValueError( f"Scalefac model must be 337x577 for GIS, received {scalefac_model.shape}. Make sure you are using the GIS scaling model and not the AIS." ) raise ValueError( f"Scalefac model must be 337x577 for GIS, received {scalefac_model.shape}." ) # get all files in directory with "ctrl_proj" and "exp" in them and store separately ctrl_proj_dirs = [] exp_dirs = [] for root, dirs, _ in os.walk(data_directory): for directory in dirs: if "ctrl_proj" in directory: ctrl_proj_dirs.append(os.path.join(root, directory)) elif "exp" in directory: exp_dirs.append(os.path.join(root, directory)) else: pass # Control projections must be processed first so their ivaf files exist # when the experimental runs try to open them for subtraction. for directory in ctrl_proj_dirs: self._calculate_ivaf_single_file(directory, densities, scalefac_model, ctrl_proj=True) for directory in exp_dirs: self._calculate_ivaf_single_file(directory, densities, scalefac_model, ctrl_proj=False) return 1 def _calculate_ivaf_single_file(self, directory, densities, scalefac_model, ctrl_proj=False): """ Calculates the Ice Volume Above Flotation (IVAF) for a single model run. Args: directory (str): The directory containing the model run data. densities (pd.DataFrame): DataFrame containing density values for different groups and models. scalefac_model (numpy.ndarray): Scaling factor matrix for the ice sheet model grid. ctrl_proj (bool, optional): If True, processes a control projection. Defaults to False. Returns: int: 1 if successful, -1 if the run is skipped due to missing or corrupted data. """ # directory = r"/gpfs/data/kbergen/pvankatw/pvankatw-bfoxkemp/GHub-ISMIP6-Projection-GrIS/AWI/ISSM1/exp09" # get metadata from path path = directory.split("/") exp = path[-1] model = path[-2] group = path[-3] # Determine which control to use based on experiment (only applies to AIS) per Nowicki, 2020 if not ctrl_proj: if self.ice_sheet == "AIS": if exp in ( "exp01", "exp02", "exp03", "exp04", "exp11", "expA1", "expA2", "expA3", "expA4", "expB1", "expB2", "expB3", "expB4", "expB5", "expC2", "expC5", "expC8", "expC11", "expE1", "expE2", "expE3", "expE4", "expE5", "expE11", "expE12", "expE13", "expE14", ): ctrl_path = os.path.join( "/".join(path[:-1]), f"ctrl_proj_open/ivaf_{self.ice_sheet}_{group}_{model}_ctrl_proj_open.nc", ) elif ( exp in ( "exp05", "exp06", "exp07", "exp08", "exp09", "exp10", "exp12", "exp13", "expA5", "expA6", "expA7", "expA8", "expB6", "expB7", "expB8", "expB9", "expB10", "expC3", "expC6", "expC9", "expC12", "expE6", "expE7", "expE8", "expE9", "expE10", "expE15", "expE16", "expE17", "expE18", ) or "expD" in exp ): ctrl_path = os.path.join( "/".join(path[:-1]), f"ctrl_proj_std/ivaf_{self.ice_sheet}_{group}_{model}_ctrl_proj_std.nc", ) elif exp in ( "expC1", "expC4", "expC7", "expC10", ): # N/A value for ocean_forcing in Nowicki, 2020 table A2 return -1 else: print(f"Experiment {exp} not recognized. Skipped.") return -1 else: # GrIS doesn't have ctrl_proj_open vs ctrl_proj_std ctrl_path = os.path.join( "/".join(path[:-1]), f"ctrl_proj/ivaf_{self.ice_sheet}_{group}_{model}_ctrl_proj.nc", ) # for some reason there is no ctrl_proj_open for AWI and JPL1, skip if group == "AWI" and "ctrl_proj_open" in ctrl_path: return -1 if group == "JPL1" and "ctrl_proj_open" in ctrl_path: return -1 # MUN_GISM1 is corrupted, skip if group == "MUN" and model == "GSM1": return -1 # folder is empty, skip elif group == "IMAU" and exp == "exp11": return -1 # bed file in NCAR_CISM/expD10 is empty, skip elif group == "NCAR" and exp in ("expD10", "expD11"): return -1 # lookup densities from csv subset_densities = densities[(densities.group == group) & (densities.model == model)] rhoi = subset_densities.rhoi.values[0] rhow = subset_densities.rhow.values[0] # load data if self.ice_sheet == "AIS" and group == "ULB": # ULB uses fETISh for AIS naming, not actual model name (fETISh_16km or fETISh_32km) naming_convention = f"{self.ice_sheet}_{group}_fETISh_{exp}.nc" else: naming_convention = f"{self.ice_sheet}_{group}_{model}_{exp}.nc" # get_xarray_data opens with decode_times=False so time stays as numeric # "days since X", which is easier to normalize across models later. bed = get_xarray_data( os.path.join(directory, f"topg_{naming_convention}"), ice_sheet=self.ice_sheet ) thickness = get_xarray_data( os.path.join(directory, f"lithk_{naming_convention}"), ice_sheet=self.ice_sheet ) mask = get_xarray_data( os.path.join(directory, f"sftgif_{naming_convention}"), ice_sheet=self.ice_sheet ) ground_mask = get_xarray_data( os.path.join(directory, f"sftgrf_{naming_convention}"), ice_sheet=self.ice_sheet ) length_time = len(thickness.time) try: bed = bed.transpose("x", "y", "time", ...) thickness = thickness.transpose("x", "y", "time", ...) mask = mask.transpose("x", "y", "time", ...) ground_mask = ground_mask.transpose("x", "y", "time", ...) except ValueError: bed = bed.transpose("x", "y", ...) thickness = thickness.transpose("x", "y", ...) mask = mask.transpose("x", "y", ...) ground_mask = ground_mask.transpose("x", "y", ...) # if time is not a dimension, add copies for each time step if "time" not in bed.dims or bed.dims["time"] == 1: try: bed = bed.drop_vars( [ "time", ] ) except ValueError: pass bed = bed.expand_dims(dim={"time": length_time}) if length_time == 86: bed["time"] = thickness[ "time" ] # most times just the bed file is missing the time index elif length_time > 86: if len(thickness.time.values) != len(set(thickness.time.values)): # has duplicates keep_indices = np.unique(thickness["time"], return_index=True)[ 1 ] # find non-duplicates bed = bed.isel(time=keep_indices) # only select non-duplicates thickness = thickness.isel(time=keep_indices) mask = mask.isel(time=keep_indices) ground_mask = ground_mask.isel(time=keep_indices) else: warnings.warn( f"At least one file in {exp} does not have a time index formatted correctly. Attempting to fix." ) start_idx = len(bed.time) - 86 bed = bed.sel(time=slice(bed.time.values[start_idx], len(bed.time))) thickness = thickness.sel( time=slice(thickness.time[start_idx], thickness.time[-1]) ) mask = mask.sel(time=slice(mask.time[start_idx], mask.time[-1])) ground_mask = ground_mask.sel( time=slice(ground_mask.time[start_idx], ground_mask.time[-1]) ) try: bed["time"] = thickness["time"].copy() except ValueError: print( f"Cannot fix time index for {exp} due to duplicate index values. Skipped." ) return -1 else: print(f"Only {len(bed.time)} time points for {exp}. Skipped.") return -1 # if -9999 instead of np.nan, replace (come back and optimize? couldn't figure out with xarray) if bed.topg[0, 0, 0] <= -9999.0 or bed.topg[0, 0, 0] >= 9999: topg = bed.topg.values topg[(np.where((topg <= -9999.0) | (topg >= 9999)))] = np.nan bed["topg"].values = topg del topg lithk = thickness.lithk.values lithk[(np.where((lithk <= -9999.0) | (lithk >= 9999)))] = np.nan thickness["lithk"].values = lithk del lithk sftgif = mask.sftgif.values sftgif[(np.where((sftgif <= -9999.0) | (sftgif >= 9999)))] = np.nan mask["sftgif"].values = sftgif del sftgif sftgrf = ground_mask.sftgrf.values sftgrf[(np.where((sftgrf <= -9999.0) | (sftgrf >= 9999)))] = np.nan ground_mask["sftgrf"].values = sftgrf del sftgrf # converts time (in "days from X" to numpy.datetime64) and subsets time from 2015 to 2100 # a few datasets do not have the time index formatted correctly if len(bed.time.attrs) == 0: if len(bed.time) == 86: bed["time"] = thickness[ "time" ] # most times just the bed file is missing the time index elif len(bed.time) > 86: # bed['time'] = thickness['time'].copy() warnings.warn( f"At least one file in {exp} does not have a time index formatted correctly. Attempting to fix." ) start_idx = len(bed.time) - 86 bed = bed.sel(time=slice(bed.time.values[start_idx], len(bed.time))) thickness = thickness.sel(time=slice(thickness.time[start_idx], thickness.time[-1])) mask = mask.sel(time=slice(mask.time[start_idx], mask.time[-1])) ground_mask = ground_mask.sel( time=slice(ground_mask.time[start_idx], ground_mask.time[-1]) ) try: bed["time"] = thickness["time"] except ValueError: print( f"Cannot fix time index for {exp} due to duplicate index values. Skipped." ) return -1 else: print(f"Only {len(bed.time)} time points for {exp}. Skipped.") return -1 bed = convert_and_subset_times(bed) thickness = convert_and_subset_times(thickness) mask = convert_and_subset_times(mask) ground_mask = convert_and_subset_times(ground_mask) length_time = len(thickness.time) # Interpolate values for x & y, for formatting purposes only, does not get used if len(set(thickness.y.values)) != len(scalefac_model): bed["x"], bed["y"] = interpolate_values(bed) thickness["x"], thickness["y"] = interpolate_values(thickness) mask["x"], mask["y"] = interpolate_values(mask) ground_mask["x"], ground_mask["y"] = interpolate_values(ground_mask) # clip masks if they are below 0 or above 1 if np.min(mask.sftgif.values) < 0 or np.max(mask.sftgif.values) > 1: mask["sftgif"] = np.clip(mask.sftgif, 0.0, 1.0) if np.min(ground_mask.sftgrf.values) < 0 or np.max(ground_mask.sftgrf.values) > 1: ground_mask["sftgrf"] = np.clip(ground_mask.sftgrf, 0.0, 1.0) # flip around axes so the order is (x, y, time) bed = bed.transpose("x", "y", "time", ...) bed_data = bed.topg.values thickness = thickness.transpose("x", "y", "time", ...) thickness_data = thickness.lithk.values mask = mask.transpose("x", "y", "time", ...) mask_data = mask.sftgif.values ground_mask = ground_mask.transpose("x", "y", "time", ...) ground_mask_data = ground_mask.sftgrf.values # for each time step, calculate ivaf ivaf = np.zeros(bed_data.shape) for i in range(length_time): # get data slices for current time thickness_i = thickness_data[:, :, i].copy() bed_i = bed_data[:, :, i].copy() mask_i = mask_data[:, :, i].copy() ground_mask_i = ground_mask_data[:, :, i].copy() # set data slices to zero where mask = 0 or any value is NaN thickness_i[ (mask_i == 0) | (np.isnan(mask_i)) | (np.isnan(thickness_i)) | (np.isnan(ground_mask_i)) | (np.isnan(bed_i)) ] = 0 bed_i[ (mask_i == 0) | (np.isnan(mask_i)) | (np.isnan(thickness_i)) | (np.isnan(ground_mask_i)) | (np.isnan(bed_i)) ] = 0 ground_mask_i[ (mask_i == 0) | np.isnan(mask_i) | np.isnan(thickness_i) | np.isnan(ground_mask_i) | np.isnan(bed_i) ] = 0 mask_i[ (mask_i == 0) | (np.isnan(mask_i)) | (np.isnan(thickness_i)) | (np.isnan(ground_mask_i)) | (np.isnan(bed_i)) ] = 0 # take min(bed_i, 0) bed_i[bed_i > 0] = 0 # calculate IVAF (based on MATLAB processing scripts from Seroussi, 2021) hf_i = thickness_i + ((rhow / rhoi) * bed_i) masked_output = hf_i * ground_mask_data[:, :, i] * mask_data[:, :, i] ivaf[:, :, i] = masked_output * scalefac_model * (self.resolution * 1000) ** 2 # Reuse the bed dataset's structure (coordinates, attributes) for the output file. ivaf_nc = bed.copy() if not ctrl_proj: ivaf_ctrl = xr.open_dataset(ctrl_path).transpose("x", "y", "time", ...) ivaf_with_ctrl = ivaf.copy() ivaf = ivaf - ivaf_ctrl.ivaf.values ivaf_nc["ivaf"] = (("x", "y", "time"), ivaf) if not ctrl_proj: ivaf_nc["ivaf_with_control"] = (("x", "y", "time"), ivaf_with_ctrl) ivaf_nc = ivaf_nc.drop_vars( [ "topg", ] ) ivaf_nc["sle"] = ivaf_nc.ivaf / 1e9 / 362.5 export_nc_path = os.path.join(directory, f"ivaf_{self.ice_sheet}_{group}_{model}_{exp}.nc") if os.path.exists(export_nc_path): os.remove(export_nc_path) ivaf_nc.to_netcdf(export_nc_path) print(f"{group}_{model}_{exp}: Processing successful.") return 1
[docs] def get_model_densities(zenodo_directory: str, output_path: str | None = None): """ Extracts density values (rhoi and rhow) from NetCDF files in the specified directory and returns them in a pandas DataFrame. Args: zenodo_directory (str): Path to the directory containing the NetCDF files. output_path (str, optional): Path to save the extracted density values as a CSV file. Defaults to None. Returns: pandas.DataFrame: A DataFrame containing the group, model, rhoi, and rhow values for each model run. """ results: list[dict] = [] for root, _, files in os.walk(zenodo_directory): for file in files: if file.endswith(".nc"): # Check if the file is a NetCDF file file_path = os.path.join(root, file) try: # Open the NetCDF file using xarray dataset = xr.open_dataset(file_path, decode_times=False).transpose( "x", "y", "time", ... ) # Extract values for rhoi and rhow if "rhoi" in dataset and "rhow" in dataset: rhoi_values = dataset["rhoi"].values rhow_values = dataset["rhow"].values # Append the filename and values to the results list results.append({"filename": file, "rhoi": rhoi_values, "rhow": rhow_values}) # Close the dataset dataset.close() except Exception as e: print(f"Error processing {file}: {e}") densities = [] last_filename = "" for result in results: if "ctrl_proj" in result["filename"] or "hist" in result["filename"]: continue elif "ILTS" in result["filename"]: fp = result["filename"].split("_") group = "ILTS_PIK" model = fp[-2] elif "ULB_fETISh" in result["filename"]: fp = result["filename"].split("_") group = "ULB" model = "fETISh_32km" if "32km" in result["filename"] else "fETISh_16km" else: fp = result["filename"].split("_") group = fp[-3] model = fp[-2] densities.append([group, model, result["rhoi"], result["rhow"]]) last_filename = result["filename"] df = pd.DataFrame(densities, columns=["group", "model", "rhoi", "rhow"]) df["rhoi"], df["rhow"] = df.rhoi.astype("float"), df.rhow.astype("float") df = df.drop_duplicates() ice_sheet = "AIS" if "AIS" in last_filename else "GIS" if output_path is not None: if output_path.endswith("/"): df.to_csv(f"{output_path}/{ice_sheet}_densities.csv", index=False) else: df.to_csv(output_path, index=False) return df
[docs] def interpolate_values(data): """ Interpolates missing values in the x and y dimensions of the input dataset using linear interpolation. Ensures that first and last values are properly adjusted to maintain consistency. Args: data (xarray.Dataset): A dataset containing x and y dimensions with potential missing values. Returns: tuple: A tuple containing the interpolated x and y arrays. """ y = pd.Series(data.y.values) y = y.replace(0, np.NaN) y = np.array(y.interpolate()) # first and last are NaNs, replace with correct values y[0] = y[1] - (y[2] - y[1]) y[-1] = y[-2] + (y[-2] - y[-3]) x = pd.Series(data.x.values) x = x.replace(0, np.NaN) x = np.array(x.interpolate()) # first and last are NaNs, replace with correct values x[0] = x[1] - (x[2] - x[1]) x[-1] = x[-2] + (x[-2] - x[-3]) return x, y
[docs] def get_xarray_data(dataset_fp, var_name=None, ice_sheet="AIS", convert_and_subset=False): """ Retrieves and processes data from an xarray dataset. Args: dataset_fp (str): The file path to the xarray dataset. var_name (str, optional): The name of the variable to retrieve from the dataset. Defaults to None. ice_sheet (str, optional): The ice sheet type ('AIS' or 'GrIS'). Defaults to 'AIS'. convert_and_subset (bool, optional): If True, converts and subsets the dataset for the target time range. Defaults to False. Returns: np.ndarray or xarray.Dataset: The extracted variable as a NumPy array or the entire processed dataset. """ dataset = xr.open_dataset( dataset_fp, decode_times=False, engine="netcdf4", ) try: dataset = dataset.transpose("time", "x", "y", ...) except (ValueError, KeyError): pass if "ivaf" in dataset.variables: pass else: # handle extra dimensions and variables try: dataset = dataset.drop_dims("nv4") except ValueError: pass for var in [ "z_bnds", "lat", "lon", "mapping", "time_bounds", "lat2d", "lon2d", "polar_stereographic", ]: try: dataset = dataset.drop(labels=[var]) except ValueError: pass if "z" in dataset.dims: dataset = dataset.mean(dim="z", skipna=True) # subset the dataset for 5km resolution (GrIS) if dataset.dims["x"] == 1681 and dataset.dims["y"] == 2881: dataset = dataset.sel(x=dataset.x.values[::5], y=dataset.y.values[::5]) if convert_and_subset: dataset = convert_and_subset_times(dataset) if var_name is not None: try: data = dataset[var_name].values except KeyError: return np.nan, np.nan x_dim = 761 if ice_sheet.lower() == "ais" else 337 y_dim = 761 if ice_sheet.lower() == "ais" else 577 if ( "time" not in dataset.dims or dataset.dims["time"] == 1 or (data.shape[1] == y_dim and data.shape[2] == x_dim) ): pass else: # TODO: fix this. this is just a weird way of tranposing, not sure if it even happens. grid_indices = np.array([0, 1, 2])[ (np.array(data.shape) == x_dim) | (np.array(data.shape) == y_dim) ] data = np.moveaxis(data, list(grid_indices), [1, 2]) if "time" not in dataset.dims: data_flattened = data.reshape( -1, ) else: data_flattened = data.reshape(len(dataset.time), -1) return data_flattened return dataset
[docs] class DatasetMerger: """ Merges pre-processed CSV forcing and projection files into a single dataset. This is a lower-level alternative to ``process_sectors()``. Use it when the intermediate per-run CSV files already exist on disk and you only need the join step (forcing ↔ projection matched by CMIP model and pathway). Args: ice_sheet (str): The ice sheet name ('AIS' or 'GrIS'). forcings (str): Directory containing forcing CSV files. projections (str): Directory containing projection CSV files. experiment_file (str): Path to the experiment metadata file (CSV or JSON). output_dir (str): Directory to save the merged ``dataset.csv``. Attributes: experiments (pd.DataFrame): Experiment metadata loaded from ``experiment_file``. forcing_paths (list): File paths for all forcing CSVs found under ``forcings``. projection_paths (list): File paths for all projection CSVs found under ``projections``. forcing_metadata (pd.DataFrame): Extracted CMIP model and pathway for each forcing file. """ def __init__(self, ice_sheet, forcings, projections, experiment_file, output_dir): self.ice_sheet = ice_sheet self.forcings = forcings self.projections = projections self.experiment_file = experiment_file self.output_dir = output_dir if self.experiment_file.endswith(".csv"): self.experiments = pd.read_csv(experiment_file) self.experiments.ice_sheet = self.experiments.ice_sheet.apply(lambda x: x.lower()) elif self.experiment_file.endswith(".json"): self.experiments = pd.read_json(experiment_file).T else: raise ValueError("Experiment file must be a CSV or JSON file.") self.forcing_paths = get_all_filepaths( path=self.forcings, filetype="csv", ) self.projection_paths = get_all_filepaths( path=self.projections, filetype="csv", ) self.forcing_metadata = self._get_forcing_metadata()
[docs] def merge_dataset(self): """ Merges forcing and projection datasets based on CMIP model and pathway metadata. Returns: int: Returns 0 upon successful merging and saving of the dataset. """ full_dataset = pd.DataFrame() self.experiments["exp"] = self.experiments["exp"].apply(lambda x: x.lower()) for i, projection in enumerate( tqdm( self.projection_paths, total=len(self.projection_paths), desc="Merging forcing & projection files", ) ): # get experiment from projection filepath exp = projection.replace(".csv", "").split("/")[-1].split("_")[-1] # make sure cases match when doing table lookup # get AOGCM value from table lookup try: aogcm = self.experiments.loc[ (self.experiments.exp == exp.lower()) & (self.experiments.ice_sheet == self.ice_sheet.lower()) ]["AOGCM"].values[0] except IndexError: aogcm = self.experiments.loc[self.experiments.exp == exp.lower()]["AOGCM"].values[0] proj_cmip_model = aogcm.split("_")[0] proj_pathway = aogcm.split("_")[-1] # names of CMIP models are slightly different, adjust based on AIS/GrIS directories if self.ice_sheet == "AIS": if proj_cmip_model == "csiro-mk3.6": proj_cmip_model = "csiro-mk3-6-0" elif proj_cmip_model == "ipsl-cm5-mr": proj_cmip_model = "ipsl-cm5a-mr" elif proj_cmip_model == "cnrm-esm2" or proj_cmip_model == "cnrm-cm6": proj_cmip_model = f"{proj_cmip_model}-1" elif self.ice_sheet == "GrIS": if proj_cmip_model.lower() == "noresm1-m": proj_cmip_model = "noresm1" elif proj_cmip_model.lower() == "ipsl-cm5-mr": proj_cmip_model = "ipsl-cm5" elif proj_cmip_model.lower() == "access1-3": proj_cmip_model = "access1.3" elif proj_cmip_model.lower() == "ukesm1-0-ll": proj_cmip_model = "ukesm1-cm6" # get forcing file from table lookup that matches projection forcing_files = self.forcing_metadata.file.loc[ (self.forcing_metadata.cmip_model == proj_cmip_model) & (self.forcing_metadata.pathway == proj_pathway) ] if forcing_files.empty: raise IndexError( f"Could not find forcing file for {aogcm}. Check formatting of experiment file." ) if len(forcing_files) > 1: forcings = pd.DataFrame() for file in forcing_files.values: forcings = pd.concat( [forcings, pd.read_csv(f"{self.forcings}/{file}.csv")], axis=1 ) else: forcing_file = forcing_files.values[0] forcings = pd.read_csv(f"{self.forcings}/{forcing_file}.csv") # load forcing and projection datasets projections = pd.read_csv(projection) # if forcings are longer than projections, cut off the beginning of the forcings if len(forcings) > len(projections): forcings = forcings.iloc[-len(projections) :].reset_index(drop=True) # add forcings and projections together and add some metadata merged_dataset = pd.concat([forcings, projections], axis=1) merged_dataset["time"] = np.arange(1, len(merged_dataset) + 1) merged_dataset["cmip_model"] = proj_cmip_model merged_dataset["pathway"] = proj_pathway merged_dataset["exp"] = exp merged_dataset["id"] = i # now add to dataset with all forcing/projection pairs full_dataset = pd.concat([full_dataset, merged_dataset]) # save the full dataset full_dataset.to_csv(f"{self.output_dir}/dataset.csv", index=False) return 0
[docs] def merge_sectors(self, _forcings_file=None, _projections_file=None, _save_dir=None): # Not yet implemented. Use process_sectors() for sector-level merging. pass
def _get_forcing_metadata(self): """ Extracts metadata from the forcing files to associate CMIP models with their respective pathways. Returns: pandas.DataFrame: DataFrame containing metadata with columns 'file', 'cmip_model', and 'pathway'. """ pairs = {} for forcing in self.forcing_paths: forcing = forcing.replace(".csv", "").split("/")[-1] cmip_model = forcing.split("_")[1] # GrIS has MAR3.9 in name, ignore if cmip_model == "MAR3.9": cmip_model = forcing.split("_")[2] elif cmip_model.lower() == "gris": cmip_model = forcing.split("_")[2] if "rcp" in forcing.lower() or "ssp" in forcing.lower(): for substring in forcing.lower().split("_"): if "rcp" in substring or "ssp" in substring: pathway = substring.lower() if len(pathway.split("-")) > 1 and ( "rcp" in pathway.split("-")[-1] or "ssp" in pathway.split("-")[-1] ): if len(pathway.split("-")) > 2: cmip_model = "-".join(pathway.split("-")[0:2]) pathway = pathway.split("-")[-1] else: cmip_model = pathway.split("-")[0] pathway = pathway.split("-")[-1] break else: pathway = "rcp85" if self.ice_sheet == "GrIS": if cmip_model.lower() == "noresm1-m": cmip_model = "noresm1" elif cmip_model.lower() == "ipsl-cm5-mr": cmip_model = "ipsl-cm5" elif cmip_model.lower() == "access1-3": cmip_model = "access1.3" elif cmip_model.lower() == "ukesm1-0-ll": cmip_model = "ukesm1-cm6" pairs[forcing] = [cmip_model.lower(), pathway.lower()] df = pd.DataFrame(pairs).T df = pd.DataFrame(pairs).T.reset_index() df.columns = ["file", "cmip_model", "pathway"] return df
[docs] def combine_gris_forcings(forcing_dir): """ Combines GrIS forcings from multiple CMIP model directories into consolidated NetCDF files. Args: forcing_dir (str): Directory containing the GrIS forcing files. Returns: int: 0 upon successful processing. """ atmosphere_dir = f"{forcing_dir}/GrIS/Atmosphere_Forcing/aSMB_observed/v1/" cmip_directories = next(os.walk(atmosphere_dir))[1] for cmip_dir in tqdm( cmip_directories, total=len(cmip_directories), desc="Processing CMIP directories" ): for var in ["aSMB", "aST"]: files = os.listdir(f"{atmosphere_dir}/{cmip_dir}/{var}") files = np.array([x for x in files if x.endswith(".nc")]) years = np.array([int(x.replace(".nc", "").split("-")[-1]) for x in files]) year_files = files[(years >= 2015) & (years <= 2100)] for i, file in enumerate(year_files): # first iteration, open dataset and store if i == 0: dataset = xr.open_dataset(f"{atmosphere_dir}/{cmip_dir}/{var}/{file}") for dim in ["nv", "nv4", "mapping"]: try: dataset = dataset.drop_dims(dim) except (ValueError, KeyError): pass dataset = dataset.drop("mapping") dataset = dataset.sel(x=dataset.x.values[::5], y=dataset.y.values[::5]) continue # following iterations, open dataset and concatenate data = xr.open_dataset(f"{atmosphere_dir}/{cmip_dir}/{var}/{file}") for dim in ["nv", "nv4"]: try: data = data.drop_dims(dim) except (ValueError, KeyError): pass data = data.drop("mapping") data = data.sel(x=data.x.values[::5], y=data.y.values[::5]) # data['time'] = pd.to_datetime(year, format='%Y') dataset = xr.concat([dataset, data], dim="time") # Now you have the dataset with the files loaded and time dimension set dataset.to_netcdf( os.path.join(atmosphere_dir, cmip_dir, f"GrIS_{cmip_dir}_{var}_combined.nc") ) return 0
[docs] def process_GrIS_atmospheric_sectors(forcing_directory, grid_file): """ Aggregate GrIS atmospheric forcing (aSMB and aST) to sector-level annual means. Reads annual NetCDF files from ``Atmosphere_Forcing/aSMB_observed/v1/`` and combines them per AOGCM via ``combine_gris_forcings()`` if combined files do not yet exist. Then averages Surface Mass Balance anomaly (aSMB) and surface temperature anomaly (aST) spatially over each of the 6 GrIS drainage basins. Args: forcing_directory (str): Root forcing directory (GHub layout expected). The function navigates to the ``Atmosphere_Forcing/aSMB_observed/v1/`` sub-directory automatically. grid_file (str or xarray.Dataset): Path to (or loaded) sector-definition NetCDF defining the 6 GrIS drainage-basin sectors. Returns: pandas.DataFrame: Rows indexed by (aogcm, sector, year) with columns ``aSMB``, ``aST``, ``aogcm``, ``year``, and ``sector``. """ start_time = time.time() path_to_forcings = "Atmosphere_Forcing/aSMB_observed/v1/" af_directory = ( f"{forcing_directory}/{path_to_forcings}" if not forcing_directory.endswith(path_to_forcings) else forcing_directory ) # check to see if GrIS forcings have been combined filepaths = get_all_filepaths(path=af_directory, contains="combined", filetype="nc") if not filepaths: combine_gris_forcings(af_directory) filepaths = get_all_filepaths(path=af_directory, contains="combined", filetype="nc") if not filepaths: raise ValueError("No combined files found. Check combine_gris_forcings function.") aogcm_directories = os.listdir(af_directory) aogcm_directories = [x for x in aogcm_directories if "DS_Store" not in x and "README" not in x] sectors = _format_grid_file(grid_file) unique_sectors = np.unique(sectors) all_data = [] for i, fp in enumerate(aogcm_directories): print("") print(f"Directory {i + 1} / {len(aogcm_directories)}") print(f"Directory: {fp.split('/')[-1]}") print(f"Time since start: {(time.time() - start_time) // 60} minutes") files = get_all_filepaths(path=f"{af_directory}/{fp}", contains="combined", filetype="nc") if len(files) != 2: raise ValueError(f"There should only be 2 combined files in each firectory, see {fp}.") st_and_smb = [] for file in files: dataset = xr.open_dataset(file, decode_times=False) dataset = convert_and_subset_times(dataset) # handle extra dimensions and variables try: dataset = dataset.drop_dims("nv4") except ValueError: pass for var in [ "z_bnds", "lat", "lon", "mapping", "time_bounds", "lat2d", "lon2d", "polar_stereographic", ]: try: dataset = dataset.drop(labels=[var]) except ValueError: pass if "z" in dataset.dims: dataset = dataset.mean(dim="z", skipna=True) dataset["sector"] = sectors formatted_aogcm = fp.rsplit("-", 1) formatted_aogcm = "_".join(formatted_aogcm).lower() aogcm_data = [] for sector in unique_sectors: mask = dataset.sector == sector sector_averages = dataset.where(mask, drop=True).mean(dim=["x", "y"]) sector_averages = sector_averages.to_dataframe() sector_averages["aogcm"] = formatted_aogcm sector_averages["year"] = np.arange(1, 87) sector_averages = sector_averages.reset_index(drop=True) aogcm_data.append(sector_averages) st_and_smb.append(pd.concat(aogcm_data)) all_data.append(pd.concat(st_and_smb, axis=1)) return pd.concat(all_data)
[docs] def process_AIS_atmospheric_sectors(forcing_directory, grid_file): """ Aggregate AIS atmospheric forcing to sector-level annual means. Searches ``Atmosphere_Forcing/`` for 8 km, 1995-2100 NetCDF files, loads each via ``ForcingFile``, and averages spatially over each of the 18 AIS sectors defined by the grid file. Args: forcing_directory (str): Root forcing directory (GHub layout expected). The function navigates to the ``Atmosphere_Forcing/`` sub-directory automatically. grid_file (str): Path to the AIS sector-definition NetCDF (e.g. ``AIS_sectors_8km.nc``). Returns: pandas.DataFrame: Rows indexed by (aogcm, sector, year) with one column per atmospheric forcing variable, plus ``aogcm``, ``year``, and ``sector``. """ ice_sheet = "AIS" start_time = time.time() path_to_forcings = "/Atmosphere_Forcing/" af_directory = ( f"{forcing_directory}/{path_to_forcings}" if not forcing_directory.endswith(path_to_forcings) else forcing_directory ) filepaths = get_all_filepaths(path=af_directory, filetype="nc") filepaths = [f for f in filepaths if "1995-2100" in f] filepaths = [f for f in filepaths if "8km" in f] if not filepaths: raise ValueError("No files found. Check the path to the forcing files.") gridfile = GridFile(ice_sheet=ice_sheet, filepath=grid_file) gridfile.format_grids() sectors = gridfile.get_sectors() unique_sectors = np.unique(sectors) all_data = [] for i, fp in enumerate(filepaths): print("") print(f"File {i + 1} / {len(filepaths)}") print(f"File: {fp.split('/')[-1]}") print(f"Time since start: {(time.time() - start_time) // 60} minutes") forcingfile = ForcingFile(ice_sheet, realm="atmos", filepath=fp) forcingfile.load(decode_times=False) forcingfile.format_timestamps() forcingfile.drop_vars( ["nv4", "z_bnds", "lat", "lon", "mapping", "time_bounds", "lat2d", "lon2d"] ) forcingfile.assign_sectors(gridfile) aogcm_data = [] for sector in unique_sectors: sector_averages = forcingfile.average_over_sector(sector) sector_averages = sector_averages.to_dataframe() sector_averages["aogcm"] = fp.split("/")[-3].lower() sector_averages["year"] = np.arange(1, 87) sector_averages = sector_averages.reset_index(drop=True) aogcm_data.append(sector_averages) all_data.append(pd.concat(aogcm_data)) atmospheric_df = pd.concat(all_data) atmospheric_df = atmospheric_df.loc[:, ~atmospheric_df.columns.duplicated()] return atmospheric_df
[docs] def process_AIS_oceanic_sectors(forcing_directory, grid_file): """ Aggregate AIS oceanic forcing to sector-level annual means. Loads thermal forcing, salinity, and ocean temperature NetCDFs from ``Ocean_Forcing/`` (8 km, 1995-2100 files), depth-averages each variable, and then spatially averages over each of the 18 AIS sectors. Args: forcing_directory (str): Root forcing directory (GHub layout expected). The function navigates to the ``Ocean_Forcing/`` sub-directory automatically. grid_file (str): Path to the AIS sector-definition NetCDF (e.g. ``AIS_sectors_8km.nc``). Returns: pandas.DataFrame: Rows indexed by (aogcm, sector, year) with columns ``thermal_forcing``, ``salinity``, ``temperature``, ``aogcm``, ``year``, and ``sector``. """ start_time = time.time() directory = ( f"{forcing_directory}/Ocean_Forcing/" if not forcing_directory.endswith("Ocean_Forcing/") else forcing_directory ) # Get all NC files that contain data from 1995-2100 filepaths = get_all_filepaths(path=directory, filetype="nc") filepaths = [f for f in filepaths if "1995-2100" in f] filepaths = [f for f in filepaths if "8km" in f] # In the case of ocean forcings, use the filepaths of the files to determine # which directories need to be used for OceanForcing processing. Change to # those directories rather than individual files. aogcms = list(set([f.split("/")[-3] for f in filepaths])) filepaths = [f"{directory}/{aogcm}/" for aogcm in aogcms] # Useful progress prints print("Files to be processed...") print([f.split("/")[-2] for f in filepaths]) gridfile = GridFile(ice_sheet="AIS", filepath=grid_file) gridfile.format_grids() sectors = gridfile.get_sectors() unique_sectors = np.unique(sectors) all_data = [] for i, directory in enumerate(filepaths): print("") print(f"File {i + 1} / {len(filepaths)}") print(f"File: {directory.split('/')[-1]}") print(f"Time since start: {(time.time() - start_time) // 60} minutes") files = os.listdir(f"{directory}/1995-2100/") if len(files) != 3: warnings.warn( f"Skipping {directory}/1995-2100/: expected 3 NetCDF files " f"(thermal_forcing, salinity, temperature), found {len(files)}: {files}" ) thermal_forcing_file = [f for f in files if "thermal_forcing" in f][0] salinity_file = [f for f in files if "salinity" in f][0] temperature_file = [f for f in files if "temperature" in f][0] tffile = ForcingFile( ice_sheet="AIS", realm="ocean", filepath=f"{directory}/1995-2100/{thermal_forcing_file}", varname="thermal_forcing", ) tffile.load(decode_times=False) tffile.format_timestamps() salfile = ForcingFile( ice_sheet="AIS", realm="ocean", filepath=f"{directory}/1995-2100/{salinity_file}", varname="salinity", ) salfile.load(decode_times=False) salfile.format_timestamps() tempfile = ForcingFile( ice_sheet="AIS", realm="ocean", filepath=f"{directory}/1995-2100/{temperature_file}", varname="temperature", ) tempfile.load(decode_times=False) tempfile.format_timestamps() aogcm_data = {"thermal_forcing": [], "salinity": [], "temperature": []} for datafile in [tffile, salfile, tempfile]: datafile.drop_vars( [ "nv4", "z_bnds", "lat", "lon", "mapping", "time_bounds", "lat2d", "lon2d", "polar_stereographic", ] ) datafile.aggregate_depth(method="mean") datafile.assign_sectors(gridfile) for sector in unique_sectors: sector_averages = datafile.average_over_sector(sector) sector_averages = sector_averages.to_dataframe() sector_averages["aogcm"] = _format_AIS_ocean_aogcm_name( directory.split("/")[-2].lower() ) sector_averages["year"] = np.arange(1, 87) sector_averages = sector_averages.reset_index(drop=True) aogcm_data[datafile.varname].append(sector_averages) df = pd.concat( [ pd.concat(aogcm_data["thermal_forcing"]), pd.concat(aogcm_data["salinity"]), pd.concat(aogcm_data["temperature"]), ], axis=1, ) df = df.loc[:, ~df.columns.duplicated()] all_data.append(df) return pd.concat(all_data)
[docs] def process_GrIS_oceanic_sectors(forcing_directory, grid_file): """ Aggregate GrIS oceanic forcing to sector-level annual means. Reads thermal forcing and basin runoff NetCDFs from ``Ocean_Forcing/Melt_Implementation/v4/`` and spatially averages each over the 6 GrIS drainage-basin sectors. Args: forcing_directory (str): Root forcing directory (GHub layout expected). The function navigates to the ``Ocean_Forcing/Melt_Implementation/v4/`` sub-directory automatically. grid_file (str or xarray.Dataset): Path to (or loaded) sector-definition NetCDF defining the 6 GrIS drainage-basin sectors. Returns: pandas.DataFrame: Rows indexed by (aogcm, sector, year) with columns ``thermal_forcing``, ``basin_runoff``, ``aogcm``, ``year``, and ``sector``. """ start_time = time.time() path_to_forcing = "Ocean_Forcing/Melt_Implementation/v4/" forcing_directory = ( f"{forcing_directory}/{path_to_forcing}" if not forcing_directory.endswith(path_to_forcing) else forcing_directory ) aogcm_directories = os.listdir(forcing_directory) aogcm_directories = [x for x in aogcm_directories if "DS_Store" not in x and "README" not in x] sectors = _format_grid_file(grid_file) unique_sectors = np.unique(sectors) all_data = [] for i, directory in enumerate(aogcm_directories): print("") print(f"Directory {i + 1} / {len(aogcm_directories)}") print(f"Directory: {directory.split('/')[-1]}") print(f"Time since start: {(time.time() - start_time) // 60} minutes") files = os.listdir(f"{forcing_directory}/{directory}") if len(files) != 2: warnings.warn( f"Skipping {forcing_directory}/{directory}: expected 2 NetCDF files " f"(thermalforcing, basinrunoff), found {len(files)}: {files}" ) thermal_forcing_file = [f for f in files if "thermalforcing" in f.lower()][0] basin_runoff_file = [f for f in files if "basinrunoff" in f.lower()][0] thermal_forcing = xr.open_dataset( f"{forcing_directory}/{directory}/{thermal_forcing_file}", decode_times=False ) basin_runoff = xr.open_dataset( f"{forcing_directory}/{directory}/{basin_runoff_file}", decode_times=False ) # subset the dataset for 5km resolution (GrIS) if thermal_forcing.sizes["x"] == 1681 and thermal_forcing.sizes["y"] == 2881: thermal_forcing = thermal_forcing.sel( x=thermal_forcing.x.values[::5], y=thermal_forcing.y.values[::5] ) basin_runoff = basin_runoff.sel( x=basin_runoff.x.values[::5], y=basin_runoff.y.values[::5] ) thermal_forcing = convert_and_subset_times(thermal_forcing) basin_runoff = convert_and_subset_times(basin_runoff) data = { "thermal_forcing": thermal_forcing, "basin_runoff": basin_runoff, } aogcm_data = { "thermal_forcing": [], "basin_runoff": [], } for name, dataset in data.items(): # handle extra dimensions and variables try: dataset = dataset.drop_dims("nv4") except ValueError: pass for var in [ "z_bnds", "lat", "lon", "mapping", "time_bounds", "lat2d", "lon2d", "polar_stereographic", ]: try: dataset = dataset.drop(labels=[var]) except ValueError: pass if "z" in dataset.dims: dataset = dataset.mean(dim="z", skipna=True) try: dataset["sector"] = sectors except ValueError: dataset["time"] = np.arange(1, 87) dataset["sector"] = sectors for sector in unique_sectors: mask = dataset.sector == sector sector_averages = dataset.where(mask, drop=True).mean(dim=["x", "y"]) sector_averages = sector_averages.to_dataframe() sector_averages["aogcm"] = _format_GrIS_ocean_aogcm_name(directory) sector_averages["year"] = np.arange(1, 87) sector_averages = sector_averages.reset_index(drop=True) aogcm_data[name].append(sector_averages) df = pd.concat( [ pd.concat(aogcm_data["thermal_forcing"]), pd.concat(aogcm_data["basin_runoff"]), ], axis=1, ) df = df.loc[:, ~df.columns.duplicated()] all_data.append(df) return pd.concat(all_data)
def _format_grid_file(grid_file): """ Formats a grid file to define sector-based data aggregation. Args: grid_file (str or xarray.Dataset): Path to the grid file or an xarray dataset. Returns: xarray.DataArray: An array representing sector labels for the grid. """ if isinstance(grid_file, str): grids = xr.open_dataset(grid_file) # .transpose('x', 'y',) sector_name = "sectors" if "8km" in grid_file.lower() else "ID" elif isinstance(grid_file, xr.Dataset): sector_name = "ID" if "Rignot" in grids.Description else "sectors" else: raise ValueError("grid_file must be a string or an xarray Dataset.") grids = grids.expand_dims(dim={"time": 86}) sectors = grids[sector_name] grids = grids.transpose("time", "x", "y", ...) return sectors
[docs] def process_AIS_outputs(zenodo_directory, with_ctrl=False): """ Load AIS IVAF scalar projections from Zenodo and convert to sea-level equivalent. Reads per-experiment NetCDF files from the ``ComputedScalarsPaper/`` sub-directory. Each file contains sector-level IVAF time series (``ivaf_sector_1`` … ``ivaf_sector_18``). Files with only 85 time steps have their first year duplicated to reach the required 86. SLE is computed as:: sle = -ivaf / 362.5 * 910 / (1e9 * 1000) following the sign convention and ice density (910 kg m⁻³) used in Seroussi et al. (2020) ISMIP6 scripts. Args: zenodo_directory (str): Path to the Zenodo download directory. The function looks inside ``ComputedScalarsPaper/`` automatically. with_ctrl (bool, optional): If ``True``, includes files that contain control projections (``ivaf_AIS_*`` files, excluding hist/ctrl filenames). Defaults to ``False``, which selects only ``ivaf_minus_ctrl_proj`` files. Returns: pandas.DataFrame: One row per (model, experiment, sector, year) with columns ``ivaf``, ``sle``, ``sector``, ``year``, ``id``, ``exp``, and ``model``. """ directory = ( f"{zenodo_directory}/ComputedScalarsPaper/" if not zenodo_directory.endswith("ComputedScalarsPaper") else zenodo_directory ) if not with_ctrl: files = get_all_filepaths(directory, contains="ivaf_minus_ctrl_proj", filetype="nc") else: files = get_all_filepaths( directory, contains="ivaf_AIS", filetype="nc", not_contains=["hist", "ctrl"] ) count = 0 all_files_data = [] for f in files: exp = f.replace(".nc", "").split("/")[-1].split("_")[-1] model = f"{f.replace('.nc', '').split('/')[-1].split('_')[-3]}_{f.replace('.nc', '').split('/')[-1].split('_')[-2]}" dataset = xr.open_dataset(f, decode_times=False) if len(dataset.time) == 85: count += 1 warnings.warn( f"{f.split('/')[-1]} does not contain 86 years. Inserting a copy into the first year." ) # Copy the first entry first_entry = dataset.isel({"time": 0}) # Assuming numeric coordinates, create a new coordinate value new_coord_value = ( first_entry["time"].values - 1 ) # Adjust this calculation based on your coordinate system # Set the new coordinate value for the copied entry first_entry["time"] = new_coord_value # Concatenate the new entry with the original dataset dataset = xr.concat([first_entry, dataset], dim="time") # dataset = convert_and_subset_times(dataset) all_sectors = [] for sector in range(1, 19): sector_x_data = dataset[f"ivaf_sector_{sector}"].to_dataframe().reset_index(drop=True) sector_x_data.rename(columns={f"ivaf_sector_{sector}": "ivaf"}, inplace=True) sector_x_data["sector"] = sector sector_x_data["year"] = np.arange(1, 87) sector_x_data["id"] = f"{model}_{exp}_sector{sector}" all_sectors.append(sector_x_data) full_dataset = pd.concat(all_sectors, axis=0) full_dataset["exp"] = exp full_dataset["model"] = model all_files_data.append(full_dataset) outputs = pd.concat(all_files_data) # outputs["sle"] = outputs["ivaf"] / 362.5 / 1e9 outputs["sle"] = ( -outputs["ivaf"] / 362.5 * 910 / (10**9 * 1000) ) # per Seroussi et al., ISMIP6 scripts (910=ice density) return outputs
[docs] def merge_datasets(forcings, projections, experiments_file, ice_sheet="AIS"): """ Join sector-level forcings and projections into a single analysis-ready DataFrame. Uses the experiment metadata to add the AOGCM name to the projections table, normalises AOGCM name formatting so the two tables join cleanly on ``(aogcm, year, sector)``, then performs an inner merge. Args: forcings (pd.DataFrame): Sector-level forcing DataFrame as produced by ``process_AIS/GrIS_atmospheric_sectors()`` + ``process_AIS/GrIS_oceanic_sectors()`` (or read from ``forcings.csv``). projections (pd.DataFrame): Sector-level projection DataFrame as produced by ``process_AIS/GrIS_outputs()`` (or read from ``projections.csv``). experiments_file (str or pd.DataFrame): Path to the experiment-metadata CSV (maps experiment IDs → AOGCM names) or a pre-loaded DataFrame. ice_sheet (str, optional): ``'AIS'`` or ``'GrIS'``. Defaults to ``'AIS'``. Returns: pandas.DataFrame: Merged dataset with one row per (model, experiment, sector, year), containing all forcing columns and the target SLE projection. """ if isinstance(experiments_file, str): experiments = pd.read_csv(experiments_file) elif isinstance(experiments_file, pd.DataFrame): experiments = experiments_file else: raise ValueError("experiments_file must be a string or a pandas DataFrame.") experiments = experiments[experiments.ice_sheet == ice_sheet] projections = pd.merge(projections, experiments, on="exp", how="inner") formatting_function = ( _format_AIS_forcings_aogcm_name if ice_sheet == "AIS" else _format_GrIS_forcings_aogcm_name ) forcings["aogcm"] = forcings["aogcm"].apply(formatting_function) projections.rename(columns={"AOGCM": "aogcm"}, inplace=True) forcings["sector"] = forcings.sector.astype(int) dataset = pd.merge(forcings, projections, on=["aogcm", "year", "sector"], how="inner") return dataset
[docs] def process_GrIS_outputs(zenodo_directory): """ Load GrIS IVAF scalar projections from Zenodo and convert to sea-level equivalent. Reads per-experiment NetCDF files from the ``v7_CMIP5_pub/`` sub-directory. Each file contains basin-level IVAF time series for the 6 GrIS drainage basins (``ivaf_no``, ``ivaf_ne``, ``ivaf_se``, ``ivaf_sw``, ``ivaf_cw``, ``ivaf_nw``). Files with only 85 time steps have their first year duplicated to reach the required 86. SLE is computed as:: sle = ivaf / 362.5 / 1e9 Args: zenodo_directory (str): Path to the Zenodo download directory. The function looks inside ``v7_CMIP5_pub/`` automatically. Returns: pandas.DataFrame: One row per (model, experiment, sector, year) with columns ``ivaf``, ``sle``, ``sector``, ``year``, ``id``, ``exp``, and ``model``. """ directory = ( f"{zenodo_directory}/v7_CMIP5_pub/" if not zenodo_directory.endswith("v7_CMIP5_pub") else zenodo_directory ) files = get_all_filepaths(directory, contains="rm", not_contains="ctrl_proj", filetype="nc") files = [f for f in files if "historical" not in f] count = 0 all_files_data = [] for f in files: exp = f.replace(".nc", "").split("/")[-1].split("_")[-1] exp = exp.replace("_05", "") model = f"{f.replace('.nc', '').split('/')[-1].split('_')[-3]}_{f.replace('.nc', '').split('/')[-1].split('_')[-2]}" dataset = xr.open_dataset(f, decode_times=False) if len(dataset.time) == 85: count += 1 warnings.warn( f"{f.split('/')[-1]} does not contain 86 years. Inserting a copy into the first year." ) # Copy the first entry first_entry = dataset.isel({"time": 0}) # Assuming numeric coordinates, create a new coordinate value new_coord_value = ( first_entry["time"].values - 1 ) # Adjust this calculation based on your coordinate system # Set the new coordinate value for the copied entry first_entry["time"] = new_coord_value # Concatenate the new entry with the original dataset dataset = xr.concat([first_entry, dataset], dim="time") sector_mapping = {"1": "no", "2": "ne", "3": "se", "4": "sw", "5": "cw", "6": "nw"} # dataset = convert_and_subset_times(dataset) all_sectors = [] for sector in range(1, 7): var_name = f"ivaf_{sector_mapping[str(sector)]}" sector_x_data = dataset[var_name].to_dataframe().reset_index(drop=True) sector_x_data.rename(columns={var_name: "ivaf"}, inplace=True) sector_x_data["sector"] = sector sector_x_data["year"] = np.arange(1, 87) sector_x_data["id"] = f"{model}_{exp}_sector{sector}" all_sectors.append(sector_x_data) full_dataset = pd.concat(all_sectors, axis=0) full_dataset["exp"] = exp full_dataset["model"] = model all_files_data.append(full_dataset) outputs = pd.concat(all_files_data) outputs["sle"] = outputs["ivaf"] / 362.5 / 1e9 return outputs
_DEFAULT_EXPERIMENTS_FILE = os.path.join( os.path.dirname(__file__), "data_files", "ismip6_experiments_updated.csv" )
[docs] def process_sectors( ice_sheet, forcing_directory, grid_file, zenodo_directory, experiments_file=_DEFAULT_EXPERIMENTS_FILE, export_directory=None, overwrite=False, with_ctrl=False, ): """ End-to-end pipeline that builds the sector-level training dataset from raw ISMIP6 files. This is the main entry point for data preparation. It reads raw climate forcing NetCDFs and pre-computed IVAF scalar projection files, aggregates both to sector-level annual time series (86 years, 2015-2100), joins them on (aogcm, year, sector), and returns a single analysis-ready DataFrame. Intermediate files are written to ``export_directory`` so individual stages can be skipped on re-runs (controlled by ``overwrite``): * ``<ice_sheet>_atmospheric.csv`` - sector-averaged atmospheric forcings * ``<ice_sheet>_oceanic.csv`` - sector-averaged oceanic forcings * ``forcings.csv`` - atmospheric + oceanic merged * ``projections.csv`` - IVAF projections by sector * ``dataset.csv`` - final merged dataset (also returned) Args: ice_sheet (str): Ice sheet to process: ``'AIS'`` (18 sectors) or ``'GrIS'`` (6 sectors). forcing_directory (str): Root directory of the ISMIP6 forcing data. Expected sub-structure mirrors the GHub layout (``Atmosphere_Forcing/``, ``Ocean_Forcing/``, etc.). grid_file (str): Path to the sector-definition NetCDF (e.g. ``AIS_sectors_8km.nc`` or ``GrIS_Basins_Rignot_sectors_5km.nc``). zenodo_directory (str): Directory containing the pre-computed IVAF scalar files from Zenodo (``ComputedScalarsPaper/`` for AIS, ``v7_CMIP5_pub/`` for GrIS). experiments_file (str): Path to the experiment-metadata CSV that maps experiment IDs to AOGCM names. Defaults to the bundled ``ismip6_experiments_updated.csv``. export_directory (str, optional): Directory to write intermediate and final CSVs. If ``None``, nothing is saved to disk. overwrite (bool, optional): If ``True``, re-process and overwrite any existing intermediate files. Defaults to ``False``. with_ctrl (bool, optional): AIS only — if ``True``, includes control projections in the output. Defaults to ``False``. Returns: pandas.DataFrame: Merged dataset with one row per (model, experiment, sector, year), containing both forcing variables and the target SLE projection. """ forcing_exists = os.path.exists(f"{export_directory}/forcings.csv") if not forcing_exists or (forcing_exists and overwrite): atmospheric_df = ( process_AIS_atmospheric_sectors(forcing_directory, grid_file) if ice_sheet == "AIS" else process_GrIS_atmospheric_sectors(forcing_directory, grid_file) ) atmospheric_df.to_csv(f"{export_directory}/{ice_sheet}_atmospheric.csv", index=False) oceanic_df = ( process_AIS_oceanic_sectors(forcing_directory, grid_file) if ice_sheet == "AIS" else process_GrIS_oceanic_sectors(forcing_directory, grid_file) ) oceanic_df.to_csv(f"{export_directory}/{ice_sheet}_oceanic.csv", index=False) atmospheric_df = pd.read_csv(f"{export_directory}/{ice_sheet}_atmospheric.csv") oceanic_df = pd.read_csv(f"{export_directory}/{ice_sheet}_oceanic.csv") atmospheric_df = atmospheric_df[[x for x in atmospheric_df.columns if ".1" not in x]] oceanic_df = oceanic_df[[x for x in oceanic_df.columns if ".1" not in x]] atmospheric_df = atmospheric_df.loc[:, ~atmospheric_df.columns.duplicated()] oceanic_df = oceanic_df.loc[:, ~oceanic_df.columns.duplicated()] forcings = pd.merge( atmospheric_df, oceanic_df, on=[ "aogcm", "year", "sector", ], how="inner", ) forcings.to_csv(f"{export_directory}/forcings.csv", index=False) else: forcings = pd.read_csv(f"{export_directory}/forcings.csv") projections_exists = os.path.exists(f"{export_directory}/projections.csv") if not projections_exists or (projections_exists and overwrite): projections = ( process_AIS_outputs( zenodo_directory, with_ctrl=with_ctrl, ) if ice_sheet == "AIS" else process_GrIS_outputs( zenodo_directory, ) ) projections.to_csv(f"{export_directory}/projections.csv", index=False) else: projections = pd.read_csv(f"{export_directory}/projections.csv") projections = projections.loc[:, ~projections.columns.duplicated()] dataset = merge_datasets( forcings, projections, experiments_file, ice_sheet, ) dataset = dataset[[x for x in dataset.columns if ".1" not in x]] if export_directory is not None: dataset.to_csv(f"{export_directory}/dataset.csv", index=False) return dataset
# --------------------------------------------------------------------------- # AOGCM name normalisation helpers # # ISMIP6 forcing directories and the experiment-metadata table use slightly # different AOGCM naming conventions (dots vs. dashes, version suffixes, etc.). # The four functions below canonicalise names so the forcings and projections # tables can be joined cleanly. Each is applied with DataFrame.apply() inside # merge_datasets() / DatasetMerger._get_forcing_metadata(). # --------------------------------------------------------------------------- def _format_AIS_ocean_aogcm_name(aogcm): """Normalise an AIS oceanic forcing directory AOGCM name for joining.""" aogcm = aogcm.lower() if ( aogcm == "ipsl-cm5a-mr_rcp2.6" or aogcm == "ipsl-cm5a-mr_rcp8.5" or aogcm == "hadgem2-es_rcp8.5" or aogcm == "csiro-mk3-6-0_rcp8.5" ): aogcm = aogcm.replace(".", "") elif ( aogcm == "cnrm-cm6-1_ssp585" or aogcm == "cnrm-esm2-1_ssp585" or aogcm == "cnrm-cm6-1_ssp126" ): aogcm = aogcm.replace("-1", "") aogcm = aogcm.replace("-", "_") elif aogcm == "ukesm1-0-ll_ssp585": aogcm = "ukesm1-0-ll" else: pass return aogcm def _format_AIS_forcings_aogcm_name(aogcm): """Normalise an AIS atmospheric forcing file AOGCM name for joining.""" aogcm = aogcm.lower() if ( aogcm == "noresm1-m_rcp2.6" or aogcm == "noresm1-m_rcp8.5" or aogcm == "miroc-esm-chem_rcp8.5" or aogcm == "ccsm4_rcp8.5" ): aogcm = aogcm.replace(".", "") elif aogcm == "csiro-mk3-6-0_rcp85": aogcm = "csiro-mk3.6_rcp85" elif aogcm == "ipsl-cm5a-mr_rcp26" or aogcm == "ipsl-cm5a-mr_rcp85": aogcm = aogcm.replace("a", "") else: pass return aogcm def _format_GrIS_forcings_aogcm_name(aogcm): """Normalise a GrIS atmospheric forcing file AOGCM name for joining.""" aogcm = aogcm.lower() if aogcm == "noresm1_rcp85": aogcm = "noresm1-m_rcp85" elif aogcm == "ukesm1-cm6_ssp585": aogcm = "ukesm1-0-ll_ssp585" elif aogcm == "csiro_mk3.6_rcp85": aogcm = "csiro-mk3.6_rcp85" elif aogcm == "hadgem2_es_rcp85": aogcm = "hadgem2-es_rcp8.5" elif aogcm == "ipsl-cm5_mr_rcp85": aogcm = "ipsl-cm5-mr_rcp8.5" elif aogcm == "noresm1_rcp85": aogcm = "noresm1-m_rcp8.5" else: pass return aogcm def _format_GrIS_ocean_aogcm_name(aogcm): """ Formats AOGCM names for GrIS oceanic forcing files to maintain consistency. Args: aogcm (str): The original AOGCM name. Returns: str: The formatted AOGCM name. """ aogcm = aogcm.lower() if aogcm == "access1-3_rcp8.5": aogcm = "access1.3_rcp85" elif aogcm == "csiro-mk3.6_rcp8.5": aogcm = aogcm.replace("8.5", "85") elif aogcm in ( "hadgem2-es_rcp8.5", "ipsl-cm5-mr_rcp8.5", "miroc5_rcp2.6", "miroc5_rcp8.5", "miroc5_rcp8.5", ): aogcm = aogcm.replace(".", "") elif aogcm == "noresm1-m_rcp8.5": aogcm = "noresm1_rcp85" elif aogcm == "ukesm1-0-ll_ssp585": aogcm = "ukesm1-cm6_ssp585" else: pass return aogcm