import time
import logging
import multiprocessing
import joblib
import warnings
from typing import Any, Dict, List, Optional, Union
import numpy as np
import pandas as pd
import tensorflow as tf
import torch
from IPython.display import clear_output
from scikeras.wrappers import KerasClassifier
import sklearn
from sklearn import metrics
from pandas.testing import assert_index_equal
from xgboost.core import XGBoostError
from ml_grid.model_classes.H2OAutoMLClassifier import H2OAutoMLClassifier
from ml_grid.model_classes.H2OGBMClassifier import H2OGBMClassifier
from ml_grid.model_classes.H2ODRFClassifier import H2ODRFClassifier
from ml_grid.model_classes.H2OGAMClassifier import H2OGAMClassifier
from ml_grid.model_classes.H2ODeepLearningClassifier import H2ODeepLearningClassifier
from ml_grid.model_classes.H2OGLMClassifier import H2OGLMClassifier
from ml_grid.model_classes.H2ONaiveBayesClassifier import H2ONaiveBayesClassifier
from ml_grid.model_classes.H2ORuleFitClassifier import H2ORuleFitClassifier
from ml_grid.model_classes.H2OXGBoostClassifier import H2OXGBoostClassifier
from ml_grid.model_classes.H2OStackedEnsembleClassifier import (
H2OStackedEnsembleClassifier,
)
from ml_grid.model_classes.NeuralNetworkKerasClassifier import NeuralNetworkClassifier
# from sklearn.utils.testing import ignore_warnings
from sklearn.exceptions import ConvergenceWarning
from sklearn.metrics import *
from sklearn.model_selection import (
ParameterGrid,
RepeatedKFold,
KFold,
cross_validate,
)
from ml_grid.model_classes.keras_classifier_class import KerasClassifierClass
from ml_grid.pipeline.hyperparameter_search import HyperparameterSearch
from ml_grid.util.debug_print_statements import debug_print_statements_class
from ml_grid.util.global_params import global_parameters
from ml_grid.util.project_score_save import project_score_save_class
from ml_grid.util.validate_parameters import validate_parameters_helper
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from ml_grid.util.bayes_utils import is_skopt_space
from skopt.space import Categorical
# Global flag to ensure TensorFlow/GPU setup runs only once per process
_TF_INITIALIZED = False
# Define H2O model types at module level for reuse
[docs]
H2O_MODEL_TYPES = (
H2OAutoMLClassifier,
H2OGBMClassifier,
H2ODRFClassifier,
H2OGAMClassifier,
H2ODeepLearningClassifier,
H2OGLMClassifier,
H2ONaiveBayesClassifier,
H2ORuleFitClassifier,
H2OXGBoostClassifier,
H2OStackedEnsembleClassifier,
)
# Disable TF Traceback Filtering to reduce overhead in Keras model building
try:
tf.debugging.disable_traceback_filtering()
except (AttributeError, ImportError):
pass
[docs]
class grid_search_crossvalidate:
def __init__(
self,
algorithm_implementation: Any,
parameter_space: Union[Dict, List[Dict]],
method_name: str,
ml_grid_object: Any,
sub_sample_parameter_val: int = 100,
project_score_save_class_instance: Optional[project_score_save_class] = None,
):
"""Initializes and runs a cross-validated hyperparameter search.
Performs grid, randomized, or Bayesian search for hyperparameters and logs results.
Args:
algorithm_implementation (Any): The scikit-learn compatible estimator
instance.
parameter_space (Union[Dict, List[Dict]]): The dictionary or list of
dictionaries defining the hyperparameter search space.
method_name (str): The name of the algorithm method.
ml_grid_object (Any): The main pipeline object containing all data
(X_train, y_train, etc.) and parameters for the current
iteration.
sub_sample_parameter_val (int, optional): A value used to limit
the number of iterations in a randomized search. Defaults to 100.
project_score_save_class_instance (Optional[project_score_save_class], optional):
An instance of the score saving class. Defaults to None.
"""
# Set warning filters
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=ConvergenceWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
[docs]
self.logger = logging.getLogger("ml_grid")
[docs]
self.global_params = global_parameters
[docs]
self.verbose = self.global_params.verbose
if self.verbose < 8:
self.logger.debug("Clearing output.")
clear_output(wait=True)
[docs]
self.project_score_save_class_instance = project_score_save_class_instance
[docs]
self.sub_sample_param_space_pct = self.global_params.sub_sample_param_space_pct
random_grid_search = self.global_params.random_grid_search
[docs]
self.sub_sample_parameter_val = sub_sample_parameter_val
# Detect nested parallelism: force n_jobs=1 if running inside a worker process
if multiprocessing.current_process().daemon:
self.global_params.grid_n_jobs = 1
grid_n_jobs = 1
else:
grid_n_jobs = self.global_params.grid_n_jobs
# Configure GPU usage and job limits for specific models
is_gpu_model = (
"keras" in method_name.lower()
or "xgb" in method_name.lower()
or "catboost" in method_name.lower()
or "neural" in method_name.lower()
)
is_h2o_model = isinstance(algorithm_implementation, H2O_MODEL_TYPES)
global _TF_INITIALIZED
if is_gpu_model or is_h2o_model:
grid_n_jobs = 1
# Disable H2O Progress Bar to save time
if is_h2o_model:
try:
import h2o
h2o.no_progress()
except ImportError:
pass
except Exception:
pass
# One-time TF/GPU Setup
if is_gpu_model and not _TF_INITIALIZED:
try:
gpu_devices = tf.config.experimental.list_physical_devices("GPU")
if gpu_devices:
for device in gpu_devices:
try:
tf.config.experimental.set_memory_growth(device, True)
except RuntimeError:
pass
else:
# Explicitly set CPU as the visible device for TensorFlow to avoid CUDA init errors
tf.config.set_visible_devices([], "GPU")
tf.config.run_functions_eagerly(False)
except Exception as e:
self.logger.warning(f"Could not configure GPU for TensorFlow: {e}")
finally:
_TF_INITIALIZED = True
[docs]
self.metric_list = self.global_params.metric_list
[docs]
self.error_raise = self.global_params.error_raise
if self.verbose >= 3:
self.logger.info(f"Cross-validating {method_name}")
[docs]
self.global_parameters = global_parameters
[docs]
self.ml_grid_object_iter = ml_grid_object
[docs]
self.X_train = self.ml_grid_object_iter.X_train
[docs]
self.y_train = self.ml_grid_object_iter.y_train
[docs]
self.X_test = self.ml_grid_object_iter.X_test
[docs]
self.y_test = self.ml_grid_object_iter.y_test
[docs]
self.X_test_orig = self.ml_grid_object_iter.X_test_orig
[docs]
self.y_test_orig = self.ml_grid_object_iter.y_test_orig
# Ensure X_train is a DataFrame
if not isinstance(self.X_train, pd.DataFrame):
self.X_train = pd.DataFrame(self.X_train).rename(columns=str)
# Ensure y_train is a Series with aligned index
if not isinstance(self.y_train, (pd.Series, pd.DataFrame)):
self.y_train = pd.Series(self.y_train, index=self.X_train.index)
# Enforce consistent target variable name for H2O compatibility
self.y_train.name = "outcome"
# Drop ID column if present
if "client_idcode" in self.X_train.columns:
self.logger.debug("Dropping 'client_idcode' from training data.")
self.X_train = self.X_train.drop(columns=["client_idcode"], errors="ignore")
if isinstance(self.X_test, pd.DataFrame):
self.X_test = self.X_test.drop(
columns=["client_idcode"], errors="ignore"
)
if isinstance(self.X_test_orig, pd.DataFrame):
self.X_test_orig = self.X_test_orig.drop(
columns=["client_idcode"], errors="ignore"
)
max_param_space_iter_value = ( # hard limit on param space exploration
self.global_params.max_param_space_iter_value
)
# Allow local override for max_param_space_iter_value
if (
self.ml_grid_object_iter.local_param_dict.get("max_param_space_iter_value")
is not None
):
max_param_space_iter_value = self.ml_grid_object_iter.local_param_dict.get(
"max_param_space_iter_value"
)
if "svc" in method_name.lower():
self.logger.info(
"Applying StandardScaler for SVC to prevent convergence issues."
)
scaler = StandardScaler()
self.X_train = pd.DataFrame(
scaler.fit_transform(self.X_train),
columns=self.X_train.columns,
index=self.X_train.index,
)
self.X_test = pd.DataFrame(
scaler.transform(self.X_test),
columns=self.X_test.columns,
index=self.X_test.index,
)
self.X_test_orig = pd.DataFrame(
scaler.transform(self.X_test_orig),
columns=self.X_test_orig.columns,
index=self.X_test_orig.index,
)
# Optimize y_test and y_test_orig to reduce metric calculation overhead
self.y_test = self._optimize_y(self.y_test)
self.y_test_orig = self._optimize_y(self.y_test_orig)
# Use faster CV strategy in test mode
if getattr(self.global_parameters, "test_mode", False):
self.logger.info("Test mode enabled. Using fast KFold(n_splits=2) for CV.")
self.cv = KFold(n_splits=2, shuffle=True, random_state=1)
else:
# Use the full, robust CV strategy for production runs
self.cv = RepeatedKFold(
# Using 2 splits for faster iteration and larger training folds.
n_splits=2,
n_repeats=2,
random_state=1,
)
start = time.time()
current_algorithm = algorithm_implementation
# Silence verbose models to keep logs clean and reduce I/O overhead
if hasattr(current_algorithm, "set_params"):
if "catboost" in method_name.lower():
ml_grid_object.logger.info(
"Silencing CatBoost verbose output and file writing."
)
current_algorithm.set_params(verbose=0, allow_writing_files=False)
elif "xgb" in method_name.lower():
ml_grid_object.logger.info("Silencing XGBoost verbose output.")
current_algorithm.set_params(verbosity=0)
elif "keras" in method_name.lower() or "neural" in method_name.lower():
ml_grid_object.logger.info("Silencing Keras verbose output.")
try:
current_algorithm.set_params(verbose=0)
except Exception:
pass
# Check for GPU availability and set device for torch-based models
if "simbsig" in str(type(algorithm_implementation)):
if not torch.cuda.is_available():
self.logger.info(
"No CUDA GPU detected. Forcing simbsig model to use CPU."
)
if hasattr(current_algorithm, "set_params"):
current_algorithm.set_params(device="cpu")
else:
self.logger.info(
"CUDA GPU detected. Allowing simbsig model to use GPU."
)
self.logger.debug(f"Algorithm implementation: {algorithm_implementation}")
parameters = parameter_space # Keep a reference to the original
if ml_grid_object.verbose >= 3:
self.logger.debug(
f"algorithm_implementation: {algorithm_implementation}, type: {type(algorithm_implementation)}"
)
# Validate parameters
if not self.global_params.bayessearch:
parameters = validate_parameters_helper(
algorithm_implementation=algorithm_implementation,
parameters=parameter_space,
ml_grid_object=ml_grid_object,
)
# Ensure list-based parameters are wrapped in Categorical for Bayesian search
if self.global_params.bayessearch:
self.logger.debug("Validating parameter space for Bayesian search...")
if isinstance(parameter_space, list):
for i, space in enumerate(parameter_space):
new_space = {}
for key, value in space.items():
is_list_of_choices = (
isinstance(value, (list, np.ndarray))
and value
and not isinstance(value[0], list)
)
if is_list_of_choices and not is_skopt_space(value):
self.logger.warning(
f"Auto-correcting param '{key}' for BayesSearch: wrapping list in Categorical."
)
new_space[key] = Categorical(value)
else:
new_space[key] = value
parameter_space[i] = new_space
elif isinstance(parameter_space, dict):
new_parameter_space = {}
for key, value in parameter_space.items():
is_list_of_choices = (
isinstance(value, (list, np.ndarray))
and value
and not isinstance(value[0], list)
)
if is_list_of_choices and not is_skopt_space(value):
self.logger.warning(
f"Auto-correcting param '{key}' for BayesSearch: wrapping list in Categorical."
)
new_parameter_space[key] = Categorical(value)
else:
new_parameter_space[key] = value
parameter_space = new_parameter_space
parameters = parameter_space
# Determine n_iter
try:
n_iter_v = getattr(self.global_params, "n_iter", 2)
if n_iter_v is None:
n_iter_v = 2
n_iter_v = int(n_iter_v)
except (ValueError, TypeError):
self.logger.warning(
"Invalid or missing n_iter in global_params. Defaulting to 2."
)
n_iter_v = 2
# Allow local override from run_params/local_param_dict
local_n_iter = self.ml_grid_object_iter.local_param_dict.get("n_iter")
if local_n_iter is not None:
try:
n_iter_v = int(local_n_iter)
self.logger.info(
f"Overriding global n_iter with local value: {n_iter_v}"
)
except (ValueError, TypeError):
self.logger.warning(
f"Invalid local n_iter value: {local_n_iter}. Ignoring override."
)
if max_param_space_iter_value is not None:
if n_iter_v > max_param_space_iter_value:
self.logger.info(
f"Capping n_iter ({n_iter_v}) to max_param_space_iter_value ({max_param_space_iter_value})"
)
n_iter_v = max_param_space_iter_value
# Log grid size or iterations
# Check for skopt space to avoid ParameterGrid errors
is_bayes_space = False
if isinstance(parameter_space, list):
for space in parameter_space:
if isinstance(space, dict) and any(
is_skopt_space(v) for v in space.values()
):
is_bayes_space = True
break
elif isinstance(parameter_space, dict):
if any(is_skopt_space(v) for v in parameter_space.values()):
is_bayes_space = True
if (
not self.global_params.bayessearch
and not random_grid_search
and not is_bayes_space
):
try:
pg = len(ParameterGrid(parameter_space))
self.logger.info(f"Parameter grid size: {pg}")
except TypeError:
self.logger.warning(
"Could not calculate ParameterGrid size (likely skopt objects)."
)
pg = "N/A"
else:
self.logger.info(f"Using n_iter={n_iter_v} for search.")
pg = "N/A"
# Dynamically adjust KNN parameter space for small datasets
if "kneighbors" in method_name.lower() or "simbsig" in method_name.lower():
self._adjust_knn_parameters(parameter_space)
self.logger.debug(
"Adjusted KNN n_neighbors parameter space to prevent errors on small CV folds."
)
# Check if dataset is too small for CatBoost
if "catboost" in method_name.lower():
min_samples_required = 10 # CatBoost needs a reasonable amount of data
if len(self.X_train) < min_samples_required:
self.logger.warning(
f"Dataset too small for CatBoost ({len(self.X_train)} samples < {min_samples_required} required). "
f"Skipping {method_name}."
)
# Return early with default scores
self.grid_search_cross_validate_score_result = 0.5
return
# Adjust CatBoost parameters for small datasets
if "catboost" in method_name.lower():
self._adjust_catboost_parameters(parameter_space)
self.logger.debug(
"Adjusted CatBoost subsample parameter space to prevent errors on small CV folds."
)
# Force sequential search for H2O/GPU models
original_grid_n_jobs = self.global_parameters.grid_n_jobs
if is_gpu_model or is_h2o_model:
self.global_parameters.grid_n_jobs = 1
try:
# Instantiate and run the hyperparameter grid/random search
search = HyperparameterSearch(
algorithm=current_algorithm,
parameter_space=parameters, # Use the validated/modified parameters
method_name=method_name,
global_params=self.global_parameters,
sub_sample_pct=self.sub_sample_param_space_pct, # Explore 50% of the parameter space
max_iter=n_iter_v, # Maximum iterations for randomized search
ml_grid_object=ml_grid_object,
cv=self.cv,
)
if self.global_parameters.verbose >= 3:
self.logger.debug("Running hyperparameter search")
# Define default scores early to handle timeouts in search phase
default_scores = {
"test_accuracy": np.array([0.5]),
"test_f1": np.array([0.5]),
"test_auc": np.array([0.5]),
"fit_time": np.array([0]),
"score_time": np.array([0]),
"train_score": np.array([0.5]),
"test_recall": np.array([0.5]),
}
failed = False
scores = None
# Initialize start_time early
start_time = time.time()
try:
# Verify initial index alignment
try:
assert_index_equal(self.X_train.index, self.y_train.index)
ml_grid_object.logger.debug(
"Index alignment PASSED before search.run_search"
)
except AssertionError:
ml_grid_object.logger.error(
"Index alignment FAILED before search.run_search"
)
raise
# Ensure y_train is a Series for consistency
if not isinstance(self.y_train, pd.Series):
ml_grid_object.logger.error(
f"y_train is not a pandas Series, but {type(self.y_train)}. Converting to Series."
)
self.y_train = pd.Series(self.y_train, index=self.X_train.index)
# Reset indices for integer-based indexing
X_train_reset = self.X_train.reset_index(drop=True)
y_train_reset = self.y_train.reset_index(drop=True)
ml_grid_object.logger.debug(
f"X_train index after reset: {X_train_reset.index[:5]}"
)
ml_grid_object.logger.debug(
f"y_train index after reset: {y_train_reset.index[:5]}"
)
# Convert y to numpy for ALL models
y_train_search = self._optimize_y(y_train_reset)
if not is_h2o_model:
X_train_search = X_train_reset.values
else:
X_train_search = X_train_reset
# Skip parameter validation overhead
with sklearn.config_context(skip_parameter_validation=True):
# Pass reset data to search
if is_h2o_model:
try:
import h2o
h2o.no_progress()
except Exception:
pass
# Force threading backend for search
with joblib.parallel_backend("threading"):
current_algorithm = search.run_search(
X_train_search, y_train_search
)
except TimeoutError:
self.logger.warning("Timeout occurred during hyperparameter search.")
failed = "Timeout"
scores = default_scores
except KeyboardInterrupt:
if "catboost" in method_name.lower():
self.logger.warning(
"KeyboardInterrupt detected during hyperparameter search. "
"This is likely a signal handling artifact (e.g. from CatBoost) triggered by the timeout. "
"Treating as a timeout."
)
failed = "KeyboardInterrupt"
scores = default_scores
else:
raise
except Exception as e:
if "dual coefficients or intercepts are not finite" in str(e):
self.logger.warning(
f"SVC failed to fit due to data issues: {e}. Returning default score."
)
self.grid_search_cross_validate_score_result = 0.5
return
self.logger.error(
f"An exception occurred during hyperparameter search for {method_name}: {e}",
exc_info=True,
)
raise e
finally:
# Restore the original grid_n_jobs setting
self.global_parameters.grid_n_jobs = original_grid_n_jobs
# Skip final CV in test mode
if not failed and getattr(self.global_parameters, "test_mode", False):
self.logger.info(
"Test mode enabled. Skipping final cross-validation for speed."
)
self.grid_search_cross_validate_score_result = 0.5 # Return a valid float
# Final cleanup for H2O models
self._shutdown_h2o_if_needed(current_algorithm)
return
if not failed and self.global_parameters.verbose >= 3:
self.logger.debug("Fitting final model")
if not failed and self.y_train.nunique() < 2:
raise ValueError(
"Only one class present in y_train. ROC AUC score is not defined "
"in that case. grid_search_cross_validate>>>cross_validate"
)
if not failed and self.global_parameters.verbose >= 1:
self.logger.info("Getting cross validation scores")
self.logger.debug(
f"X_train shape: {self.X_train.shape}, y_train shape: {self.y_train.shape}"
)
self.logger.debug(f"y_train value counts:\n{self.y_train.value_counts()}")
# Set a time threshold in seconds
time_threshold = 60 # For example, 60 seconds
keras_model_types = (NeuralNetworkClassifier, KerasClassifierClass)
is_h2o_model = isinstance(current_algorithm, H2O_MODEL_TYPES)
is_keras_model = isinstance(current_algorithm, keras_model_types)
# H2O and Keras models require single-threaded execution for CV
final_cv_n_jobs = 1 if is_h2o_model or is_keras_model else grid_n_jobs
if final_cv_n_jobs == 1:
self.logger.debug(
"H2O or Keras model detected. Forcing n_jobs=1 for final cross-validation."
)
try:
if failed:
raise TimeoutError
if isinstance(current_algorithm, H2O_MODEL_TYPES):
X_train_final = self.X_train # Pass DataFrame directly
y_train_final = self._optimize_y(self.y_train)
else:
X_train_final = self.X_train.values # Use NumPy array for other models
y_train_final = self._optimize_y(self.y_train)
scores = None
# Check for user override to force second CV
force_second_cv = self.ml_grid_object_iter.local_param_dict.get(
"force_second_cv", getattr(self.global_params, "force_second_cv", False)
)
if force_second_cv:
self.logger.info(
"force_second_cv is True. Skipping cached result extraction to run fresh cross-validation."
)
# Check if we can reuse results from HyperparameterSearch
if (
not force_second_cv
and hasattr(current_algorithm, "cv_results_")
and hasattr(current_algorithm, "best_index_")
):
try:
self.logger.info(
"Using cached cross-validation results from HyperparameterSearch."
)
results = current_algorithm.cv_results_
index = current_algorithm.best_index_
n_splits = self.cv.get_n_splits()
temp_scores = {}
# Extract fit and score times
if "split0_fit_time" in results:
temp_scores["fit_time"] = np.array(
[
results[f"split{k}_fit_time"][index]
for k in range(n_splits)
]
)
else:
# Fallback: Use mean time repeated if split times are missing (e.g. BayesSearchCV)
temp_scores["fit_time"] = np.full(
n_splits, results["mean_fit_time"][index]
)
if "split0_score_time" in results:
temp_scores["score_time"] = np.array(
[
results[f"split{k}_score_time"][index]
for k in range(n_splits)
]
)
else:
# Fallback: Use mean score time.
default_times = np.zeros(index + 1)
temp_scores["score_time"] = np.full(
n_splits,
results.get("mean_score_time", default_times)[index],
)
# Extract metric scores
for metric in self.metric_list:
# Test scores
test_key = f"test_{metric}"
temp_scores[test_key] = np.array(
[
results[f"split{k}_test_{metric}"][index]
for k in range(n_splits)
]
)
# Train scores (if available)
train_key = f"train_{metric}"
train_col = (
f"split0_train_{metric}" # Check existence on first split
)
if train_col in results:
temp_scores[train_key] = np.array(
[
results[f"split{k}_train_{metric}"][index]
for k in range(n_splits)
]
)
scores = temp_scores
except Exception as e:
self.logger.warning(
f"Could not extract cached CV results: {e}. Falling back to standard CV."
)
scores = None
if scores is None:
if isinstance(
current_algorithm, (KerasClassifier, KerasClassifierClass)
):
self.logger.debug("Fitting Keras model with internal CV handling.")
y_train_values = (
self.y_train.values
if hasattr(self.y_train, "values")
else self.y_train
)
X_train_values = (
self.X_train.values
if hasattr(self.X_train, "values")
else self.X_train
)
current_algorithm.fit(
X_train_values, y_train_values, cv=self.cv, verbose=0
)
# Since fit already did the CV, create a dummy scores dictionary.
scores = {
"test_roc_auc": [
current_algorithm.score(self.X_test, self.y_test.values)
]
}
else:
# Skip parameter validation overhead
with sklearn.config_context(skip_parameter_validation=True):
# Ensure H2O progress is disabled before CV
if is_h2o_model:
try:
import h2o
h2o.no_progress()
except Exception:
pass
# Always use threading backend
backend = "threading"
with joblib.parallel_backend(backend):
scores = cross_validate(
current_algorithm,
X_train_final,
y_train_final, # Use optimized y (numpy for sklearn, Series for H2O)
scoring=self.metric_list,
cv=self.cv,
n_jobs=final_cv_n_jobs, # Use adjusted n_jobs
pre_dispatch="2*n_jobs",
error_score=self.error_raise, # Raise error if cross-validation fails
)
# Pre-compile the predict function for Keras/TF models
if isinstance(
current_algorithm,
(
KerasClassifier,
KerasClassifierClass,
NeuralNetworkClassifier,
),
):
try:
self.logger.debug(
"Pre-compiling TensorFlow predict function to avoid retracing."
)
n_features = self.X_train.shape[1]
# Define an input signature that allows for variable batch size.
input_signature = [
tf.TensorSpec(
shape=(None, n_features), dtype=tf.float32
)
]
# Access the underlying Keras model via .model_
current_algorithm.model_.predict.get_concrete_function(
input_signature
)
except Exception as e:
self.logger.warning(
f"Could not pre-compile TF function. Performance may be impacted. Error: {e}"
)
except XGBoostError as e:
if "cuda" in str(e).lower() or "memory" in str(e).lower():
self.logger.warning(
"GPU memory error detected during cross-validation, falling back to CPU..."
)
current_algorithm.set_params(tree_method="hist")
try:
scores = cross_validate(
current_algorithm,
X_train_final,
y_train_final, # Use optimized y
scoring=self.metric_list,
cv=self.cv,
n_jobs=final_cv_n_jobs, # Use adjusted n_jobs
pre_dispatch="2*n_jobs",
error_score=self.error_raise, # Raise error if cross-validation fails
)
except Exception as e:
self.logger.error(
f"An unexpected error occurred during cross-validation attempt 2: {e}",
exc_info=True,
)
self.logger.warning("Returning default scores")
failed = True
scores = default_scores # Use default scores for other errors
except ValueError as e:
# Handle specific ValueError if AdaBoostClassifier fails due to poor performance
if (
"BaseClassifier in AdaBoostClassifier ensemble is worse than random"
in str(e)
):
self.logger.warning(f"AdaBoostClassifier failed: {e}")
self.logger.warning(
"Skipping AdaBoostClassifier due to poor base classifier performance."
)
# Set default scores if the AdaBoostClassifier fails
failed = True
scores = default_scores # Use default scores
else:
self.logger.error(
f"An unexpected ValueError occurred during cross-validation: {e}",
exc_info=True,
)
failed = True
scores = default_scores # Use default scores for other errors
except RuntimeError as e:
self.logger.error(
f"A RuntimeError occurred during cross-validation (often H2O related): {e}",
exc_info=True,
)
self.logger.warning("Returning default scores.")
failed = True
scores = default_scores
except TimeoutError:
self.logger.warning("Timeout occurred during cross-validation.")
failed = "Timeout"
scores = default_scores
except KeyboardInterrupt:
if "catboost" in method_name.lower():
self.logger.warning(
"KeyboardInterrupt detected during cross-validation. "
"This is likely a signal handling artifact (e.g. from CatBoost) triggered by the timeout. "
"Treating as a timeout."
)
failed = "KeyboardInterrupt"
scores = default_scores
else:
raise
except Exception as e:
# Catch any other general exceptions and log them
self.logger.error(
f"An unexpected error occurred during cross-validation: {e}",
exc_info=True,
)
failed = True
scores = default_scores # Use default scores if an error occurs
# End the timer
end_time = time.time()
# Calculate elapsed time
elapsed_time = end_time - start_time
if self.global_parameters.verbose >= 1:
# Print a warning if the execution time exceeds the threshold
if elapsed_time > time_threshold:
self.logger.warning(
f"Cross-validation took too long ({elapsed_time:.2f} seconds). "
"Consider optimizing the parameters or reducing CV folds."
)
else:
self.logger.info(
f"Cross-validation for {method_name} completed in {elapsed_time:.2f} seconds."
)
if self.global_parameters.verbose >= 4:
debug_print_statements_class(scores).debug_print_scores()
plot_auc = False
if plot_auc:
# This was passing a classifier trained on the test dataset....
self.logger.debug("Plotting AUC is disabled.")
try:
best_pred_orig = current_algorithm.predict(self.X_test) # exp
except Exception:
best_pred_orig = np.zeros(len(self.X_test))
# Call the update_score_log method on the provided instance
if self.project_score_save_class_instance:
self.project_score_save_class_instance.update_score_log(
ml_grid_object=ml_grid_object,
scores=scores,
best_pred_orig=best_pred_orig,
current_algorithm=current_algorithm,
method_name=method_name,
pg=pg,
start=start,
n_iter_v=n_iter_v,
failed=failed,
)
else:
self.logger.warning(
"No project_score_save_class_instance provided. Skipping score logging."
)
# calculate metric for optimisation
try:
y_test_np = (
self.y_test.values if hasattr(self.y_test, "values") else self.y_test
)
auc = metrics.roc_auc_score(y_test_np, best_pred_orig)
except Exception:
auc = 0.5
[docs]
self.grid_search_cross_validate_score_result = auc
self._shutdown_h2o_if_needed(current_algorithm)
def _optimize_y(self, y):
"""Helper to optimize y for sklearn/H2O to reduce type_of_target overhead."""
# Convert to numpy if it's a Series or Categorical
if hasattr(y, "dtype") and isinstance(y.dtype, pd.CategoricalDtype):
y_opt = y.cat.codes.values
elif hasattr(y, "values"):
y_opt = y.values
else:
y_opt = y
# Force integer encoding
if not pd.api.types.is_integer_dtype(y_opt):
try:
y_opt = y_opt.astype(int)
except (ValueError, TypeError):
y_opt, _ = pd.factorize(y_opt, sort=True)
y_opt = y_opt.astype(int)
# Ensure contiguous array for speed in np.unique and other ops
return np.ascontiguousarray(y_opt)
def _adjust_knn_parameters(self, parameter_space: Union[Dict, List[Dict]]):
"""
Dynamically adjusts the 'n_neighbors' parameter for KNN-based models
to prevent errors on small datasets during cross-validation.
"""
self.cv.get_n_splits()
# Correctly calculate the training fold size
dummy_indices = np.arange(len(self.X_train))
train_indices, _ = next(self.cv.split(dummy_indices))
n_samples_train_fold = len(train_indices)
n_samples_test_fold = len(self.X_train) - n_samples_train_fold
max_n_neighbors = max(1, n_samples_train_fold)
self.logger.debug(
f"KNN constraints - train_fold_size={n_samples_train_fold}, "
f"test_fold_size={n_samples_test_fold}, max_n_neighbors={max_n_neighbors}"
)
def adjust_param(param_value):
if is_skopt_space(param_value):
# For skopt.space objects, adjust the upper bound
new_high = min(param_value.high, max_n_neighbors)
new_low = min(param_value.low, new_high)
param_value.high = new_high
param_value.low = new_low
self.logger.debug(
f"Adjusted skopt space: low={new_low}, high={new_high}"
)
elif isinstance(param_value, (list, np.ndarray)):
# For lists, filter the values
new_param_value = [n for n in param_value if n <= max_n_neighbors]
if not new_param_value:
self.logger.warning(
f"All n_neighbors values filtered out. Using [{max_n_neighbors}]"
)
return [max_n_neighbors]
self.logger.debug(f"Filtered n_neighbors list: {new_param_value}")
return new_param_value
return param_value
if isinstance(parameter_space, list):
for params in parameter_space:
if "n_neighbors" in params:
params["n_neighbors"] = adjust_param(params["n_neighbors"])
elif isinstance(parameter_space, dict) and "n_neighbors" in parameter_space:
parameter_space["n_neighbors"] = adjust_param(
parameter_space["n_neighbors"]
)
def _adjust_catboost_parameters(self, parameter_space: Union[Dict, List[Dict]]):
"""
Dynamically adjusts the 'subsample' parameter for CatBoost to prevent
errors on small datasets during cross-validation.
"""
n_splits = self.cv.get_n_splits()
# Correctly calculate the size of the smallest training fold.
n_samples_in_fold = len(self.X_train) - (len(self.X_train) // n_splits)
# Ensure n_samples_in_fold is at least 1 to avoid division by zero
n_samples_in_fold = max(1, n_samples_in_fold)
# If the training fold is extremely small, force subsample to 1.0
# to prevent CatBoost from failing on constant features.
if n_samples_in_fold <= 2:
min_subsample = 1.0
else:
# The minimum subsample value must be > 1/n_samples to ensure at least one sample is chosen
min_subsample = 1.0 / n_samples_in_fold
def adjust_param(param_value):
if is_skopt_space(param_value):
# For skopt.space objects (Real), adjust the lower bound
new_low = max(param_value.low, min_subsample)
# Ensure the new low is not higher than the high
if new_low > param_value.high:
new_low = param_value.high
param_value.low = new_low
# If the fold is tiny, force the entire space to be 1.0
if n_samples_in_fold <= 2:
param_value.low = param_value.high = 1.0
elif isinstance(param_value, (list, np.ndarray)):
# For lists, filter the values
new_param_value = [s for s in param_value if s >= min_subsample]
if not new_param_value:
# If all values are filtered out, use the smallest valid value
return [
(
min(p for p in param_value if p > 0)
if any(p > 0 for p in param_value)
else 1.0
)
]
return new_param_value
# If the fold is tiny, force subsample to 1.0
if n_samples_in_fold <= 2:
return [1.0] if isinstance(param_value, list) else 1.0
return param_value
if isinstance(parameter_space, list):
for params in parameter_space:
if "subsample" in params:
params["subsample"] = adjust_param(params["subsample"])
elif isinstance(parameter_space, dict) and "subsample" in parameter_space:
parameter_space["subsample"] = adjust_param(parameter_space["subsample"])
# Also adjust 'rsm' (colsample_bylevel) which can cause the same issue
if isinstance(parameter_space, list):
for params in parameter_space:
if "rsm" in params:
params["rsm"] = adjust_param(params["rsm"])
elif isinstance(parameter_space, dict) and "rsm" in parameter_space:
parameter_space["rsm"] = adjust_param(parameter_space["rsm"])
def _shutdown_h2o_if_needed(self, algorithm: Any):
"""Safely shuts down the H2O cluster if the algorithm is an H2O model."""
# Use the module-level tuple
if isinstance(algorithm, H2O_MODEL_TYPES):
import h2o
cluster = h2o.cluster()
if cluster and cluster.is_running():
self.logger.info(
"H2O model finished. Leaving cluster running for next H2O model."
)
[docs]
def dummy_auc() -> float:
"""Returns a constant AUC score of 0.5.
This function is intended as a placeholder or for use in scenarios where
a valid AUC score cannot be calculated but a value is required.
Returns:
float: A constant value of 0.5.
"""
return 0.5
# Create a scorer using make_scorer
# dummy_auc_scorer = make_scorer(dummy_auc)
[docs]
def scale_data(X_train: pd.DataFrame) -> pd.DataFrame:
"""Scales the data to a [0, 1] range if it's not already scaled.
Args:
X_train (pd.DataFrame): Training features.
Returns:
pd.DataFrame: Scaled training features.
"""
# Initialize MinMaxScaler
scaler = MinMaxScaler(feature_range=(0, 1))
# Check if data is already scaled
min_val = X_train.min().min()
max_val = X_train.max().max()
# If data is not scaled, then scale it
if min_val < 0 or max_val > 1:
# Fit and transform the data
X_train_scaled = pd.DataFrame(
scaler.fit_transform(X_train), columns=X_train.columns
)
return X_train_scaled
else:
# If data is already scaled, return it as is
return X_train