Source code for ml_grid.pipeline.grid_search_cross_validate

import time
import traceback
import warnings
from typing import Any, Dict, List, Optional, Union

import keras
import numpy as np
import pandas as pd
import tensorflow as tf
from IPython.display import clear_output
from numpy import absolute, mean, std
from scikeras.wrappers import KerasClassifier
from sklearn import metrics
from IPython.display import display
from xgboost.core import XGBoostError

# from sklearn.utils.testing import ignore_warnings
from sklearn.exceptions import ConvergenceWarning
from sklearn.metrics import *
from sklearn.metrics import (
    classification_report,
    f1_score,
    make_scorer,
    matthews_corrcoef,
    roc_auc_score,
)
from sklearn.model_selection import (
    GridSearchCV,
    ParameterGrid,
    RandomizedSearchCV,
    RepeatedKFold,
    cross_validate,
)

from ml_grid.model_classes.keras_classifier_class import kerasClassifier_class
from ml_grid.pipeline.hyperparameter_search import HyperparameterSearch
from ml_grid.util.debug_print_statements import debug_print_statements_class
from ml_grid.util.global_params import global_parameters
from ml_grid.util.project_score_save import project_score_save_class
from ml_grid.util.validate_parameters import validate_parameters_helper
from sklearn.preprocessing import MinMaxScaler
from ml_grid.util.bayes_utils import calculate_combinations
from skopt.space import Categorical

[docs] class grid_search_crossvalidate: def __init__( self, algorithm_implementation: Any, parameter_space: Union[Dict, List[Dict]], method_name: str, ml_grid_object: Any, sub_sample_parameter_val: int = 100, ): """Initializes and runs a cross-validated hyperparameter search. This class takes a given algorithm, its parameter space, and data from the main pipeline object to perform either a grid search, randomized search, or Bayesian search for the best hyperparameters. It then logs the results. Args: algorithm_implementation (Any): The scikit-learn compatible estimator instance. parameter_space (Union[Dict, List[Dict]]): The dictionary or list of dictionaries defining the hyperparameter search space. method_name (str): The name of the algorithm method. ml_grid_object (Any): The main pipeline object containing all data (X_train, y_train, etc.) and parameters for the current iteration. sub_sample_parameter_val (int, optional): A value used to limit the number of iterations in a randomized search. Defaults to 100. """ warnings.filterwarnings("ignore") warnings.filterwarnings("ignore", category=FutureWarning) warnings.filterwarnings("ignore", category=ConvergenceWarning) warnings.filterwarnings("ignore", category=UserWarning)
[docs] self.global_params = global_parameters
[docs] self.verbose = self.global_params.verbose
if self.verbose < 8: print(f"Clearing ") clear_output(wait=True)
[docs] self.sub_sample_param_space_pct = self.global_params.sub_sample_param_space_pct
random_grid_search = self.global_params.random_grid_search
[docs] self.sub_sample_parameter_val = sub_sample_parameter_val
grid_n_jobs = self.global_params.grid_n_jobs # Configure GPU usage and job limits for specific models if "keras" in method_name.lower() or "xgb" in method_name.lower() or "catboost" in method_name.lower(): grid_n_jobs = 1 try: gpu_devices = tf.config.experimental.list_physical_devices("GPU") for device in gpu_devices: tf.config.experimental.set_memory_growth(device, True) except Exception as e: print(f"Could not configure GPU for TensorFlow: {e}")
[docs] self.metric_list = self.global_params.metric_list
[docs] self.error_raise = self.global_params.error_raise
if self.verbose >= 3: print(f"crossvalidating {method_name}")
[docs] self.global_parameters = global_parameters
[docs] self.ml_grid_object_iter = ml_grid_object
[docs] self.X_train = self.ml_grid_object_iter.X_train
[docs] self.y_train = self.ml_grid_object_iter.y_train
[docs] self.X_test = self.ml_grid_object_iter.X_test
[docs] self.y_test = self.ml_grid_object_iter.y_test
[docs] self.X_test_orig = self.ml_grid_object_iter.X_test_orig
[docs] self.y_test_orig = self.ml_grid_object_iter.y_test_orig
max_param_space_iter_value = self.global_params.max_param_space_iter_value # hard limit on param space exploration if "svc" in method_name.lower(): self.X_train = scale_data(self.X_train) self.X_test = scale_data(self.X_test)
[docs] self.cv = RepeatedKFold( n_splits=max(2, min(len(self.X_train), 2) + 1), n_repeats=2, random_state=1 )
start = time.time() current_algorithm = algorithm_implementation if self.verbose >= 1: print(f"algorithm_implementation: {algorithm_implementation}") parameters = parameter_space if(self.global_params.bayessearch is False): n_iter_v = np.nan else: n_iter_v = 2 # if(sub_sample_param_space): # sub_sample_param_space_n = int(sub_sample_param_space_pct * len(ParameterGrid(parameter_space))) # parameter_space random.sample(ParameterGrid(parameter_space), sub_sample_param_space_n) # Grid search over hyperparameter space, randomised. if(ml_grid_object.verbose >= 3): print("algorithm_implementation: ", algorithm_implementation, " type: ", type(algorithm_implementation), ) if(self.global_params.bayessearch is False): # Validate parameters parameters = validate_parameters_helper( algorithm_implementation=algorithm_implementation, parameters=parameters, ml_grid_object=ml_grid_object, ) # if random_grid_search: # # n_iter_v = int(self.sub_sample_param_space_pct * len(ParameterGrid(parameter_space))) + 2 # n_iter_v = int(len(ParameterGrid(parameter_space))) + 2 # if self.sub_sample_parameter_val < n_iter_v: # n_iter_v = self.sub_sample_parameter_val # if n_iter_v < 2: # print("warn n_iter_v < 2") # n_iter_v = 2 # if n_iter_v > max_param_space_iter_value: # print(f"Warn n_iter_v > max_param_space_iter_value, setting {max_param_space_iter_value}") # n_iter_v = max_param_space_iter_value # grid = RandomizedSearchCV( # current_algorithm, # parameters, # verbose=1, # cv=[(slice(None), slice(None))], # n_jobs=grid_n_jobs, # n_iter=n_iter_v, # # error_score=np.nan, # error_score="raise", # ) # else: # grid = GridSearchCV( # current_algorithm, # parameters, # verbose=1, # cv=[(slice(None), slice(None))], # n_jobs=grid_n_jobs, # error_score=np.nan, # ) # Negate CV in param search for speed if not self.global_parameters.bayessearch: pg = ParameterGrid(parameter_space) pg = len(pg) else: pg = calculate_combinations(parameter_space, steps=n_iter_v) #untested n iter v #print(f"Approximate number of combinations: {approx_combinations}") # print(pg) if (random_grid_search and n_iter_v > 100000) or ( random_grid_search == False and pg > 100000 ): print("grid too large", str(pg), str(n_iter_v)) print("Warning grid too large, ", str(pg)) # raise Exception("grid too large", str(pg)) if self.global_parameters.verbose >= 1: if random_grid_search: print( f"Randomized parameter grid size for {current_algorithm} \n : Full: {pg}, (mean * {self.sub_sample_param_space_pct}): {self.sub_sample_parameter_val}, current: {n_iter_v} " ) else: print(f"parameter grid size: Full: {pg}") #grid.fit(self.X_train, self.y_train) if self.global_parameters.bayessearch: n_iter_v = pg + 2 else: n_iter_v = int(len(ParameterGrid(parameter_space))) + 2 #review relevance and value if self.sub_sample_parameter_val < n_iter_v: n_iter_v = self.sub_sample_parameter_val if n_iter_v < 2: print("warn n_iter_v < 2") n_iter_v = 2 if n_iter_v > max_param_space_iter_value: print(f"Warn n_iter_v > max_param_space_iter_value, setting {max_param_space_iter_value}") n_iter_v = max_param_space_iter_value print("n_iter_v = ", n_iter_v) # Instantiate and run the hyperparameter grid/random search search = HyperparameterSearch( algorithm=current_algorithm, parameter_space=parameter_space, method_name=method_name, global_params=self.global_parameters, sub_sample_pct=self.sub_sample_param_space_pct, # Explore 50% of the parameter space max_iter=n_iter_v, # Maximum iterations for randomized search ml_grid_object=ml_grid_object ) if self.global_parameters.verbose >= 3: print("Running hyperparameter search") try: current_algorithm = search.run_search(self.X_train, self.y_train) except XGBoostError as e: if 'cuda' in str(e).lower() or 'memory' in str(e).lower(): print("GPU memory error detected, falling back to CPU...") # Change the tree_method in parameter_space dynamically if isinstance(parameter_space, list): for param_dict in parameter_space: if 'tree_method' in param_dict: param_dict['tree_method'] = Categorical(['hist']) if self.global_params.bayessearch else ["hist"] elif isinstance(parameter_space, dict) and 'tree_method' in parameter_space: parameter_space['tree_method'] = Categorical(['hist']) if self.global_params.bayessearch else ["hist"] search = HyperparameterSearch( algorithm=current_algorithm, parameter_space=parameter_space, method_name=method_name, global_params=self.global_parameters, sub_sample_pct=self.sub_sample_param_space_pct, max_iter=n_iter_v, ml_grid_object=ml_grid_object ) # Try again with non-gpu method. current_algorithm = search.run_search(self.X_train, self.y_train) else: print("unknown xgb error") print(e) except Exception as e: print(e) print("Failed to run search in gridsearch cross validate") if self.global_parameters.verbose >= 3: print("Fitting final model") #current_algorithm = grid.best_estimator_ current_algorithm.fit(self.X_train, self.y_train) metric_list = self.metric_list # Catch only one class present AUC not defined: #dummy_auc_scorer = make_scorer(dummy_auc) if len(np.unique(self.y_train)) < 2: raise ValueError("Only one class present in y_train. ROC AUC score is not defined in that case. grid_search_cross_validate>>>cross_validate") if self.global_parameters.verbose >= 1: print("Getting cross validation scores") print(self.X_train.shape, self.y_train.shape) print("y_train value counts:") print(self.y_train.value_counts()) # Set a time threshold in seconds time_threshold = 60 # For example, 60 seconds start_time = time.time() # Define default scores (e.g., mean score of 0.5 for binary classification) # Default scores if cross-validation fails default_scores = { 'test_accuracy': [0.5], # Default to random classifier performance (0.5 for binary classification) 'test_f1': [0.5], # Default F1 score (again, 0.5 for random classification) 'test_auc': [0.5], # Default ROC AUC score (0.5 for random classifier) #is only auc not roc_auc? 'fit_time': [0], # No fitting time if the model fails 'score_time': [0], # No scoring time if the model fails 'train_score': [0.5], # Default train score 'test_recall':[0.5] #'test_auc': [0.5] # ? } failed = False try: # Perform the cross-validation scores = cross_validate( current_algorithm, self.X_train, self.y_train, scoring=self.metric_list, cv=self.cv, n_jobs=grid_n_jobs, # Full CV on final best model pre_dispatch=80, error_score=self.error_raise, # Raise error if cross-validation fails ) except XGBoostError as e: if 'cuda' in str(e).lower() or 'memory' in str(e).lower(): print("GPU memory error detected, falling back to CPU...") current_algorithm.set_params(tree_method='hist') try: scores = cross_validate( current_algorithm, self.X_train, self.y_train, scoring=self.metric_list, cv=self.cv, n_jobs=grid_n_jobs, # Full CV on final best model pre_dispatch=80, error_score=self.error_raise, # Raise error if cross-validation fails ) except Exception as e: print(f"An unexpected error occurred during cross-validation attempt 2: {e}") print("Returning default scores") failed = True scores = default_scores # Use default scores for other errors except ValueError as e: # Handle specific ValueError if AdaBoostClassifier fails due to poor performance if "BaseClassifier in AdaBoostClassifier ensemble is worse than random" in str(e): print(f"AdaBoostClassifier failed: {e}") print("Skipping AdaBoostClassifier due to poor base classifier performance.") # Set default scores if the AdaBoostClassifier fails scores = default_scores # Use default scores else: print(f"An unexpected error occurred during cross-validation: {e}") scores = default_scores # Use default scores for other errors except Exception as e: # Catch any other general exceptions and log them print(f"An error occurred during cross-validation: {e}") scores = default_scores # Use default scores if an error occurs # End the timer end_time = time.time() # Calculate elapsed time elapsed_time = end_time - start_time if self.global_parameters.verbose >= 1: # Print a warning if the execution time exceeds the threshold if elapsed_time > time_threshold: print(f"Warning: Cross-validation took too long ({elapsed_time:.2f} seconds). Consider optimizing the parameters or reducing CV folds.") else: print(f"Cross-validation {method_name} completed in {elapsed_time:.2f} seconds.") current_algorithm_scores = scores # scores_tuple_list.append((method_name, current_algorithm_scores, grid)) if self.global_parameters.verbose >= 4: debug_print_statements_class(scores).debug_print_scores() plot_auc = False if plot_auc: # This was passing a classifier trained on the test dataset.... print(" ") # plot_auc_results(current_algorithm, self.X_test_orig[self.X_train.columns], self.y_test_orig, self.cv) # plot_auc_results(grid.best_estimator_, X_test_orig, self.y_test_orig, cv) # this should be x_test...? best_pred_orig = current_algorithm.predict(self.X_test) # exp project_score_save_class.update_score_log( ml_grid_object=ml_grid_object, scores=scores, best_pred_orig=best_pred_orig, current_algorithm=current_algorithm, method_name=method_name, pg=pg, start=start, n_iter_v=n_iter_v, failed=failed ) # calculate metric for optimisation auc = metrics.roc_auc_score(self.y_test, best_pred_orig)
[docs] self.grid_search_cross_validate_score_result = auc
[docs] def dummy_auc() -> float: """Returns a constant AUC score of 0.5. This function is intended as a placeholder or for use in scenarios where a valid AUC score cannot be calculated but a value is required. Returns: float: A constant value of 0.5. """ return 0.5
# Create a scorer using make_scorer #dummy_auc_scorer = make_scorer(dummy_auc)
[docs] def scale_data(X_train: pd.DataFrame) -> pd.DataFrame: """Scales the data to a [0, 1] range if it's not already scaled. Args: X_train (pd.DataFrame): Training features. Returns: pd.DataFrame: Scaled training features. """ # Initialize MinMaxScaler scaler = MinMaxScaler(feature_range=(0, 1)) # Check if data is already scaled min_val = X_train.min().min() max_val = X_train.max().max() # If data is not scaled, then scale it if (min_val < 0 or max_val > 1): # Fit and transform the data X_train_scaled = pd.DataFrame(scaler.fit_transform(X_train), columns=X_train.columns) return X_train_scaled else: # If data is already scaled, return it as is return X_train