"""
Main PipelineOptimizer class implementation.
"""
from typing import Dict, List, Tuple, Optional, Union
import numpy as np
import numpy.typing as npt
import logging
import warnings
from sklearn.pipeline import Pipeline
from sklearn.linear_model import Ridge
from sklearn.model_selection import GroupShuffleSplit, LeavePGroupsOut
from sklearn.metrics import mean_squared_error
# Import Bayesian Optimization
from bayes_opt import BayesianOptimization
# Import from package modules
from spectoprep.pipeline.config import AVAILABLE_STEPS, INCOMPATIBLE_SETS, DEFAULT_PARAM_BOUNDS
from spectoprep.pipeline.builder import build_preprocessor_from_bayes
from spectoprep.modelling.ridge import OptimizedRidgeCV
from spectoprep.pipeline.utils import generate_pipeline_configurations
# Suppress warnings for cleaner output
warnings.filterwarnings("ignore")
[docs]
class PipelineOptimizer:
"""
A class for optimizing machine learning pipelines using Bayesian optimization.
It precomputes possible pipeline configurations and then searches over both
the pipeline configuration (encoded as an index) and the hyperparameters.
"""
def __init__(
self,
X_train: npt.NDArray,
y_train: npt.NDArray,
preprocessing_steps: Optional[List[str]] = None,
X_test: Optional[npt.NDArray] = None,
y_test: Optional[npt.NDArray] = None,
cv_method: str = "group_shuffle_split",
n_splits: int = 3,
test_size: float = 0.3,
n_groups_out: int = 2,
random_state: int = 42,
groups: Optional[npt.NDArray] = None,
max_pipeline_length: int = 5,
n_jobs: int = -1,
allowed_preprocess_combinations: Optional[Union[int, List[int], Tuple[int, ...]]] = [1, 2],
log_level: str = "INFO"
):
"""
Initialize the PipelineOptimizer.
Args:
X_train: Training features.
y_train: Training targets.
preprocessing_steps: List of preprocessing steps to use.
X_test: Test features (optional).
y_test: Test targets (optional).
cv_method: Either "group_shuffle_split" or "leave_p_group_out".
n_splits: Number of CV splits.
test_size: Test set fraction (if using GroupShuffleSplit).
n_groups_out: Number of groups left out (if using LeavePGroupsOut).
random_state: Random seed.
groups: Optional group labels; if None, one group per sample is used.
max_pipeline_length: Maximum number of steps in pipeline.
n_jobs: Number of parallel jobs for compatible estimators.
allowed_preprocess_combinations: Allowed lengths for preprocessing combinations.
log_level: Logging level (INFO, DEBUG, WARNING, ERROR).
"""
# Set up logging
numeric_level = getattr(logging, log_level.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError(f"Invalid log level: {log_level}")
logging.basicConfig(
level=numeric_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger("PipelineOptimizer")
# Store data attributes
self.X_train = X_train
self.y_train = np.ravel(y_train)
self.X_test = X_test
self.y_test = np.ravel(y_test) if y_test is not None else None
# Store cross-validation parameters
self.cv_method = cv_method
self.n_splits = n_splits
self.test_size = test_size
self.n_groups_out = n_groups_out
# Store other configuration
self.random_state = random_state
self.max_pipeline_length = max_pipeline_length
self.n_jobs = n_jobs
self.allowed_preprocess_combinations = allowed_preprocess_combinations
# Handle groups
if groups is not None:
self.groups = np.ravel(groups)
else:
self.groups = np.arange(self.X_train.shape[0])
# Validate and set preprocessing steps
self.preprocessing_steps = self._validate_preprocessing_steps(preprocessing_steps)
# Validate inputs
self._validate_inputs()
# Generate all valid pipeline configurations
self.all_pipelines = generate_pipeline_configurations(
self.preprocessing_steps,
INCOMPATIBLE_SETS,
self.max_pipeline_length,
self.allowed_preprocess_combinations
)
# Update pipeline config bounds
self.param_bounds = DEFAULT_PARAM_BOUNDS.copy()
self.param_bounds["pipeline_config"] = (0, len(self.all_pipelines) - 1)
# Log initialization information
self.logger.info("Initialized PipelineOptimizer")
self.logger.info(f"Number of preprocessing steps: {len(self.preprocessing_steps)}")
self.logger.info(f"Number of possible pipeline configurations: {len(self.all_pipelines)}")
self.logger.info(f"Using {cv_method} validation method")
def _validate_preprocessing_steps(self, steps: Optional[List[str]] = None) -> List[str]:
"""
Validate and standardize preprocessing steps.
Args:
steps: List of preprocessing step names
Returns:
Validated list of preprocessing step names
"""
if steps is None:
# Default steps
steps = ['scaler', 'pca', 'robust_scaler', 'select_k_best']
if isinstance(steps, str):
steps = [steps]
invalid_steps = set(steps) - set(AVAILABLE_STEPS.keys())
if invalid_steps:
raise ValueError(f"Invalid preprocessing steps: {invalid_steps}. Available: {list(AVAILABLE_STEPS.keys())}")
return list(steps)
def _validate_inputs(self) -> None:
"""
Validate input data and parameters.
Raises:
ValueError: If inputs are invalid
"""
if not isinstance(self.X_train, np.ndarray) or not isinstance(self.y_train, np.ndarray):
raise ValueError("X_train and y_train must be numpy arrays")
if self.X_train.shape[0] != self.y_train.shape[0]:
raise ValueError(f"X_train and y_train must have the same number of samples. Got {self.X_train.shape[0]} and {self.y_train.shape[0]}")
if (self.X_test is None) != (self.y_test is None):
raise ValueError("Both X_test and y_test must be provided together")
if self.X_test is not None and self.X_test.shape[1] != self.X_train.shape[1]:
raise ValueError(f"X_test and X_train must have the same number of features. Got {self.X_test.shape[1]} and {self.X_train.shape[1]}")
if self.cv_method not in ["group_shuffle_split", "leave_p_group_out"]:
raise ValueError("cv_method must be 'group_shuffle_split' or 'leave_p_group_out'")
if len(self.groups) != self.X_train.shape[0]:
raise ValueError("Groups must have the same number of samples as X_train.")
# def bayes_objective(self, **params) -> float:
# """
# Objective function for Bayesian optimization.
# Args:
# **params: Parameters to evaluate
# Returns:
# float: Negative RMSE or penalty score on error
# """
# try:
# # Extract pipeline configuration index and limit to valid range
# pipeline_config_index = int(round(params["pipeline_config"]))
# pipeline_config_index = max(0, min(pipeline_config_index, len(self.all_pipelines) - 1))
# pipeline_config = self.all_pipelines[pipeline_config_index]
# # Build pipeline steps
# steps = []
# for step in pipeline_config:
# transformer = build_preprocessor_from_bayes(
# step, params, self.X_train.shape, self.random_state, self.n_jobs
# )
# steps.append((step, transformer))
# # Add Ridge regression as the final estimator
# ridge_alpha = params["ridge_alpha"]
# steps.append(("ridge", Ridge(alpha=ridge_alpha, random_state=self.random_state)))
# pipeline = Pipeline(steps)
# # Configure cross-validation
# if self.cv_method == "group_shuffle_split":
# cv = GroupShuffleSplit(
# n_splits=self.n_splits,
# test_size=self.test_size,
# random_state=self.random_state
# )
# else:
# cv = LeavePGroupsOut(n_groups=self.n_groups_out)
# # Perform cross-validation
# all_predictions = []
# all_actuals = []
# for train_idx, val_idx in cv.split(self.X_train, self.y_train, groups=self.groups):
# try:
# X_train_fold = self.X_train[train_idx]
# X_val_fold = self.X_train[val_idx]
# y_train_fold = self.y_train[train_idx]
# y_val_fold = self.y_train[val_idx]
# # Check condition number to avoid numerical instability
# if np.linalg.cond(X_train_fold) > 1e10:
# self.logger.warning("High condition number detected. Skipping this fold.")
# continue
# pipeline.fit(X_train_fold, y_train_fold)
# preds = pipeline.predict(X_val_fold)
# preds = np.ravel(preds)
# all_predictions.extend(preds)
# all_actuals.extend(y_val_fold)
# except np.linalg.LinAlgError as e:
# self.logger.warning(f"LinAlgError in fold: {str(e)}")
# continue
# except Exception as e:
# self.logger.warning(f"Error in fold: {str(e)}")
# continue
# # Check if we have valid predictions
# if not all_predictions:
# self.logger.warning("No valid predictions. Returning penalty score.")
# return -1e6 # Penalty score
# # Calculate metrics
# rmse = np.sqrt(mean_squared_error(np.array(all_actuals), np.array(all_predictions)))
# r2 = 1 - np.sum((np.array(all_actuals) - np.array(all_predictions))**2) / np.sum((np.array(all_actuals) - np.mean(np.array(all_actuals)))**2)
# # Weighted combination of metrics to optimize (negative RMSE + small R² contribution)
# score = -rmse
# self.logger.info(f"Pipeline config: {pipeline_config}, RMSE: {rmse:.4f}, R²: {r2:.4f}")
# return score
# except Exception as e:
# self.logger.error(f"Error in bayes_objective: {str(e)}")
# return -1e6 # Penalty score for failed configurations
#updated to include test set for validation
[docs]
def bayes_objective(self, **params) -> float:
"""
Objective function for Bayesian optimization.
Args:
**params: Parameters to evaluate
Returns:
float: Negative RMSE or penalty score on error
"""
try:
# Extract pipeline configuration index and limit to valid range
pipeline_config_index = int(round(params["pipeline_config"]))
pipeline_config_index = max(0, min(pipeline_config_index, len(self.all_pipelines) - 1))
pipeline_config = self.all_pipelines[pipeline_config_index]
# Build pipeline steps
steps = []
for step in pipeline_config:
transformer = build_preprocessor_from_bayes(
step, params, self.X_train.shape, self.random_state, self.n_jobs
)
steps.append((step, transformer))
# Add Ridge regression as the final estimator
ridge_alpha = params["ridge_alpha"]
steps.append(("ridge", Ridge(alpha=ridge_alpha, random_state=self.random_state)))
pipeline = Pipeline(steps)
# If test data is available, use it directly for validation
if self.X_test is not None and self.y_test is not None:
try:
pipeline.fit(self.X_train, self.y_train)
preds = pipeline.predict(self.X_test)
preds = np.ravel(preds)
# Calculate metrics
rmse = np.sqrt(mean_squared_error(self.y_test, preds))
r2 = 1 - np.sum((self.y_test - preds)**2) / np.sum((self.y_test - np.mean(self.y_test))**2)
score = -rmse
self.logger.info(f"Pipeline config: {pipeline_config}, RMSE on test set: {rmse:.4f}, R²: {r2:.4f}")
return score
except np.linalg.LinAlgError as e:
self.logger.warning(f"LinAlgError with test set: {str(e)}")
# Fall back to cross-validation if there's an error
except Exception as e:
self.logger.warning(f"Error with test set: {str(e)}")
# Fall back to cross-validation if there's an error
# If no test data or error occurred, use cross-validation
# Configure cross-validation
if self.cv_method == "group_shuffle_split":
cv = GroupShuffleSplit(
n_splits=self.n_splits,
test_size=self.test_size,
random_state=self.random_state
)
else:
cv = LeavePGroupsOut(n_groups=self.n_groups_out)
# Perform cross-validation
all_predictions = []
all_actuals = []
for train_idx, val_idx in cv.split(self.X_train, self.y_train, groups=self.groups):
try:
X_train_fold = self.X_train[train_idx]
X_val_fold = self.X_train[val_idx]
y_train_fold = self.y_train[train_idx]
y_val_fold = self.y_train[val_idx]
# Check condition number to avoid numerical instability
if np.linalg.cond(X_train_fold) > 1e10:
self.logger.warning("High condition number detected. Skipping this fold.")
continue
pipeline.fit(X_train_fold, y_train_fold)
preds = pipeline.predict(X_val_fold)
preds = np.ravel(preds)
all_predictions.extend(preds)
all_actuals.extend(y_val_fold)
except np.linalg.LinAlgError as e:
self.logger.warning(f"LinAlgError in fold: {str(e)}")
continue
except Exception as e:
self.logger.warning(f"Error in fold: {str(e)}")
continue
# Check if we have valid predictions
if not all_predictions:
self.logger.warning("No valid predictions. Returning penalty score.")
return -1e6 # Penalty score
# Calculate metrics
rmse = np.sqrt(mean_squared_error(np.array(all_actuals), np.array(all_predictions)))
r2 = 1 - np.sum((np.array(all_actuals) - np.array(all_predictions))**2) / np.sum((np.array(all_actuals) - np.mean(np.array(all_actuals)))**2)
# Weighted combination of metrics to optimize (negative RMSE + small R² contribution)
score = -rmse
self.logger.info(f"Pipeline config: {pipeline_config}, CV RMSE: {rmse:.4f}, R²: {r2:.4f}")
return score
except Exception as e:
self.logger.error(f"Error in bayes_objective: {str(e)}")
return -1e6 # Penalty score for failed configurations
[docs]
def bayesian_optimize(self, init_points: int = 10, n_iter: int = 50, acquisition_function: str = "ei") -> Tuple[Dict, Pipeline]:
"""
Run Bayesian optimization to find the best pipeline configuration and hyperparameters.
Args:
init_points: Number of random initial points
n_iter: Number of Bayesian optimization iterations
acquisition_function: Acquisition function for Bayesian optimization
Returns:
Tuple containing:
- Dict of best parameters
- Fitted Pipeline with best configuration
"""
# Create optimizer
optimizer = BayesianOptimization(
f=self.bayes_objective,
pbounds=self.param_bounds,
random_state=self.random_state,
verbose=2
)
# Set acquisition function
if acquisition_function not in ["ucb", "ei", "poi"]:
self.logger.warning(f"Unknown acquisition function: {acquisition_function}. Using 'ei'.")
acquisition_function = "ei"
# Run optimization
optimizer.maximize(
init_points=init_points,
n_iter=n_iter
)
# Store optimizer for later analysis
self.optimizer = optimizer
# Extract best parameters
best_params = optimizer.max["params"]
# Build the best pipeline from the best parameters
pipeline_config_index = int(round(best_params["pipeline_config"]))
pipeline_config_index = max(0, min(pipeline_config_index, len(self.all_pipelines) - 1))
best_pipeline_config = self.all_pipelines[pipeline_config_index]
# Create and fit the best pipeline
steps = []
for step in best_pipeline_config:
transformer = build_preprocessor_from_bayes(
step, best_params, self.X_train.shape, self.random_state, self.n_jobs
)
steps.append((step, transformer))
ridge_alpha = best_params["ridge_alpha"]
steps.append(("ridge", Ridge(alpha=ridge_alpha, random_state=self.random_state)))
best_pipeline = Pipeline(steps)
best_pipeline.fit(self.X_train, self.y_train)
# Log best pipeline details
self.logger.info(f"Best pipeline config: {best_pipeline_config}")
self.logger.info(f"Best pipeline score: {optimizer.max['target']}")
for step, transformer in steps:
self.logger.info(f"Step: {step}, Transformer: {transformer}")
return best_params, best_pipeline
[docs]
def get_best_pipeline_predictions(self, best_pipeline: Pipeline) -> Tuple[npt.NDArray, float, float]:
"""
Get predictions using the best pipeline.
Args:
best_pipeline: Fitted pipeline object
Returns:
Tuple containing:
- Predictions array
- RMSE score
- R² score
"""
# Fit the pipeline to training data
best_pipeline.fit(self.X_train, self.y_train)
# If test data is available, use it for evaluation
if self.X_test is not None:
preds = best_pipeline.predict(self.X_test)
preds = np.ravel(preds)
rmse = np.sqrt(mean_squared_error(self.y_test, preds))
r2 = 1 - np.sum((self.y_test - preds)**2) / np.sum((self.y_test - np.mean(self.y_test))**2)
else:
# If no test set, use cross-validation predictions
if self.cv_method == "group_shuffle_split":
cv = GroupShuffleSplit(
n_splits=self.n_splits,
test_size=self.test_size,
random_state=self.random_state
)
else:
cv = LeavePGroupsOut(n_groups=self.n_groups_out)
all_preds = []
all_true = []
for train_idx, val_idx in cv.split(self.X_train, self.y_train, groups=self.groups):
X_train_fold = self.X_train[train_idx]
X_val_fold = self.X_train[val_idx]
y_train_fold = self.y_train[train_idx]
y_val_fold = self.y_train[val_idx]
best_pipeline.fit(X_train_fold, y_train_fold)
preds = best_pipeline.predict(X_val_fold)
preds = np.ravel(preds)
all_preds.extend(preds)
all_true.extend(y_val_fold)
preds = np.array(all_preds)
true_vals = np.array(all_true)
rmse = np.sqrt(mean_squared_error(true_vals, preds))
r2 = 1 - np.sum((true_vals - preds)**2) / np.sum((true_vals - np.mean(true_vals))**2)
return preds, rmse, r2
[docs]
def get_all_tested_pipelines(self) -> List[Dict]:
"""
Get details of all tested pipeline configurations.
Returns:
List of dictionaries with pipeline details
"""
if not hasattr(self, "optimizer"):
raise AttributeError("No optimizer found. Please run bayesian_optimize() first.")
results = []
for i, res in enumerate(self.optimizer.res):
params = res["params"]
pipeline_index = int(round(params["pipeline_config"]))
pipeline_index = max(0, min(pipeline_index, len(self.all_pipelines) - 1))
pipeline_config = self.all_pipelines[pipeline_index]
# Calculate metrics from the saved objective score (negative RMSE)
rmse = -res["target"] if res["target"] > -1e5 else float('inf')
# Create a dictionary with trial information
result_dict = {
"trial": i,
"pipeline_config": pipeline_config,
"params": {k: v for k, v in params.items() if k != "pipeline_config"},
"rmse": rmse,
"r2": None # R² is not directly available from objective score
}
results.append(result_dict)
return results
[docs]
def print_evaluated_pipelines(self) -> None:
"""
Print details for all evaluated pipelines from the Bayesian optimizer.
This method assumes that bayesian_optimize() has been run and that
self.optimizer exists.
"""
if not hasattr(self, "optimizer"):
self.logger.warning("No optimizer found. Please run bayesian_optimize() first.")
return
print("Evaluated pipelines:")
for i, res in enumerate(self.optimizer.res):
params = res["params"]
# Convert the continuous pipeline_config parameter to an integer index
pipeline_index = int(round(params["pipeline_config"]))
# Clamp the index to the valid range
pipeline_index = max(0, min(pipeline_index, len(self.all_pipelines) - 1))
pipeline_config = self.all_pipelines[pipeline_index]
target = res["target"]
print(f"Trial {i}:")
print(f" Pipeline configuration: {pipeline_config}")
print(f" Hyperparameters: {params}")
print(f" Objective (score): {target:.4f}")
[docs]
def export_best_pipeline(self, file_path: str) -> None:
"""
Export the best pipeline configuration and hyperparameters to a file.
Args:
file_path: Path to save the export file
Raises:
AttributeError: If optimizer hasn't been run yet
"""
if not hasattr(self, "optimizer"):
raise AttributeError("No optimizer found. Please run bayesian_optimize() first.")
import json
best_params = self.optimizer.max["params"]
pipeline_index = int(round(best_params["pipeline_config"]))
pipeline_index = max(0, min(pipeline_index, len(self.all_pipelines) - 1))
best_pipeline_config = self.all_pipelines[pipeline_index]
export_data = {
"best_score": self.optimizer.max["target"],
"pipeline_config": list(best_pipeline_config),
"hyperparameters": {k: v for k, v in best_params.items() if k != "pipeline_config"}
}
with open(file_path, 'w') as f:
json.dump(export_data, f, indent=2)
self.logger.info(f"Best pipeline configuration exported to {file_path}")
[docs]
def summarize_optimization(self) -> Dict:
"""
Generate a summary of the optimization results.
Returns:
Dictionary containing optimization summary metrics
"""
if not hasattr(self, "optimizer"):
raise AttributeError("No optimizer found. Please run bayesian_optimize() first.")
results = self.optimizer.res
targets = [r["target"] for r in results]
# Extract best pipeline configuration
best_params = self.optimizer.max["params"]
pipeline_index = int(round(best_params["pipeline_config"]))
best_pipeline_config = self.all_pipelines[pipeline_index]
# Calculate improvement and convergence metrics
initial_performance = min(targets[:5]) if len(targets) >= 5 else min(targets)
final_performance = self.optimizer.max["target"]
improvement = final_performance - initial_performance
# Check for convergence by looking at the last few iterations
n_last = min(5, len(targets))
recent_targets = targets[-n_last:]
converged = (max(recent_targets) - min(recent_targets)) < 0.001
# Count unique pipeline configurations evaluated
unique_configs = set()
for res in results:
idx = int(round(res["params"]["pipeline_config"]))
idx = max(0, min(idx, len(self.all_pipelines) - 1))
unique_configs.add(idx)
# Create summary
summary = {
"n_trials": len(results),
"n_unique_configs": len(unique_configs),
"best_score": final_performance,
"best_pipeline": list(best_pipeline_config),
"improvement": improvement,
"converged": converged,
"best_rmse": -final_performance if final_performance > -1e5 else float('inf')
}
return summary