from typing import Any, Dict, List
import numpy as np
import keras
from aeon.classification.deep_learning import EncoderClassifier
from skopt.space import Categorical
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.base import BaseEstimator, TransformerMixin
from ml_grid.pipeline.data import pipe
[docs]
class TimeSeriesStandardScaler(BaseEstimator, TransformerMixin):
def __init__(self, epsilon=1e-6):
[docs]
self.scaler = StandardScaler()
[docs]
self.imputer = SimpleImputer(strategy="median")
[docs]
def fit(self, X, y=None):
n_samples, n_dims, n_timesteps = X.shape
# Reshape to (n_samples * n_timesteps, n_dims) to scale each dimension
X_reshaped = X.transpose(0, 2, 1).reshape(-1, n_dims)
# Handle infs/nans before anything else
X_reshaped = np.where(np.isinf(X_reshaped), np.nan, X_reshaped)
# Impute missing values before scaling
X_imputed = self.imputer.fit_transform(X_reshaped)
self.scaler.fit(X_imputed)
# If scale is near zero (constant feature), use 1.0 to avoid division by zero and value explosion
self.scaler.scale_ = np.where(
self.scaler.scale_ < self.epsilon, 1.0, self.scaler.scale_
)
return self
[docs]
class EncoderClassifierWrapper(EncoderClassifier):
[docs]
def fit(self, X, y, **kwargs):
if isinstance(self.kernel_size, tuple):
self.kernel_size = list(self.kernel_size)
if isinstance(self.n_filters, tuple):
self.n_filters = list(self.n_filters)
return super().fit(X, y, **kwargs)
def _fit(self, X, y):
if self.metrics is None:
self._metrics = []
elif isinstance(self.metrics, str):
self._metrics = [self.metrics]
else:
self._metrics = self.metrics
# Clone optimizer to ensure a fresh instance for each fit/fold
if hasattr(self, "optimizer") and isinstance(
self.optimizer, keras.optimizers.Optimizer
):
self.optimizer = self.optimizer.from_config(self.optimizer.get_config())
return super()._fit(X, y)
def _predict_proba(self, X, **kwargs):
# Safety net: intercept NaNs in probability outputs
proba = super()._predict_proba(X, **kwargs)
if np.isnan(proba).any():
# Replace NaNs with uniform probability (1/n_classes)
n_classes = len(self.classes_)
uniform_prob = 1.0 / n_classes
proba = np.where(np.isnan(proba), uniform_prob, proba)
# Normalize rows to sum to 1
row_sums = proba.sum(axis=1)
# Avoid division by zero if a row is all zeros
row_sums[row_sums == 0] = 1
proba = proba / row_sums[:, np.newaxis]
return proba
def _predict(self, X, **kwargs):
# Use our safe predict_proba to generate predictions
probs = self._predict_proba(X, **kwargs)
return np.array([self.classes_[np.argmax(prob)] for prob in probs])
[docs]
class EncoderClassifier_class:
"""A wrapper for the aeon EncoderClassifier time-series classifier.
This class provides a consistent interface for the EncoderClassifier,
including defining a hyperparameter search space.
Attributes:
algorithm_implementation: An instance of the aeon EncoderClassifier.
method_name (str): The name of the classifier method.
parameter_space (Dict[str, List[Any]]): The hyperparameter search space
for the classifier.
"""
[docs]
algorithm_implementation: EncoderClassifier
[docs]
parameter_space: Dict[str, List[Any]]
def __init__(self, ml_grid_object: pipe):
"""Initializes the EncoderClassifier_class.
Args:
ml_grid_object (pipe): An instance of the main data pipeline object.
"""
random_state_val = ml_grid_object.global_params.random_state_val
# Dynamically constrain kernel_size based on the length of the time series.
# aeon format is (n_samples, n_dims, n_timesteps)
n_timesteps = ml_grid_object.X_train.shape[2]
encoder_model = EncoderClassifierWrapper()
self.algorithm_implementation = Pipeline(
[("scaler", TimeSeriesStandardScaler()), ("model", encoder_model)]
)
self.method_name = "EncoderClassifier"
if getattr(ml_grid_object.global_params, "test_mode", False):
self.parameter_space = [
{
"model__n_epochs": [1],
"model__n_filters": [(32,)],
"model__kernel_size": [(3,)],
"model__fc_units": [32],
"model__verbose": [0],
"model__optimizer": [keras.optimizers.Adam(learning_rate=0.001)],
}
]
return
# Common parameters for all configurations
common_params_bayes = {
"max_pool_size": Categorical([2, 3]),
"activation": Categorical(["sigmoid", "relu", "tanh"]),
"dropout_proba": Categorical([0.0, 0.2, 0.5]),
"padding": Categorical(["same", "valid"]),
"strides": Categorical([1, 2]),
"fc_units": Categorical([128, 256, 512]),
"save_best_model": [True],
"save_last_model": [False],
"random_state": [random_state_val],
"use_mini_batch_size": Categorical([False]),
"optimizer": Categorical(
[
keras.optimizers.Adam(learning_rate=0.00001, clipnorm=1.0),
keras.optimizers.SGD(learning_rate=0.00001, clipnorm=1.0),
]
),
}
common_params_grid = {
"max_pool_size": [2, 3],
"activation": ["sigmoid", "relu", "tanh"],
"dropout_proba": [0.0, 0.2, 0.5],
"padding": ["same", "valid"],
"strides": [1, 2],
"fc_units": [128, 256, 512],
"save_best_model": [False],
"save_last_model": [False],
"random_state": [random_state_val],
"use_mini_batch_size": [False],
"optimizer": [
keras.optimizers.Adam(learning_rate=0.00001, clipnorm=1.0),
keras.optimizers.SGD(learning_rate=0.00001, clipnorm=1.0),
],
}
# Define the full potential parameter space
full_param_space_bayes = [
{
"kernel_size": Categorical([(5,), (11,), (21,)]),
"n_filters": Categorical([(128,), (256,), (512,)]),
**common_params_bayes,
},
{
"kernel_size": Categorical([(5, 11), (5, 21), (11, 21)]),
"n_filters": Categorical([(128, 256), (128, 512), (256, 512)]),
**common_params_bayes,
},
{
"kernel_size": Categorical([(5, 11, 21)]),
"n_filters": Categorical([(128, 256, 512)]),
**common_params_bayes,
},
]
full_param_space_grid = [
{
"kernel_size": [(5,), (11,), (21,)],
"n_filters": [(128,), (256,), (512,)],
**common_params_grid,
},
{
"kernel_size": [(5, 11), (5, 21), (11, 21)],
"n_filters": [(128, 256), (128, 512), (256, 512)],
**common_params_grid,
},
{
"kernel_size": [(5, 11, 21)],
"n_filters": [(128, 256, 512)],
**common_params_grid,
},
]
# Filter the spaces to only include valid kernel sizes
filtered_space_bayes = []
for config in full_param_space_bayes:
valid_kernels = [
ks
for ks in config["kernel_size"].categories
if all(k <= n_timesteps for k in ks)
]
if valid_kernels:
new_config = config.copy()
new_config["kernel_size"] = Categorical(valid_kernels)
prefixed_config = {
f"model__{key}": value for key, value in new_config.items()
}
filtered_space_bayes.append(prefixed_config)
filtered_space_grid = []
for config in full_param_space_grid:
valid_kernels = [
ks for ks in config["kernel_size"] if all(k <= n_timesteps for k in ks)
]
if valid_kernels:
new_config = config.copy()
new_config["kernel_size"] = valid_kernels
prefixed_config = {
f"model__{key}": value for key, value in new_config.items()
}
filtered_space_grid.append(prefixed_config)
if ml_grid_object.global_params.bayessearch:
self.parameter_space = (
filtered_space_bayes if filtered_space_bayes else [{}]
)
else:
self.parameter_space = filtered_space_grid if filtered_space_grid else [{}]