Source code for mp_time_split.utils.split

from warnings import warn

import numpy as np
from sklearn.model_selection import KFold, TimeSeriesSplit, train_test_split
from sklearn.model_selection._split import _BaseKFold
from sklearn.utils import indexable
from sklearn.utils.validation import _num_samples

AVAILABLE_MODES = ["TimeSeriesSplit", "TimeSeriesOverflowSplit", "TimeKFold"]


[docs] def mp_time_split( X, mode="TimeSeriesSplit", use_trainval_test: bool = True, n_cv_splits: int = 5, max_train_size=None, test_size=None, gap=0, ): if mode not in AVAILABLE_MODES: raise NotImplementedError( f"mode={mode} not implemented. Use one of {AVAILABLE_MODES}" ) if use_trainval_test: X_trainval, _ = train_test_split(X, shuffle=False, test_size=0.2) # ss = ShuffleSplit(n_splits=1, test_size=0.2) # trainval_index, test_index = ss.split(X) # if y is None: # X_trainval, X_test = train_test_split(X, shuffle=False) # y_trainval = None # y_test = None # else: # X_trainval, X_test, y_trainval, y_test = train_test_split( # X, y, shuffle=False # ) else: # trainval_index = np.array(list(range(X.shape[0]))) # test_index = np.array([]) X_trainval = X # y_trainval = y if mode == "TimeSeriesSplit": splitter = TimeSeriesSplit( n_splits=n_cv_splits, max_train_size=max_train_size, test_size=test_size, gap=0, ) elif mode == "TimeSeriesOverflowSplit": splitter = TimeSeriesOverflowSplit( n_splits=n_cv_splits, max_train_size=max_train_size, test_size=test_size, gap=0, ) elif mode == "TimeKFold": if gap != 0: raise NotImplementedError( "non-zero `gap` specified, not implemented for TimeKFold" ) if max_train_size is not None: raise NotImplementedError( "non-None `max_train_size` specified, not implemented for TimeKFold" ) if test_size is not None: raise NotImplementedError( "non-None `test_size` specified, not implemented for TimeKFold" ) splitter = TimeKFold(n_splits=n_cv_splits) trainval_splits = list(splitter.split(X_trainval)) if use_trainval_test: num_samples = X.shape[0] n_trainval = X_trainval.shape[0] test_split = (np.arange(0, n_trainval), np.arange(n_trainval, num_samples)) return trainval_splits, test_split else: return trainval_splits
[docs] class TimeSeriesOverflowSplit(_BaseKFold): """Time Series cross-validator TODO: update docstring Provides train/test indices to split time series data samples that are observed at fixed time intervals, in train/test sets. In each split, test indices must be higher than before, and thus shuffling in cross validator is inappropriate. This cross-validation object is a variation of :class:`KFold`. In the kth split, it returns first k folds as train set and the (k+1)th fold as test set. Note that unlike standard cross-validation methods, successive training sets are supersets of those that come before them. Read more in the :ref:`User Guide <time_series_split>`. .. versionadded:: 0.18 Parameters ---------- n_splits : int, default=5 Number of splits. Must be at least 2. .. versionchanged:: 0.22 ``n_splits`` default value changed from 3 to 5. max_train_size : int, default=None Maximum size for a single training set. test_size : int, default=None Used to limit the size of the test set. Defaults to ``n_samples // (n_splits + 1)``, which is the maximum allowed value with ``gap=0``. .. versionadded:: 0.24 gap : int, default=0 Number of samples to exclude from the end of each train set before the test set. .. versionadded:: 0.24 Examples -------- >>> import numpy as np >>> from sklearn.model_selection import TimeSeriesSplit >>> X = np.array([[1, 2], [3, 4], [1, 2], [3, 4], [1, 2], [3, 4]]) >>> y = np.array([1, 2, 3, 4, 5, 6]) >>> tscv = TimeSeriesSplit() >>> print(tscv) TimeSeriesSplit(gap=0, max_train_size=None, n_splits=5, test_size=None) >>> for train_index, test_index in tscv.split(X): ... print("TRAIN:", train_index, "TEST:", test_index) ... X_train, X_test = X[train_index], X[test_index] ... y_train, y_test = y[train_index], y[test_index] TRAIN: [0] TEST: [1] TRAIN: [0 1] TEST: [2] TRAIN: [0 1 2] TEST: [3] TRAIN: [0 1 2 3] TEST: [4] TRAIN: [0 1 2 3 4] TEST: [5] >>> # Fix test_size to 2 with 12 samples >>> X = np.random.randn(12, 2) >>> y = np.random.randint(0, 2, 12) >>> tscv = TimeSeriesSplit(n_splits=3, test_size=2) >>> for train_index, test_index in tscv.split(X): ... print("TRAIN:", train_index, "TEST:", test_index) ... X_train, X_test = X[train_index], X[test_index] ... y_train, y_test = y[train_index], y[test_index] TRAIN: [0 1 2 3 4 5] TEST: [6 7] TRAIN: [0 1 2 3 4 5 6 7] TEST: [8 9] TRAIN: [0 1 2 3 4 5 6 7 8 9] TEST: [10 11] >>> # Add in a 2 period gap >>> tscv = TimeSeriesSplit(n_splits=3, test_size=2, gap=2) >>> for train_index, test_index in tscv.split(X): ... print("TRAIN:", train_index, "TEST:", test_index) ... X_train, X_test = X[train_index], X[test_index] ... y_train, y_test = y[train_index], y[test_index] TRAIN: [0 1 2 3] TEST: [6 7] TRAIN: [0 1 2 3 4 5] TEST: [8 9] TRAIN: [0 1 2 3 4 5 6 7] TEST: [10 11] Notes ----- The training set has size ``i * n_samples // (n_splits + 1) + n_samples % (n_splits + 1)`` in the ``i`` th split, with a test set of size ``n_samples//(n_splits + 1)`` by default, where ``n_samples`` is the number of samples. """ def __init__(self, n_splits=5, *, max_train_size=None, test_size=None, gap=0): super().__init__(n_splits, shuffle=False, random_state=None) self.max_train_size = max_train_size self.test_size = test_size self.gap = gap
[docs] def split(self, X, y=None, groups=None): """Generate indices to split data into training and test set. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data, where `n_samples` is the number of samples and `n_features` is the number of features. y : array-like of shape (n_samples,) Always ignored, exists for compatibility. groups : array-like of shape (n_samples,) Always ignored, exists for compatibility. Yields ------ train : ndarray The training set indices for that split. test : ndarray The testing set indices for that split. """ X, y, groups = indexable(X, y, groups) n_samples = _num_samples(X) n_splits = self.n_splits n_folds = n_splits + 1 gap = self.gap test_size = ( self.test_size if self.test_size is not None else n_samples // n_folds ) all_index = list(range(n_samples)) tscv = TimeSeriesSplit( gap=gap, n_splits=n_splits, test_size=test_size, max_train_size=self.max_train_size, ) train_indices = [] test_indices = [] for tri, _ in tscv.split(X): train_indices.append(tri) # use remainder of data rather than default `test_index` test_indices.append(np.setdiff1d(all_index, tri)) splits = list(zip(train_indices, test_indices)) for train_index, test_index in splits: yield train_index, test_index
[docs] class TimeKFold(_BaseKFold): """Time Series K-Folds cross-validator TODO: update docstring Provides train/test indices to split data in train/test sets. Split dataset into k consecutive folds (without shuffling by default). Each fold is then used once as a validation while the k - 1 remaining folds form the training set. Read more in the :ref:`User Guide <k_fold>`. Parameters ---------- n_splits : int, default=5 Number of folds. Must be at least 2. .. versionchanged:: 0.22 ``n_splits`` default value changed from 3 to 5. Examples -------- >>> import numpy as np >>> from sklearn.model_selection import KFold >>> X = np.array([[1, 2], [3, 4], [1, 2], [3, 4]]) >>> y = np.array([1, 2, 3, 4]) >>> kf = KFold(n_splits=2) >>> kf.get_n_splits(X) 2 >>> print(kf) KFold(n_splits=2, random_state=None, shuffle=False) >>> for train_index, test_index in kf.split(X): ... print("TRAIN:", train_index, "TEST:", test_index) ... X_train, X_test = X[train_index], X[test_index] ... y_train, y_test = y[train_index], y[test_index] TRAIN: [2 3] TEST: [0 1] TRAIN: [0 1] TEST: [2 3] Notes ----- The first ``n_samples % n_splits`` folds have size ``n_samples // n_splits + 1``, other folds have size ``n_samples // n_splits``, where ``n_samples`` is the number of samples. Randomized CV splitters may return different results for each call of split. You can make the results identical by setting `random_state` to an integer. See Also -------- StratifiedKFold : Takes group information into account to avoid building folds with imbalanced class distributions (for binary or multiclass classification tasks). GroupKFold : K-fold iterator variant with non-overlapping groups. RepeatedKFold : Repeats K-Fold n times. """ def __init__(self, n_splits=5, *, shuffle=False, random_state=None): if shuffle or random_state is not None: warn( "`shuffle` and `random_state` for compatibility only. These are fixed to `False` and `None`, respectively." # noqa: E501 ) super().__init__(n_splits=n_splits, shuffle=False, random_state=None)
[docs] def split(self, X, y=None, groups=None): """Generate indices to split data into training and test set. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data, where `n_samples` is the number of samples and `n_features` is the number of features. y : array-like of shape (n_samples,), default=None The target variable for supervised learning problems. groups : array-like of shape (n_samples,), default=None Group labels for the samples used while splitting the dataset into train/test set. Yields ------ train : ndarray The training set indices for that split. test : ndarray The testing set indices for that split. """ X, y, groups = indexable(X, y, groups) n_samples = _num_samples(X) # an extra split to ensure that last `text_index` is not empty kf = KFold(n_splits=self.n_splits + 1) splits = [indices[1] for indices in kf.split(X)] splits.pop(-1) running_index = np.empty(0, dtype=int) all_index = list(range(n_samples)) for s in splits: running_index = np.concatenate((running_index, s)) train_index = running_index test_index = np.setdiff1d(all_index, running_index) yield train_index, test_index