"""Evaluation metrics for ISEFlow sea-level projections.
This module collects the full set of metrics used to assess ISEFlow prediction
quality across three dimensions: point accuracy, distributional fidelity, and
uncertainty calibration.
Point accuracy metrics
----------------------
``r2_score`` — coefficient of determination (R²).
``mean_squared_error`` — MSE between predicted and true SLE.
``mean_absolute_error`` — MAE.
``mape`` — mean absolute percentage error (skips zero targets).
``relative_squared_error``— RSE = SS_res / SS_tot (0 = perfect, 1 = baseline mean).
``crps`` — continuous ranked probability score for Gaussian forecasts (lower is better; uses ``properscoring`` library).
Distribution metrics
--------------------
These compare the *distribution* of projected SLE values at 2100 across many
runs, rather than individual timestep accuracy:
``kl_divergence`` — Kullback-Leibler divergence between predicted and true PDFs.
``js_divergence`` — Jensen-Shannon divergence (symmetric, bounded [0, 1]).
``kolmogorov_smirnov``— KS two-sample test statistic and p-value.
``t_test`` — two-sample t-test statistic and p-value.
Uncertainty calibration metrics
--------------------------------
``calculate_ece`` — Expected Calibration Error (ECE). Measures
how well the predicted std aligns with actual errors by binning predictions
by uncertainty level and checking what fraction of true values fall within
±2σ (expected ≈ 95.4 % for a Gaussian). Lower ECE = better calibrated.
``mean_prediction_interval_width`` — average width of prediction intervals
(sharpness proxy; should be small while ECE stays low).
``winkler_score`` — proper scoring rule for interval forecasts at
significance level α. Penalises both wide intervals and violations.
Sector aggregation
------------------
``sum_by_sector`` — given a full 2-D (x, y) grid array and a sector-definition
NetCDF file, sums values within each sector mask to produce an
``(N_timesteps, N_sectors)`` matrix::
from ise.evaluation.metrics import sum_by_sector
sector_sums = sum_by_sector(predicted_grid, "AIS_sectors_8km.nc")
Usage example
-------------
::
from ise.evaluation.metrics import r2_score, crps, calculate_ece
r2 = r2_score(y_true, predictions)
score = crps(y_true, predictions, uncertainties["total"])
ece = calculate_ece(predictions, uncertainties["total"], y_true)
"""
import numpy as np
import properscoring as ps
import torch
import xarray as xr
from scipy.spatial.distance import jensenshannon
from scipy.stats import kstest, ttest_ind
from sklearn.metrics import r2_score as r2
[docs]
def sum_by_sector(array, grid_file):
"""
Computes the sum of values in a given array by predefined sectors using a grid file.
Args:
array (numpy.ndarray or torch.Tensor): A 2D or 3D array containing values to be summed by sector.
grid_file (str or xarray.Dataset): Path to the grid file defining sector boundaries or an xarray dataset.
Returns:
numpy.ndarray: A 2D array where each row represents a timestep and each column represents a sector.
Raises:
ValueError: If grid_file is not a valid string or xarray dataset.
"""
if isinstance(array, torch.Tensor):
array = array.cpu().detach().numpy()
if isinstance(grid_file, str):
grids = xr.open_dataset(grid_file)
sector_name = "sectors" if "ais" in grid_file.lower() else "ID"
elif isinstance(grid_file, xr.Dataset):
grids = grid_file
sector_name = "ID" if "Rignot" in grids.attrs.get("Description", "") else "sectors"
else:
raise ValueError("grid_file must be a string or an xarray Dataset.")
if len(array.shape) == 3:
num_timesteps = array.shape[0]
elif len(array.shape) == 2:
num_timesteps = 1
array = array.reshape((1, array.shape[0], array.shape[1]))
sectors = grids[sector_name].values
ice_sheet = "AIS" if 761 in array.shape else "GIS"
num_sectors = 18 if ice_sheet == "AIS" else 6
sums_by_sector = np.zeros((num_timesteps, num_sectors))
for i in range(array.shape[0]):
for sector in range(1, num_sectors + 1):
sums_by_sector[i, sector - 1] = np.sum(array[i, :, :][sectors == sector])
return sums_by_sector
[docs]
def r2_score(y_true, y_pred):
"""
Computes the coefficient of determination (R² score).
Args:
y_true (numpy.ndarray or list): The true values.
y_pred (numpy.ndarray or list): The predicted values.
Returns:
float: The R² score, where 1 indicates perfect predictions.
"""
return r2(y_true, y_pred)
[docs]
def mean_squared_error_sector(sum_sectors_true, sum_sectors_pred):
"""
Computes the mean squared error (MSE) between true and predicted sector-wise sums.
Args:
sum_sectors_true (numpy.ndarray): The true summed sector values.
sum_sectors_pred (numpy.ndarray): The predicted summed sector values.
Returns:
float: The mean squared error (MSE).
"""
return np.mean((sum_sectors_true - sum_sectors_pred) ** 2)
[docs]
def kl_divergence(p: np.ndarray, q: np.ndarray):
"""
Computes the Kullback-Leibler (KL) Divergence between two probability distributions.
Args:
p (numpy.ndarray): The first probability distribution.
q (numpy.ndarray): The second probability distribution.
Returns:
float: The KL divergence value.
Notes:
- The distributions p and q must be normalized (i.e., sum to 1).
- Small epsilon values are used to avoid numerical instability.
"""
p = np.asarray(p, dtype=np.float64)
q = np.asarray(q, dtype=np.float64)
# Normalize p and q to ensure they are probability distributions
p /= np.sum(p)
q /= np.sum(q)
# Clip values to avoid numerical instability
epsilon = 1e-10
p = np.clip(p, epsilon, 1)
q = np.clip(q, epsilon, 1)
# Compute KL divergence
return np.sum(p * np.log(p / q))
[docs]
def js_divergence(p: np.ndarray, q: np.ndarray):
"""
Computes the Jensen-Shannon Divergence (JSD) between two probability distributions.
Args:
p (numpy.ndarray): The first probability distribution.
q (numpy.ndarray): The second probability distribution.
Returns:
float: The Jensen-Shannon divergence value.
Notes:
- JSD is a smoothed and symmetric version of KL divergence.
- The function normalizes the distributions before computation.
"""
p = np.asarray(p, dtype=np.float64)
q = np.asarray(q, dtype=np.float64)
# Normalize p and q to ensure they are probability distributions
p /= np.sum(p)
q /= np.sum(q)
# Clip values to avoid numerical instability
epsilon = 1e-10
p = np.clip(p, epsilon, 1)
q = np.clip(q, epsilon, 1)
# Calculate the Jensen-Shannon Divergence
jsd = (
jensenshannon(p, q) ** 2
) # The function returns the square root, so square it for the divergence
return jsd
[docs]
def crps(y_true, y_pred, y_std):
"""
Computes the Continuous Ranked Probability Score (CRPS) for a Gaussian distribution.
Args:
y_true (numpy.ndarray): The true values.
y_pred (numpy.ndarray): The predicted mean values.
y_std (numpy.ndarray): The predicted standard deviations.
Returns:
numpy.ndarray: The computed CRPS values for each prediction.
"""
return ps.crps_gaussian(y_true, mu=y_pred, sig=y_std)
[docs]
def mape(y_true, y_pred):
"""
Computes the Mean Absolute Percentage Error (MAPE).
Args:
y_true (numpy.ndarray or list): The true values.
y_pred (numpy.ndarray or list): The predicted values.
Returns:
float: The MAPE value, expressed as a percentage.
Notes:
- MAPE ignores zero values in y_true to prevent division by zero.
- If all true values are zero, returns infinity.
"""
y_true, y_pred = np.array(y_true), np.array(y_pred)
# Avoid division by zero
non_zero_mask = y_true != 0
if not np.any(non_zero_mask):
return np.inf
mape = (
np.mean(np.abs((y_true[non_zero_mask] - y_pred[non_zero_mask]) / y_true[non_zero_mask]))
* 100
)
return mape
[docs]
def relative_squared_error(y_true, y_pred):
"""
Computes the Relative Squared Error (RSE), measuring the error relative to the variance in y_true.
Args:
y_true (numpy.ndarray or list): The true values.
y_pred (numpy.ndarray or list): The predicted values.
Returns:
float: The computed RSE value.
Notes:
- A lower RSE indicates better performance, with RSE=0 indicating perfect predictions.
"""
y_true, y_pred = np.array(y_true), np.array(y_pred)
ss_res = np.sum((y_true - y_pred) ** 2)
ss_tot = np.sum((y_true - np.mean(y_true)) ** 2)
rse = ss_res / ss_tot
return rse
[docs]
def kolmogorov_smirnov(x1, x2):
"""
Computes the Kolmogorov-Smirnov (KS) statistic to compare two distributions.
Args:
x1 (numpy.ndarray or list): The first dataset.
x2 (numpy.ndarray or list): The second dataset.
Returns:
tuple: (KS statistic, p-value).
"""
res = kstest(x1, x2)
return res.statistic, res.pvalue
[docs]
def t_test(x1, x2):
"""
Performs an independent two-sample t-test to compare the means of two distributions.
Args:
x1 (numpy.ndarray or list): The first dataset.
x2 (numpy.ndarray or list): The second dataset.
Returns:
tuple: (t-statistic, p-value).
"""
res = ttest_ind(x1, x2)
return res.statistic, res.pvalue
[docs]
def calculate_ece(predictions, uncertainties, true_values, bins=10):
"""
Computes the Expected Calibration Error (ECE) for a regression model.
Args:
predictions (numpy.ndarray): The predicted mean values.
uncertainties (numpy.ndarray): The predicted standard deviations.
true_values (numpy.ndarray): The true values.
bins (int, optional): The number of bins for uncertainty grouping. Defaults to 10.
Returns:
float: The Expected Calibration Error (ECE).
Notes:
- ECE measures how well predicted uncertainties align with actual errors.
- A lower ECE indicates better-calibrated uncertainty estimates.
"""
bin_limits = np.linspace(np.min(uncertainties), np.max(uncertainties), bins + 1)
ece = 0.0
total_count = len(predictions)
for i in range(bins):
bin_mask = (uncertainties >= bin_limits[i]) & (uncertainties < bin_limits[i + 1])
if np.sum(bin_mask) == 0:
continue
bin_predictions = predictions[bin_mask]
bin_uncertainties = uncertainties[bin_mask]
bin_true_values = true_values[bin_mask]
# Assume Gaussian distribution: about 95.4% of data should fall within ±2 standard deviations
lower_bounds = bin_predictions - 2 * bin_uncertainties
upper_bounds = bin_predictions + 2 * bin_uncertainties
in_range = (bin_true_values >= lower_bounds) & (bin_true_values <= upper_bounds)
observed_probability = np.mean(in_range)
expected_probability = 0.954 # For ±2 standard deviations in Gaussian distribution
# Calculate the absolute difference weighted by the number of elements in the bin
ece += np.abs(observed_probability - expected_probability) * np.sum(bin_mask) / total_count
return ece
[docs]
def mean_squared_error(y_true, y_pred):
"""
Computes the Mean Squared Error (MSE).
Args:
y_true (numpy.ndarray or list): The true values.
y_pred (numpy.ndarray or list): The predicted values.
Returns:
float: The Mean Squared Error (MSE).
"""
y_true, y_pred = np.array(y_true), np.array(y_pred)
mse = np.mean((y_true - y_pred) ** 2)
return mse
[docs]
def mean_absolute_error(y_true, y_pred):
"""
Computes the Mean Absolute Error (MAE).
Args:
y_true (numpy.ndarray or list): The true values.
y_pred (numpy.ndarray or list): The predicted values.
Returns:
float: The Mean Absolute Error (MAE).
"""
y_true, y_pred = np.array(y_true), np.array(y_pred)
mae = np.mean(np.abs(y_true - y_pred))
return mae
[docs]
def mean_prediction_interval_width(upper_bound, lower_bound):
"""
Computes the Mean Prediction Interval Width (MPIW).
Args:
upper_bound (numpy.ndarray or list): The upper bounds of the prediction intervals.
lower_bound (numpy.ndarray or list): The lower bounds of the prediction intervals.
Returns:
float: The Mean Prediction Interval Width (MPIW).
"""
upper_bound, lower_bound = np.array(upper_bound), np.array(lower_bound)
mpiw = np.mean(upper_bound - lower_bound)
return mpiw
[docs]
def winkler_score(y_true, y_pred, lower_bound, upper_bound, alpha=0.05):
"""
Computes the Winkler Score for prediction intervals.
Args:
y_true (numpy.ndarray or list): The true values.
y_pred (numpy.ndarray or list): The predicted mean values.
lower_bound (numpy.ndarray or list): The lower bounds of the prediction intervals.
upper_bound (numpy.ndarray or list): The upper bounds of the prediction intervals.
alpha (float, optional): The significance level for the prediction intervals. Defaults to 0.05.
Returns:
float: The Winkler Score.
"""
y_true, y_pred = np.array(y_true), np.array(y_pred)
lower_bound, upper_bound = np.array(lower_bound), np.array(upper_bound)
n = len(y_true)
score = np.zeros(n)
for i in range(n):
if y_true[i] < lower_bound[i]:
score[i] = (upper_bound[i] - lower_bound[i]) + (2 / alpha) * (
lower_bound[i] - y_true[i]
)
elif y_true[i] > upper_bound[i]:
score[i] = (upper_bound[i] - lower_bound[i]) + (2 / alpha) * (
y_true[i] - upper_bound[i]
)
else:
score[i] = upper_bound[i] - lower_bound[i]
return np.mean(score)