Datasets API Reference
This page provides detailed API documentation for all dataset classes in Samay.
LPTMDataset
Dataset class for LPTM model.
LPTMDataset
Bases: BaseDataset
Dataset class for Moment model
Data Format:
Dict with keys:
input_ts: np.ndarray, historical time series data
actual_ts: np.ndarray, actual time series data
Source code in Samay\src\samay\dataset.py
| def __init__(
self,
name=None,
datetime_col=None,
path=None,
batchsize=16,
mode="train",
boundaries=[0, 0, 0],
horizon=0,
task_name="forecasting",
label_col=None,
stride=10,
seq_len=512,
**kwargs,
):
super().__init__(
name=name,
datetime_col=datetime_col,
path=path,
batchsize=batchsize,
mode=mode,
)
self.task_name = task_name
self.label_col = "label" if label_col is None else label_col
self.seq_len = seq_len
self.stride = stride
self.forecast_horizon = horizon
self.boundaries = boundaries
self.max_col_num = 64
self.pad = False
self._read_data()
self.one_chunk_num = (
self.length_timeseries - self.seq_len - self.forecast_horizon
) // self.stride + 1
|
TimesfmDataset
Dataset class for TimesFM model.
TimesfmDataset
Bases: BaseDataset
Dataset class for TimesFM model
Data Format:
Dict with keys:
input_ts: np.ndarray, historical time series data
actual_ts: np.ndarray, actual time series data
Source code in Samay\src\samay\dataset.py
| def __init__(
self,
name=None,
datetime_col="ds",
path=None,
batchsize=4,
mode="train",
boundaries=(0, 0, 0),
context_len=128,
horizon_len=32,
freq="h",
normalize=False,
stride=10,
**kwargs,
):
super().__init__(
name=name,
datetime_col=datetime_col,
path=path,
batchsize=batchsize,
mode=mode,
)
self.context_len = context_len
self.horizon_len = horizon_len
self.freq = freq
self.normalize = normalize
self.stride = stride
self.data = pd.read_csv(self.data_path)
if boundaries == (0, 0, 0):
# Default boundaries: train 50%, val 20%, test 30%
self.boundaries = [
int(len(self.data) * 0.5),
int(len(self.data) * 0.7),
len(self.data) - 1,
]
elif boundaries == (-1, -1, -1):
# use all data for training
self.boundaries = [0, 0, len(self.data) - 1]
else:
self.boundaries = boundaries
self.horizon_len = min(self.horizon_len, int(0.3 * len(self.data) + 1))
self.ts_cols = [col for col in self.data.columns if col != self.datetime_col]
tfdtl = TimeSeriesdata(
data_path=self.data_path,
datetime_col=self.datetime_col,
num_cov_cols=None,
cat_cov_cols=None,
ts_cols=np.array(self.ts_cols),
train_range=[0, self.boundaries[0]],
val_range=[self.boundaries[0], self.boundaries[1]],
test_range=[self.boundaries[1], self.boundaries[2]],
hist_len=self.context_len,
pred_len=self.horizon_len,
batch_size=64,
freq=self.freq,
normalize=self.normalize,
epoch_len=None,
holiday=False,
permute=False,
)
if self.normalize:
self.scaler = tfdtl.scaler
self.num_ts = len(self.ts_cols)
if self.mode == "train":
tfset = tfdtl.torch_dataset(mode="train", shift=self.stride)
else:
tfset = tfdtl.torch_dataset(mode="test", shift=self.horizon_len)
self.dataset = tfset
|
MomentDataset
Dataset class for MOMENT model supporting multiple tasks.
MomentDataset
Bases: BaseDataset
Dataset class for Moment model
Data Format:
Dict with keys:
input_ts: np.ndarray, historical time series data
actual_ts: np.ndarray, actual time series data
Source code in Samay\src\samay\dataset.py
| def __init__(
self,
name=None,
datetime_col=None,
path=None,
batchsize=64,
mode="train",
boundaries=[0, 0, 0],
horizon_len=0,
task_name="forecasting",
label_col=None,
stride=10,
**kwargs,
):
super().__init__(
name=name,
datetime_col=datetime_col,
path=path,
batchsize=batchsize,
mode=mode,
)
self.task_name = task_name
self.label_col = "label" if label_col is None else label_col
self.mode = mode
self.seq_len = 512
self.stride = (
stride if (self.mode == "train" or horizon_len == 0) else horizon_len
)
self.forecast_horizon = horizon_len
self.boundaries = boundaries
self.max_col_num = 64
self.pad = False
self._read_data()
self.one_chunk_num = (
self.length_timeseries - self.seq_len - self.forecast_horizon
) // self.stride + 1
|
ChronosDataset
Dataset class for Chronos model with tokenization.
ChronosDataset
Bases: BaseDataset
Dataset class for Chronos model
Data Format:
Dict with keys:
input_ts: np.ndarray, historical time series data
actual_ts: np.ndarray, actual time series data
Source code in Samay\src\samay\dataset.py
| def __init__(
self,
name=None,
datetime_col="ds",
path=None,
boundaries=[0, 0, 0],
batch_size=16,
mode=None,
stride=10,
tokenizer_class="MeanScaleUniformBins",
drop_prob=0.2,
min_past=64,
np_dtype=np.float32,
config=None,
):
super().__init__(
name=name,
datetime_col=datetime_col,
path=path,
batchsize=batch_size,
mode=mode,
)
# Todo: implement ChronosDataset
assert tokenizer_class is not None, "Tokenizer is required for ChronosDataset"
if not config:
self.config = ChronosConfig(
tokenizer_class="MeanScaleUniformBins",
tokenizer_kwargs={"low_limit": -15.0, "high_limit": 15.0},
n_tokens=4096,
n_special_tokens=2,
pad_token_id=0,
eos_token_id=1,
use_eos_token=True,
model_type="seq2seq",
context_length=512,
prediction_length=64,
num_samples=20,
temperature=1.0,
top_k=50,
top_p=1.0,
)
else:
self.config = ChronosConfig(**config)
assert type(self.config) == ChronosConfig, (
"Config must be an instance of ChronosConfig"
)
assert self.config.model_type in ("seq2seq", "causal"), (
"Model type must be either 'seq2seq' or 'causal'"
)
self.context_len = self.config.context_length
self.horizon_len = self.config.prediction_length
self.drop_prob = drop_prob if self.config.model_type == "seq2seq" else 0.0
self.min_past = min_past or self.config.prediction_length
self.model_type = self.config.model_type
self.mode = mode
self.np_dtype = np_dtype
self.boundaries = boundaries
self.stride = stride
self.batchsize = batch_size
self.max_col_num = 16
self.pad = False
self._read_data()
self.preprocess()
self.one_chunk_num = (
self.length_timeseries - self.context_len - self.horizon_len
) // self.stride + 1
|
MoiraiDataset
Dataset class for MOIRAI model with frequency support.
MoiraiDataset
Bases: BaseDataset
Dataset class for Moirai model.
It ingests data in the form of a (num_variates x num_timesteps) matrix.
Source code in Samay\src\samay\dataset.py
| def __init__(
self,
name=None,
datetime_col="date",
path=None,
boundaries=(0, 0, 0),
context_len=128,
horizon_len=32,
patch_size=16,
batch_size=16,
freq=None,
start_date=None,
end_date=None,
operation="mean",
normalize=True,
mode="train",
htune=False, # hyperparameter tuning
data_config=None,
**kwargs,
):
super().__init__(
name=name,
datetime_col=datetime_col,
path=path,
batchsize=batch_size,
mode=mode,
)
self.context_len = context_len
self.horizon_len = horizon_len
self.patch_size = patch_size
self.batch_size = batch_size
self.mode = mode
self.htune = htune
self.boundaries = boundaries
self.normalize = normalize
self.kwargs = kwargs
if data_config:
self.target_dim = data_config.get("target_dim", 1)
self.feat_dynamic_real_dim = data_config.get("feat_dynamic_real_dim", 0)
self.past_feat_dynamic_real_dim = data_config.get(
"past_feat_dynamic_real_dim", 0
)
else:
self.target_dim = 1
self.feat_dynamic_real_dim = 0
self.past_feat_dynamic_real_dim = 0
self._read_data() # read from path into a pandas dataframe
# Preprocess the data - infer freq, take subset or normalize
self._preprocess(
start_date=start_date, end_date=end_date, freq=freq, operation=operation
)
self.start_date = self.dataset.index[0]
self.train_transforms = self.default_transforms()
self.test_transforms = self.default_transforms()
# Split the dataset into train, val, test
if self.mode == "train": # no windowing
self.dataset = self.dataset[: self.boundaries[0]]
self.gen_train_val_data()
elif self.mode == "val": # no windowing
self.dataset = self.dataset[self.boundaries[0] : self.boundaries[1]]
self.gen_train_val_data()
elif self.mode == "test":
# whole dataset sent
self.gen_test_data()
else:
raise ValueError(f"Unsupported mode: {self.mode}")
|
add_past_fields
Add the following fields:
(a) past_target: The past target data
(b) past_observed_target: The past target data with missing values indicator
(c) past_is_pad: Indicates if the added value was a padding value
(d) past_feat_dynamic_real: The past dynamic real features
(e) past_observed_feat_dynamic_real: The past dynamic real features with missing values indicator
Source code in Samay\src\samay\dataset.py
| def add_past_fields(
self,
data: dict,
ts_fields: list = [],
past_ts_fields: list = [],
dummy_val: float = 0.0,
lead_time: int = 0,
target_field: str = "target",
is_pad_field: str = "is_pad",
observed_value_field: str = "observed_target",
start_field: str = "start",
forecast_start_field: str = "forecast_start",
output_NTC: bool = True,
mode="train",
):
"""Add the following fields:
(a) past_target: The past target data
(b) past_observed_target: The past target data with missing values indicator
(c) past_is_pad: Indicates if the added value was a padding value
(d) past_feat_dynamic_real: The past dynamic real features
(e) past_observed_feat_dynamic_real: The past dynamic real features with missing values indicator
"""
pred_len = self.horizon_len
target = data[target_field]
num_windows = 1 + ((target.shape[-1] - self.past_length) // pred_len)
# Sample indices from the target field using the instance sampler
if mode == "train":
sampled_indices = [
self.past_length + i * pred_len for i in range(num_windows + 1)
]
elif mode == "test":
sampled_indices = custom_train_instance_split(target)
else:
raise ValueError(f"Unsupported mode: {mode}")
# Columns to be sliced
slice_cols = ts_fields + past_ts_fields + [target_field, observed_value_field]
transformed_data = []
# Iterate over the sampled indices
for i in range(len(sampled_indices)):
idx = sampled_indices[i]
# Calculate the padding length if the index is less than past_length
d = data.copy()
pad_length = max(
0,
self.past_length
- d[target_field][..., (idx - self.past_length) : idx].shape[-1],
)
# Iterate over the fields to be sliced
for field in slice_cols:
# Slice the past piece of the field
if pad_length == 0:
past_piece = d[field][..., (idx - self.past_length) : idx]
else:
pad_block = np.full(
shape=d[field].shape[:-1] + (pad_length,),
fill_value=dummy_val,
dtype=d[field].dtype,
)
past_piece = np.concatenate(
[pad_block, d[field][..., (idx - self.past_length) : idx]],
axis=-1,
)
# # Slice the future piece of the field
# future_piece = d[field][..., (idx + lead_time) : (idx + lead_time + pred_len)]
future_piece = np.full(
shape=d[field].shape[:-1] + (pred_len,),
fill_value=dummy_val,
dtype=d[field].dtype,
)
# If the field is in time series fields, concatenate past and future pieces
if field in ts_fields:
piece = np.concatenate([past_piece, future_piece], axis=-1)
if output_NTC:
piece = piece.transpose()
d[field] = piece
else:
if output_NTC:
past_piece = past_piece.transpose()
# future_piece = future_piece.transpose()
if field not in past_ts_fields:
d["past_" + field] = past_piece
# d["future_" + field] = future_piece
del d[field]
else:
d[field] = past_piece
# Create a padding indicator for the past piece
pad_indicator = np.zeros(self.past_length)
if pad_length > 0:
pad_indicator[:pad_length] = 1
d["past_" + (is_pad_field)] = pad_indicator
# Set the forecast start field
d[forecast_start_field] = (d[start_field] + idx + lead_time).to_timestamp()
# Append the transformed data
transformed_data.append(d)
# Return the transformed data
return transformed_data
|
Default transformations for the dataset
Source code in Samay\src\samay\dataset.py
| def default_transforms(self) -> transforms.Compose:
"""Default transformations for the dataset"""
transforms_list = []
# Convert the target data to numpy array
transforms_list.append(
AsNumpy(
field="target",
expected_ndim=1 if self.target_dim == 1 else 2,
dtype=np.float32,
)
)
if self.target_dim == 1:
# Fix missing values
transforms_list.append(
AddObservedValues(
target_field="target",
output_field="observed_target",
imputation_method=CausalMeanNaNFix(),
dtype=bool,
)
)
# Add dimension to target
transforms_list.append(ArrExpandDims(field="target", axis=0))
transforms_list.append(ArrExpandDims(field="observed_target", axis=0))
else:
transforms_list.append(
AddObservedValues(
target_field="target",
output_field="observed_target",
dtype=bool,
)
)
if self.feat_dynamic_real_dim > 0:
transforms_list.append(
AsNumpy(
field="feat_dynamic_real",
expected_ndim=2,
dtype=np.float32,
)
)
transforms_list.append(
AddObservedValues(
target_field="feat_dynamic_real",
output_field="observed_feat_dynamic_real",
dtype=bool,
)
)
if self.past_feat_dynamic_real_dim > 0:
transforms_list.append(
AsNumpyArray(
field="past_feat_dynamic_real",
expected_ndim=2,
dtype=np.float32,
)
)
transforms_list.append(
AddObservedValuesIndicator(
target_field="past_feat_dynamic_real",
output_field="past_observed_feat_dynamic_real",
dtype=bool,
)
)
# Convert list of tranforms to a single transformation
comp_transform = transforms.Compose(transforms_list)
return comp_transform
|
gen_test_data
Generates test data based on the boundaries
Returns:
Source code in Samay\src\samay\dataset.py
| def gen_test_data(self):
"""Generates test data based on the boundaries
Returns:
np.ndarray: Test data
"""
data = []
num_windows = (
1
if (self.dataset.shape[0] - self.boundaries[1]) < self.horizon_len + self.context_len
else (self.dataset.shape[0] - self.boundaries[1] - self.context_len) // self.horizon_len
)
for i in range(self.dataset.shape[1]):
for j in range(num_windows):
if j == num_windows - 1:
start_idx = self.dataset.shape[0] - self.horizon_len
else:
start_idx = self.boundaries[1] + self.context_len + j * self.horizon_len
end_idx = start_idx + self.horizon_len
data.append(
(
{ # input
"start": Period(self.start_date, freq=freq_mapping(self.freq)),
"target": self.dataset.iloc[max(0, start_idx-self.context_len):start_idx, i].values,
"item_id": self.dataset.columns[i],
},
{ # label
"start": Period(self.start_date, freq=freq_mapping(self.freq)),
"target": self.dataset.iloc[start_idx:end_idx, i].values,
"item_id": self.dataset.columns[i],
},
)
)
self.dataset = MoiraiTorch(data)
self.data = data
|
gen_train_val_data
Generates training and validation data based on the boundaries
Returns:
| Type |
Description |
|
|
np.ndarray: Training and Validation data
|
Source code in Samay\src\samay\dataset.py
| def gen_train_val_data(self):
"""Generates training and validation data based on the boundaries
Returns:
np.ndarray: Training and Validation data
"""
data = []
# Each column is a separate time series
# Each time series is appended to the data list
for i in range(self.dataset.shape[1]):
data.append(
{
"start": Period(self.start_date, freq=freq_mapping(self.freq)),
"target": self.dataset.iloc[:, i].values,
"item_id": self.dataset.columns[i],
}
)
self.dataset = MoiraiTorch(data)
self.data = data
|
get_dataloader
Returns the iterator for data batches for the dataset based on the mode
Returns:
| Type |
Description |
|
|
torch.utils.data.DataLoader: Depends on the mode
|
Source code in Samay\src\samay\dataset.py
| def get_dataloader(self):
"""Returns the iterator for data batches for the dataset based on the mode
Returns:
torch.utils.data.DataLoader: Depends on the mode
"""
if self.mode == "train":
self.prep_train_test_data(mode="train")
if self.kwargs:
batch_size = self.kwargs.get("batch_size", self.batch_size)
num_workers = self.kwargs.get("num_workers", 0)
pin_memory = self.kwargs.get("pin_memory", False)
persistent_workers = self.kwargs.get("persistent_workers", False)
return DataLoader(
self.batched_data,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers,
pin_memory=pin_memory,
persistent_workers=persistent_workers,
)
return DataLoader(
self.batched_data, batch_size=self.batch_size, shuffle=True
)
else:
self.prep_train_test_data(mode="test")
return DataLoader(
self.batched_data, batch_size=self.batch_size, shuffle=False
)
|
prep_train_test_data
Apply transforms on the data and add the past fields (past target, past observed target, etc)
Source code in Samay\src\samay\dataset.py
| def prep_train_test_data(self, mode="train"):
"""Apply transforms on the data and add the past fields (past target, past observed target, etc)"""
ts_fields = []
if self.feat_dynamic_real_dim > 0:
ts_fields.append("feat_dynamic_real")
ts_fields.append("observed_feat_dynamic_real")
past_ts_fields = []
if self.past_feat_dynamic_real_dim > 0:
past_ts_fields.append("past_feat_dynamic_real")
past_ts_fields.append("past_observed_feat_dynamic_real")
if mode == "train":
# STEP 1: Apply the transforms on the data
while self.train_transforms.transforms:
t = self.train_transforms.transforms.pop(0)
self.data = [t(x) for x in self.data]
# STEP 2: Linearize the data and add the required fields
transformed_data = []
for x in self.data:
transformed_data.extend(
self.add_past_fields(
data=x,
mode="train",
ts_fields=ts_fields,
past_ts_fields=past_ts_fields,
)
)
self.data = transformed_data
# STEP 3: Convert the data to a MoiraiTorch object
self.batched_data = MoiraiTorch(self.data)
elif mode == "test":
# STEP 1: Apply the transforms on the data
data = [x[0] for x in self.data] # only input part
while self.test_transforms.transforms:
t = self.test_transforms.transforms.pop(0)
data = [t(x) for x in data]
# STEP 2: Linearize the data and add the required fields
transformed_data = []
for x in data:
transformed_data.extend(
self.add_past_fields(
data=x,
mode="test",
ts_fields=ts_fields,
past_ts_fields=past_ts_fields,
)
)
# STEP 3: Convert the data to a MoiraiTorch object
self.batched_data = MoiraiTorch(transformed_data)
|
TinyTimeMixerDataset
Dataset class for TinyTimeMixer model.
TinyTimeMixerDataset
Bases: BaseDataset
Dataset class for ChronosBolt model
Data Format:
Dict with keys:
input_ts: np.ndarray, historical time series data
actual_ts: np.ndarray, actual time series data
Source code in Samay\src\samay\dataset.py
| def __init__(
self,
name=None,
datetime_col="ds",
path=None,
boundaries=[0, 0, 0],
batch_size=128,
mode=None,
stride=10,
context_len=512,
horizon_len=64,
):
super().__init__(
name=name,
datetime_col=datetime_col,
path=path,
batchsize=batch_size,
mode=mode,
)
# Todo: implement ChronosDataset
self.context_len = context_len
self.horizon_len = horizon_len
self.mode = mode
self.boundaries = boundaries
self.stride = stride
self.batchsize = batch_size
self.max_col_num = 64
self.pad = False
self._read_data()
self.one_chunk_num = (
self.length_timeseries - self.context_len - self.horizon_len
) // self.stride + 1
|
BaseDataset
All datasets inherit from the base dataset class:
BaseDataset
Parameters:
| Name |
Type |
Description |
Default |
name
|
|
|
None
|
target
|
|
|
required
|
Source code in Samay\src\samay\dataset.py
| def __init__(
self,
name=None,
datetime_col=None,
path=None,
batchsize=8,
mode="train",
**kwargs,
):
"""
Args:
name: str, dataset name
target: np.ndarray, target data
"""
self.name = name
self.datetime_col = datetime_col
self.batchsize = batchsize
self.mode = mode
if path:
self.data_path = path
else:
data_func = globals()[f"get_{self.name}_dataset"]
self.data_path = data_func()
|
Usage Examples
Loading a Dataset
from samay.dataset import LPTMDataset
train_dataset = LPTMDataset(
name="ett",
datetime_col="date",
path="./data/ETTh1.csv",
mode="train",
horizon=192,
batchsize=16,
)
Custom Data Splits
# Specify exact boundaries
dataset = LPTMDataset(
datetime_col="date",
path="./data/ETTh1.csv",
mode="train",
horizon=192,
boundaries=[0, 10000, 15000], # Train: 0-10k, Val: 10k-15k, Test: 15k-end
)
Getting Data Loader
# Get PyTorch DataLoader
train_loader = train_dataset.get_data_loader()
for batch in train_loader:
# Process batch
pass
Accessing Dataset Properties
# Dataset length
print(f"Dataset size: {len(dataset)}")
# Get a single item
sample = dataset[0]
# Number of channels
print(f"Number of channels: {dataset.n_channels}")
# Sequence length
print(f"Sequence length: {dataset.seq_len}")
Denormalizing Predictions
# If dataset normalizes data
normalized_preds = model.evaluate(dataset)[2]
# Denormalize for interpretation
denormalized_preds = dataset._denormalize_data(normalized_preds)
Common Parameters
Most dataset classes share these common parameters:
| Parameter |
Type |
Default |
Description |
name |
str |
None |
Dataset name (for metadata) |
datetime_col |
str |
Varies |
Name of the datetime column in CSV |
path |
str |
Required |
Path to CSV file |
mode |
str |
"train" |
Mode: "train" or "test" |
batchsize |
int |
Varies |
Batch size for DataLoader |
boundaries |
list |
[0, 0, 0] |
Custom train/val/test split indices |
stride |
int |
10 |
Stride for sliding window |
CSV Structure
All datasets expect CSV files with:
1. A datetime column (configurable name)
2. One or more value columns
Example:
date,HUFL,HULL,MUFL,MULL,LUFL,LULL,OT
2016-07-01 00:00:00,5.827,2.009,1.599,0.462,5.677,2.009,6.082
2016-07-01 01:00:00,5.693,2.076,1.492,0.426,5.485,1.942,5.947
...
Supported datetime formats:
- ISO 8601: 2016-07-01 00:00:00
- Date only: 2016-07-01
- Custom formats (parsed by pandas)
Missing Values
- Some datasets handle missing values automatically
- Others require preprocessing
- Check individual dataset documentation
Model-Specific Dataset Features
LPTMDataset
- Supports forecasting, classification, and detection
- Configurable sequence length (default: 512)
- Adaptive segmentation
dataset = LPTMDataset(
datetime_col="date",
path="./data/ETTh1.csv",
mode="train",
horizon=192,
seq_len=512, # Configurable
task_name="forecasting",
)
TimesfmDataset
- Frequency specification
- Optional normalization
- Patch-based processing
dataset = TimesfmDataset(
datetime_col="date",
path="./data/ETTh1.csv",
mode="train",
context_len=512,
horizon_len=192,
freq="h", # Frequency
normalize=True, # Optional normalization
)
MomentDataset
- Multi-task support
- Task-specific preprocessing
- Label handling for classification
# Forecasting
dataset = MomentDataset(
datetime_col="date",
path="./data/ETTh1.csv",
mode="train",
horizon_len=192,
task_name="forecasting",
)
# Classification
dataset = MomentDataset(
datetime_col="date",
path="./data/classification.csv",
mode="train",
task_name="classification",
label_col="label",
)
ChronosDataset
- Tokenization support
- Configurable vocab size
- Drop probability for training
from samay.models.chronosforecasting.chronos import ChronosConfig
config = ChronosConfig(
context_length=512,
prediction_length=64,
# ... other configs
)
dataset = ChronosDataset(
datetime_col="date",
path="./data/ETTh1.csv",
mode="train",
config=config,
)
MoiraiDataset
- Frequency specification (required)
- Date range filtering
- Built-in normalization
dataset = MoiraiDataset(
datetime_col="date",
path="./data/ETTh1.csv",
mode="train",
freq="h", # Required
context_len=128,
horizon_len=64,
start_date="2016-01-01", # Optional
end_date="2017-12-31", # Optional
normalize=True,
)
TinyTimeMixerDataset
- Large batch support
- Efficient windowing
- Fast data loading
dataset = TinyTimeMixerDataset(
datetime_col="date",
path="./data/ETTh1.csv",
mode="train",
context_len=512,
horizon_len=96,
batch_size=128, # Supports large batches
)
Common Methods
All datasets implement these methods:
__len__()
Returns the number of samples in the dataset.
__getitem__(idx)
Returns a single sample at the given index.
get_data_loader()
Returns a PyTorch DataLoader for the dataset.
Returns:
- torch.utils.data.DataLoader
_denormalize_data(data)
Denormalizes data (if normalization was applied).
Parameters:
- data (np.ndarray): Normalized data
Returns:
- np.ndarray: Denormalized data
Data Split Strategies
Default Split
When boundaries=[0, 0, 0]:
- Train: 60% of data
- Validation: 20% of data
- Test: 20% of data
Custom Split
# Specify exact indices
dataset = LPTMDataset(
boundaries=[0, 10000, 15000],
# Train: 0-10000
# Val: 10000-15000
# Test: 15000-end
)
Use All Data
# Use entire dataset for training
dataset = LPTMDataset(
boundaries=[-1, -1, -1],
)
1. Batch Size
Larger batch sizes improve throughput:
dataset = LPTMDataset(
batchsize=64, # Larger batch
# ...
)
2. Stride
Smaller stride creates more samples but is slower:
# More samples (slower)
dataset = LPTMDataset(
stride=1,
# ...
)
# Fewer samples (faster)
dataset = LPTMDataset(
stride=96,
# ...
)
3. Normalization
Enable normalization for better performance:
dataset = TimesfmDataset(
normalize=True,
# ...
)
See Also