"""End-to-end ISMIP6 data processing pipeline for ISEFlow training data.
This module converts raw ISMIP6 forcing and projection files into the
sector-level, analysis-ready ``dataset.csv`` consumed by ``FeatureEngineer``
and ultimately by ``ISEFlow.fit()``.
Public entry points
-------------------
``process_sectors`` (main entry point):
End-to-end pipeline. Reads raw ISMIP6 forcing NetCDFs from the GHub
directory layout and pre-computed IVAF scalar projection files from
Zenodo, aggregates both to sector-level annual time series (86 years,
2015-2100), joins them on (aogcm, year, sector), and returns a single
``pandas.DataFrame``::
from ise.data.process import process_sectors
dataset = process_sectors(
ice_sheet="AIS",
forcing_directory="/path/to/GHub/AIS/",
grid_file="/path/to/AIS_sectors_8km.nc",
zenodo_directory="/path/to/zenodo_download/",
export_directory="outputs/",
)
Intermediate CSVs (``AIS_atmospheric.csv``, ``AIS_oceanic.csv``,
``forcings.csv``, ``projections.csv``, ``dataset.csv``) are written to
``export_directory`` so individual stages are skipped on re-runs
(controlled by ``overwrite=False``).
``ProjectionProcessor``:
Only needed when starting from **raw 3-D ISMIP6 NetCDF output files**
rather than the pre-computed Zenodo scalar files. Computes Ice Volume
Above Flotation (IVAF) from bed topography, ice thickness, and ice/grounded
fraction at each grid cell, subtracts the matched control-run IVAF, and
writes ``ivaf_<ice_sheet>_<group>_<model>_<exp>.nc`` files::
from ise.data.process import ProjectionProcessor
processor = ProjectionProcessor(
ice_sheet="AIS",
forcings_directory="/path/to/forcing/",
projections_directory="/path/to/projections/",
scalefac_path="af2_scalefac.nc",
densities_path="AIS_densities.csv",
)
processor.process()
``DatasetMerger``:
Lower-level alternative to ``process_sectors()`` for when intermediate
per-run CSV files already exist on disk. Performs only the join step
(forcing ↔ projection matched by CMIP model and pathway).
Supporting functions
--------------------
``process_AIS_atmospheric_sectors`` / ``process_GrIS_atmospheric_sectors``
Aggregate atmospheric forcing NetCDFs to sector-level annual means.
``process_AIS_oceanic_sectors`` / ``process_GrIS_oceanic_sectors``
Aggregate oceanic forcing NetCDFs to sector-level annual means.
``process_AIS_outputs`` / ``process_GrIS_outputs``
Load pre-computed IVAF scalar projections from Zenodo and convert to SLE.
``merge_datasets``
Join sector-level forcings and projections DataFrames on (aogcm, year, sector).
``get_model_densities``
Extract ice/water density values (rhoi, rhow) from raw ISMIP6 NetCDFs.
``combine_gris_forcings``
Concatenate annual GrIS atmospheric NetCDF files into per-AOGCM combined files.
"""
import os
import time
import warnings
import numpy as np
import pandas as pd
import xarray as xr
from tqdm import tqdm
from ise.data.forcings import ForcingFile
from ise.data.grids import GridFile
from ise.data.utils import convert_and_subset_times
from ise.utils.functions import get_all_filepaths
[docs]
class ProjectionProcessor:
"""
A class for processing ISMIP6 projections (outputs) for ice sheet models, specifically for calculating
Ice Volume Above Flotation (IVAF), handling control projections, and processing experimental projections.
Args:
ice_sheet (str): The ice sheet being analyzed ('AIS' or 'GIS').
forcings_directory (str): Path to the directory containing forcing datasets.
projections_directory (str): Path to the directory containing projection datasets.
scalefac_path (str, optional): Path to the NetCDF file containing scaling factors for each grid cell. Defaults to None.
densities_path (str, optional): Path to the CSV file containing density data for models. Defaults to None.
Attributes:
forcings_directory (str): Path to forcing data.
projections_directory (str): Path to projection data.
densities_path (str): Path to density dataset.
scalefac_path (str): Path to scaling factor dataset.
ice_sheet (str): Ice sheet identifier ('AIS' or 'GIS').
resolution (int): Resolution of the dataset (5 for GIS, 8 for AIS).
Methods:
process(): Processes ISMIP6 projections by calculating IVAF and subtracting control projections.
Note:
This class is only needed when starting from raw 3-D ISMIP6 NetCDF output
files. If you are using the pre-computed scalar files from Zenodo
(``ComputedScalarsPaper/`` for AIS, ``v7_CMIP5_pub/`` for GrIS), call
``process_sectors()`` directly instead.
"""
def __init__(
self,
ice_sheet,
forcings_directory,
projections_directory,
scalefac_path=None,
densities_path=None,
):
self.forcings_directory = forcings_directory
self.projections_directory = projections_directory
self.densities_path = densities_path
self.scalefac_path = scalefac_path
self.ice_sheet = ice_sheet.upper()
if self.ice_sheet.lower() in ("gris", "gis"):
self.ice_sheet = "GIS"
self.resolution = 5 if self.ice_sheet == "GIS" else 8
[docs]
def process(self):
"""
Process ISMIP6 projections by calculating IVAF for control and experiment projections,
subtracting out control IVAF from experiments, and exporting IVAF files.
For each model run the method:
1. Loads bed topography, ice thickness, ice fraction, and grounded fraction.
2. Computes IVAF at every grid cell and time step.
3. Subtracts the matched control-run IVAF to isolate the forced signal.
4. Writes ``ivaf_<ice_sheet>_<group>_<model>_<exp>.nc`` next to the input files.
Returns:
int: 1 if processing is successful.
Raises:
ValueError: If projections_directory is not specified.
"""
if self.projections_directory is None:
raise ValueError("Projections path must be specified")
self._calculate_ivaf_minus_control(
self.projections_directory, self.densities_path, self.scalefac_path
)
return 1
def _calculate_ivaf_minus_control(
self, data_directory: str, densities_fp: str, scalefac_path: str
):
"""
Calculates the Ice Volume Above Flotation (IVAF) for each model run within the given directory.
This method processes control projections first and then processes experimental projections
by subtracting the control IVAF.
Args:
data_directory (str): Path to the directory containing projection data files.
densities_fp (str or pd.DataFrame): Path to a CSV file containing density values or a preloaded pandas DataFrame.
scalefac_path (str): Path to a NetCDF file containing scaling factors for each grid cell.
Returns:
int: 1 indicating successful IVAF calculation.
Raises:
ValueError: If densities_fp is not provided or is of an invalid type.
"""
# error handling for densities argument (must be str filepath or dataframe)
if densities_fp is None:
raise ValueError(
"densities_fp must be specified. Run get_model_densities() to get density data."
)
if isinstance(densities_fp, str):
densities = pd.read_csv(densities_fp)
elif isinstance(densities_fp, pd.DataFrame):
pass
else:
raise ValueError("densities argument must be a string or a pandas DataFrame.")
# open scaling model
scalefac_ds = xr.open_dataset(scalefac_path)
scalefac_model = np.transpose(scalefac_ds.af2.values, (1, 0))
# adjust scaling model based on desired resolution
if self.ice_sheet == "AIS":
scalefac_model = scalefac_model[:: self.resolution, :: self.resolution]
elif self.ice_sheet == "GIS" and scalefac_model.shape != (337, 577):
if scalefac_model.shape[0] == 6081:
raise ValueError(
f"Scalefac model must be 337x577 for GIS, received {scalefac_model.shape}. Make sure you are using the GIS scaling model and not the AIS."
)
raise ValueError(
f"Scalefac model must be 337x577 for GIS, received {scalefac_model.shape}."
)
# get all files in directory with "ctrl_proj" and "exp" in them and store separately
ctrl_proj_dirs = []
exp_dirs = []
for root, dirs, _ in os.walk(data_directory):
for directory in dirs:
if "ctrl_proj" in directory:
ctrl_proj_dirs.append(os.path.join(root, directory))
elif "exp" in directory:
exp_dirs.append(os.path.join(root, directory))
else:
pass
# Control projections must be processed first so their ivaf files exist
# when the experimental runs try to open them for subtraction.
for directory in ctrl_proj_dirs:
self._calculate_ivaf_single_file(directory, densities, scalefac_model, ctrl_proj=True)
for directory in exp_dirs:
self._calculate_ivaf_single_file(directory, densities, scalefac_model, ctrl_proj=False)
return 1
def _calculate_ivaf_single_file(self, directory, densities, scalefac_model, ctrl_proj=False):
"""
Calculates the Ice Volume Above Flotation (IVAF) for a single model run.
Args:
directory (str): The directory containing the model run data.
densities (pd.DataFrame): DataFrame containing density values for different groups and models.
scalefac_model (numpy.ndarray): Scaling factor matrix for the ice sheet model grid.
ctrl_proj (bool, optional): If True, processes a control projection. Defaults to False.
Returns:
int: 1 if successful, -1 if the run is skipped due to missing or corrupted data.
"""
# directory = r"/gpfs/data/kbergen/pvankatw/pvankatw-bfoxkemp/GHub-ISMIP6-Projection-GrIS/AWI/ISSM1/exp09"
# get metadata from path
path = directory.split("/")
exp = path[-1]
model = path[-2]
group = path[-3]
# Determine which control to use based on experiment (only applies to AIS) per Nowicki, 2020
if not ctrl_proj:
if self.ice_sheet == "AIS":
if exp in (
"exp01",
"exp02",
"exp03",
"exp04",
"exp11",
"expA1",
"expA2",
"expA3",
"expA4",
"expB1",
"expB2",
"expB3",
"expB4",
"expB5",
"expC2",
"expC5",
"expC8",
"expC11",
"expE1",
"expE2",
"expE3",
"expE4",
"expE5",
"expE11",
"expE12",
"expE13",
"expE14",
):
ctrl_path = os.path.join(
"/".join(path[:-1]),
f"ctrl_proj_open/ivaf_{self.ice_sheet}_{group}_{model}_ctrl_proj_open.nc",
)
elif (
exp
in (
"exp05",
"exp06",
"exp07",
"exp08",
"exp09",
"exp10",
"exp12",
"exp13",
"expA5",
"expA6",
"expA7",
"expA8",
"expB6",
"expB7",
"expB8",
"expB9",
"expB10",
"expC3",
"expC6",
"expC9",
"expC12",
"expE6",
"expE7",
"expE8",
"expE9",
"expE10",
"expE15",
"expE16",
"expE17",
"expE18",
)
or "expD" in exp
):
ctrl_path = os.path.join(
"/".join(path[:-1]),
f"ctrl_proj_std/ivaf_{self.ice_sheet}_{group}_{model}_ctrl_proj_std.nc",
)
elif exp in (
"expC1",
"expC4",
"expC7",
"expC10",
): # N/A value for ocean_forcing in Nowicki, 2020 table A2
return -1
else:
print(f"Experiment {exp} not recognized. Skipped.")
return -1
else:
# GrIS doesn't have ctrl_proj_open vs ctrl_proj_std
ctrl_path = os.path.join(
"/".join(path[:-1]),
f"ctrl_proj/ivaf_{self.ice_sheet}_{group}_{model}_ctrl_proj.nc",
)
# for some reason there is no ctrl_proj_open for AWI and JPL1, skip
if group == "AWI" and "ctrl_proj_open" in ctrl_path:
return -1
if group == "JPL1" and "ctrl_proj_open" in ctrl_path:
return -1
# MUN_GISM1 is corrupted, skip
if group == "MUN" and model == "GSM1":
return -1
# folder is empty, skip
elif group == "IMAU" and exp == "exp11":
return -1
# bed file in NCAR_CISM/expD10 is empty, skip
elif group == "NCAR" and exp in ("expD10", "expD11"):
return -1
# lookup densities from csv
subset_densities = densities[(densities.group == group) & (densities.model == model)]
rhoi = subset_densities.rhoi.values[0]
rhow = subset_densities.rhow.values[0]
# load data
if self.ice_sheet == "AIS" and group == "ULB":
# ULB uses fETISh for AIS naming, not actual model name (fETISh_16km or fETISh_32km)
naming_convention = f"{self.ice_sheet}_{group}_fETISh_{exp}.nc"
else:
naming_convention = f"{self.ice_sheet}_{group}_{model}_{exp}.nc"
# get_xarray_data opens with decode_times=False so time stays as numeric
# "days since X", which is easier to normalize across models later.
bed = get_xarray_data(
os.path.join(directory, f"topg_{naming_convention}"), ice_sheet=self.ice_sheet
)
thickness = get_xarray_data(
os.path.join(directory, f"lithk_{naming_convention}"), ice_sheet=self.ice_sheet
)
mask = get_xarray_data(
os.path.join(directory, f"sftgif_{naming_convention}"), ice_sheet=self.ice_sheet
)
ground_mask = get_xarray_data(
os.path.join(directory, f"sftgrf_{naming_convention}"), ice_sheet=self.ice_sheet
)
length_time = len(thickness.time)
try:
bed = bed.transpose("x", "y", "time", ...)
thickness = thickness.transpose("x", "y", "time", ...)
mask = mask.transpose("x", "y", "time", ...)
ground_mask = ground_mask.transpose("x", "y", "time", ...)
except ValueError:
bed = bed.transpose("x", "y", ...)
thickness = thickness.transpose("x", "y", ...)
mask = mask.transpose("x", "y", ...)
ground_mask = ground_mask.transpose("x", "y", ...)
# if time is not a dimension, add copies for each time step
if "time" not in bed.dims or bed.dims["time"] == 1:
try:
bed = bed.drop_vars(
[
"time",
]
)
except ValueError:
pass
bed = bed.expand_dims(dim={"time": length_time})
if length_time == 86:
bed["time"] = thickness[
"time"
] # most times just the bed file is missing the time index
elif length_time > 86:
if len(thickness.time.values) != len(set(thickness.time.values)): # has duplicates
keep_indices = np.unique(thickness["time"], return_index=True)[
1
] # find non-duplicates
bed = bed.isel(time=keep_indices) # only select non-duplicates
thickness = thickness.isel(time=keep_indices)
mask = mask.isel(time=keep_indices)
ground_mask = ground_mask.isel(time=keep_indices)
else:
warnings.warn(
f"At least one file in {exp} does not have a time index formatted correctly. Attempting to fix."
)
start_idx = len(bed.time) - 86
bed = bed.sel(time=slice(bed.time.values[start_idx], len(bed.time)))
thickness = thickness.sel(
time=slice(thickness.time[start_idx], thickness.time[-1])
)
mask = mask.sel(time=slice(mask.time[start_idx], mask.time[-1]))
ground_mask = ground_mask.sel(
time=slice(ground_mask.time[start_idx], ground_mask.time[-1])
)
try:
bed["time"] = thickness["time"].copy()
except ValueError:
print(
f"Cannot fix time index for {exp} due to duplicate index values. Skipped."
)
return -1
else:
print(f"Only {len(bed.time)} time points for {exp}. Skipped.")
return -1
# if -9999 instead of np.nan, replace (come back and optimize? couldn't figure out with xarray)
if bed.topg[0, 0, 0] <= -9999.0 or bed.topg[0, 0, 0] >= 9999:
topg = bed.topg.values
topg[(np.where((topg <= -9999.0) | (topg >= 9999)))] = np.nan
bed["topg"].values = topg
del topg
lithk = thickness.lithk.values
lithk[(np.where((lithk <= -9999.0) | (lithk >= 9999)))] = np.nan
thickness["lithk"].values = lithk
del lithk
sftgif = mask.sftgif.values
sftgif[(np.where((sftgif <= -9999.0) | (sftgif >= 9999)))] = np.nan
mask["sftgif"].values = sftgif
del sftgif
sftgrf = ground_mask.sftgrf.values
sftgrf[(np.where((sftgrf <= -9999.0) | (sftgrf >= 9999)))] = np.nan
ground_mask["sftgrf"].values = sftgrf
del sftgrf
# converts time (in "days from X" to numpy.datetime64) and subsets time from 2015 to 2100
# a few datasets do not have the time index formatted correctly
if len(bed.time.attrs) == 0:
if len(bed.time) == 86:
bed["time"] = thickness[
"time"
] # most times just the bed file is missing the time index
elif len(bed.time) > 86:
# bed['time'] = thickness['time'].copy()
warnings.warn(
f"At least one file in {exp} does not have a time index formatted correctly. Attempting to fix."
)
start_idx = len(bed.time) - 86
bed = bed.sel(time=slice(bed.time.values[start_idx], len(bed.time)))
thickness = thickness.sel(time=slice(thickness.time[start_idx], thickness.time[-1]))
mask = mask.sel(time=slice(mask.time[start_idx], mask.time[-1]))
ground_mask = ground_mask.sel(
time=slice(ground_mask.time[start_idx], ground_mask.time[-1])
)
try:
bed["time"] = thickness["time"]
except ValueError:
print(
f"Cannot fix time index for {exp} due to duplicate index values. Skipped."
)
return -1
else:
print(f"Only {len(bed.time)} time points for {exp}. Skipped.")
return -1
bed = convert_and_subset_times(bed)
thickness = convert_and_subset_times(thickness)
mask = convert_and_subset_times(mask)
ground_mask = convert_and_subset_times(ground_mask)
length_time = len(thickness.time)
# Interpolate values for x & y, for formatting purposes only, does not get used
if len(set(thickness.y.values)) != len(scalefac_model):
bed["x"], bed["y"] = interpolate_values(bed)
thickness["x"], thickness["y"] = interpolate_values(thickness)
mask["x"], mask["y"] = interpolate_values(mask)
ground_mask["x"], ground_mask["y"] = interpolate_values(ground_mask)
# clip masks if they are below 0 or above 1
if np.min(mask.sftgif.values) < 0 or np.max(mask.sftgif.values) > 1:
mask["sftgif"] = np.clip(mask.sftgif, 0.0, 1.0)
if np.min(ground_mask.sftgrf.values) < 0 or np.max(ground_mask.sftgrf.values) > 1:
ground_mask["sftgrf"] = np.clip(ground_mask.sftgrf, 0.0, 1.0)
# flip around axes so the order is (x, y, time)
bed = bed.transpose("x", "y", "time", ...)
bed_data = bed.topg.values
thickness = thickness.transpose("x", "y", "time", ...)
thickness_data = thickness.lithk.values
mask = mask.transpose("x", "y", "time", ...)
mask_data = mask.sftgif.values
ground_mask = ground_mask.transpose("x", "y", "time", ...)
ground_mask_data = ground_mask.sftgrf.values
# for each time step, calculate ivaf
ivaf = np.zeros(bed_data.shape)
for i in range(length_time):
# get data slices for current time
thickness_i = thickness_data[:, :, i].copy()
bed_i = bed_data[:, :, i].copy()
mask_i = mask_data[:, :, i].copy()
ground_mask_i = ground_mask_data[:, :, i].copy()
# set data slices to zero where mask = 0 or any value is NaN
thickness_i[
(mask_i == 0)
| (np.isnan(mask_i))
| (np.isnan(thickness_i))
| (np.isnan(ground_mask_i))
| (np.isnan(bed_i))
] = 0
bed_i[
(mask_i == 0)
| (np.isnan(mask_i))
| (np.isnan(thickness_i))
| (np.isnan(ground_mask_i))
| (np.isnan(bed_i))
] = 0
ground_mask_i[
(mask_i == 0)
| np.isnan(mask_i)
| np.isnan(thickness_i)
| np.isnan(ground_mask_i)
| np.isnan(bed_i)
] = 0
mask_i[
(mask_i == 0)
| (np.isnan(mask_i))
| (np.isnan(thickness_i))
| (np.isnan(ground_mask_i))
| (np.isnan(bed_i))
] = 0
# take min(bed_i, 0)
bed_i[bed_i > 0] = 0
# calculate IVAF (based on MATLAB processing scripts from Seroussi, 2021)
hf_i = thickness_i + ((rhow / rhoi) * bed_i)
masked_output = hf_i * ground_mask_data[:, :, i] * mask_data[:, :, i]
ivaf[:, :, i] = masked_output * scalefac_model * (self.resolution * 1000) ** 2
# Reuse the bed dataset's structure (coordinates, attributes) for the output file.
ivaf_nc = bed.copy()
if not ctrl_proj:
ivaf_ctrl = xr.open_dataset(ctrl_path).transpose("x", "y", "time", ...)
ivaf_with_ctrl = ivaf.copy()
ivaf = ivaf - ivaf_ctrl.ivaf.values
ivaf_nc["ivaf"] = (("x", "y", "time"), ivaf)
if not ctrl_proj:
ivaf_nc["ivaf_with_control"] = (("x", "y", "time"), ivaf_with_ctrl)
ivaf_nc = ivaf_nc.drop_vars(
[
"topg",
]
)
ivaf_nc["sle"] = ivaf_nc.ivaf / 1e9 / 362.5
export_nc_path = os.path.join(directory, f"ivaf_{self.ice_sheet}_{group}_{model}_{exp}.nc")
if os.path.exists(export_nc_path):
os.remove(export_nc_path)
ivaf_nc.to_netcdf(export_nc_path)
print(f"{group}_{model}_{exp}: Processing successful.")
return 1
[docs]
def get_model_densities(zenodo_directory: str, output_path: str | None = None):
"""
Extracts density values (rhoi and rhow) from NetCDF files in the specified directory and returns them in a pandas DataFrame.
Args:
zenodo_directory (str): Path to the directory containing the NetCDF files.
output_path (str, optional): Path to save the extracted density values as a CSV file. Defaults to None.
Returns:
pandas.DataFrame: A DataFrame containing the group, model, rhoi, and rhow values for each model run.
"""
results: list[dict] = []
for root, _, files in os.walk(zenodo_directory):
for file in files:
if file.endswith(".nc"): # Check if the file is a NetCDF file
file_path = os.path.join(root, file)
try:
# Open the NetCDF file using xarray
dataset = xr.open_dataset(file_path, decode_times=False).transpose(
"x", "y", "time", ...
)
# Extract values for rhoi and rhow
if "rhoi" in dataset and "rhow" in dataset:
rhoi_values = dataset["rhoi"].values
rhow_values = dataset["rhow"].values
# Append the filename and values to the results list
results.append({"filename": file, "rhoi": rhoi_values, "rhow": rhow_values})
# Close the dataset
dataset.close()
except Exception as e:
print(f"Error processing {file}: {e}")
densities = []
last_filename = ""
for result in results:
if "ctrl_proj" in result["filename"] or "hist" in result["filename"]:
continue
elif "ILTS" in result["filename"]:
fp = result["filename"].split("_")
group = "ILTS_PIK"
model = fp[-2]
elif "ULB_fETISh" in result["filename"]:
fp = result["filename"].split("_")
group = "ULB"
model = "fETISh_32km" if "32km" in result["filename"] else "fETISh_16km"
else:
fp = result["filename"].split("_")
group = fp[-3]
model = fp[-2]
densities.append([group, model, result["rhoi"], result["rhow"]])
last_filename = result["filename"]
df = pd.DataFrame(densities, columns=["group", "model", "rhoi", "rhow"])
df["rhoi"], df["rhow"] = df.rhoi.astype("float"), df.rhow.astype("float")
df = df.drop_duplicates()
ice_sheet = "AIS" if "AIS" in last_filename else "GIS"
if output_path is not None:
if output_path.endswith("/"):
df.to_csv(f"{output_path}/{ice_sheet}_densities.csv", index=False)
else:
df.to_csv(output_path, index=False)
return df
[docs]
def interpolate_values(data):
"""
Interpolates missing values in the x and y dimensions of the input dataset using linear interpolation.
Ensures that first and last values are properly adjusted to maintain consistency.
Args:
data (xarray.Dataset): A dataset containing x and y dimensions with potential missing values.
Returns:
tuple: A tuple containing the interpolated x and y arrays.
"""
y = pd.Series(data.y.values)
y = y.replace(0, np.NaN)
y = np.array(y.interpolate())
# first and last are NaNs, replace with correct values
y[0] = y[1] - (y[2] - y[1])
y[-1] = y[-2] + (y[-2] - y[-3])
x = pd.Series(data.x.values)
x = x.replace(0, np.NaN)
x = np.array(x.interpolate())
# first and last are NaNs, replace with correct values
x[0] = x[1] - (x[2] - x[1])
x[-1] = x[-2] + (x[-2] - x[-3])
return x, y
[docs]
def get_xarray_data(dataset_fp, var_name=None, ice_sheet="AIS", convert_and_subset=False):
"""
Retrieves and processes data from an xarray dataset.
Args:
dataset_fp (str): The file path to the xarray dataset.
var_name (str, optional): The name of the variable to retrieve from the dataset. Defaults to None.
ice_sheet (str, optional): The ice sheet type ('AIS' or 'GrIS'). Defaults to 'AIS'.
convert_and_subset (bool, optional): If True, converts and subsets the dataset for the target time range. Defaults to False.
Returns:
np.ndarray or xarray.Dataset: The extracted variable as a NumPy array or the entire processed dataset.
"""
dataset = xr.open_dataset(
dataset_fp,
decode_times=False,
engine="netcdf4",
)
try:
dataset = dataset.transpose("time", "x", "y", ...)
except (ValueError, KeyError):
pass
if "ivaf" in dataset.variables:
pass
else:
# handle extra dimensions and variables
try:
dataset = dataset.drop_dims("nv4")
except ValueError:
pass
for var in [
"z_bnds",
"lat",
"lon",
"mapping",
"time_bounds",
"lat2d",
"lon2d",
"polar_stereographic",
]:
try:
dataset = dataset.drop(labels=[var])
except ValueError:
pass
if "z" in dataset.dims:
dataset = dataset.mean(dim="z", skipna=True)
# subset the dataset for 5km resolution (GrIS)
if dataset.dims["x"] == 1681 and dataset.dims["y"] == 2881:
dataset = dataset.sel(x=dataset.x.values[::5], y=dataset.y.values[::5])
if convert_and_subset:
dataset = convert_and_subset_times(dataset)
if var_name is not None:
try:
data = dataset[var_name].values
except KeyError:
return np.nan, np.nan
x_dim = 761 if ice_sheet.lower() == "ais" else 337
y_dim = 761 if ice_sheet.lower() == "ais" else 577
if (
"time" not in dataset.dims
or dataset.dims["time"] == 1
or (data.shape[1] == y_dim and data.shape[2] == x_dim)
):
pass
else:
# TODO: fix this. this is just a weird way of tranposing, not sure if it even happens.
grid_indices = np.array([0, 1, 2])[
(np.array(data.shape) == x_dim) | (np.array(data.shape) == y_dim)
]
data = np.moveaxis(data, list(grid_indices), [1, 2])
if "time" not in dataset.dims:
data_flattened = data.reshape(
-1,
)
else:
data_flattened = data.reshape(len(dataset.time), -1)
return data_flattened
return dataset
[docs]
class DatasetMerger:
"""
Merges pre-processed CSV forcing and projection files into a single dataset.
This is a lower-level alternative to ``process_sectors()``. Use it when the
intermediate per-run CSV files already exist on disk and you only need the
join step (forcing ↔ projection matched by CMIP model and pathway).
Args:
ice_sheet (str): The ice sheet name ('AIS' or 'GrIS').
forcings (str): Directory containing forcing CSV files.
projections (str): Directory containing projection CSV files.
experiment_file (str): Path to the experiment metadata file (CSV or JSON).
output_dir (str): Directory to save the merged ``dataset.csv``.
Attributes:
experiments (pd.DataFrame): Experiment metadata loaded from ``experiment_file``.
forcing_paths (list): File paths for all forcing CSVs found under ``forcings``.
projection_paths (list): File paths for all projection CSVs found under ``projections``.
forcing_metadata (pd.DataFrame): Extracted CMIP model and pathway for each forcing file.
"""
def __init__(self, ice_sheet, forcings, projections, experiment_file, output_dir):
self.ice_sheet = ice_sheet
self.forcings = forcings
self.projections = projections
self.experiment_file = experiment_file
self.output_dir = output_dir
if self.experiment_file.endswith(".csv"):
self.experiments = pd.read_csv(experiment_file)
self.experiments.ice_sheet = self.experiments.ice_sheet.apply(lambda x: x.lower())
elif self.experiment_file.endswith(".json"):
self.experiments = pd.read_json(experiment_file).T
else:
raise ValueError("Experiment file must be a CSV or JSON file.")
self.forcing_paths = get_all_filepaths(
path=self.forcings,
filetype="csv",
)
self.projection_paths = get_all_filepaths(
path=self.projections,
filetype="csv",
)
self.forcing_metadata = self._get_forcing_metadata()
[docs]
def merge_dataset(self):
"""
Merges forcing and projection datasets based on CMIP model and pathway metadata.
Returns:
int: Returns 0 upon successful merging and saving of the dataset.
"""
full_dataset = pd.DataFrame()
self.experiments["exp"] = self.experiments["exp"].apply(lambda x: x.lower())
for i, projection in enumerate(
tqdm(
self.projection_paths,
total=len(self.projection_paths),
desc="Merging forcing & projection files",
)
):
# get experiment from projection filepath
exp = projection.replace(".csv", "").split("/")[-1].split("_")[-1]
# make sure cases match when doing table lookup
# get AOGCM value from table lookup
try:
aogcm = self.experiments.loc[
(self.experiments.exp == exp.lower())
& (self.experiments.ice_sheet == self.ice_sheet.lower())
]["AOGCM"].values[0]
except IndexError:
aogcm = self.experiments.loc[self.experiments.exp == exp.lower()]["AOGCM"].values[0]
proj_cmip_model = aogcm.split("_")[0]
proj_pathway = aogcm.split("_")[-1]
# names of CMIP models are slightly different, adjust based on AIS/GrIS directories
if self.ice_sheet == "AIS":
if proj_cmip_model == "csiro-mk3.6":
proj_cmip_model = "csiro-mk3-6-0"
elif proj_cmip_model == "ipsl-cm5-mr":
proj_cmip_model = "ipsl-cm5a-mr"
elif proj_cmip_model == "cnrm-esm2" or proj_cmip_model == "cnrm-cm6":
proj_cmip_model = f"{proj_cmip_model}-1"
elif self.ice_sheet == "GrIS":
if proj_cmip_model.lower() == "noresm1-m":
proj_cmip_model = "noresm1"
elif proj_cmip_model.lower() == "ipsl-cm5-mr":
proj_cmip_model = "ipsl-cm5"
elif proj_cmip_model.lower() == "access1-3":
proj_cmip_model = "access1.3"
elif proj_cmip_model.lower() == "ukesm1-0-ll":
proj_cmip_model = "ukesm1-cm6"
# get forcing file from table lookup that matches projection
forcing_files = self.forcing_metadata.file.loc[
(self.forcing_metadata.cmip_model == proj_cmip_model)
& (self.forcing_metadata.pathway == proj_pathway)
]
if forcing_files.empty:
raise IndexError(
f"Could not find forcing file for {aogcm}. Check formatting of experiment file."
)
if len(forcing_files) > 1:
forcings = pd.DataFrame()
for file in forcing_files.values:
forcings = pd.concat(
[forcings, pd.read_csv(f"{self.forcings}/{file}.csv")], axis=1
)
else:
forcing_file = forcing_files.values[0]
forcings = pd.read_csv(f"{self.forcings}/{forcing_file}.csv")
# load forcing and projection datasets
projections = pd.read_csv(projection)
# if forcings are longer than projections, cut off the beginning of the forcings
if len(forcings) > len(projections):
forcings = forcings.iloc[-len(projections) :].reset_index(drop=True)
# add forcings and projections together and add some metadata
merged_dataset = pd.concat([forcings, projections], axis=1)
merged_dataset["time"] = np.arange(1, len(merged_dataset) + 1)
merged_dataset["cmip_model"] = proj_cmip_model
merged_dataset["pathway"] = proj_pathway
merged_dataset["exp"] = exp
merged_dataset["id"] = i
# now add to dataset with all forcing/projection pairs
full_dataset = pd.concat([full_dataset, merged_dataset])
# save the full dataset
full_dataset.to_csv(f"{self.output_dir}/dataset.csv", index=False)
return 0
[docs]
def merge_sectors(self, _forcings_file=None, _projections_file=None, _save_dir=None):
# Not yet implemented. Use process_sectors() for sector-level merging.
pass
def _get_forcing_metadata(self):
"""
Extracts metadata from the forcing files to associate CMIP models with their respective pathways.
Returns:
pandas.DataFrame: DataFrame containing metadata with columns 'file', 'cmip_model', and 'pathway'.
"""
pairs = {}
for forcing in self.forcing_paths:
forcing = forcing.replace(".csv", "").split("/")[-1]
cmip_model = forcing.split("_")[1]
# GrIS has MAR3.9 in name, ignore
if cmip_model == "MAR3.9":
cmip_model = forcing.split("_")[2]
elif cmip_model.lower() == "gris":
cmip_model = forcing.split("_")[2]
if "rcp" in forcing.lower() or "ssp" in forcing.lower():
for substring in forcing.lower().split("_"):
if "rcp" in substring or "ssp" in substring:
pathway = substring.lower()
if len(pathway.split("-")) > 1 and (
"rcp" in pathway.split("-")[-1] or "ssp" in pathway.split("-")[-1]
):
if len(pathway.split("-")) > 2:
cmip_model = "-".join(pathway.split("-")[0:2])
pathway = pathway.split("-")[-1]
else:
cmip_model = pathway.split("-")[0]
pathway = pathway.split("-")[-1]
break
else:
pathway = "rcp85"
if self.ice_sheet == "GrIS":
if cmip_model.lower() == "noresm1-m":
cmip_model = "noresm1"
elif cmip_model.lower() == "ipsl-cm5-mr":
cmip_model = "ipsl-cm5"
elif cmip_model.lower() == "access1-3":
cmip_model = "access1.3"
elif cmip_model.lower() == "ukesm1-0-ll":
cmip_model = "ukesm1-cm6"
pairs[forcing] = [cmip_model.lower(), pathway.lower()]
df = pd.DataFrame(pairs).T
df = pd.DataFrame(pairs).T.reset_index()
df.columns = ["file", "cmip_model", "pathway"]
return df
[docs]
def combine_gris_forcings(forcing_dir):
"""
Combines GrIS forcings from multiple CMIP model directories into consolidated NetCDF files.
Args:
forcing_dir (str): Directory containing the GrIS forcing files.
Returns:
int: 0 upon successful processing.
"""
atmosphere_dir = f"{forcing_dir}/GrIS/Atmosphere_Forcing/aSMB_observed/v1/"
cmip_directories = next(os.walk(atmosphere_dir))[1]
for cmip_dir in tqdm(
cmip_directories, total=len(cmip_directories), desc="Processing CMIP directories"
):
for var in ["aSMB", "aST"]:
files = os.listdir(f"{atmosphere_dir}/{cmip_dir}/{var}")
files = np.array([x for x in files if x.endswith(".nc")])
years = np.array([int(x.replace(".nc", "").split("-")[-1]) for x in files])
year_files = files[(years >= 2015) & (years <= 2100)]
for i, file in enumerate(year_files):
# first iteration, open dataset and store
if i == 0:
dataset = xr.open_dataset(f"{atmosphere_dir}/{cmip_dir}/{var}/{file}")
for dim in ["nv", "nv4", "mapping"]:
try:
dataset = dataset.drop_dims(dim)
except (ValueError, KeyError):
pass
dataset = dataset.drop("mapping")
dataset = dataset.sel(x=dataset.x.values[::5], y=dataset.y.values[::5])
continue
# following iterations, open dataset and concatenate
data = xr.open_dataset(f"{atmosphere_dir}/{cmip_dir}/{var}/{file}")
for dim in ["nv", "nv4"]:
try:
data = data.drop_dims(dim)
except (ValueError, KeyError):
pass
data = data.drop("mapping")
data = data.sel(x=data.x.values[::5], y=data.y.values[::5])
# data['time'] = pd.to_datetime(year, format='%Y')
dataset = xr.concat([dataset, data], dim="time")
# Now you have the dataset with the files loaded and time dimension set
dataset.to_netcdf(
os.path.join(atmosphere_dir, cmip_dir, f"GrIS_{cmip_dir}_{var}_combined.nc")
)
return 0
[docs]
def process_GrIS_atmospheric_sectors(forcing_directory, grid_file):
"""
Aggregate GrIS atmospheric forcing (aSMB and aST) to sector-level annual means.
Reads annual NetCDF files from ``Atmosphere_Forcing/aSMB_observed/v1/`` and
combines them per AOGCM via ``combine_gris_forcings()`` if combined files do
not yet exist. Then averages Surface Mass Balance anomaly (aSMB) and surface
temperature anomaly (aST) spatially over each of the 6 GrIS drainage basins.
Args:
forcing_directory (str): Root forcing directory (GHub layout expected).
The function navigates to the ``Atmosphere_Forcing/aSMB_observed/v1/``
sub-directory automatically.
grid_file (str or xarray.Dataset): Path to (or loaded) sector-definition
NetCDF defining the 6 GrIS drainage-basin sectors.
Returns:
pandas.DataFrame: Rows indexed by (aogcm, sector, year) with columns
``aSMB``, ``aST``, ``aogcm``, ``year``, and ``sector``.
"""
start_time = time.time()
path_to_forcings = "Atmosphere_Forcing/aSMB_observed/v1/"
af_directory = (
f"{forcing_directory}/{path_to_forcings}"
if not forcing_directory.endswith(path_to_forcings)
else forcing_directory
)
# check to see if GrIS forcings have been combined
filepaths = get_all_filepaths(path=af_directory, contains="combined", filetype="nc")
if not filepaths:
combine_gris_forcings(af_directory)
filepaths = get_all_filepaths(path=af_directory, contains="combined", filetype="nc")
if not filepaths:
raise ValueError("No combined files found. Check combine_gris_forcings function.")
aogcm_directories = os.listdir(af_directory)
aogcm_directories = [x for x in aogcm_directories if "DS_Store" not in x and "README" not in x]
sectors = _format_grid_file(grid_file)
unique_sectors = np.unique(sectors)
all_data = []
for i, fp in enumerate(aogcm_directories):
print("")
print(f"Directory {i + 1} / {len(aogcm_directories)}")
print(f"Directory: {fp.split('/')[-1]}")
print(f"Time since start: {(time.time() - start_time) // 60} minutes")
files = get_all_filepaths(path=f"{af_directory}/{fp}", contains="combined", filetype="nc")
if len(files) != 2:
raise ValueError(f"There should only be 2 combined files in each firectory, see {fp}.")
st_and_smb = []
for file in files:
dataset = xr.open_dataset(file, decode_times=False)
dataset = convert_and_subset_times(dataset)
# handle extra dimensions and variables
try:
dataset = dataset.drop_dims("nv4")
except ValueError:
pass
for var in [
"z_bnds",
"lat",
"lon",
"mapping",
"time_bounds",
"lat2d",
"lon2d",
"polar_stereographic",
]:
try:
dataset = dataset.drop(labels=[var])
except ValueError:
pass
if "z" in dataset.dims:
dataset = dataset.mean(dim="z", skipna=True)
dataset["sector"] = sectors
formatted_aogcm = fp.rsplit("-", 1)
formatted_aogcm = "_".join(formatted_aogcm).lower()
aogcm_data = []
for sector in unique_sectors:
mask = dataset.sector == sector
sector_averages = dataset.where(mask, drop=True).mean(dim=["x", "y"])
sector_averages = sector_averages.to_dataframe()
sector_averages["aogcm"] = formatted_aogcm
sector_averages["year"] = np.arange(1, 87)
sector_averages = sector_averages.reset_index(drop=True)
aogcm_data.append(sector_averages)
st_and_smb.append(pd.concat(aogcm_data))
all_data.append(pd.concat(st_and_smb, axis=1))
return pd.concat(all_data)
[docs]
def process_AIS_atmospheric_sectors(forcing_directory, grid_file):
"""
Aggregate AIS atmospheric forcing to sector-level annual means.
Searches ``Atmosphere_Forcing/`` for 8 km, 1995-2100 NetCDF files, loads
each via ``ForcingFile``, and averages spatially over each of the 18 AIS
sectors defined by the grid file.
Args:
forcing_directory (str): Root forcing directory (GHub layout expected).
The function navigates to the ``Atmosphere_Forcing/`` sub-directory
automatically.
grid_file (str): Path to the AIS sector-definition NetCDF
(e.g. ``AIS_sectors_8km.nc``).
Returns:
pandas.DataFrame: Rows indexed by (aogcm, sector, year) with one column
per atmospheric forcing variable, plus ``aogcm``, ``year``,
and ``sector``.
"""
ice_sheet = "AIS"
start_time = time.time()
path_to_forcings = "/Atmosphere_Forcing/"
af_directory = (
f"{forcing_directory}/{path_to_forcings}"
if not forcing_directory.endswith(path_to_forcings)
else forcing_directory
)
filepaths = get_all_filepaths(path=af_directory, filetype="nc")
filepaths = [f for f in filepaths if "1995-2100" in f]
filepaths = [f for f in filepaths if "8km" in f]
if not filepaths:
raise ValueError("No files found. Check the path to the forcing files.")
gridfile = GridFile(ice_sheet=ice_sheet, filepath=grid_file)
gridfile.format_grids()
sectors = gridfile.get_sectors()
unique_sectors = np.unique(sectors)
all_data = []
for i, fp in enumerate(filepaths):
print("")
print(f"File {i + 1} / {len(filepaths)}")
print(f"File: {fp.split('/')[-1]}")
print(f"Time since start: {(time.time() - start_time) // 60} minutes")
forcingfile = ForcingFile(ice_sheet, realm="atmos", filepath=fp)
forcingfile.load(decode_times=False)
forcingfile.format_timestamps()
forcingfile.drop_vars(
["nv4", "z_bnds", "lat", "lon", "mapping", "time_bounds", "lat2d", "lon2d"]
)
forcingfile.assign_sectors(gridfile)
aogcm_data = []
for sector in unique_sectors:
sector_averages = forcingfile.average_over_sector(sector)
sector_averages = sector_averages.to_dataframe()
sector_averages["aogcm"] = fp.split("/")[-3].lower()
sector_averages["year"] = np.arange(1, 87)
sector_averages = sector_averages.reset_index(drop=True)
aogcm_data.append(sector_averages)
all_data.append(pd.concat(aogcm_data))
atmospheric_df = pd.concat(all_data)
atmospheric_df = atmospheric_df.loc[:, ~atmospheric_df.columns.duplicated()]
return atmospheric_df
[docs]
def process_AIS_oceanic_sectors(forcing_directory, grid_file):
"""
Aggregate AIS oceanic forcing to sector-level annual means.
Loads thermal forcing, salinity, and ocean temperature NetCDFs from
``Ocean_Forcing/`` (8 km, 1995-2100 files), depth-averages each variable,
and then spatially averages over each of the 18 AIS sectors.
Args:
forcing_directory (str): Root forcing directory (GHub layout expected).
The function navigates to the ``Ocean_Forcing/`` sub-directory
automatically.
grid_file (str): Path to the AIS sector-definition NetCDF
(e.g. ``AIS_sectors_8km.nc``).
Returns:
pandas.DataFrame: Rows indexed by (aogcm, sector, year) with columns
``thermal_forcing``, ``salinity``, ``temperature``, ``aogcm``,
``year``, and ``sector``.
"""
start_time = time.time()
directory = (
f"{forcing_directory}/Ocean_Forcing/"
if not forcing_directory.endswith("Ocean_Forcing/")
else forcing_directory
)
# Get all NC files that contain data from 1995-2100
filepaths = get_all_filepaths(path=directory, filetype="nc")
filepaths = [f for f in filepaths if "1995-2100" in f]
filepaths = [f for f in filepaths if "8km" in f]
# In the case of ocean forcings, use the filepaths of the files to determine
# which directories need to be used for OceanForcing processing. Change to
# those directories rather than individual files.
aogcms = list(set([f.split("/")[-3] for f in filepaths]))
filepaths = [f"{directory}/{aogcm}/" for aogcm in aogcms]
# Useful progress prints
print("Files to be processed...")
print([f.split("/")[-2] for f in filepaths])
gridfile = GridFile(ice_sheet="AIS", filepath=grid_file)
gridfile.format_grids()
sectors = gridfile.get_sectors()
unique_sectors = np.unique(sectors)
all_data = []
for i, directory in enumerate(filepaths):
print("")
print(f"File {i + 1} / {len(filepaths)}")
print(f"File: {directory.split('/')[-1]}")
print(f"Time since start: {(time.time() - start_time) // 60} minutes")
files = os.listdir(f"{directory}/1995-2100/")
if len(files) != 3:
warnings.warn(
f"Skipping {directory}/1995-2100/: expected 3 NetCDF files "
f"(thermal_forcing, salinity, temperature), found {len(files)}: {files}"
)
thermal_forcing_file = [f for f in files if "thermal_forcing" in f][0]
salinity_file = [f for f in files if "salinity" in f][0]
temperature_file = [f for f in files if "temperature" in f][0]
tffile = ForcingFile(
ice_sheet="AIS",
realm="ocean",
filepath=f"{directory}/1995-2100/{thermal_forcing_file}",
varname="thermal_forcing",
)
tffile.load(decode_times=False)
tffile.format_timestamps()
salfile = ForcingFile(
ice_sheet="AIS",
realm="ocean",
filepath=f"{directory}/1995-2100/{salinity_file}",
varname="salinity",
)
salfile.load(decode_times=False)
salfile.format_timestamps()
tempfile = ForcingFile(
ice_sheet="AIS",
realm="ocean",
filepath=f"{directory}/1995-2100/{temperature_file}",
varname="temperature",
)
tempfile.load(decode_times=False)
tempfile.format_timestamps()
aogcm_data = {"thermal_forcing": [], "salinity": [], "temperature": []}
for datafile in [tffile, salfile, tempfile]:
datafile.drop_vars(
[
"nv4",
"z_bnds",
"lat",
"lon",
"mapping",
"time_bounds",
"lat2d",
"lon2d",
"polar_stereographic",
]
)
datafile.aggregate_depth(method="mean")
datafile.assign_sectors(gridfile)
for sector in unique_sectors:
sector_averages = datafile.average_over_sector(sector)
sector_averages = sector_averages.to_dataframe()
sector_averages["aogcm"] = _format_AIS_ocean_aogcm_name(
directory.split("/")[-2].lower()
)
sector_averages["year"] = np.arange(1, 87)
sector_averages = sector_averages.reset_index(drop=True)
aogcm_data[datafile.varname].append(sector_averages)
df = pd.concat(
[
pd.concat(aogcm_data["thermal_forcing"]),
pd.concat(aogcm_data["salinity"]),
pd.concat(aogcm_data["temperature"]),
],
axis=1,
)
df = df.loc[:, ~df.columns.duplicated()]
all_data.append(df)
return pd.concat(all_data)
[docs]
def process_GrIS_oceanic_sectors(forcing_directory, grid_file):
"""
Aggregate GrIS oceanic forcing to sector-level annual means.
Reads thermal forcing and basin runoff NetCDFs from
``Ocean_Forcing/Melt_Implementation/v4/`` and spatially averages each
over the 6 GrIS drainage-basin sectors.
Args:
forcing_directory (str): Root forcing directory (GHub layout expected).
The function navigates to the ``Ocean_Forcing/Melt_Implementation/v4/``
sub-directory automatically.
grid_file (str or xarray.Dataset): Path to (or loaded) sector-definition
NetCDF defining the 6 GrIS drainage-basin sectors.
Returns:
pandas.DataFrame: Rows indexed by (aogcm, sector, year) with columns
``thermal_forcing``, ``basin_runoff``, ``aogcm``, ``year``,
and ``sector``.
"""
start_time = time.time()
path_to_forcing = "Ocean_Forcing/Melt_Implementation/v4/"
forcing_directory = (
f"{forcing_directory}/{path_to_forcing}"
if not forcing_directory.endswith(path_to_forcing)
else forcing_directory
)
aogcm_directories = os.listdir(forcing_directory)
aogcm_directories = [x for x in aogcm_directories if "DS_Store" not in x and "README" not in x]
sectors = _format_grid_file(grid_file)
unique_sectors = np.unique(sectors)
all_data = []
for i, directory in enumerate(aogcm_directories):
print("")
print(f"Directory {i + 1} / {len(aogcm_directories)}")
print(f"Directory: {directory.split('/')[-1]}")
print(f"Time since start: {(time.time() - start_time) // 60} minutes")
files = os.listdir(f"{forcing_directory}/{directory}")
if len(files) != 2:
warnings.warn(
f"Skipping {forcing_directory}/{directory}: expected 2 NetCDF files "
f"(thermalforcing, basinrunoff), found {len(files)}: {files}"
)
thermal_forcing_file = [f for f in files if "thermalforcing" in f.lower()][0]
basin_runoff_file = [f for f in files if "basinrunoff" in f.lower()][0]
thermal_forcing = xr.open_dataset(
f"{forcing_directory}/{directory}/{thermal_forcing_file}", decode_times=False
)
basin_runoff = xr.open_dataset(
f"{forcing_directory}/{directory}/{basin_runoff_file}", decode_times=False
)
# subset the dataset for 5km resolution (GrIS)
if thermal_forcing.sizes["x"] == 1681 and thermal_forcing.sizes["y"] == 2881:
thermal_forcing = thermal_forcing.sel(
x=thermal_forcing.x.values[::5], y=thermal_forcing.y.values[::5]
)
basin_runoff = basin_runoff.sel(
x=basin_runoff.x.values[::5], y=basin_runoff.y.values[::5]
)
thermal_forcing = convert_and_subset_times(thermal_forcing)
basin_runoff = convert_and_subset_times(basin_runoff)
data = {
"thermal_forcing": thermal_forcing,
"basin_runoff": basin_runoff,
}
aogcm_data = {
"thermal_forcing": [],
"basin_runoff": [],
}
for name, dataset in data.items():
# handle extra dimensions and variables
try:
dataset = dataset.drop_dims("nv4")
except ValueError:
pass
for var in [
"z_bnds",
"lat",
"lon",
"mapping",
"time_bounds",
"lat2d",
"lon2d",
"polar_stereographic",
]:
try:
dataset = dataset.drop(labels=[var])
except ValueError:
pass
if "z" in dataset.dims:
dataset = dataset.mean(dim="z", skipna=True)
try:
dataset["sector"] = sectors
except ValueError:
dataset["time"] = np.arange(1, 87)
dataset["sector"] = sectors
for sector in unique_sectors:
mask = dataset.sector == sector
sector_averages = dataset.where(mask, drop=True).mean(dim=["x", "y"])
sector_averages = sector_averages.to_dataframe()
sector_averages["aogcm"] = _format_GrIS_ocean_aogcm_name(directory)
sector_averages["year"] = np.arange(1, 87)
sector_averages = sector_averages.reset_index(drop=True)
aogcm_data[name].append(sector_averages)
df = pd.concat(
[
pd.concat(aogcm_data["thermal_forcing"]),
pd.concat(aogcm_data["basin_runoff"]),
],
axis=1,
)
df = df.loc[:, ~df.columns.duplicated()]
all_data.append(df)
return pd.concat(all_data)
def _format_grid_file(grid_file):
"""
Formats a grid file to define sector-based data aggregation.
Args:
grid_file (str or xarray.Dataset): Path to the grid file or an xarray dataset.
Returns:
xarray.DataArray: An array representing sector labels for the grid.
"""
if isinstance(grid_file, str):
grids = xr.open_dataset(grid_file) # .transpose('x', 'y',)
sector_name = "sectors" if "8km" in grid_file.lower() else "ID"
elif isinstance(grid_file, xr.Dataset):
sector_name = "ID" if "Rignot" in grids.Description else "sectors"
else:
raise ValueError("grid_file must be a string or an xarray Dataset.")
grids = grids.expand_dims(dim={"time": 86})
sectors = grids[sector_name]
grids = grids.transpose("time", "x", "y", ...)
return sectors
[docs]
def process_AIS_outputs(zenodo_directory, with_ctrl=False):
"""
Load AIS IVAF scalar projections from Zenodo and convert to sea-level equivalent.
Reads per-experiment NetCDF files from the ``ComputedScalarsPaper/``
sub-directory. Each file contains sector-level IVAF time series
(``ivaf_sector_1`` … ``ivaf_sector_18``). Files with only 85 time steps
have their first year duplicated to reach the required 86.
SLE is computed as::
sle = -ivaf / 362.5 * 910 / (1e9 * 1000)
following the sign convention and ice density (910 kg m⁻³) used in
Seroussi et al. (2020) ISMIP6 scripts.
Args:
zenodo_directory (str): Path to the Zenodo download directory. The
function looks inside ``ComputedScalarsPaper/`` automatically.
with_ctrl (bool, optional): If ``True``, includes files that contain
control projections (``ivaf_AIS_*`` files, excluding hist/ctrl
filenames). Defaults to ``False``, which selects only
``ivaf_minus_ctrl_proj`` files.
Returns:
pandas.DataFrame: One row per (model, experiment, sector, year) with
columns ``ivaf``, ``sle``, ``sector``, ``year``, ``id``, ``exp``,
and ``model``.
"""
directory = (
f"{zenodo_directory}/ComputedScalarsPaper/"
if not zenodo_directory.endswith("ComputedScalarsPaper")
else zenodo_directory
)
if not with_ctrl:
files = get_all_filepaths(directory, contains="ivaf_minus_ctrl_proj", filetype="nc")
else:
files = get_all_filepaths(
directory, contains="ivaf_AIS", filetype="nc", not_contains=["hist", "ctrl"]
)
count = 0
all_files_data = []
for f in files:
exp = f.replace(".nc", "").split("/")[-1].split("_")[-1]
model = f"{f.replace('.nc', '').split('/')[-1].split('_')[-3]}_{f.replace('.nc', '').split('/')[-1].split('_')[-2]}"
dataset = xr.open_dataset(f, decode_times=False)
if len(dataset.time) == 85:
count += 1
warnings.warn(
f"{f.split('/')[-1]} does not contain 86 years. Inserting a copy into the first year."
)
# Copy the first entry
first_entry = dataset.isel({"time": 0})
# Assuming numeric coordinates, create a new coordinate value
new_coord_value = (
first_entry["time"].values - 1
) # Adjust this calculation based on your coordinate system
# Set the new coordinate value for the copied entry
first_entry["time"] = new_coord_value
# Concatenate the new entry with the original dataset
dataset = xr.concat([first_entry, dataset], dim="time")
# dataset = convert_and_subset_times(dataset)
all_sectors = []
for sector in range(1, 19):
sector_x_data = dataset[f"ivaf_sector_{sector}"].to_dataframe().reset_index(drop=True)
sector_x_data.rename(columns={f"ivaf_sector_{sector}": "ivaf"}, inplace=True)
sector_x_data["sector"] = sector
sector_x_data["year"] = np.arange(1, 87)
sector_x_data["id"] = f"{model}_{exp}_sector{sector}"
all_sectors.append(sector_x_data)
full_dataset = pd.concat(all_sectors, axis=0)
full_dataset["exp"] = exp
full_dataset["model"] = model
all_files_data.append(full_dataset)
outputs = pd.concat(all_files_data)
# outputs["sle"] = outputs["ivaf"] / 362.5 / 1e9
outputs["sle"] = (
-outputs["ivaf"] / 362.5 * 910 / (10**9 * 1000)
) # per Seroussi et al., ISMIP6 scripts (910=ice density)
return outputs
[docs]
def merge_datasets(forcings, projections, experiments_file, ice_sheet="AIS"):
"""
Join sector-level forcings and projections into a single analysis-ready DataFrame.
Uses the experiment metadata to add the AOGCM name to the projections table,
normalises AOGCM name formatting so the two tables join cleanly on
``(aogcm, year, sector)``, then performs an inner merge.
Args:
forcings (pd.DataFrame): Sector-level forcing DataFrame as produced by
``process_AIS/GrIS_atmospheric_sectors()`` + ``process_AIS/GrIS_oceanic_sectors()``
(or read from ``forcings.csv``).
projections (pd.DataFrame): Sector-level projection DataFrame as produced by
``process_AIS/GrIS_outputs()`` (or read from ``projections.csv``).
experiments_file (str or pd.DataFrame): Path to the experiment-metadata CSV
(maps experiment IDs → AOGCM names) or a pre-loaded DataFrame.
ice_sheet (str, optional): ``'AIS'`` or ``'GrIS'``. Defaults to ``'AIS'``.
Returns:
pandas.DataFrame: Merged dataset with one row per (model, experiment,
sector, year), containing all forcing columns and the target SLE
projection.
"""
if isinstance(experiments_file, str):
experiments = pd.read_csv(experiments_file)
elif isinstance(experiments_file, pd.DataFrame):
experiments = experiments_file
else:
raise ValueError("experiments_file must be a string or a pandas DataFrame.")
experiments = experiments[experiments.ice_sheet == ice_sheet]
projections = pd.merge(projections, experiments, on="exp", how="inner")
formatting_function = (
_format_AIS_forcings_aogcm_name if ice_sheet == "AIS" else _format_GrIS_forcings_aogcm_name
)
forcings["aogcm"] = forcings["aogcm"].apply(formatting_function)
projections.rename(columns={"AOGCM": "aogcm"}, inplace=True)
forcings["sector"] = forcings.sector.astype(int)
dataset = pd.merge(forcings, projections, on=["aogcm", "year", "sector"], how="inner")
return dataset
[docs]
def process_GrIS_outputs(zenodo_directory):
"""
Load GrIS IVAF scalar projections from Zenodo and convert to sea-level equivalent.
Reads per-experiment NetCDF files from the ``v7_CMIP5_pub/`` sub-directory.
Each file contains basin-level IVAF time series for the 6 GrIS drainage
basins (``ivaf_no``, ``ivaf_ne``, ``ivaf_se``, ``ivaf_sw``, ``ivaf_cw``,
``ivaf_nw``). Files with only 85 time steps have their first year duplicated
to reach the required 86.
SLE is computed as::
sle = ivaf / 362.5 / 1e9
Args:
zenodo_directory (str): Path to the Zenodo download directory. The
function looks inside ``v7_CMIP5_pub/`` automatically.
Returns:
pandas.DataFrame: One row per (model, experiment, sector, year) with
columns ``ivaf``, ``sle``, ``sector``, ``year``, ``id``, ``exp``,
and ``model``.
"""
directory = (
f"{zenodo_directory}/v7_CMIP5_pub/"
if not zenodo_directory.endswith("v7_CMIP5_pub")
else zenodo_directory
)
files = get_all_filepaths(directory, contains="rm", not_contains="ctrl_proj", filetype="nc")
files = [f for f in files if "historical" not in f]
count = 0
all_files_data = []
for f in files:
exp = f.replace(".nc", "").split("/")[-1].split("_")[-1]
exp = exp.replace("_05", "")
model = f"{f.replace('.nc', '').split('/')[-1].split('_')[-3]}_{f.replace('.nc', '').split('/')[-1].split('_')[-2]}"
dataset = xr.open_dataset(f, decode_times=False)
if len(dataset.time) == 85:
count += 1
warnings.warn(
f"{f.split('/')[-1]} does not contain 86 years. Inserting a copy into the first year."
)
# Copy the first entry
first_entry = dataset.isel({"time": 0})
# Assuming numeric coordinates, create a new coordinate value
new_coord_value = (
first_entry["time"].values - 1
) # Adjust this calculation based on your coordinate system
# Set the new coordinate value for the copied entry
first_entry["time"] = new_coord_value
# Concatenate the new entry with the original dataset
dataset = xr.concat([first_entry, dataset], dim="time")
sector_mapping = {"1": "no", "2": "ne", "3": "se", "4": "sw", "5": "cw", "6": "nw"}
# dataset = convert_and_subset_times(dataset)
all_sectors = []
for sector in range(1, 7):
var_name = f"ivaf_{sector_mapping[str(sector)]}"
sector_x_data = dataset[var_name].to_dataframe().reset_index(drop=True)
sector_x_data.rename(columns={var_name: "ivaf"}, inplace=True)
sector_x_data["sector"] = sector
sector_x_data["year"] = np.arange(1, 87)
sector_x_data["id"] = f"{model}_{exp}_sector{sector}"
all_sectors.append(sector_x_data)
full_dataset = pd.concat(all_sectors, axis=0)
full_dataset["exp"] = exp
full_dataset["model"] = model
all_files_data.append(full_dataset)
outputs = pd.concat(all_files_data)
outputs["sle"] = outputs["ivaf"] / 362.5 / 1e9
return outputs
_DEFAULT_EXPERIMENTS_FILE = os.path.join(
os.path.dirname(__file__), "data_files", "ismip6_experiments_updated.csv"
)
[docs]
def process_sectors(
ice_sheet,
forcing_directory,
grid_file,
zenodo_directory,
experiments_file=_DEFAULT_EXPERIMENTS_FILE,
export_directory=None,
overwrite=False,
with_ctrl=False,
):
"""
End-to-end pipeline that builds the sector-level training dataset from raw ISMIP6 files.
This is the main entry point for data preparation. It reads raw climate
forcing NetCDFs and pre-computed IVAF scalar projection files, aggregates
both to sector-level annual time series (86 years, 2015-2100), joins them
on (aogcm, year, sector), and returns a single analysis-ready DataFrame.
Intermediate files are written to ``export_directory`` so individual stages
can be skipped on re-runs (controlled by ``overwrite``):
* ``<ice_sheet>_atmospheric.csv`` - sector-averaged atmospheric forcings
* ``<ice_sheet>_oceanic.csv`` - sector-averaged oceanic forcings
* ``forcings.csv`` - atmospheric + oceanic merged
* ``projections.csv`` - IVAF projections by sector
* ``dataset.csv`` - final merged dataset (also returned)
Args:
ice_sheet (str): Ice sheet to process: ``'AIS'`` (18 sectors) or ``'GrIS'`` (6 sectors).
forcing_directory (str): Root directory of the ISMIP6 forcing data. Expected
sub-structure mirrors the GHub layout (``Atmosphere_Forcing/``,
``Ocean_Forcing/``, etc.).
grid_file (str): Path to the sector-definition NetCDF (e.g.
``AIS_sectors_8km.nc`` or ``GrIS_Basins_Rignot_sectors_5km.nc``).
zenodo_directory (str): Directory containing the pre-computed IVAF scalar
files from Zenodo (``ComputedScalarsPaper/`` for AIS,
``v7_CMIP5_pub/`` for GrIS).
experiments_file (str): Path to the experiment-metadata CSV that maps
experiment IDs to AOGCM names. Defaults to the bundled
``ismip6_experiments_updated.csv``.
export_directory (str, optional): Directory to write intermediate and final
CSVs. If ``None``, nothing is saved to disk.
overwrite (bool, optional): If ``True``, re-process and overwrite any
existing intermediate files. Defaults to ``False``.
with_ctrl (bool, optional): AIS only — if ``True``, includes control
projections in the output. Defaults to ``False``.
Returns:
pandas.DataFrame: Merged dataset with one row per (model, experiment,
sector, year), containing both forcing variables and the target SLE
projection.
"""
forcing_exists = os.path.exists(f"{export_directory}/forcings.csv")
if not forcing_exists or (forcing_exists and overwrite):
atmospheric_df = (
process_AIS_atmospheric_sectors(forcing_directory, grid_file)
if ice_sheet == "AIS"
else process_GrIS_atmospheric_sectors(forcing_directory, grid_file)
)
atmospheric_df.to_csv(f"{export_directory}/{ice_sheet}_atmospheric.csv", index=False)
oceanic_df = (
process_AIS_oceanic_sectors(forcing_directory, grid_file)
if ice_sheet == "AIS"
else process_GrIS_oceanic_sectors(forcing_directory, grid_file)
)
oceanic_df.to_csv(f"{export_directory}/{ice_sheet}_oceanic.csv", index=False)
atmospheric_df = pd.read_csv(f"{export_directory}/{ice_sheet}_atmospheric.csv")
oceanic_df = pd.read_csv(f"{export_directory}/{ice_sheet}_oceanic.csv")
atmospheric_df = atmospheric_df[[x for x in atmospheric_df.columns if ".1" not in x]]
oceanic_df = oceanic_df[[x for x in oceanic_df.columns if ".1" not in x]]
atmospheric_df = atmospheric_df.loc[:, ~atmospheric_df.columns.duplicated()]
oceanic_df = oceanic_df.loc[:, ~oceanic_df.columns.duplicated()]
forcings = pd.merge(
atmospheric_df,
oceanic_df,
on=[
"aogcm",
"year",
"sector",
],
how="inner",
)
forcings.to_csv(f"{export_directory}/forcings.csv", index=False)
else:
forcings = pd.read_csv(f"{export_directory}/forcings.csv")
projections_exists = os.path.exists(f"{export_directory}/projections.csv")
if not projections_exists or (projections_exists and overwrite):
projections = (
process_AIS_outputs(
zenodo_directory,
with_ctrl=with_ctrl,
)
if ice_sheet == "AIS"
else process_GrIS_outputs(
zenodo_directory,
)
)
projections.to_csv(f"{export_directory}/projections.csv", index=False)
else:
projections = pd.read_csv(f"{export_directory}/projections.csv")
projections = projections.loc[:, ~projections.columns.duplicated()]
dataset = merge_datasets(
forcings,
projections,
experiments_file,
ice_sheet,
)
dataset = dataset[[x for x in dataset.columns if ".1" not in x]]
if export_directory is not None:
dataset.to_csv(f"{export_directory}/dataset.csv", index=False)
return dataset
# ---------------------------------------------------------------------------
# AOGCM name normalisation helpers
#
# ISMIP6 forcing directories and the experiment-metadata table use slightly
# different AOGCM naming conventions (dots vs. dashes, version suffixes, etc.).
# The four functions below canonicalise names so the forcings and projections
# tables can be joined cleanly. Each is applied with DataFrame.apply() inside
# merge_datasets() / DatasetMerger._get_forcing_metadata().
# ---------------------------------------------------------------------------
def _format_AIS_ocean_aogcm_name(aogcm):
"""Normalise an AIS oceanic forcing directory AOGCM name for joining."""
aogcm = aogcm.lower()
if (
aogcm == "ipsl-cm5a-mr_rcp2.6"
or aogcm == "ipsl-cm5a-mr_rcp8.5"
or aogcm == "hadgem2-es_rcp8.5"
or aogcm == "csiro-mk3-6-0_rcp8.5"
):
aogcm = aogcm.replace(".", "")
elif (
aogcm == "cnrm-cm6-1_ssp585"
or aogcm == "cnrm-esm2-1_ssp585"
or aogcm == "cnrm-cm6-1_ssp126"
):
aogcm = aogcm.replace("-1", "")
aogcm = aogcm.replace("-", "_")
elif aogcm == "ukesm1-0-ll_ssp585":
aogcm = "ukesm1-0-ll"
else:
pass
return aogcm
def _format_AIS_forcings_aogcm_name(aogcm):
"""Normalise an AIS atmospheric forcing file AOGCM name for joining."""
aogcm = aogcm.lower()
if (
aogcm == "noresm1-m_rcp2.6"
or aogcm == "noresm1-m_rcp8.5"
or aogcm == "miroc-esm-chem_rcp8.5"
or aogcm == "ccsm4_rcp8.5"
):
aogcm = aogcm.replace(".", "")
elif aogcm == "csiro-mk3-6-0_rcp85":
aogcm = "csiro-mk3.6_rcp85"
elif aogcm == "ipsl-cm5a-mr_rcp26" or aogcm == "ipsl-cm5a-mr_rcp85":
aogcm = aogcm.replace("a", "")
else:
pass
return aogcm
def _format_GrIS_forcings_aogcm_name(aogcm):
"""Normalise a GrIS atmospheric forcing file AOGCM name for joining."""
aogcm = aogcm.lower()
if aogcm == "noresm1_rcp85":
aogcm = "noresm1-m_rcp85"
elif aogcm == "ukesm1-cm6_ssp585":
aogcm = "ukesm1-0-ll_ssp585"
elif aogcm == "csiro_mk3.6_rcp85":
aogcm = "csiro-mk3.6_rcp85"
elif aogcm == "hadgem2_es_rcp85":
aogcm = "hadgem2-es_rcp8.5"
elif aogcm == "ipsl-cm5_mr_rcp85":
aogcm = "ipsl-cm5-mr_rcp8.5"
elif aogcm == "noresm1_rcp85":
aogcm = "noresm1-m_rcp8.5"
else:
pass
return aogcm
def _format_GrIS_ocean_aogcm_name(aogcm):
"""
Formats AOGCM names for GrIS oceanic forcing files to maintain consistency.
Args:
aogcm (str): The original AOGCM name.
Returns:
str: The formatted AOGCM name.
"""
aogcm = aogcm.lower()
if aogcm == "access1-3_rcp8.5":
aogcm = "access1.3_rcp85"
elif aogcm == "csiro-mk3.6_rcp8.5":
aogcm = aogcm.replace("8.5", "85")
elif aogcm in (
"hadgem2-es_rcp8.5",
"ipsl-cm5-mr_rcp8.5",
"miroc5_rcp2.6",
"miroc5_rcp8.5",
"miroc5_rcp8.5",
):
aogcm = aogcm.replace(".", "")
elif aogcm == "noresm1-m_rcp8.5":
aogcm = "noresm1_rcp85"
elif aogcm == "ukesm1-0-ll_ssp585":
aogcm = "ukesm1-cm6_ssp585"
else:
pass
return aogcm