Source code for bsix.utils.load_data

import numpy as np
import pandas as pd
import re

from importlib import resources
from scipy.stats import mode
from sklearn.model_selection import train_test_split
from statsmodels.stats.outliers_influence import variance_inflation_factor

def _prepare_data(df, test_size, validation_size, seed):

    """
    Prepare the dataset for training, validation and testing.
    """

    from sklearn.model_selection import StratifiedGroupKFold

    # Remove rows with 0.0 or less in time columns
    if all(name in df.columns for name in ["time_start", "time_stop"]):
        event_labels = ["event"]
        time_labels = ["time_start", "time_stop"]

        labels = event_labels + time_labels

        # For time-varying data, only filter on time_stop (the event/censoring time)
        df = df[df["time_stop"] > 0]
    elif all(name in df.columns for name in ["event1", "time1"]):
        event_labels = df.filter(regex=r'^event\d+$').columns.tolist()
        time_labels = df.filter(regex=r'^time\d+$').columns.tolist()

        labels = event_labels + time_labels
        labels.sort(key=lambda x: int(re.search(r'(\d+)$', x).group()))

        df = df[(df[time_labels] >= 0).all(axis=1)]
    else:
        event_labels = ["event"]
        time_labels = ["time"]

        labels = event_labels + time_labels

        df = df[df["time"] > 0]
    
    df = df.dropna()
    df = df.reset_index(drop=True)

    index = df.index

    # Print dataset information
    df.info()
    print()
    print(df.describe(include="all"))
    print()

    # One-hot encoding for categorical variables (excluding labels)
    categorical_cols = df.select_dtypes(include=['object']).columns.tolist()
    # Remove label columns from categorical encoding
    categorical_cols = [col for col in categorical_cols if col not in labels]
    df = pd.get_dummies(df, columns=categorical_cols, drop_first=True)

    feature_names = [x for x in df.columns.to_list() if x not in labels]
    
    # Split dataset into train and test sets
    # If "identifier" is not present, use standard stratified splitting
    if "identifier" not in feature_names:
        X_train, X_test, y_train, y_test, train_idx, test_idx = train_test_split(df[feature_names], df[labels], index, test_size=test_size, random_state=seed, stratify=df[event_labels])
        
        X_train, X_validation, y_train, y_validation, train_idx, val_idx = train_test_split(X_train, y_train, train_idx, test_size=validation_size, random_state=seed, stratify=y_train[event_labels])

        X_train = np.array(X_train, np.float32)   
        y_train = np.array(y_train, np.float32)
        X_validation = np.array(X_validation, np.float32)
        y_validation = np.array(y_validation, np.float32)
        X_test = np.array(X_test, np.float32)
        y_test = np.array(y_test, np.float32)
    # If "identifier" is present, use stratified group splitting
    if "identifier" in feature_names:
        n_groups = df["identifier"].nunique()
        n_splits_outer = min(int(1 / test_size), max(2, n_groups // 2))
        n_splits_inner = min(int(1 / validation_size), max(2, n_groups // 4))
        
        group_split_outer = StratifiedGroupKFold(n_splits=n_splits_outer, shuffle=True, random_state=seed)
        if len(event_labels) == 1: # Time varying
            train_val_idx, test_idx = next(group_split_outer.split(df[feature_names], df[event_labels], groups=df["identifier"]))
        elif len(event_labels) > 1: # Multitask (multi-progression)
            stratify_train_test = df[event_labels].astype(str).agg('_'.join, axis=1)
            train_val_idx, test_idx = next(group_split_outer.split(df[feature_names], stratify_train_test, groups=df["identifier"]))

        X_train_val = df.iloc[train_val_idx]
        y_train_val = df.iloc[train_val_idx][labels]
        X_test = df.iloc[test_idx][feature_names]
        y_test = df.iloc[test_idx][labels]

        group_split_inner = StratifiedGroupKFold(n_splits=n_splits_inner, shuffle=True, random_state=seed)
        if len(event_labels) == 1: # Time varying
            train_idx, val_idx = next(group_split_inner.split(X_train_val, y_train_val[event_labels], groups=X_train_val["identifier"]))
        elif len(event_labels) > 1: # Multitask (multi-progression)
            stratify_train_val = y_train_val[event_labels].astype(str).agg('_'.join, axis=1)
            train_idx, val_idx = next(group_split_inner.split(X_train_val, stratify_train_val, groups=X_train_val["identifier"]))

        feature_names.remove("identifier")
            
        X_train = np.array(X_train_val.iloc[train_idx][feature_names].values, np.float32)
        y_train = np.array(y_train_val.iloc[train_idx][labels].values, np.float32)
        X_validation = np.array(X_train_val.iloc[val_idx][feature_names].values, np.float32)
        y_validation = np.array(y_train_val.iloc[val_idx][labels].values, np.float32)
        X_test = np.array(X_test[feature_names].values, np.float32)
        y_test = np.array(y_test[labels].values, np.float32)

    return X_train, y_train, X_validation, y_validation, X_test, y_test, train_idx, val_idx, test_idx, feature_names

def _toDataframe(data):

    """
    Convert the HDF5 format to a DataFrame.
    """
    
    df = pd.DataFrame(data[0])

    # Time varying data
    if all(name in df.columns for name in ["time_start", "time_stop"]): 
        df["event"] = data[1]
        df["time_start"] = data[2][:, 0]
        df["time_stop"] = data[2][:, 1]
    # Multitask (multi-progression) data
    elif all(name in df.columns for name in ["event1", "time1"]): 
        progression_colon = len(df.filter(regex=r'^event\d+$').columns.tolist())
        for i in range(progression_colon):
            df[f"event{i+1}"] = data[1][:, i]
            df[f"time{i+1}"] = data[2][:, i]
    # Standard data
    else:
        df["event"] = data[1]
        df["time"] = data[2]

    return df

[docs] def load_data_hdf(data_dir, dataset_name): """ Load dataset from a HDF5 file. """ import h5py # Load dataset path = resources.files(data_dir) / dataset_name f = h5py.File(path, "r") data = [f["x"][()], f["e"][()], f["t"][()]] f.close() df = _toDataframe(data) print(f"\n- - - - {dataset_name} (hdf5) - - - -\n") return df
[docs] def load_data_arff(data_dir, dataset_name): """ Load dataset from a ARFF file. """ from scipy.io import arff # Load dataset path = resources.files(data_dir) / dataset_name data, meta = arff.loadarff(path) df = pd.DataFrame(data) # Decode byte strings to UTF-8 strings for object columns for col in df.select_dtypes([object]).columns: df[col] = df[col].apply(lambda x: x.decode('utf-8') if isinstance(x, bytes) else x) print(f"\n- - - - {dataset_name} (arff) - - - -\n") return df
[docs] def load_data_csv(data_dir, dataset_name): """ Load dataset from a CSV file. """ # Load dataset path = resources.files(data_dir) / dataset_name df = pd.read_csv(path) print(f"\n- - - - {dataset_name} (csv) - - - -\n") return df
def _transformTrainValidationTest(X, y): """ Transform the data format for train, validation and test sets. """ from sksurv.util import Surv survival_X = X.copy() # Standard: [event, time] if y.shape[1] == 2: _yE = y[:, 0].astype(np.float32) _yT = y[:, 1].astype(np.float32) survival_y = Surv.from_arrays(event=_yE, time=_yT) # Time varying: [event, time_start, time_stop] elif y.shape[1] == 3: _yE = y[:, 0].astype(np.float32) _yTstart = y[:, 1].astype(np.float32) _yTstop = y[:, 2].astype(np.float32) _yT = np.array([_yTstart, _yTstop]) dtype = [('event', '?'), ('time_start', 'f8'), ('time_stop', 'f8')] survival_y = np.empty(len(y), dtype=dtype) survival_y['event'] = y[:, 0].astype(bool) survival_y['time_start'] = y[:, 1].astype(float) survival_y['time_stop'] = y[:, 2].astype(float) # Multitask (multi-progression): [event1, time1, event2, time2, ...] else: _yE = y[:, 0::2].astype(np.float32) # (0, 2, 4...) _yT = y[:, 1::2].astype(np.float32) # (1, 3, 5...) number_progressions = y.shape[1] // 2 surv_list = [Surv.from_arrays(event=_yE[:, i], time=_yT[:, i]) for i in range(number_progressions)] survival_y = np.array(surv_list).T survival = { "x" : survival_X, "t" : _yT, "e" : _yE, } return survival_X, survival_y, survival def _filter_low_variance(X, threshold=0.80): """ Filter out columns with low variance. """ _, counts = mode(X, axis=0, keepdims=True) mask = (counts[0] / X.shape[0]) <= threshold return mask def _filter_high_correlation(X, threshold=0.80): """ Filter out columns with high correlation. """ corr_matrix = np.nan_to_num(np.abs(np.corrcoef(X.T))) upper = np.triu(corr_matrix, k=1) drop_indices = np.unique(np.where(upper > threshold)[1]) mask = np.ones(X.shape[1], dtype=bool) mask[drop_indices] = False return mask def _filter_high_vif(X, threshold=5.0): """ Filter out columns with high Variance Inflation Factor (VIF). """ vif_values = np.zeros(X.shape[1]) for i in range(X.shape[1]): vif_values[i] = variance_inflation_factor(X, i) mask = vif_values <= threshold return mask
[docs] def get_data(df=None, data_dir="bsix.datasets", dataset_name="colon.csv", test_size=0.2, validation_size=0.2, scaler_name="standard", scaler=None, to_multitask=False, seed=0): """ Load and preprocess the dataset. """ if df is not None: X_train, y_train, X_validation, y_validation, X_test, y_test, train_idx, val_idx, test_idx, feature_names = _prepare_data(df, test_size, validation_size, seed) elif ".h5" in dataset_name: X_train, y_train, X_validation, y_validation, X_test, y_test, train_idx, val_idx, test_idx, feature_names = _prepare_data(load_data_hdf(data_dir, dataset_name), test_size, validation_size, seed) elif ".arff" in dataset_name: X_train, y_train, X_validation, y_validation, X_test, y_test, train_idx, val_idx, test_idx, feature_names = _prepare_data(load_data_arff(data_dir, dataset_name), test_size, validation_size, seed) elif ".csv" in dataset_name: X_train, y_train, X_validation, y_validation, X_test, y_test, train_idx, val_idx, test_idx, feature_names = _prepare_data(load_data_csv(data_dir, dataset_name), test_size, validation_size, seed) else: print("ERROR : Wrong format of dataset.") return -1 mask = _filter_low_variance(X_train) X_train = X_train[:, mask] X_validation = X_validation[:, mask] X_test = X_test[:, mask] feature_names = [feature_names[i] for i in range(len(feature_names)) if mask[i]] if X_train.shape[1] > 1: mask = _filter_high_correlation(X_train) X_train = X_train[:, mask] X_validation = X_validation[:, mask] X_test = X_test[:, mask] feature_names = [feature_names[i] for i in range(len(feature_names)) if mask[i]] if X_train.shape[1] > 1: mask = _filter_high_vif(X_train) X_train = X_train[:, mask] X_validation = X_validation[:, mask] X_test = X_test[:, mask] feature_names = [feature_names[i] for i in range(len(feature_names)) if mask[i]] # Convert to DataFrame for scaling X_train_df = pd.DataFrame(X_train, columns=feature_names) X_validation_df = pd.DataFrame(X_validation, columns=feature_names) X_test_df = pd.DataFrame(X_test, columns=feature_names) # Scale data if scaler is None: if scaler_name == "log": from sklearn.preprocessing import FunctionTransformer scaler = FunctionTransformer(func=np.log1p).set_output(transform="pandas") elif scaler_name == "minmax": from sklearn.preprocessing import MinMaxScaler scaler = MinMaxScaler((0, 1)).set_output(transform="pandas") elif scaler_name == "standard": from sklearn.preprocessing import StandardScaler scaler = StandardScaler().set_output(transform="pandas") elif scaler_name == "robust": from sklearn.preprocessing import RobustScaler scaler = RobustScaler().set_output(transform="pandas") elif scaler_name == "product": from sklearn.pipeline import Pipeline from sklearn.preprocessing import FunctionTransformer, MinMaxScaler scaler = Pipeline([('minmax', MinMaxScaler((1, 10))), ('log', FunctionTransformer(func=np.log))]).set_output(transform="pandas") scaler = scaler.fit(X_train_df) X_train_df = scaler.transform(X_train_df) X_validation_df = scaler.transform(X_validation_df) X_test_df = scaler.transform(X_test_df) else: X_train_df = scaler.transform(X_train_df) X_validation_df = scaler.transform(X_validation_df) X_test_df = scaler.transform(X_test_df) # Convert back to numpy arrays X_train = np.array(X_train_df.values, np.float32) X_validation = np.array(X_validation_df.values, np.float32) X_test = np.array(X_test_df.values, np.float32) # Transform data for train, validation and test sets X_train, y_train, _ = _transformTrainValidationTest(X_train, y_train) X_validation, y_validation, _ = _transformTrainValidationTest(X_validation, y_validation) X_test, y_test, _ = _transformTrainValidationTest(X_test, y_test) # Adapt "y" for multitasking when there is only one progression if to_multitask and y_train.ndim == 1: y_train = y_train[:, np.newaxis] y_validation = y_validation[:, np.newaxis] y_test = y_test[:, np.newaxis] return X_train, y_train, X_validation, y_validation, X_test, y_test, train_idx, val_idx, test_idx, feature_names, scaler