from typing import Any, Dict, List
import numpy as np
import keras
from sklearn.base import BaseEstimator, ClassifierMixin, TransformerMixin
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from skopt.space import Categorical
from ml_grid.pipeline.data import pipe
from ml_grid.util.param_space import ParamSpace
class _DummyClassifier(BaseEstimator, ClassifierMixin):
"""A dummy classifier to act as a placeholder for a missing model."""
def __init__(self, kernel_size=None, filter_sizes=None, layers=None, **kwargs):
self.kernel_size = kernel_size
self.filter_sizes = filter_sizes
self.layers = layers
def fit(self, X, y, **kwargs):
self.classes_ = np.unique(y)
return self
def predict(self, X, **kwargs):
return np.zeros(len(X), dtype=int)
def predict_proba(self, X, **kwargs):
proba = np.zeros((len(X), len(self.classes_)))
if proba.shape[1] > 0:
proba[:, 0] = 1.0
return proba
def _fit(self, X, y):
return self.fit(X, y)
def _predict(self, X, **kwargs):
return self.predict(X, **kwargs)
def _predict_proba(self, X, **kwargs):
return self.predict_proba(X, **kwargs)
try:
from aeon.classification.deep_learning import TapNetClassifier
except ImportError:
try:
from aeon.classification.deep_learning._tapnet import TapNetClassifier
except ImportError:
[docs]
TapNetClassifier = _DummyClassifier
[docs]
class TimeSeriesStandardScaler(BaseEstimator, TransformerMixin):
def __init__(self, epsilon=1e-6):
[docs]
self.scaler = StandardScaler()
[docs]
self.imputer = SimpleImputer(strategy="median")
[docs]
def fit(self, X, y=None):
n_samples, n_dims, n_timesteps = X.shape
# Reshape to (n_samples * n_timesteps, n_dims) to scale each dimension
X_reshaped = X.transpose(0, 2, 1).reshape(-1, n_dims)
# Handle infs/nans before anything else
X_reshaped = np.where(np.isinf(X_reshaped), np.nan, X_reshaped)
# Impute missing values before scaling
X_imputed = self.imputer.fit_transform(X_reshaped)
self.scaler.fit(X_imputed)
# If scale is near zero (constant feature), use 1.0 to avoid division by zero and value explosion
self.scaler.scale_ = np.where(
self.scaler.scale_ < self.epsilon, 1.0, self.scaler.scale_
)
return self
[docs]
class TapNetClassifierWrapper(TapNetClassifier):
[docs]
def fit(self, X, y, **kwargs):
if isinstance(self.kernel_size, tuple):
self.kernel_size = list(self.kernel_size)
if isinstance(self.filter_sizes, tuple):
self.filter_sizes = list(self.filter_sizes)
if isinstance(self.layers, tuple):
self.layers = list(self.layers)
return super().fit(X, y, **kwargs)
def _fit(self, X, y):
if self.metrics is None:
self._metrics = []
elif isinstance(self.metrics, str):
self._metrics = [self.metrics]
else:
self._metrics = self.metrics
# Clone optimizer to ensure a fresh instance for each fit/fold
if hasattr(self, "optimizer") and isinstance(
self.optimizer, keras.optimizers.Optimizer
):
self.optimizer = self.optimizer.from_config(self.optimizer.get_config())
return super()._fit(X, y)
def _predict_proba(self, X, **kwargs):
# Safety net: intercept NaNs in probability outputs
proba = super()._predict_proba(X, **kwargs)
if np.isnan(proba).any():
# Replace NaNs with uniform probability (1/n_classes)
n_classes = len(self.classes_)
uniform_prob = 1.0 / n_classes
proba = np.where(np.isnan(proba), uniform_prob, proba)
# Normalize rows to sum to 1
row_sums = proba.sum(axis=1)
# Avoid division by zero if a row is all zeros
row_sums[row_sums == 0] = 1
proba = proba / row_sums[:, np.newaxis]
return proba
def _predict(self, X, **kwargs):
# Use our safe predict_proba to generate predictions
probs = self._predict_proba(X, **kwargs)
return np.array([self.classes_[np.argmax(prob)] for prob in probs])
[docs]
class TapNetClassifier_class:
"""A wrapper for the aeon TapNetClassifier time-series classifier.
This class provides a consistent interface for the TapNetClassifier,
including defining a hyperparameter search space.
Attributes:
algorithm_implementation: An instance of the aeon TapNetClassifier.
method_name (str): The name of the classifier method.
parameter_space (Dict[str, List[Any]]): The hyperparameter search space
for the classifier.
"""
[docs]
algorithm_implementation: TapNetClassifier
[docs]
parameter_space: Dict[str, List[Any]]
def __init__(self, ml_grid_object: pipe):
"""Initializes the TapNetClassifier_class.
Args:
ml_grid_object (pipe): An instance of the main data pipeline object.
"""
verbose_param = ml_grid_object.verbose
param_space = ParamSpace(
ml_grid_object.local_param_dict.get("param_space_size")
)
log_epoch = param_space.param_dict.get("log_epoch")
if isinstance(log_epoch, list):
log_epoch = log_epoch[0]
random_state_val = ml_grid_object.global_params.random_state_val
tapnet_model = TapNetClassifierWrapper()
self.algorithm_implementation = Pipeline(
[("scaler", TimeSeriesStandardScaler()), ("model", tapnet_model)]
)
self.method_name = "TapNetClassifier"
if getattr(ml_grid_object.global_params, "test_mode", False):
self.parameter_space = {
"model__n_epochs": [1],
"model__layers": [(64, 64)],
"model__filter_sizes": [(64, 64)],
"model__kernel_size": [(3, 3)],
"model__verbose": [0],
"model__optimizer": [keras.optimizers.Adam(learning_rate=0.001)],
}
return
if ml_grid_object.global_params.bayessearch:
base_params = {
"filter_sizes": Categorical([(256, 256, 128), (128, 128, 64)]),
"kernel_size": Categorical([(8, 5, 3), (4, 3, 2)]),
"layers": Categorical([(500, 300, 100), (400, 200, 100)]),
"n_epochs": log_epoch,
"batch_size": Categorical([16, 32]),
"dropout": Categorical([0.5, 0.3, 0.2]),
"use_mini_batch_size": Categorical([False]),
"dilation": Categorical([1]),
"activation": Categorical(["sigmoid", "relu"]),
"loss": Categorical(
["binary_crossentropy", "categorical_crossentropy"]
),
"optimizer": Categorical(
[
keras.optimizers.Adam(learning_rate=0.00001, clipnorm=1.0),
keras.optimizers.SGD(learning_rate=0.00001, clipnorm=1.0),
]
),
"use_bias": Categorical([True, False]),
"use_rp": Categorical([True, False]),
"use_att": Categorical([True, False]),
"use_lstm": Categorical([True, False]),
"use_cnn": Categorical([True, False]),
"verbose": Categorical([verbose_param]),
"random_state": Categorical([random_state_val]),
}
self.parameter_space = {
f"model__{key}": value for key, value in base_params.items()
}
else:
base_params = {
"filter_sizes": [(256, 256, 128), (128, 128, 64)],
"kernel_size": [(8, 5, 3), (4, 3, 2)],
"layers": [(500, 300, 100), (400, 200, 100)],
"n_epochs": [100],
"batch_size": [16, 32],
"dropout": [0.5, 0.3, 0.2],
"use_mini_batch_size": [False],
"dilation": [1],
"activation": ["sigmoid", "relu"],
"loss": ["binary_crossentropy", "categorical_crossentropy"],
"optimizer": [
keras.optimizers.Adam(learning_rate=0.00001, clipnorm=1.0),
keras.optimizers.SGD(learning_rate=0.00001, clipnorm=1.0),
],
"use_bias": [True, False],
"use_rp": [True, False],
"use_att": [True, False],
"use_lstm": [True, False],
"use_cnn": [True, False],
"verbose": [verbose_param],
"random_state": [random_state_val],
}
self.parameter_space = {
f"model__{key}": value for key, value in base_params.items()
}