import time
import traceback
import logging
import warnings
from typing import Any, Dict, List, Optional, Union
import keras
import numpy as np
import pandas as pd
import tensorflow as tf
import torch
from IPython.display import clear_output
from numpy import absolute, mean, std
from scikeras.wrappers import KerasClassifier
from sklearn import metrics
from IPython.display import display
from catboost import CatBoostError
from pandas.testing import assert_index_equal
from xgboost.core import XGBoostError
from ml_grid.model_classes.H2OAutoMLClassifier import H2OAutoMLClassifier
from ml_grid.model_classes.H2OGBMClassifier import H2OGBMClassifier
from ml_grid.model_classes.H2ODRFClassifier import H2ODRFClassifier
from ml_grid.model_classes.H2OGAMClassifier import H2OGAMClassifier
from ml_grid.model_classes.H2ODeepLearningClassifier import H2ODeepLearningClassifier
from ml_grid.model_classes.H2OGLMClassifier import H2OGLMClassifier
from ml_grid.model_classes.H2ONaiveBayesClassifier import H2ONaiveBayesClassifier
from ml_grid.model_classes.H2ORuleFitClassifier import H2ORuleFitClassifier
from ml_grid.model_classes.H2OXGBoostClassifier import H2OXGBoostClassifier
from ml_grid.model_classes.H2OStackedEnsembleClassifier import (
H2OStackedEnsembleClassifier,
)
from ml_grid.model_classes.NeuralNetworkKerasClassifier import NeuralNetworkClassifier
# from sklearn.utils.testing import ignore_warnings
from sklearn.exceptions import ConvergenceWarning
from sklearn.metrics import *
from sklearn.metrics import (
classification_report,
f1_score,
make_scorer,
matthews_corrcoef,
roc_auc_score,
)
from sklearn.model_selection import (
GridSearchCV,
ParameterGrid,
RandomizedSearchCV,
RepeatedKFold,
KFold,
cross_validate,
)
from ml_grid.model_classes.keras_classifier_class import KerasClassifierClass
from ml_grid.pipeline.hyperparameter_search import HyperparameterSearch
from ml_grid.util.debug_print_statements import debug_print_statements_class
from ml_grid.util.global_params import global_parameters
from ml_grid.util.project_score_save import project_score_save_class
from ml_grid.util.validate_parameters import validate_parameters_helper
from sklearn.preprocessing import MinMaxScaler
from ml_grid.util.bayes_utils import calculate_combinations, is_skopt_space
from skopt.space import Categorical
[docs]
class grid_search_crossvalidate:
def __init__(
self,
algorithm_implementation: Any,
parameter_space: Union[Dict, List[Dict]],
method_name: str,
ml_grid_object: Any,
sub_sample_parameter_val: int = 100,
project_score_save_class_instance: Optional[project_score_save_class] = None,
):
"""Initializes and runs a cross-validated hyperparameter search.
This class takes a given algorithm, its parameter space, and data from
the main pipeline object to perform either a grid search, randomized
search, or Bayesian search for the best hyperparameters. It then logs
the results.
Args:
algorithm_implementation (Any): The scikit-learn compatible estimator
instance.
parameter_space (Union[Dict, List[Dict]]): The dictionary or list of
dictionaries defining the hyperparameter search space.
method_name (str): The name of the algorithm method.
ml_grid_object (Any): The main pipeline object containing all data
(X_train, y_train, etc.) and parameters for the current
iteration.
sub_sample_parameter_val (int, optional): A value used to limit
the number of iterations in a randomized search. Defaults to 100.
project_score_save_class_instance (Optional[project_score_save_class], optional):
An instance of the score saving class. Defaults to None.
"""
# Set each warning filter individually for robustness
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=ConvergenceWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
[docs]
self.logger = logging.getLogger("ml_grid")
[docs]
self.global_params = global_parameters
[docs]
self.verbose = self.global_params.verbose
if self.verbose < 8:
self.logger.debug("Clearing output.")
clear_output(wait=True)
[docs]
self.project_score_save_class_instance = project_score_save_class_instance
[docs]
self.sub_sample_param_space_pct = self.global_params.sub_sample_param_space_pct
random_grid_search = self.global_params.random_grid_search
[docs]
self.sub_sample_parameter_val = sub_sample_parameter_val
grid_n_jobs = self.global_params.grid_n_jobs
# Configure GPU usage and job limits for specific models
is_gpu_model = (
"keras" in method_name.lower()
or "xgb" in method_name.lower()
or "catboost" in method_name.lower()
)
if is_gpu_model:
grid_n_jobs = 1
try:
gpu_devices = tf.config.experimental.list_physical_devices("GPU")
if gpu_devices:
for device in gpu_devices:
tf.config.experimental.set_memory_growth(device, True)
else:
# Explicitly set CPU as the visible device for TensorFlow to avoid CUDA init errors
tf.config.set_visible_devices([], "GPU")
except Exception as e:
self.logger.warning(f"Could not configure GPU for TensorFlow: {e}")
[docs]
self.metric_list = self.global_params.metric_list
[docs]
self.error_raise = self.global_params.error_raise
if self.verbose >= 3:
self.logger.info(f"Cross-validating {method_name}")
[docs]
self.global_parameters = global_parameters
[docs]
self.ml_grid_object_iter = ml_grid_object
[docs]
self.X_train = self.ml_grid_object_iter.X_train
[docs]
self.y_train = self.ml_grid_object_iter.y_train
[docs]
self.X_test = self.ml_grid_object_iter.X_test
[docs]
self.y_test = self.ml_grid_object_iter.y_test
[docs]
self.X_test_orig = self.ml_grid_object_iter.X_test_orig
[docs]
self.y_test_orig = self.ml_grid_object_iter.y_test_orig
# --- ROBUST DATA TYPE HANDLING ---
# Ensure X_train is a pandas DataFrame and y_train is a pandas Series
# with aligned indices. This handles inputs being numpy arrays (from tests)
# or pandas objects, preventing AttributeError and ensuring consistency.
# 1. Ensure X_train is a DataFrame.
if not isinstance(self.X_train, pd.DataFrame):
self.X_train = pd.DataFrame(self.X_train).rename(columns=str)
# 2. Ensure y_train is a Series, using X_train's index for alignment.
if not isinstance(self.y_train, (pd.Series, pd.DataFrame)):
self.y_train = pd.Series(self.y_train, index=self.X_train.index)
# 3. Ensure target is categorical for classification models (especially H2O).
self.y_train = self.y_train.astype("category")
# --- CRITICAL FIX for H2O Stacked Ensemble response column mismatch ---
# Enforce a consistent name for the target variable series. This prevents
# the "response_column must match" error in H2O StackedEnsemble.
self.y_train.name = "outcome"
max_param_space_iter_value = (
self.global_params.max_param_space_iter_value
) # hard limit on param space exploration
if "svc" in method_name.lower():
self.X_train = scale_data(self.X_train)
self.X_test = scale_data(self.X_test)
# --- PERFORMANCE FIX for testing ---
# Use a much faster CV strategy when in test_mode.
# This MUST be defined before HyperparameterSearch is instantiated.
if getattr(self.global_parameters, "test_mode", False):
self.logger.info("Test mode enabled. Using fast KFold(n_splits=2) for CV.")
self.cv = KFold(n_splits=2, shuffle=True, random_state=1)
else:
# Use the full, robust CV strategy for production runs
self.cv = RepeatedKFold(
# Using 2 splits for faster iteration and larger training folds.
n_splits=2,
n_repeats=2,
random_state=1,
)
start = time.time()
current_algorithm = algorithm_implementation
# Silence verbose models like CatBoost to keep logs clean
if "catboost" in method_name.lower() and hasattr(
current_algorithm, "set_params"
):
ml_grid_object.logger.info("Silencing CatBoost verbose output.")
current_algorithm.set_params(verbose=0)
# Check for GPU availability and set device for torch-based models
if "simbsig" in str(type(algorithm_implementation)):
if not torch.cuda.is_available():
self.logger.info(
"No CUDA GPU detected. Forcing simbsig model to use CPU."
)
if hasattr(current_algorithm, "set_params"):
current_algorithm.set_params(device="cpu")
else:
self.logger.info(
"CUDA GPU detected. Allowing simbsig model to use GPU."
)
self.logger.debug(f"Algorithm implementation: {algorithm_implementation}")
parameters = parameter_space # Keep a reference to the original
if ml_grid_object.verbose >= 3:
self.logger.debug(
f"algorithm_implementation: {algorithm_implementation}, type: {type(algorithm_implementation)}"
)
# Validate parameters for non-Bayesian searches
if not self.global_params.bayessearch:
parameters = validate_parameters_helper(
algorithm_implementation=algorithm_implementation,
parameters=parameter_space,
ml_grid_object=ml_grid_object,
)
# --- FIX for skopt ValueError ---
# If using Bayesian search, ensure all list-based parameters are wrapped
# in skopt.space.Categorical to prevent "can only convert an array of size 1" error.
if self.global_params.bayessearch:
self.logger.debug("Validating parameter space for Bayesian search...")
if isinstance(
parameter_space, list
): # For models like LogisticRegression with multiple dicts
# This part remains the same as it handles lists of dictionaries correctly.
for i, space in enumerate(parameter_space):
new_space = {}
for key, value in space.items():
# --- REFINED FIX for skopt ValueError ---
# Check if the value is a list of potential choices that needs wrapping.
# This is true if it's a list/array, not already a skopt space,
# and its elements are not lists themselves (e.g., for H2O's 'hidden' param).
is_list_of_choices = (
isinstance(value, (list, np.ndarray))
and value
and not isinstance(value[0], list)
)
if is_list_of_choices and not is_skopt_space(value):
self.logger.warning(
f"Auto-correcting param '{key}' for BayesSearch: wrapping list in Categorical."
)
new_space[key] = Categorical(value)
else: # It's a skopt object, a single value, or a list of lists (like for 'hidden')
new_space[key] = value
parameter_space[i] = new_space
elif isinstance(parameter_space, dict): # For standard single-dict spaces
# This is the key change: iterate and build a new dictionary
# to avoid issues with modifying a dictionary while iterating.
new_parameter_space = {}
for key, value in parameter_space.items():
# --- REFINED FIX for skopt ValueError ---
is_list_of_choices = (
isinstance(value, (list, np.ndarray))
and value
and not isinstance(value[0], list)
)
if is_list_of_choices and not is_skopt_space(value):
self.logger.warning(
f"Auto-correcting param '{key}' for BayesSearch: wrapping list in Categorical."
)
new_parameter_space[key] = Categorical(value)
else: # It's a skopt object, a single value, or a list of lists (like for 'hidden')
new_parameter_space[key] = value
parameter_space = new_parameter_space
# Use the new n_iter parameter from the config
# Default to 50 if not present, preventing AttributeError
n_iter_v = getattr(self.global_params, "n_iter", 2)
# For GridSearchCV, n_iter is not used, but we calculate the grid size for logging.
if not self.global_params.bayessearch and not random_grid_search:
pg = len(ParameterGrid(parameter_space))
self.logger.info(f"Parameter grid size: {pg}")
else:
# For Random and Bayes search, log the number of iterations
self.logger.info(f"Using n_iter={n_iter_v} for search.")
# Calculate pg for logging purposes
pg = (
len(ParameterGrid(parameter_space))
if not self.global_params.bayessearch
else "N/A"
)
# Dynamically adjust KNN parameter space for small datasets
if "kneighbors" in method_name.lower() or "simbsig" in method_name.lower():
self._adjust_knn_parameters(parameter_space)
self.logger.debug(
"Adjusted KNN n_neighbors parameter space to prevent errors on small CV folds."
)
# Check if dataset is too small for CatBoost
if "catboost" in method_name.lower():
min_samples_required = 10 # CatBoost needs a reasonable amount of data
if len(self.X_train) < min_samples_required:
self.logger.warning(
f"Dataset too small for CatBoost ({len(self.X_train)} samples < {min_samples_required} required). "
f"Skipping {method_name}."
)
# Return early with default scores
self.grid_search_cross_validate_score_result = 0.5
return
# Dynamically adjust CatBoost subsample parameter for small datasets
if "catboost" in method_name.lower():
self._adjust_catboost_parameters(parameter_space)
self.logger.debug(
"Adjusted CatBoost subsample parameter space to prevent errors on small CV folds."
)
# --- CRITICAL FIX for H2OStackedEnsemble ---
# The special handling logic has been moved inside the H2OStackedEnsembleClassifier
# class itself, making it a self-contained scikit-learn meta-estimator.
# No special orchestration is needed here anymore.
# Instantiate and run the hyperparameter grid/random search
search = HyperparameterSearch(
algorithm=current_algorithm,
parameter_space=parameter_space,
method_name=method_name,
global_params=self.global_parameters,
sub_sample_pct=self.sub_sample_param_space_pct, # Explore 50% of the parameter space
max_iter=n_iter_v, # Maximum iterations for randomized search
ml_grid_object=ml_grid_object,
cv=self.cv,
)
if self.global_parameters.verbose >= 3:
self.logger.debug("Running hyperparameter search")
try:
# Verify initial index alignment
try:
assert_index_equal(self.X_train.index, self.y_train.index)
ml_grid_object.logger.debug(
"Index alignment PASSED before search.run_search"
)
except AssertionError:
ml_grid_object.logger.error(
"Index alignment FAILED before search.run_search"
)
raise
# Ensure y_train is a Series for consistency
if not isinstance(self.y_train, pd.Series):
ml_grid_object.logger.error(
f"y_train is not a pandas Series, but {type(self.y_train)}. Converting to Series."
)
self.y_train = pd.Series(self.y_train, index=self.X_train.index)
# CRITICAL FIX: Reset indices to ensure integer-based indexing for sklearn
# This prevents "String indexing is not supported with 'axis=0'" errors
X_train_reset = self.X_train.reset_index(drop=True)
y_train_reset = self.y_train.reset_index(drop=True)
ml_grid_object.logger.debug(
f"X_train index after reset: {X_train_reset.index[:5]}"
)
ml_grid_object.logger.debug(
f"y_train index after reset: {y_train_reset.index[:5]}"
)
# Pass reset data to search
current_algorithm = search.run_search(X_train_reset, y_train_reset)
except Exception as e:
# Log the error and re-raise it to stop the entire execution,
# allowing the main loop in main.py to handle it based on error_raise.
self.logger.error(
f"An exception occurred during hyperparameter search for {method_name}: {e}",
exc_info=True,
)
raise e
# --- PERFORMANCE FIX for testing ---
# If in test_mode, we have already verified that the search runs without crashing.
# We can skip the final, slow cross-validation and return a dummy score.
if getattr(self.global_parameters, "test_mode", False):
self.logger.info(
"Test mode enabled. Skipping final cross-validation for speed."
)
self.grid_search_cross_validate_score_result = 0.5 # Return a valid float
# Final cleanup for H2O models
self._shutdown_h2o_if_needed(current_algorithm)
return
if self.global_parameters.verbose >= 3:
self.logger.debug("Fitting final model")
# In production, we re-fit the best estimator on the full training data before CV.
# In test_mode, the estimator from the search is already fitted, and re-fitting
# can invalidate complex models like H2OStackedEnsemble before the final assert.
metric_list = self.metric_list
# Catch only one class present AUC not defined:
if len(np.unique(self.y_train)) < 2:
raise ValueError(
"Only one class present in y_train. ROC AUC score is not defined "
"in that case. grid_search_cross_validate>>>cross_validate"
)
if self.global_parameters.verbose >= 1:
self.logger.info("Getting cross validation scores")
self.logger.debug(
f"X_train shape: {self.X_train.shape}, y_train shape: {self.y_train.shape}"
)
self.logger.debug(f"y_train value counts:\n{self.y_train.value_counts()}")
# Set a time threshold in seconds
time_threshold = 60 # For example, 60 seconds
start_time = time.time()
# Define default scores (e.g., mean score of 0.5 for binary classification)
# Default scores if cross-validation fails
default_scores = {
"test_accuracy": [
0.5 # Default to random classifier performance
],
"test_f1": [0.5], # Default F1 score (again, 0.5 for random classification)
"test_auc": [0.5], # Default ROC AUC score (0.5 for random classifier)
"fit_time": [0], # No fitting time if the model fails
"score_time": [0], # No scoring time if the model fails
"train_score": [0.5], # Default train score
"test_recall": [0.5],
#'test_auc': [0.5] # ?
}
# --- CRITICAL FIX for H2O multiprocessing error ---
# H2O models cannot be pickled and sent to other processes for parallel
# execution with joblib. We must detect if the current algorithm is an
# H2O model and, if so, force n_jobs=1 for cross_validate.
h2o_model_types = (
H2OAutoMLClassifier,
H2OGBMClassifier,
H2ODRFClassifier,
H2OGAMClassifier,
H2ODeepLearningClassifier,
H2OGLMClassifier,
H2ONaiveBayesClassifier,
H2ORuleFitClassifier,
H2OXGBoostClassifier,
H2OStackedEnsembleClassifier,
)
# Keras/TensorFlow models also require single-threaded execution.
keras_model_types = (NeuralNetworkClassifier, KerasClassifierClass)
is_h2o_model = isinstance(current_algorithm, h2o_model_types)
is_keras_model = isinstance(current_algorithm, keras_model_types)
final_cv_n_jobs = 1 if is_h2o_model or is_keras_model else grid_n_jobs
if final_cv_n_jobs == 1:
self.logger.debug(
"H2O or Keras model detected. Forcing n_jobs=1 for final cross-validation."
)
failed = False
try:
# H2O models require pandas DataFrames with column names, while other
# sklearn models can benefit from using NumPy arrays.
if isinstance(current_algorithm, h2o_model_types):
X_train_final = self.X_train # Pass DataFrame directly
else:
X_train_final = self.X_train.values # Use NumPy array for other models
# --- FIX for UnboundLocalError ---
# Consolidate Keras and non-Keras logic to ensure 'scores' is always assigned.
if isinstance(current_algorithm, (KerasClassifier, KerasClassifierClass)):
self.logger.debug("Fitting Keras model with internal CV handling.")
y_train_values = self.y_train.values
current_algorithm.fit(self.X_train, y_train_values, cv=self.cv)
# Since fit already did the CV, create a dummy scores dictionary.
scores = {
"test_roc_auc": [
current_algorithm.score(self.X_test, self.y_test.values)
]
}
else:
# For all other models, perform standard cross-validation.
# --- FIX for UnboundLocalError ---
# Move the fit call inside the try block. If fit fails, the except
# block will catch it and assign default scores, preventing the error.
if not getattr(self.global_parameters, "test_mode", False):
# Fit on the full training data first
current_algorithm.fit(self.X_train, self.y_train)
# --- CRITICAL FIX: Pass the pandas Series, not the numpy array ---
# Passing the numpy array (y_train.to_numpy()) causes index misalignment
# with the pandas DataFrame (X_train_final) inside sklearn's CV,
# which introduces NaNs into the target column and makes H2O fail.
scores = cross_validate(
current_algorithm,
X_train_final,
self.y_train, # Pass the pandas Series to preserve index alignment
scoring=self.metric_list,
cv=self.cv,
n_jobs=final_cv_n_jobs, # Use adjusted n_jobs
pre_dispatch=80,
error_score=self.error_raise, # Raise error if cross-validation fails
)
# --- TENSORFLOW PERFORMANCE FIX (Corrected Position) ---
# Pre-compile the predict function for Keras/TF models to avoid retracing warnings.
# This is done AFTER fitting and before cross-validation.
if isinstance(
current_algorithm,
(KerasClassifier, KerasClassifierClass, NeuralNetworkClassifier),
):
try:
self.logger.debug(
"Pre-compiling TensorFlow predict function to avoid retracing."
)
n_features = self.X_train.shape[1]
# Define an input signature that allows for variable batch size.
input_signature = [
tf.TensorSpec(shape=(None, n_features), dtype=tf.float32)
]
# Access the underlying Keras model via .model_
current_algorithm.model_.predict.get_concrete_function(
input_signature
)
except Exception as e:
self.logger.warning(
f"Could not pre-compile TF function. Performance may be impacted. Error: {e}"
)
except XGBoostError as e:
if "cuda" in str(e).lower() or "memory" in str(e).lower():
self.logger.warning(
"GPU memory error detected during cross-validation, falling back to CPU..."
)
current_algorithm.set_params(tree_method="hist")
try:
scores = cross_validate(
current_algorithm,
X_train_final,
self.y_train, # Use pandas Series for consistency
scoring=self.metric_list,
cv=self.cv,
n_jobs=final_cv_n_jobs, # Use adjusted n_jobs
pre_dispatch=80,
error_score=self.error_raise, # Raise error if cross-validation fails
)
except Exception as e:
self.logger.error(
f"An unexpected error occurred during cross-validation attempt 2: {e}",
exc_info=True,
)
self.logger.warning("Returning default scores")
failed = True
scores = default_scores # Use default scores for other errors
except ValueError as e:
# Handle specific ValueError if AdaBoostClassifier fails due to poor performance
if (
"BaseClassifier in AdaBoostClassifier ensemble is worse than random"
in str(e)
):
self.logger.warning(f"AdaBoostClassifier failed: {e}")
self.logger.warning(
"Skipping AdaBoostClassifier due to poor base classifier performance."
)
# Set default scores if the AdaBoostClassifier fails
scores = default_scores # Use default scores
else:
self.logger.error(
f"An unexpected ValueError occurred during cross-validation: {e}",
exc_info=True,
)
scores = default_scores # Use default scores for other errors
except RuntimeError as e:
raise e # raise h2o errors to aid development
# --- FIX for UnboundLocalError with H2OStackedEnsemble ---
# Catch any RuntimeError, which can be raised by H2O models during fit
# (e.g., base model training failure) or predict.
self.logger.error(
f"A RuntimeError occurred during cross-validation (often H2O related): {e}",
exc_info=True,
)
self.logger.warning("Returning default scores.")
failed = True
scores = default_scores
except Exception as e:
# Catch any other general exceptions and log them
self.logger.error(
f"An unexpected error occurred during cross-validation: {e}",
exc_info=True,
)
scores = default_scores # Use default scores if an error occurs
# End the timer
end_time = time.time()
# Calculate elapsed time
elapsed_time = end_time - start_time
if self.global_parameters.verbose >= 1:
# Print a warning if the execution time exceeds the threshold
if elapsed_time > time_threshold:
self.logger.warning(
f"Cross-validation took too long ({elapsed_time:.2f} seconds). "
"Consider optimizing the parameters or reducing CV folds."
)
else:
self.logger.info(
f"Cross-validation for {method_name} completed in {elapsed_time:.2f} seconds."
)
current_algorithm_scores = scores
# scores_tuple_list.append((method_name, current_algorithm_scores, grid))
if self.global_parameters.verbose >= 4:
debug_print_statements_class(scores).debug_print_scores()
plot_auc = False
if plot_auc:
# This was passing a classifier trained on the test dataset....
self.logger.debug("Plotting AUC is disabled.")
# plot_auc_results(current_algorithm, self.X_test_orig[self.X_train.columns], self.y_test_orig, self.cv)
# plot_auc_results(grid.best_estimator_, X_test_orig, self.y_test_orig, cv)
# this should be x_test...?
best_pred_orig = current_algorithm.predict(self.X_test) # exp
# Call the update_score_log method on the provided instance
if self.project_score_save_class_instance:
self.project_score_save_class_instance.update_score_log(
ml_grid_object=ml_grid_object,
scores=scores,
best_pred_orig=best_pred_orig,
current_algorithm=current_algorithm,
method_name=method_name,
pg=pg,
start=start,
n_iter_v=n_iter_v,
failed=failed,
)
else:
self.logger.warning(
"No project_score_save_class_instance provided. Skipping score logging."
)
# calculate metric for optimisation
auc = metrics.roc_auc_score(self.y_test, best_pred_orig)
[docs]
self.grid_search_cross_validate_score_result = auc
self._shutdown_h2o_if_needed(current_algorithm)
def _adjust_knn_parameters(self, parameter_space: Union[Dict, List[Dict]]):
"""
Dynamically adjusts the 'n_neighbors' parameter for KNN-based models
to prevent errors on small datasets during cross-validation.
"""
n_splits = self.cv.get_n_splits()
# --- CRITICAL FIX: Correctly calculate the training fold size ---
# The previous calculation was incorrect for some CV strategies.
# This method is robust: create a dummy split to get the exact train fold size.
dummy_indices = np.arange(len(self.X_train))
train_indices, _ = next(self.cv.split(dummy_indices))
n_samples_train_fold = len(train_indices)
n_samples_test_fold = len(self.X_train) - n_samples_train_fold
max_n_neighbors = max(1, n_samples_train_fold)
self.logger.debug(
f"KNN constraints - train_fold_size={n_samples_train_fold}, "
f"test_fold_size={n_samples_test_fold}, max_n_neighbors={max_n_neighbors}"
)
def adjust_param(param_value):
if is_skopt_space(param_value):
# For skopt.space objects, adjust the upper bound
new_high = min(param_value.high, max_n_neighbors)
new_low = min(param_value.low, new_high)
param_value.high = new_high
param_value.low = new_low
self.logger.debug(
f"Adjusted skopt space: low={new_low}, high={new_high}"
)
elif isinstance(param_value, (list, np.ndarray)):
# For lists, filter the values
new_param_value = [n for n in param_value if n <= max_n_neighbors]
if not new_param_value:
self.logger.warning(
f"All n_neighbors values filtered out. Using [{max_n_neighbors}]"
)
return [max_n_neighbors]
self.logger.debug(f"Filtered n_neighbors list: {new_param_value}")
return new_param_value
return param_value
if isinstance(parameter_space, list):
for params in parameter_space:
if "n_neighbors" in params:
params["n_neighbors"] = adjust_param(params["n_neighbors"])
elif isinstance(parameter_space, dict) and "n_neighbors" in parameter_space:
parameter_space["n_neighbors"] = adjust_param(
parameter_space["n_neighbors"]
)
def _adjust_catboost_parameters(self, parameter_space: Union[Dict, List[Dict]]):
"""
Dynamically adjusts the 'subsample' parameter for CatBoost to prevent
errors on small datasets during cross-validation.
"""
n_splits = self.cv.get_n_splits()
# Correctly calculate the size of the smallest training fold.
n_samples_in_fold = len(self.X_train) - (len(self.X_train) // n_splits)
# Ensure n_samples_in_fold is at least 1 to avoid division by zero
n_samples_in_fold = max(1, n_samples_in_fold)
# If the training fold is extremely small, force subsample to 1.0
# to prevent CatBoost from failing on constant features.
if n_samples_in_fold <= 2:
min_subsample = 1.0
else:
# The minimum subsample value must be > 1/n_samples to ensure at least one sample is chosen
min_subsample = 1.0 / n_samples_in_fold
def adjust_param(param_value):
if is_skopt_space(param_value):
# For skopt.space objects (Real), adjust the lower bound
new_low = max(param_value.low, min_subsample)
# Ensure the new low is not higher than the high
if new_low > param_value.high:
new_low = param_value.high
param_value.low = new_low
# If the fold is tiny, force the entire space to be 1.0
if n_samples_in_fold <= 2:
param_value.low = param_value.high = 1.0
elif isinstance(param_value, (list, np.ndarray)):
# For lists, filter the values
new_param_value = [s for s in param_value if s >= min_subsample]
if not new_param_value:
# If all values are filtered out, use the smallest valid value
return [
(
min(p for p in param_value if p > 0)
if any(p > 0 for p in param_value)
else 1.0
)
]
return new_param_value
# If the fold is tiny, force subsample to 1.0
if n_samples_in_fold <= 2:
return [1.0] if isinstance(param_value, list) else 1.0
return param_value
if isinstance(parameter_space, list):
for params in parameter_space:
if "subsample" in params:
params["subsample"] = adjust_param(params["subsample"])
elif isinstance(parameter_space, dict) and "subsample" in parameter_space:
parameter_space["subsample"] = adjust_param(parameter_space["subsample"])
# Also adjust 'rsm' (colsample_bylevel) which can cause the same issue
if isinstance(parameter_space, list):
for params in parameter_space:
if "rsm" in params:
params["rsm"] = adjust_param(params["rsm"])
elif isinstance(parameter_space, dict) and "rsm" in parameter_space:
parameter_space["rsm"] = adjust_param(parameter_space["rsm"])
def _shutdown_h2o_if_needed(self, algorithm: Any):
"""Safely shuts down the H2O cluster if the algorithm is an H2O model."""
h2o_model_types = (
H2OAutoMLClassifier,
H2OGBMClassifier,
H2ODRFClassifier,
H2OGAMClassifier,
H2ODeepLearningClassifier,
H2OGLMClassifier,
H2ONaiveBayesClassifier,
H2ORuleFitClassifier,
H2OXGBoostClassifier,
H2OStackedEnsembleClassifier,
)
if isinstance(algorithm, h2o_model_types):
# --- FIX for repeated H2O cluster shutdown ---
# We no longer shut down the cluster after each model.
# The cluster is now managed globally and should be shut down
# at the end of the entire experiment run.
import h2o
cluster = h2o.cluster()
if cluster and cluster.is_running():
self.logger.info(
"H2O model finished. Leaving cluster running for next H2O model."
)
# The shutdown call was removed from H2OBaseClassifier. The cluster is managed globally.
[docs]
def dummy_auc() -> float:
"""Returns a constant AUC score of 0.5.
This function is intended as a placeholder or for use in scenarios where
a valid AUC score cannot be calculated but a value is required.
Returns:
float: A constant value of 0.5.
"""
return 0.5
# Create a scorer using make_scorer
# dummy_auc_scorer = make_scorer(dummy_auc)
[docs]
def scale_data(X_train: pd.DataFrame) -> pd.DataFrame:
"""Scales the data to a [0, 1] range if it's not already scaled.
Args:
X_train (pd.DataFrame): Training features.
Returns:
pd.DataFrame: Scaled training features.
"""
# Initialize MinMaxScaler
scaler = MinMaxScaler(feature_range=(0, 1))
# Check if data is already scaled
min_val = X_train.min().min()
max_val = X_train.max().max()
# If data is not scaled, then scale it
if min_val < 0 or max_val > 1:
# Fit and transform the data
X_train_scaled = pd.DataFrame(
scaler.fit_transform(X_train), columns=X_train.columns
)
return X_train_scaled
else:
# If data is already scaled, return it as is
return X_train