"""Feature engineering for ISMIP6 emulator training datasets.
This module transforms the raw merged dataset (output of ``ise.data.process``)
into the scaled, lagged, train/val/test-split arrays consumed by
``ISEFlow.fit()``. The primary interface is the ``FeatureEngineer`` class,
backed by a set of standalone functions that can also be called independently.
Pipeline stages
---------------
The typical preprocessing sequence is::
from ise.data.feature_engineer import FeatureEngineer
fe = FeatureEngineer("AIS", data=df)
fe.add_model_characteristics() # merge ISM config one-hot columns
fe.drop_outliers( # remove SLE < -26.3 mm (physics bound)
method="explicit",
column="sle",
expression=[("sle", "<", -26.3)],
)
fe.backfill_outliers() # replace extreme spikes with prev value
fe.add_lag_variables(lag=5) # add t-1 … t-5 copies of forcing vars
fe.split_data(output_directory="splits/") # 70/15/15 by simulation id
X_scaled, y_scaled = fe.scale_data(method="standard", save_dir="splits/")
Key design choices
------------------
- **Split granularity:** train/val/test is done by *simulation id*, not by
individual rows, so no future data leaks into the validation set. The
default split is 70/15/15 with ``random_state=1``.
- **Outlier threshold:** ``drop_outliers`` with ``expression=[("sle", "<", -26.3)]``
removes physically implausible projections (sea level rise of more than
26.3 mm is considered a physical bound for individual sectors).
- **Lag variables:** ``add_lag_variables(lag=5)`` adds t-1 through t-5 copies
of each atmospheric and oceanic forcing column within each 86-year segment,
respecting projection boundaries so lag values do not cross between runs.
- **Model characteristics:** ``add_model_characteristics()`` merges the
ISM configuration CSV (e.g. ``AIS_model_characteristics.csv``) and
one-hot encodes categorical columns such as numerics, stress balance, etc.
Standalone functions (also usable without FeatureEngineer)
----------------------------------------------------------
``split_training_data`` — train/val/test split by simulation id.
``add_lag_variables`` — add t-k lag columns within each 86-step segment.
``backfill_outliers`` — replace extreme y-values with previous-row value.
``drop_outliers`` — remove entire runs containing outlier timesteps.
``add_model_characteristics`` — merge and encode ISM config metadata.
``scale_data`` — apply a pre-fitted sklearn scaler from disk.
``fill_mrro_nans`` — impute missing ``mrro_anomaly`` values.
"""
import json
import os
import pickle
import warnings
import numpy as np
import pandas as pd
import torch
from sklearn.preprocessing import MinMaxScaler, RobustScaler, StandardScaler
from tqdm import tqdm
[docs]
class FeatureEngineer:
"""
A class for performing feature engineering on a given dataset, including preprocessing,
scaling, dataset splitting, and outlier handling.
Args:
ice_sheet (str): The name of the ice sheet being analyzed.
data (pd.DataFrame): The input dataset.
fill_mrro_nans (bool, optional): Whether to fill missing values in the 'mrro' column. Defaults to False.
split_dataset (bool, optional): Whether to split the dataset into training, validation, and test sets. Defaults to False.
train_size (float, optional): Proportion of data to use for training. Defaults to 0.7.
val_size (float, optional): Proportion of data to use for validation. Defaults to 0.15.
test_size (float, optional): Proportion of data to use for testing. Defaults to 0.15.
output_directory (str, optional): Directory to save the split datasets. Defaults to None.
Attributes:
data (pd.DataFrame): The input dataset.
train_size (float): Proportion of training data.
val_size (float): Proportion of validation data.
test_size (float): Proportion of testing data.
output_directory (str): Directory to save datasets.
scaler_X_path (str): Path to the saved input feature scaler.
scaler_y_path (str): Path to the saved target variable scaler.
scaler_X (scaler object): Scaler for input features.
scaler_y (scaler object): Scaler for target variables.
train (pd.DataFrame): Training dataset.
val (pd.DataFrame): Validation dataset.
test (pd.DataFrame): Test dataset.
_including_model_characteristics (bool): Whether model characteristics have been included.
Methods:
split_data: Splits dataset into train, validation, and test sets.
fill_mrro_nans: Fills missing values in the 'mrro' column.
scale_data: Scales input and target variables using a specified method.
unscale_data: Reverses the scaling transformation.
add_lag_variables: Adds lag features to the dataset.
backfill_outliers: Replaces extreme values in target variables.
drop_outliers: Removes outliers based on specified criteria.
add_model_characteristics: Merges model characteristics into the dataset.
"""
def __init__(
self,
ice_sheet,
data: pd.DataFrame,
fill_mrro_nans: bool = False,
split_dataset: bool = False,
train_size: float = 0.7,
val_size: float = 0.15,
test_size: float = 0.15,
output_directory: str | None = None,
):
self.data = data
try:
self.data = self.data.sort_values(by=["model", "exp", "sector", "year"])
except:
pass
self.train_size = train_size
self.val_size = val_size
self.test_size = test_size
self.output_directory = output_directory
self.ice_sheet = ice_sheet
self.scaler_X_path = None
self.scaler_y_path = None
self.scaler_X = None
self.scaler_y = None
self.train = None
self.val = None
self.test = None
if fill_mrro_nans:
self.data = self.fill_mrro_nans(method="zero")
if split_dataset:
self.train, self.val, self.test = self.split_data(
data, train_size, val_size, test_size, output_directory, random_state=1
)
self._including_model_characteristics = False
[docs]
def split_data(
self,
data=None,
train_size=None,
val_size=None,
test_size=None,
output_directory=None,
random_state=1,
):
"""
Splits the dataset into training, validation, and test sets.
Args:
data (pd.DataFrame, optional): The input dataset. Defaults to None.
train_size (float, optional): Proportion of training data. Defaults to None.
val_size (float, optional): Proportion of validation data. Defaults to None.
test_size (float, optional): Proportion of testing data. Defaults to None.
output_directory (str, optional): Directory to save split datasets. Defaults to None.
random_state (int, optional): Random seed for reproducibility. Defaults to 42.
Returns:
tuple: Training, validation, and test datasets as pandas DataFrames.
"""
if data is not None:
self.data = data
if train_size is not None:
self.train_size = train_size
if val_size is not None:
self.val_size = val_size
if output_directory is not None:
self.output_directory = output_directory
self.train, self.val, self.test = split_training_data(
self.data,
self.train_size,
self.val_size,
self.test_size,
self.output_directory,
random_state,
)
return self.train, self.val, self.test
[docs]
def fill_mrro_nans(self, method, data=None):
"""
Fills missing values in the 'mrro' column.
Args:
method (str): The method used to fill missing values.
data (pd.DataFrame, optional): The dataset. Defaults to None.
Returns:
pd.DataFrame: The dataset with missing values filled.
"""
if data is not None:
self.data = data
if "mrro_anomaly" not in self.data.columns:
print("mrro_anomaly not in columns, skipping fill_mrro_nans()")
return self.data
self.data = fill_mrro_nans(self.data, method)
return self.data
[docs]
def scale_data(self, X=None, y=None, method="standard", save_dir=None):
"""
Scales input (X) and target (y) variables using a specified scaling method.
Args:
X (pd.DataFrame or np.ndarray, optional): Input data. Defaults to None.
y (pd.DataFrame or np.ndarray, optional): Target data. Defaults to None.
method (str, optional): Scaling method ('standard', 'minmax', 'robust'). Defaults to 'standard'.
save_dir (str, optional): Directory to save scalers. Defaults to None.
Returns:
tuple: Scaled X and y values.
"""
dropped_data = pd.DataFrame(index=self.data.index)
if X is not None:
self.X = X
else:
if self._including_model_characteristics:
dropped_columns = [
"id",
"cmip_model",
"pathway",
"exp",
"ice_sheet",
"Scenario",
"Tier",
"aogcm",
"id",
"exp",
"model",
"ivaf",
"year",
] + list(self.data.columns[self.data.dtypes == bool]) # noqa: E721
else:
dropped_columns = [
"id",
"cmip_model",
"pathway",
"exp",
"ice_sheet",
"Scenario",
"Ocean forcing",
"Ocean sensitivity",
"Ice shelf fracture",
"Tier",
"aogcm",
"id",
"exp",
"model",
"ivaf",
"year",
]
dropped_columns = [x for x in self.data.columns if x in dropped_columns]
dropped_data = self.data[dropped_columns]
self.X = self.data.drop(
columns=[x for x in self.data.columns if "sle" in x] + dropped_columns
)
if y is not None:
self.y = y
else:
self.y = self.data[[x for x in self.data.columns if "sle" in x]]
if self.scaler_X_path is not None and self.scaler_y_path is not None:
with open(self.scaler_X_path, "rb") as f:
scaler_X = pickle.load(f)
with open(self.scaler_y_path, "rb") as f:
scaler_y = pickle.load(f)
return scaler_X.transform(self.X), scaler_y.transform(self.y)
elif self.scaler_X is not None and self.scaler_y is not None:
return self.scaler_X.transform(self.X), self.scaler_y.transform(self.y)
if (self.X is None and X is None) or (self.y is None and y is None):
raise ValueError(
"X and y must be provided if they are not already stored in the class instance."
)
# Initialize the scalers based on the method
if method == "standard":
scaler_X = StandardScaler()
scaler_y = StandardScaler()
elif method == "minmax":
scaler_X = MinMaxScaler()
scaler_y = MinMaxScaler()
elif method == "robust":
scaler_X = RobustScaler()
scaler_y = RobustScaler()
else:
raise ValueError("method must be 'standard', 'minmax', or 'robust'")
# Store scalers in the class instance for potential future use
self.scaler_X, self.scaler_y = scaler_X, scaler_y
scaler_X.fit(self.X)
X_scaled = scaler_X.transform(self.X)
# Fit and transform y
if isinstance(self.y, pd.DataFrame):
y_data = self.y.values
elif isinstance(self.y, np.ndarray):
y_data = self.y
else:
raise TypeError("X must be either a pandas DataFrame or a NumPy array.")
scaler_y.fit(y_data)
y_scaled = scaler_y.transform(y_data)
self.scaler_X, self.scaler_y = scaler_X, scaler_y
# Optionally save the scalers
if save_dir is not None:
if os.path.exists(f"{save_dir}/scalers/"):
self.scaler_X_path = f"{save_dir}/scalers/scaler_X.pkl"
self.scaler_y_path = f"{save_dir}/scalers/scaler_y.pkl"
else:
self.scaler_X_path = f"{save_dir}/scaler_X.pkl"
self.scaler_y_path = f"{save_dir}/scaler_y.pkl"
with open(self.scaler_X_path, "wb") as f:
pickle.dump(scaler_X, f)
with open(self.scaler_y_path, "wb") as f:
pickle.dump(scaler_y, f)
self.data = pd.concat(
[
pd.DataFrame(X_scaled, columns=self.X.columns, index=self.X.index),
pd.DataFrame(y_scaled, columns=self.y.columns, index=self.y.index),
dropped_data,
],
axis=1,
)
return X_scaled, y_scaled
[docs]
def unscale_data(self, X=None, y=None, scaler_X_path=None, scaler_y_path=None):
"""
Reverses the scaling transformation for input (X) and target (y) variables.
Args:
X (pd.DataFrame or np.ndarray, optional): The input data to be unscaled. Defaults to None.
y (pd.DataFrame, np.ndarray, or torch.Tensor, optional): The target data to be unscaled. Defaults to None.
scaler_X_path (str, optional): Path to the stored input scaler. Defaults to None.
scaler_y_path (str, optional): Path to the stored target scaler. Defaults to None.
Returns:
tuple: Unscaled X and y data.
"""
if scaler_X_path is not None:
self.scaler_X_path = scaler_X_path
if scaler_y_path is not None:
self.scaler_y_path = scaler_y_path
if isinstance(y, torch.Tensor):
y = y.detach().cpu().numpy()
# Load scaler for X
if X is not None:
if self.scaler_X_path is None and self.scaler_X is None:
raise ValueError(
"scaler_X_path must be provided if X is not None and self.scaler_X is None."
)
if self.scaler_X is None:
with open(self.scaler_X_path, "rb") as f:
scaler_X = pickle.load(f)
else:
scaler_X = self.scaler_X
X_unscaled = scaler_X.inverse_transform(X)
if isinstance(X, pd.DataFrame):
X_unscaled = pd.DataFrame(X_unscaled, columns=X.columns, index=X.index)
else:
X_unscaled = None
# Load scaler for y
if y is not None:
if self.scaler_y_path is None and self.scaler_y is None:
raise ValueError(
"scaler_y_path must be provided if y is not None and self.scaler_y is None."
)
if self.scaler_y is None:
with open(self.scaler_y_path, "rb") as f:
scaler_y = pickle.load(f)
else:
scaler_y = self.scaler_y
y_unscaled = scaler_y.inverse_transform(y)
if isinstance(y, pd.DataFrame):
y_unscaled = pd.DataFrame(y_unscaled, columns=y.columns, index=y.index)
else:
y_unscaled = None
return X_unscaled, y_unscaled
[docs]
def add_lag_variables(self, lag, data=None):
"""
Adds lagged versions of predictor variables to the dataset.
Args:
lag (int): Number of time steps to lag the variables.
data (pd.DataFrame, optional): The dataset. If not provided, the class attribute 'data' is used.
Returns:
FeatureEngineer: The modified instance with lag variables added.
"""
if data is not None:
self.data = data
self.data = add_lag_variables(self.data, lag)
return self
[docs]
def backfill_outliers(self, percentile=99.999, data=None):
"""
Replaces extreme values in target variables with the previous row's value.
Args:
percentile (float, optional): Percentile threshold for identifying outliers. Defaults to 99.999.
data (pd.DataFrame, optional): The dataset. If not provided, the class attribute 'data' is used.
Returns:
FeatureEngineer: The modified instance with outliers handled.
"""
if data is not None:
self.data = data
self.data = backfill_outliers(self.data, percentile=percentile)
return self
[docs]
def drop_outliers(self, method, column, expression=None, quantiles=[0.01, 0.99], data=None):
"""
Drops simulations that are outliers based on the provided method.
Args:
method (str): Method of outlier deletion ('quantile' or 'explicit').
column (str): Column used for detecting outliers.
expression (list[tuple], optional): List of filtering expressions in the form [(column, operator, value)]. Defaults to None.
quantiles (list[float], optional): Quantiles for 'quantile' method. Defaults to [0.01, 0.99].
data (pd.DataFrame, optional): The dataset. If not provided, the class attribute 'data' is used.
Returns:
FeatureEngineer: The modified instance with outliers removed.
"""
if data is not None:
self.data = data
self.data = drop_outliers(self.data, column, method, expression, quantiles)
return self
[docs]
def add_model_characteristics(
self,
data=None,
model_char_path=None,
encode=True,
ids_path=None,
):
"""
Merges model characteristic data with the dataset.
Args:
data (pd.DataFrame, optional): The dataset. If not provided, the class attribute 'data' is used.
model_char_path (str, optional): Path to the model characteristics file. Defaults to the internal path.
encode (bool, optional): Whether to one-hot encode categorical characteristics. Defaults to True.
ids_path (str, optional): Path to an additional ID mapping file. Defaults to None.
Returns:
FeatureEngineer: The modified instance with model characteristics added.
"""
if data is not None:
self.data = data
if model_char_path is None:
model_char_path = os.path.join(
os.path.dirname(__file__),
"data_files",
f"{self.ice_sheet}_model_characteristics.csv",
)
self.data = add_model_characteristics(self.data, model_char_path, encode, ids_path=ids_path)
self._including_model_characteristics = True
return self
[docs]
def exclude_fetish_models(self, data=None, exclude="both"):
"""
Excludes specific models from the dataset.
Args:
data (pd.DataFrame, optional): The dataset. If not provided, the class attribute 'data' is used.
Returns:
FeatureEngineer: The modified instance with specific models excluded.
"""
if data is not None:
self.data = data
self.data = exclude_fetish_models(self.data, exclude)
return self
[docs]
def scale_data(data, scaler_path):
"""
Scales the provided dataset using a pre-trained scaler.
Args:
data (pd.DataFrame): The dataset to be scaled.
scaler_path (str): Path to the saved scaler.
Returns:
pd.DataFrame: The scaled dataset.
"""
# Columns to drop regardless
always_drop = {
"id",
"cmip_model",
"pathway",
"exp",
"ice_sheet",
"Scenario",
"Tier",
"aogcm",
"model",
"ivaf",
"outlier",
}
column_order = data.columns
with open(scaler_path, "rb") as f:
scaler = pickle.load(f)
columns_to_scale = scaler.get_feature_names_out()
columns_not_to_scale = [c for c in data.columns if c not in columns_to_scale]
data_to_scale = data[columns_to_scale]
data_not_to_scale = data[columns_not_to_scale]
scaled = scaler.transform(data_to_scale)
scaled = pd.DataFrame(scaled, columns=columns_to_scale, index=data.index)
data = pd.concat([scaled, data_not_to_scale], axis=1)
data = data[column_order]
# Drop duplicate columns from the concat
data = data.loc[:, ~data.columns.duplicated()]
return data
[docs]
def add_model_characteristics(
data, model_char_path=None, encode=True, ids_path=None
) -> pd.DataFrame:
"""
Adds model characteristics to the dataset.
Args:
data (pd.DataFrame): The input dataset.
model_char_path (str, optional): Path to the model characteristics file. Defaults to internal path.
encode (bool, optional): Whether to one-hot encode categorical characteristics. Defaults to True.
ids_path (str, optional): Path to an additional ID mapping file. Defaults to None.
Returns:
pd.DataFrame: The dataset with model characteristics added.
"""
if model_char_path is None:
model_char_path = os.path.join(
os.path.dirname(__file__), "data_files", "AIS_model_characteristics.csv"
)
model_chars = pd.read_csv(model_char_path)
all_data = pd.merge(data, model_chars, on="model", how="left")
existing_char_columns = [
"Ocean forcing",
"Ocean sensitivity",
"Ice shelf fracture",
] # These are the columns that are already in the data and should not be encoded
# if 'Ocean forcing' not in data.columns:
# if ids_path is None:
# raise ValueError("ids must be provided if 'Ocean forcing' is not in the data.")
# else:
# ids = json.load(open(ids_path, 'r'))
if encode:
all_data = pd.get_dummies(
all_data,
columns=[
x
for x in model_chars.columns
if x
not in [
"initial_year",
"model",
"Scenario",
]
]
+ existing_char_columns,
)
return all_data
[docs]
def backfill_outliers(data, percentile=99.999):
"""
Replaces extreme values in y-values (above the specified percentile and below the 1-percentile across all y-values)
with the value from the next row (bfill). Trailing outliers at the end of the series will remain as NaN.
Args:
data (pd.DataFrame): The dataset containing y-values.
percentile (float, optional): The percentile threshold to define upper extreme values. Defaults to 99.999.
Returns:
pd.DataFrame: The dataset with extreme values replaced using backfill.
"""
# Assuming y-values are in columns named with 'sle' as mentioned in other methods
y_columns = [col for col in data.columns if "sle" in col]
# Concatenate all y-values to compute the overall upper and lower percentile thresholds
all_y_values = pd.concat([data[col].dropna() for col in y_columns])
upper_threshold = np.percentile(all_y_values, percentile)
lower_threshold = np.percentile(all_y_values, 100 - percentile)
# Iterate over each y-column to backfill outliers based on the overall upper and lower thresholds
for col in y_columns:
upper_extreme_value_mask = data[col] > upper_threshold
lower_extreme_value_mask = data[col] < lower_threshold
# Temporarily replace upper and lower extreme values with NaN
data.loc[upper_extreme_value_mask, col] = np.nan
data.loc[lower_extreme_value_mask, col] = np.nan
# Use backfill method to fill NaN values
data[col] = data[col].bfill()
return data
[docs]
def add_lag_variables(data: pd.DataFrame, lag: int, verbose=True) -> pd.DataFrame:
"""
Adds lagged variables to the input dataset, creating time-shifted versions of the predictor variables.
Args:
data (pd.DataFrame): The dataset containing time series data.
lag (int): The number of time steps to lag the variables.
verbose (bool, optional): Whether to display a progress bar. Defaults to True.
Returns:
pd.DataFrame: The dataset with lagged variables added.
"""
# Separate columns that won't be lagged and shouldn't be dropped
cols_to_exclude = [
x
for x in data.columns
if x
not in (
"year",
"pr_anomaly",
"evspsbl_anomaly",
"mrro_anomaly",
"smb_anomaly",
"ts_anomaly",
"thermal_forcing",
"salinity",
"temperature",
"aST",
"aSMB",
"basin_runoff",
)
]
cols_to_exclude = [x for x in cols_to_exclude if x in data.columns]
temporal_indicator = "time" if "time" in data.columns else "year"
non_temporal_cols = [temporal_indicator] + [
x for x in data.columns if "sle" in x or x in cols_to_exclude
]
projection_length = 86
# Initialize a list to collect the processed DataFrames
processed_segments = []
# Calculate the number of segments
num_segments = len(data) // projection_length
if len(data) % projection_length != 0:
warnings.warn(
f"Data length {len(data)} is not divisible by projection_length "
f"{projection_length}; dropping {len(data) % projection_length} trailing rows."
)
if verbose:
iterator = tqdm(range(num_segments), total=num_segments, desc="Adding lag variables")
else:
iterator = range(num_segments)
for segment_idx in iterator:
# Extract the segment
segment_start = segment_idx * projection_length
segment_end = (segment_idx + 1) * projection_length
segment = data.iloc[segment_start:segment_end, :]
# Separate the segment into lagged and non-lagged parts
non_lagged_data = segment[non_temporal_cols]
base_temporal_columns = segment.drop(columns=non_temporal_cols)
lags = []
# Generate lagged variables for the segment
for shift in range(1, lag + 1):
lag_columns = base_temporal_columns.shift(shift).add_suffix(f".lag{shift}")
# Fill missing values caused by shifting
lag_columns.bfill(inplace=True)
lags.append(lag_columns)
full_segment_data = pd.concat(
[
non_lagged_data.reset_index(drop=True),
base_temporal_columns.reset_index(drop=True),
pd.concat(lags, axis=1).reset_index(drop=True),
],
axis=1,
)
# Store the processed segment
processed_segments.append(full_segment_data)
# Concatenate all processed segments into a single DataFrame
final_data = pd.concat(processed_segments, axis=0)
return final_data
[docs]
def exclude_fetish_models(data: pd.DataFrame, exclude: str = "both") -> pd.DataFrame:
"""
Excludes specific models from the dataset.
Args:
data (pd.DataFrame): The input DataFrame.
Returns:
pd.DataFrame: The filtered DataFrame.
"""
if exclude == "16km":
return data[data.model != "fETISh_16km"]
elif exclude == "32km":
return data[data.model != "fETISh_32km"]
elif exclude == "both":
return data[(data.model != "fETISh_16km") & (data.model != "fETISh_32km")]
else:
raise ValueError("exclude must be '16km', '32km', or 'both'")
[docs]
def fill_mrro_nans(data: pd.DataFrame, method) -> pd.DataFrame:
"""
Fills the NaN values in the specified columns with the given method.
Args:
data (pd.DataFrame): The input DataFrame.
method (str or int): The method to fill NaN values. Must be one of 'zero', 'mean', 'median', or 'drop'.
Returns:
pd.DataFrame: The DataFrame with NaN values filled according to the specified method.
Raises:
ValueError: If the method is not one of 'zero', 'mean', 'median', or 'drop'.
"""
mrro_columns = [x for x in data.columns if "mrro" in x]
if method.lower() == "zero" or method.lower() == "0" or method == 0:
for col in mrro_columns:
data[col] = data[col].fillna(0)
elif method.lower() == "mean":
for col in mrro_columns:
data[col] = data[col].fillna(data[col].mean())
elif method.lower() == "median":
for col in mrro_columns:
data[col] = data[col].fillna(data[col].median())
elif method.lower() == "drop":
data = data.dropna(subset=mrro_columns)
elif method.lower() == "mean_by_year":
data["mrro_anomaly"] = data.groupby("year")["mrro_anomaly"].transform(
lambda x: x.fillna(x.mean())
)
else:
raise ValueError("method must be 'zero', 'mean', 'median', or 'drop'")
return data
[docs]
def split_training_data(
data, train_size, val_size, test_size=None, output_directory=None, random_state=1
):
"""
Splits the dataset into training, validation, and test sets.
Args:
data (str or pd.DataFrame): The dataset or path to the dataset to be split.
train_size (float): Proportion of data to use for training.
val_size (float): Proportion of data to use for validation.
test_size (float, optional): Proportion of data to use for testing. Defaults to the remainder.
output_directory (str, optional): Directory to save the split datasets as CSV files. Defaults to None.
random_state (int, optional): Seed for reproducibility. Defaults to 1.
Returns:
tuple: Training, validation, and test datasets as pandas DataFrames.
Raises:
ValueError: If the dataset length is not divisible by 86, indicating incomplete projections.
ValueError: If the dataset does not contain an 'id' column.
"""
if isinstance(data, str):
data = pd.read_csv(data)
elif not isinstance(data, pd.DataFrame):
raise ValueError("data must be a path (str) or a pandas DataFrame")
if not len(data) % 86 == 0:
warnings.warn(
"Length of data must be divisible by 86, if not there are incomplete projections."
)
if "id" not in data.columns:
raise ValueError("data must have a column named 'id'")
total_ids = data["id"].unique()
rng = np.random.default_rng(random_state)
rng.shuffle(total_ids)
train_ids = total_ids[: int(len(total_ids) * train_size)]
val_ids = total_ids[
int(len(total_ids) * train_size) : int(len(total_ids) * (train_size + val_size))
]
test_ids = total_ids[int(len(total_ids) * (train_size + val_size)) :]
split_data = {
"train_ids": list(train_ids),
"val_ids": list(val_ids),
"test_ids": list(test_ids),
}
if output_directory is not None:
with open(f"{output_directory}/ids.json", "w") as file:
json.dump(split_data, file)
train = data[data["id"].isin(train_ids)]
val = data[data["id"].isin(val_ids)]
test = data[data["id"].isin(test_ids)]
if output_directory is not None:
train.to_csv(f"{output_directory}/train.csv", index=False)
val.to_csv(f"{output_directory}/val.csv", index=False)
test.to_csv(f"{output_directory}/test.csv", index=False)
return train, val, test
[docs]
def drop_outliers(
data: pd.DataFrame,
column: str,
method: str,
expression: list[tuple] | None = None,
quantiles: list[float] = [0.01, 0.99],
):
"""
Removes outliers from the dataset based on a specified method.
Args:
data (pd.DataFrame): The dataset containing the column with potential outliers.
column (str): The column to assess for outliers.
method (str): The method of outlier detection ('quantile' or 'explicit').
expression (list of tuples, optional): A list of conditions in the format [(column, operator, value)] for explicit filtering. Defaults to None.
quantiles (list of float, optional): Quantiles for filtering when using the 'quantile' method. Defaults to [0.01, 0.99].
Returns:
pd.DataFrame: The dataset with outliers removed.
Raises:
AttributeError: If the method is 'quantile' but no quantiles are provided.
AttributeError: If the method is 'explicit' but no expression is provided.
ValueError: If the operator in the expression is not recognized.
"""
# Check if method is quantile
if method.lower() == "quantile":
if quantiles is None:
raise AttributeError("If method == quantile, quantiles argument cannot be None")
# Calculate lower and upper quantiles
lower_sle, upper_sle = np.quantile(np.array(data[column]), quantiles)
# Filter outlier data based on quantiles
outlier_data = data[(data[column] <= lower_sle) | (data[column] >= upper_sle)]
# Check if method is explicit
elif method.lower() == "explicit":
if expression is None:
raise AttributeError("If method == explicit, expression argument cannot be None")
elif not isinstance(expression, list) or not isinstance(expression[0], tuple):
raise AttributeError(
'Expression argument must be a list of tuples, e.g. [("sle", ">", 20), ("sle", "<", -20)]'
)
outlier_data = data.copy()
# Apply subset expressions to filter outlier data
subset_dfs = []
for subset_expression in expression:
column, operator, value = subset_expression
if operator.lower() in ("equal", "equals", "=", "=="):
outlier_dataframe = outlier_data[outlier_data[column] == value]
elif operator.lower() in ("not equal", "not equals", "!=", "~="):
outlier_dataframe = outlier_data[outlier_data[column] != value]
elif operator.lower() in ("greater than", "greater", ">=", ">"):
outlier_dataframe = outlier_data[outlier_data[column] > value]
elif operator.lower() in ("less than", "less", "<=", "<"):
outlier_dataframe = outlier_data[outlier_data[column] < value]
else:
raise ValueError(f'Operator must be in ["==", "!=", ">", "<"], received {operator}')
subset_dfs.append(outlier_dataframe)
outlier_data = pd.concat(subset_dfs)
# Check if outlier_data is empty
if outlier_data.empty:
return data
# Create dataframe of experiments with outliers (want to delete the entire 86 rows)
outlier_runs = pd.DataFrame()
# TODO: Check to see if this works
outlier_runs["modelname"] = outlier_data["model"]
outlier_runs["exp_id"] = outlier_data["exp"]
try:
outlier_runs["sector"] = outlier_data["sector"]
sectors = True
except KeyError:
sectors = False
outlier_runs_list = outlier_runs.values.tolist()
unique_outliers = [list(x) for x in set(tuple(x) for x in outlier_runs_list)]
data["outlier"] = False
# Drop those runs
for i in tqdm(unique_outliers, total=len(unique_outliers), desc="Dropping outliers"):
modelname = i[0]
exp_id = i[1]
if sectors:
sector = i[2]
data.loc[
(data.model == modelname) & (data.exp == exp_id) & (data.sector == sector),
"outlier",
] = True
else:
data.loc[(data.model == modelname) & (data.exp == exp_id), "outlier"] = True
data = data[~data["outlier"]]
return data