"""Dimensionality reduction for ice sheet emulator inputs.
This module provides PCA and DimensionProcessor for reducing input feature
dimensions and aligning data with fitted transforms (e.g. after scaling).
"""
import numpy as np
import pandas as pd
import torch
from torch import nn
from ise.data.scaler import LogScaler, RobustScaler, StandardScaler
[docs]
class PCA(nn.Module):
"""
Principal Component Analysis (PCA) using PyTorch.
This class performs Principal Component Analysis (PCA) for dimensionality
reduction, leveraging PyTorch's built-in operations. The model can be
trained on input data to extract principal components and later transform
new data into the reduced-dimensional space.
Attributes:
n_components (int or float): Number of principal components to keep.
If an integer, it represents the exact number of components.
If a float between 0 and 1, it represents the proportion of variance to retain.
mean (torch.Tensor): Mean of the input data, computed during fitting.
components (torch.Tensor): Principal components (eigenvectors) of the fitted data.
singular_values (torch.Tensor): Singular values from singular value decomposition.
explained_variance (torch.Tensor): Variance explained by each principal component.
explained_variance_ratio (torch.Tensor): Proportion of total variance explained by each component.
device (str): The computation device ('cuda' if available, otherwise 'cpu').
"""
def __init__(self, n_components):
super().__init__()
self.n_components = n_components
self.mean = None
self.components = None
self.singular_values = None
self.explained_variance = None
self.explained_variance_ratio = None
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.to(self.device)
[docs]
def fit(self, X):
"""
Fits the PCA model to the input data.
Computes the principal components, singular values, explained variance,
and variance ratios using singular value decomposition.
Args:
X (np.ndarray or pd.DataFrame): Input data of shape (n_samples, n_features).
Returns:
PCA: The fitted PCA model instance.
Raises:
ValueError: If `n_components` is neither a valid float (0,1) nor an integer.
"""
# Center the data
X = self._to_tensor(X)
self.mean = torch.mean(X, dim=0)
X_centered = X - self.mean
# Compute low-rank PCA
U, S, V = torch.pca_lowrank(
X_centered,
q=301,
)
# Compute total variance
total_variance = S.pow(2).sum()
explained_variance = S.pow(2)
explained_variance_ratio = explained_variance / total_variance
# Determine the number of components for the desired variance
if isinstance(self.n_components, float) and 0 < self.n_components < 1:
cumulative_variance_ratio = torch.cumsum(explained_variance_ratio, dim=0)
# Number of components to explain desired variance
num_components = torch.searchsorted(cumulative_variance_ratio, self.n_components) + 1
self.n_components = min(num_components, S.size(0))
elif isinstance(self.n_components, int):
self.n_components = min(self.n_components, S.size(0))
else:
raise ValueError("n_components must be a float in the range (0, 1) or an integer.")
self.components = V[:, : self.n_components]
self.singular_values = S[: self.n_components]
self.explained_variance = explained_variance[: self.n_components]
self.explained_variance_ratio = explained_variance_ratio[: self.n_components]
return self
[docs]
def save(self, path):
"""
Saves the fitted PCA model to a specified file.
Args:
path (str): File path where the model should be saved.
Raises:
RuntimeError: If the PCA model has not been fitted before saving.
"""
if self.mean is None or self.components is None:
raise RuntimeError("PCA model has not been fitted yet.")
torch.save(
{
"n_components": self.n_components,
"mean": self.mean,
"components": self.components,
"singular_values": self.singular_values,
"explained_variance": self.explained_variance,
"explained_variance_ratio": self.explained_variance_ratio,
},
path,
)
def _to_tensor(self, x):
"""
Converts input data into a PyTorch tensor.
Args:
x (np.ndarray, pd.DataFrame, or torch.Tensor): Input data.
Returns:
torch.Tensor: Converted data as a PyTorch tensor.
Raises:
ValueError: If input data type is not supported.
"""
if x is None:
return None
if isinstance(x, pd.DataFrame):
x = x.values
elif isinstance(x, np.ndarray):
x = torch.tensor(x)
elif isinstance(x, torch.Tensor):
pass
else:
raise ValueError("Data must be a pandas dataframe, numpy array, or PyTorch tensor")
return x
[docs]
@staticmethod
def load(path):
"""
Loads a previously saved PCA model.
Args:
path (str): Path to the saved PCA model.
Returns:
PCA: Loaded PCA model instance.
Raises:
FileNotFoundError: If the specified file does not exist.
RuntimeError: If the loaded model is not a valid PCA instance.
"""
checkpoint = torch.load(path)
model = PCA(checkpoint["n_components"])
model.mean = checkpoint["mean"]
model.components = checkpoint["components"]
model.singular_values = checkpoint["singular_values"]
model.explained_variance = checkpoint["explained_variance"]
model.explained_variance_ratio = checkpoint["explained_variance_ratio"]
return model
[docs]
class DimensionProcessor(nn.Module):
"""
A pipeline for dimensionality reduction using PCA and data scaling.
This class integrates a PCA model for dimensionality reduction and a
scaling method for normalization. It facilitates transformation between
the original data space and the reduced PCA space.
Args:
pca_model (str or PCA): A pre-trained PCA model or a file path to a saved PCA model.
scaler_model (str or StandardScaler, RobustScaler, LogScaler):
A scaler model instance or a file path to a saved scaler model.
scaler_method (str, optional): Scaling method to use.
Must be one of 'standard', 'robust', or 'log'. Defaults to 'standard'.
Attributes:
device (str): Computation device ('cuda' if available, otherwise 'cpu').
pca (PCA): PCA model used for dimensionality reduction.
scaler (StandardScaler, RobustScaler, or LogScaler): Scaling model for data normalization.
Raises:
ValueError: If the `pca_model` or `scaler_model` are invalid.
RuntimeError: If the PCA model has not been fitted.
"""
def __init__(
self,
pca_model,
scaler_model,
scaler_method="standard",
):
super().__init__()
self.device = "cuda" if torch.cuda.is_available() else "cpu"
# LOAD PCA
if isinstance(pca_model, str):
self.pca = PCA.load(pca_model)
elif isinstance(pca_model, PCA):
self.pca = pca_model
else:
raise ValueError("pca_model must be a path (str) or a PCA instance")
if self.pca.mean is None or self.pca.components is None:
raise RuntimeError("PCA model has not been fitted yet.")
# LOAD SCALER
if scaler_method == "standard":
scaler_class = StandardScaler
elif scaler_method == "robust":
scaler_class = RobustScaler
elif scaler_method == "log":
scaler_class = LogScaler
else:
raise ValueError("scaler_method must be 'standard', 'robust', or 'log'")
if isinstance(scaler_model, str):
self.scaler = scaler_class.load(scaler_model)
elif isinstance(scaler_model, scaler_class):
self.scaler = scaler_model
else:
raise ValueError("pca_model must be a path (str) or a PCA instance")
# if self.scaler.mean_ is None or self.scaler.scale_ is None:
# raise RuntimeError("This StandardScalerPyTorch instance is not fitted yet.")
self.scaler.to(self.device)
self.pca.to(self.device)
self.to(self.device)
[docs]
def to_pca(self, data):
"""
Applies PCA transformation to the input data.
Args:
data (torch.Tensor or pd.DataFrame): Input data for transformation.
Returns:
torch.Tensor: Data transformed into the PCA space.
"""
data = data.to(self.device)
scaled = self.scaler.transform(data) # scale
return self.pca.transform(scaled) # convert to pca
[docs]
def to_grid(self, pcs, unscale=True):
"""
Converts PCA-transformed data back to the original feature space.
Args:
pcs (torch.Tensor or pd.DataFrame): Principal components to transform.
unscale (bool, optional): Whether to apply inverse scaling. Defaults to True.
Returns:
torch.Tensor or pd.DataFrame: Reconstructed data in the original feature space.
"""
if not isinstance(pcs, torch.Tensor):
if isinstance(pcs, pd.DataFrame):
pcs = pcs.values
pcs = torch.tensor(pcs, dtype=torch.float32).to(self.device)
else:
pcs = pcs.to(self.device)
# Ensure components and mean are on the same device as pcs
components = self.pca.components.to(self.device)
pca_mean = self.pca.mean.to(self.device)
# Now, the operation should not cause a device mismatch error
scaled_grid = torch.mm(pcs, components.t()) + pca_mean
if unscale:
return self.scaler.inverse_transform(scaled_grid)
# scale = self.scaler.scale_.to(self.device)
# scaler_mean = self.scaler.mean_.to(self.device)
# unscaled_grid = scaled_grid * scale + scaler_mean
# return unscaled_grid
return scaled_grid