Skip to content

Datasets API Reference

This page provides detailed API documentation for all dataset classes in Samay.


LPTMDataset

Dataset class for LPTM model.

LPTMDataset(name=None, datetime_col=None, path=None, batchsize=16, mode='train', boundaries=[0, 0, 0], horizon=0, task_name='forecasting', label_col=None, stride=10, seq_len=512, **kwargs)

Bases: BaseDataset

Dataset class for Moment model Data Format: Dict with keys: input_ts: np.ndarray, historical time series data actual_ts: np.ndarray, actual time series data

Source code in Samay\src\samay\dataset.py
def __init__(
    self,
    name=None,
    datetime_col=None,
    path=None,
    batchsize=16,
    mode="train",
    boundaries=[0, 0, 0],
    horizon=0,
    task_name="forecasting",
    label_col=None,
    stride=10,
    seq_len=512,
    **kwargs,
):
    super().__init__(
        name=name,
        datetime_col=datetime_col,
        path=path,
        batchsize=batchsize,
        mode=mode,
    )
    self.task_name = task_name
    self.label_col = "label" if label_col is None else label_col

    self.seq_len = seq_len
    self.stride = stride
    self.forecast_horizon = horizon
    self.boundaries = boundaries

    self.max_col_num = 64
    self.pad = False
    self._read_data()

    self.one_chunk_num = (
        self.length_timeseries - self.seq_len - self.forecast_horizon
    ) // self.stride + 1

TimesfmDataset

Dataset class for TimesFM model.

TimesfmDataset(name=None, datetime_col='ds', path=None, batchsize=4, mode='train', boundaries=(0, 0, 0), context_len=128, horizon_len=32, freq='h', normalize=False, stride=10, **kwargs)

Bases: BaseDataset

Dataset class for TimesFM model Data Format: Dict with keys: input_ts: np.ndarray, historical time series data actual_ts: np.ndarray, actual time series data

Source code in Samay\src\samay\dataset.py
def __init__(
    self,
    name=None,
    datetime_col="ds",
    path=None,
    batchsize=4,
    mode="train",
    boundaries=(0, 0, 0),
    context_len=128,
    horizon_len=32,
    freq="h",
    normalize=False,
    stride=10,
    **kwargs,
):
    super().__init__(
        name=name,
        datetime_col=datetime_col,
        path=path,
        batchsize=batchsize,
        mode=mode,
    )
    self.context_len = context_len
    self.horizon_len = horizon_len
    self.freq = freq
    self.normalize = normalize
    self.stride = stride
    self.data = pd.read_csv(self.data_path)
    if boundaries == (0, 0, 0):
        # Default boundaries: train 50%, val 20%, test 30%
        self.boundaries = [
            int(len(self.data) * 0.5),
            int(len(self.data) * 0.7),
            len(self.data) - 1,
        ]
    elif boundaries == (-1, -1, -1):
        # use all data for training
        self.boundaries = [0, 0, len(self.data) - 1]
    else:
        self.boundaries = boundaries
    self.horizon_len = min(self.horizon_len, int(0.3 * len(self.data) + 1))
    self.ts_cols = [col for col in self.data.columns if col != self.datetime_col]
    tfdtl = TimeSeriesdata(
        data_path=self.data_path,
        datetime_col=self.datetime_col,
        num_cov_cols=None,
        cat_cov_cols=None,
        ts_cols=np.array(self.ts_cols),
        train_range=[0, self.boundaries[0]],
        val_range=[self.boundaries[0], self.boundaries[1]],
        test_range=[self.boundaries[1], self.boundaries[2]],
        hist_len=self.context_len,
        pred_len=self.horizon_len,
        batch_size=64,
        freq=self.freq,
        normalize=self.normalize,
        epoch_len=None,
        holiday=False,
        permute=False,
    )
    if self.normalize:
        self.scaler = tfdtl.scaler
    self.num_ts = len(self.ts_cols)
    if self.mode == "train":
        tfset = tfdtl.torch_dataset(mode="train", shift=self.stride)
    else:
        tfset = tfdtl.torch_dataset(mode="test", shift=self.horizon_len)
    self.dataset = tfset

MomentDataset

Dataset class for MOMENT model supporting multiple tasks.

MomentDataset(name=None, datetime_col=None, path=None, batchsize=64, mode='train', boundaries=[0, 0, 0], horizon_len=0, task_name='forecasting', label_col=None, stride=10, **kwargs)

Bases: BaseDataset

Dataset class for Moment model Data Format: Dict with keys: input_ts: np.ndarray, historical time series data actual_ts: np.ndarray, actual time series data

Source code in Samay\src\samay\dataset.py
def __init__(
    self,
    name=None,
    datetime_col=None,
    path=None,
    batchsize=64,
    mode="train",
    boundaries=[0, 0, 0],
    horizon_len=0,
    task_name="forecasting",
    label_col=None,
    stride=10,
    **kwargs,
):
    super().__init__(
        name=name,
        datetime_col=datetime_col,
        path=path,
        batchsize=batchsize,
        mode=mode,
    )
    self.task_name = task_name
    self.label_col = "label" if label_col is None else label_col
    self.mode = mode

    self.seq_len = 512
    self.stride = (
        stride if (self.mode == "train" or horizon_len == 0) else horizon_len
    )
    self.forecast_horizon = horizon_len
    self.boundaries = boundaries
    self.max_col_num = 64

    self.pad = False
    self._read_data()

    self.one_chunk_num = (
        self.length_timeseries - self.seq_len - self.forecast_horizon
    ) // self.stride + 1

ChronosDataset

Dataset class for Chronos model with tokenization.

ChronosDataset(name=None, datetime_col='ds', path=None, boundaries=[0, 0, 0], batch_size=16, mode=None, stride=10, tokenizer_class='MeanScaleUniformBins', drop_prob=0.2, min_past=64, np_dtype=np.float32, config=None)

Bases: BaseDataset

Dataset class for Chronos model Data Format: Dict with keys: input_ts: np.ndarray, historical time series data actual_ts: np.ndarray, actual time series data

Source code in Samay\src\samay\dataset.py
def __init__(
    self,
    name=None,
    datetime_col="ds",
    path=None,
    boundaries=[0, 0, 0],
    batch_size=16,
    mode=None,
    stride=10,
    tokenizer_class="MeanScaleUniformBins",
    drop_prob=0.2,
    min_past=64,
    np_dtype=np.float32,
    config=None,
):
    super().__init__(
        name=name,
        datetime_col=datetime_col,
        path=path,
        batchsize=batch_size,
        mode=mode,
    )
    # Todo: implement ChronosDataset
    assert tokenizer_class is not None, "Tokenizer is required for ChronosDataset"

    if not config:
        self.config = ChronosConfig(
            tokenizer_class="MeanScaleUniformBins",
            tokenizer_kwargs={"low_limit": -15.0, "high_limit": 15.0},
            n_tokens=4096,
            n_special_tokens=2,
            pad_token_id=0,
            eos_token_id=1,
            use_eos_token=True,
            model_type="seq2seq",
            context_length=512,
            prediction_length=64,
            num_samples=20,
            temperature=1.0,
            top_k=50,
            top_p=1.0,
        )
    else:
        self.config = ChronosConfig(**config)
    assert type(self.config) == ChronosConfig, (
        "Config must be an instance of ChronosConfig"
    )
    assert self.config.model_type in ("seq2seq", "causal"), (
        "Model type must be either 'seq2seq' or 'causal'"
    )

    self.context_len = self.config.context_length
    self.horizon_len = self.config.prediction_length
    self.drop_prob = drop_prob if self.config.model_type == "seq2seq" else 0.0
    self.min_past = min_past or self.config.prediction_length
    self.model_type = self.config.model_type
    self.mode = mode
    self.np_dtype = np_dtype
    self.boundaries = boundaries
    self.stride = stride
    self.batchsize = batch_size
    self.max_col_num = 16

    self.pad = False
    self._read_data()
    self.preprocess()

    self.one_chunk_num = (
        self.length_timeseries - self.context_len - self.horizon_len
    ) // self.stride + 1

MoiraiDataset

Dataset class for MOIRAI model with frequency support.

MoiraiDataset(name=None, datetime_col='date', path=None, boundaries=(0, 0, 0), context_len=128, horizon_len=32, patch_size=16, batch_size=16, freq=None, start_date=None, end_date=None, operation='mean', normalize=True, mode='train', htune=False, data_config=None, **kwargs)

Bases: BaseDataset

Dataset class for Moirai model. It ingests data in the form of a (num_variates x num_timesteps) matrix.

Source code in Samay\src\samay\dataset.py
def __init__(
    self,
    name=None,
    datetime_col="date",
    path=None,
    boundaries=(0, 0, 0),
    context_len=128,
    horizon_len=32,
    patch_size=16,
    batch_size=16,
    freq=None,
    start_date=None,
    end_date=None,
    operation="mean",
    normalize=True,
    mode="train",
    htune=False,  # hyperparameter tuning
    data_config=None,
    **kwargs,
):
    super().__init__(
        name=name,
        datetime_col=datetime_col,
        path=path,
        batchsize=batch_size,
        mode=mode,
    )
    self.context_len = context_len
    self.horizon_len = horizon_len
    self.patch_size = patch_size
    self.batch_size = batch_size
    self.mode = mode
    self.htune = htune
    self.boundaries = boundaries
    self.normalize = normalize
    self.kwargs = kwargs
    if data_config:
        self.target_dim = data_config.get("target_dim", 1)
        self.feat_dynamic_real_dim = data_config.get("feat_dynamic_real_dim", 0)
        self.past_feat_dynamic_real_dim = data_config.get(
            "past_feat_dynamic_real_dim", 0
        )
    else:
        self.target_dim = 1
        self.feat_dynamic_real_dim = 0
        self.past_feat_dynamic_real_dim = 0

    self._read_data()  # read from path into a pandas dataframe
    # Preprocess the data - infer freq, take subset or normalize
    self._preprocess(
        start_date=start_date, end_date=end_date, freq=freq, operation=operation
    )
    self.start_date = self.dataset.index[0]
    self.train_transforms = self.default_transforms()
    self.test_transforms = self.default_transforms()

    # Split the dataset into train, val, test
    if self.mode == "train":  # no windowing
        self.dataset = self.dataset[: self.boundaries[0]]
        self.gen_train_val_data()
    elif self.mode == "val":  # no windowing
        self.dataset = self.dataset[self.boundaries[0] : self.boundaries[1]]
        self.gen_train_val_data()
    elif self.mode == "test":
        # whole dataset sent
        self.gen_test_data()
    else:
        raise ValueError(f"Unsupported mode: {self.mode}")

add_past_fields(data: dict, ts_fields: list = [], past_ts_fields: list = [], dummy_val: float = 0.0, lead_time: int = 0, target_field: str = 'target', is_pad_field: str = 'is_pad', observed_value_field: str = 'observed_target', start_field: str = 'start', forecast_start_field: str = 'forecast_start', output_NTC: bool = True, mode='train')

Add the following fields: (a) past_target: The past target data (b) past_observed_target: The past target data with missing values indicator (c) past_is_pad: Indicates if the added value was a padding value (d) past_feat_dynamic_real: The past dynamic real features (e) past_observed_feat_dynamic_real: The past dynamic real features with missing values indicator

Source code in Samay\src\samay\dataset.py
def add_past_fields(
    self,
    data: dict,
    ts_fields: list = [],
    past_ts_fields: list = [],
    dummy_val: float = 0.0,
    lead_time: int = 0,
    target_field: str = "target",
    is_pad_field: str = "is_pad",
    observed_value_field: str = "observed_target",
    start_field: str = "start",
    forecast_start_field: str = "forecast_start",
    output_NTC: bool = True,
    mode="train",
):
    """Add the following fields:
    (a) past_target: The past target data
    (b) past_observed_target: The past target data with missing values indicator
    (c) past_is_pad: Indicates if the added value was a padding value
    (d) past_feat_dynamic_real: The past dynamic real features
    (e) past_observed_feat_dynamic_real: The past dynamic real features with missing values indicator
    """
    pred_len = self.horizon_len
    target = data[target_field]
    num_windows = 1 + ((target.shape[-1] - self.past_length) // pred_len)

    # Sample indices from the target field using the instance sampler
    if mode == "train":
        sampled_indices = [
            self.past_length + i * pred_len for i in range(num_windows + 1)
        ]
    elif mode == "test":
        sampled_indices = custom_train_instance_split(target)
    else:
        raise ValueError(f"Unsupported mode: {mode}")

    # Columns to be sliced
    slice_cols = ts_fields + past_ts_fields + [target_field, observed_value_field]

    transformed_data = []
    # Iterate over the sampled indices
    for i in range(len(sampled_indices)):
        idx = sampled_indices[i]
        # Calculate the padding length if the index is less than past_length
        d = data.copy()
        pad_length = max(
            0,
            self.past_length
            - d[target_field][..., (idx - self.past_length) : idx].shape[-1],
        )

        # Iterate over the fields to be sliced
        for field in slice_cols:
            # Slice the past piece of the field
            if pad_length == 0:
                past_piece = d[field][..., (idx - self.past_length) : idx]
            else:
                pad_block = np.full(
                    shape=d[field].shape[:-1] + (pad_length,),
                    fill_value=dummy_val,
                    dtype=d[field].dtype,
                )
                past_piece = np.concatenate(
                    [pad_block, d[field][..., (idx - self.past_length) : idx]],
                    axis=-1,
                )

            # # Slice the future piece of the field
            # future_piece = d[field][..., (idx + lead_time) : (idx + lead_time + pred_len)]
            future_piece = np.full(
                shape=d[field].shape[:-1] + (pred_len,),
                fill_value=dummy_val,
                dtype=d[field].dtype,
            )

            # If the field is in time series fields, concatenate past and future pieces
            if field in ts_fields:
                piece = np.concatenate([past_piece, future_piece], axis=-1)
                if output_NTC:
                    piece = piece.transpose()
                d[field] = piece
            else:
                if output_NTC:
                    past_piece = past_piece.transpose()
                    # future_piece = future_piece.transpose()
                if field not in past_ts_fields:
                    d["past_" + field] = past_piece
                    # d["future_" + field] = future_piece
                    del d[field]
                else:
                    d[field] = past_piece

        # Create a padding indicator for the past piece
        pad_indicator = np.zeros(self.past_length)
        if pad_length > 0:
            pad_indicator[:pad_length] = 1
        d["past_" + (is_pad_field)] = pad_indicator

        # Set the forecast start field
        d[forecast_start_field] = (d[start_field] + idx + lead_time).to_timestamp()

        # Append the transformed data
        transformed_data.append(d)

    # Return the transformed data
    return transformed_data

default_transforms() -> transforms.Compose

Default transformations for the dataset

Source code in Samay\src\samay\dataset.py
def default_transforms(self) -> transforms.Compose:
    """Default transformations for the dataset"""
    transforms_list = []

    # Convert the target data to numpy array
    transforms_list.append(
        AsNumpy(
            field="target",
            expected_ndim=1 if self.target_dim == 1 else 2,
            dtype=np.float32,
        )
    )

    if self.target_dim == 1:
        # Fix missing values
        transforms_list.append(
            AddObservedValues(
                target_field="target",
                output_field="observed_target",
                imputation_method=CausalMeanNaNFix(),
                dtype=bool,
            )
        )

        # Add dimension to target
        transforms_list.append(ArrExpandDims(field="target", axis=0))
        transforms_list.append(ArrExpandDims(field="observed_target", axis=0))
    else:
        transforms_list.append(
            AddObservedValues(
                target_field="target",
                output_field="observed_target",
                dtype=bool,
            )
        )

    if self.feat_dynamic_real_dim > 0:
        transforms_list.append(
            AsNumpy(
                field="feat_dynamic_real",
                expected_ndim=2,
                dtype=np.float32,
            )
        )
        transforms_list.append(
            AddObservedValues(
                target_field="feat_dynamic_real",
                output_field="observed_feat_dynamic_real",
                dtype=bool,
            )
        )

    if self.past_feat_dynamic_real_dim > 0:
        transforms_list.append(
            AsNumpyArray(
                field="past_feat_dynamic_real",
                expected_ndim=2,
                dtype=np.float32,
            )
        )
        transforms_list.append(
            AddObservedValuesIndicator(
                target_field="past_feat_dynamic_real",
                output_field="past_observed_feat_dynamic_real",
                dtype=bool,
            )
        )

    # Convert list of tranforms to a single transformation
    comp_transform = transforms.Compose(transforms_list)

    return comp_transform

gen_test_data()

Generates test data based on the boundaries

Returns:

Type Description

np.ndarray: Test data

Source code in Samay\src\samay\dataset.py
def gen_test_data(self):
    """Generates test data based on the boundaries

    Returns:
        np.ndarray: Test data
    """
    data = []
    num_windows = (
        1
        if (self.dataset.shape[0] - self.boundaries[1]) < self.horizon_len + self.context_len
        else (self.dataset.shape[0] - self.boundaries[1] - self.context_len) // self.horizon_len
    )
    for i in range(self.dataset.shape[1]):
        for j in range(num_windows):
            if j == num_windows - 1:
                start_idx = self.dataset.shape[0] - self.horizon_len
            else:
                start_idx = self.boundaries[1] + self.context_len + j * self.horizon_len
            end_idx = start_idx + self.horizon_len
            data.append(
                (
                    {  # input
                        "start": Period(self.start_date, freq=freq_mapping(self.freq)),
                        "target": self.dataset.iloc[max(0, start_idx-self.context_len):start_idx, i].values,
                        "item_id": self.dataset.columns[i],
                    },
                    {  # label
                        "start": Period(self.start_date, freq=freq_mapping(self.freq)),
                        "target": self.dataset.iloc[start_idx:end_idx, i].values,
                        "item_id": self.dataset.columns[i],
                    },
                )
            )

    self.dataset = MoiraiTorch(data)
    self.data = data

gen_train_val_data()

Generates training and validation data based on the boundaries

Returns:

Type Description

np.ndarray: Training and Validation data

Source code in Samay\src\samay\dataset.py
def gen_train_val_data(self):
    """Generates training and validation data based on the boundaries

    Returns:
        np.ndarray: Training and Validation data
    """
    data = []
    # Each column is a separate time series
    # Each time series is appended to the data list
    for i in range(self.dataset.shape[1]):
        data.append(
            {
                "start": Period(self.start_date, freq=freq_mapping(self.freq)),
                "target": self.dataset.iloc[:, i].values,
                "item_id": self.dataset.columns[i],
            }
        )

    self.dataset = MoiraiTorch(data)
    self.data = data

get_dataloader()

Returns the iterator for data batches for the dataset based on the mode

Returns:

Type Description

torch.utils.data.DataLoader: Depends on the mode

Source code in Samay\src\samay\dataset.py
def get_dataloader(self):
    """Returns the iterator for data batches for the dataset based on the mode

    Returns:
        torch.utils.data.DataLoader: Depends on the mode
    """
    if self.mode == "train":
        self.prep_train_test_data(mode="train")
        if self.kwargs:
            batch_size = self.kwargs.get("batch_size", self.batch_size)
            num_workers = self.kwargs.get("num_workers", 0)
            pin_memory = self.kwargs.get("pin_memory", False)
            persistent_workers = self.kwargs.get("persistent_workers", False)

            return DataLoader(
                self.batched_data,
                batch_size=batch_size,
                shuffle=True,
                num_workers=num_workers,
                pin_memory=pin_memory,
                persistent_workers=persistent_workers,
            )
        return DataLoader(
            self.batched_data, batch_size=self.batch_size, shuffle=True
        )
    else:
        self.prep_train_test_data(mode="test")
        return DataLoader(
            self.batched_data, batch_size=self.batch_size, shuffle=False
        )

prep_train_test_data(mode='train')

Apply transforms on the data and add the past fields (past target, past observed target, etc)

Source code in Samay\src\samay\dataset.py
def prep_train_test_data(self, mode="train"):
    """Apply transforms on the data and add the past fields (past target, past observed target, etc)"""
    ts_fields = []
    if self.feat_dynamic_real_dim > 0:
        ts_fields.append("feat_dynamic_real")
        ts_fields.append("observed_feat_dynamic_real")
    past_ts_fields = []
    if self.past_feat_dynamic_real_dim > 0:
        past_ts_fields.append("past_feat_dynamic_real")
        past_ts_fields.append("past_observed_feat_dynamic_real")

    if mode == "train":
        # STEP 1: Apply the transforms on the data
        while self.train_transforms.transforms:
            t = self.train_transforms.transforms.pop(0)
            self.data = [t(x) for x in self.data]
        # STEP 2: Linearize the data and add the required fields
        transformed_data = []
        for x in self.data:
            transformed_data.extend(
                self.add_past_fields(
                    data=x,
                    mode="train",
                    ts_fields=ts_fields,
                    past_ts_fields=past_ts_fields,
                )
            )
        self.data = transformed_data
        # STEP 3: Convert the data to a MoiraiTorch object
        self.batched_data = MoiraiTorch(self.data)

    elif mode == "test":
        # STEP 1: Apply the transforms on the data
        data = [x[0] for x in self.data]  # only input part
        while self.test_transforms.transforms:
            t = self.test_transforms.transforms.pop(0)
            data = [t(x) for x in data]
        # STEP 2: Linearize the data and add the required fields
        transformed_data = []
        for x in data:
            transformed_data.extend(
                self.add_past_fields(
                    data=x,
                    mode="test",
                    ts_fields=ts_fields,
                    past_ts_fields=past_ts_fields,
                )
            )
        # STEP 3: Convert the data to a MoiraiTorch object
        self.batched_data = MoiraiTorch(transformed_data)

TinyTimeMixerDataset

Dataset class for TinyTimeMixer model.

TinyTimeMixerDataset(name=None, datetime_col='ds', path=None, boundaries=[0, 0, 0], batch_size=128, mode=None, stride=10, context_len=512, horizon_len=64)

Bases: BaseDataset

Dataset class for ChronosBolt model Data Format: Dict with keys: input_ts: np.ndarray, historical time series data actual_ts: np.ndarray, actual time series data

Source code in Samay\src\samay\dataset.py
def __init__(
    self,
    name=None,
    datetime_col="ds",
    path=None,
    boundaries=[0, 0, 0],
    batch_size=128,
    mode=None,
    stride=10,
    context_len=512,
    horizon_len=64,
):
    super().__init__(
        name=name,
        datetime_col=datetime_col,
        path=path,
        batchsize=batch_size,
        mode=mode,
    )
    # Todo: implement ChronosDataset

    self.context_len = context_len
    self.horizon_len = horizon_len
    self.mode = mode
    self.boundaries = boundaries
    self.stride = stride
    self.batchsize = batch_size
    self.max_col_num = 64

    self.pad = False
    self._read_data()

    self.one_chunk_num = (
        self.length_timeseries - self.context_len - self.horizon_len
    ) // self.stride + 1

BaseDataset

All datasets inherit from the base dataset class:

BaseDataset(name=None, datetime_col=None, path=None, batchsize=8, mode='train', **kwargs)

Parameters:

Name Type Description Default
name

str, dataset name

None
target

np.ndarray, target data

required
Source code in Samay\src\samay\dataset.py
def __init__(
    self,
    name=None,
    datetime_col=None,
    path=None,
    batchsize=8,
    mode="train",
    **kwargs,
):
    """
    Args:
        name: str, dataset name
        target: np.ndarray, target data
    """
    self.name = name
    self.datetime_col = datetime_col
    self.batchsize = batchsize
    self.mode = mode
    if path:
        self.data_path = path
    else:
        data_func = globals()[f"get_{self.name}_dataset"]
        self.data_path = data_func()

Usage Examples

Loading a Dataset

from samay.dataset import LPTMDataset

train_dataset = LPTMDataset(
    name="ett",
    datetime_col="date",
    path="./data/ETTh1.csv",
    mode="train",
    horizon=192,
    batchsize=16,
)

Custom Data Splits

# Specify exact boundaries
dataset = LPTMDataset(
    datetime_col="date",
    path="./data/ETTh1.csv",
    mode="train",
    horizon=192,
    boundaries=[0, 10000, 15000],  # Train: 0-10k, Val: 10k-15k, Test: 15k-end
)

Getting Data Loader

# Get PyTorch DataLoader
train_loader = train_dataset.get_data_loader()

for batch in train_loader:
    # Process batch
    pass

Accessing Dataset Properties

# Dataset length
print(f"Dataset size: {len(dataset)}")

# Get a single item
sample = dataset[0]

# Number of channels
print(f"Number of channels: {dataset.n_channels}")

# Sequence length
print(f"Sequence length: {dataset.seq_len}")

Denormalizing Predictions

# If dataset normalizes data
normalized_preds = model.evaluate(dataset)[2]

# Denormalize for interpretation
denormalized_preds = dataset._denormalize_data(normalized_preds)

Common Parameters

Most dataset classes share these common parameters:

Parameter Type Default Description
name str None Dataset name (for metadata)
datetime_col str Varies Name of the datetime column in CSV
path str Required Path to CSV file
mode str "train" Mode: "train" or "test"
batchsize int Varies Batch size for DataLoader
boundaries list [0, 0, 0] Custom train/val/test split indices
stride int 10 Stride for sliding window

Data Format Requirements

CSV Structure

All datasets expect CSV files with: 1. A datetime column (configurable name) 2. One or more value columns

Example:

date,HUFL,HULL,MUFL,MULL,LUFL,LULL,OT
2016-07-01 00:00:00,5.827,2.009,1.599,0.462,5.677,2.009,6.082
2016-07-01 01:00:00,5.693,2.076,1.492,0.426,5.485,1.942,5.947
...

Datetime Formats

Supported datetime formats: - ISO 8601: 2016-07-01 00:00:00 - Date only: 2016-07-01 - Custom formats (parsed by pandas)

Missing Values

  • Some datasets handle missing values automatically
  • Others require preprocessing
  • Check individual dataset documentation

Model-Specific Dataset Features

LPTMDataset

  • Supports forecasting, classification, and detection
  • Configurable sequence length (default: 512)
  • Adaptive segmentation
dataset = LPTMDataset(
    datetime_col="date",
    path="./data/ETTh1.csv",
    mode="train",
    horizon=192,
    seq_len=512,  # Configurable
    task_name="forecasting",
)

TimesfmDataset

  • Frequency specification
  • Optional normalization
  • Patch-based processing
dataset = TimesfmDataset(
    datetime_col="date",
    path="./data/ETTh1.csv",
    mode="train",
    context_len=512,
    horizon_len=192,
    freq="h",  # Frequency
    normalize=True,  # Optional normalization
)

MomentDataset

  • Multi-task support
  • Task-specific preprocessing
  • Label handling for classification
# Forecasting
dataset = MomentDataset(
    datetime_col="date",
    path="./data/ETTh1.csv",
    mode="train",
    horizon_len=192,
    task_name="forecasting",
)

# Classification
dataset = MomentDataset(
    datetime_col="date",
    path="./data/classification.csv",
    mode="train",
    task_name="classification",
    label_col="label",
)

ChronosDataset

  • Tokenization support
  • Configurable vocab size
  • Drop probability for training
from samay.models.chronosforecasting.chronos import ChronosConfig

config = ChronosConfig(
    context_length=512,
    prediction_length=64,
    # ... other configs
)

dataset = ChronosDataset(
    datetime_col="date",
    path="./data/ETTh1.csv",
    mode="train",
    config=config,
)

MoiraiDataset

  • Frequency specification (required)
  • Date range filtering
  • Built-in normalization
dataset = MoiraiDataset(
    datetime_col="date",
    path="./data/ETTh1.csv",
    mode="train",
    freq="h",  # Required
    context_len=128,
    horizon_len=64,
    start_date="2016-01-01",  # Optional
    end_date="2017-12-31",    # Optional
    normalize=True,
)

TinyTimeMixerDataset

  • Large batch support
  • Efficient windowing
  • Fast data loading
dataset = TinyTimeMixerDataset(
    datetime_col="date",
    path="./data/ETTh1.csv",
    mode="train",
    context_len=512,
    horizon_len=96,
    batch_size=128,  # Supports large batches
)

Common Methods

All datasets implement these methods:

__len__()

Returns the number of samples in the dataset.

__getitem__(idx)

Returns a single sample at the given index.

get_data_loader()

Returns a PyTorch DataLoader for the dataset.

Returns: - torch.utils.data.DataLoader

_denormalize_data(data)

Denormalizes data (if normalization was applied).

Parameters: - data (np.ndarray): Normalized data

Returns: - np.ndarray: Denormalized data


Data Split Strategies

Default Split

When boundaries=[0, 0, 0]: - Train: 60% of data - Validation: 20% of data - Test: 20% of data

Custom Split

# Specify exact indices
dataset = LPTMDataset(
    boundaries=[0, 10000, 15000],
    # Train: 0-10000
    # Val: 10000-15000
    # Test: 15000-end
)

Use All Data

# Use entire dataset for training
dataset = LPTMDataset(
    boundaries=[-1, -1, -1],
)

Performance Tips

1. Batch Size

Larger batch sizes improve throughput:

dataset = LPTMDataset(
    batchsize=64,  # Larger batch
    # ...
)

2. Stride

Smaller stride creates more samples but is slower:

# More samples (slower)
dataset = LPTMDataset(
    stride=1,
    # ...
)

# Fewer samples (faster)
dataset = LPTMDataset(
    stride=96,
    # ...
)

3. Normalization

Enable normalization for better performance:

dataset = TimesfmDataset(
    normalize=True,
    # ...
)


See Also