Epoch Aware Sampler¤
Track epochs and reshuffle at epoch boundaries.
See Also¤
- Samplers Overview - All sampling strategies
- Shuffle Sampler - Random shuffling
- Checkpointing - Save/restore state
- Monitoring - Track progress
datarax.samplers.epoch_aware_sampler ¤
Epoch-aware sampler with per-epoch callbacks and index tracking.
EpochAwareSamplerConfig
dataclass
¤
EpochAwareSamplerConfig(cacheable: bool = False, batch_stats_fn: Callable | Module | None = None, precomputed_stats: dict[str, Any] | None = None, stochastic: bool = False, stream_name: str | None = None, num_records: int | None = None, num_epochs: int = 1, shuffle: bool = True, seed: int = 42)
Bases: StructuralConfig
Configuration for EpochAwareSamplerModule.
Attributes:
| Name | Type | Description |
|---|---|---|
num_records |
int | None
|
Total number of records in the dataset (required) |
num_epochs |
int
|
Number of epochs to iterate (default: 1, -1 for infinite) |
shuffle |
bool
|
Whether to shuffle indices per epoch (default: True) |
seed |
int
|
Base seed for shuffling (default: 42, only used if shuffle=True) |
precomputed_stats
class-attribute
instance-attribute
¤
EpochAwareSamplerModule ¤
EpochAwareSamplerModule(config: EpochAwareSamplerConfig, *, rngs: Rngs | None = None, name: str | None = None)
Bases: SamplerModule
Sampler with explicit epoch boundary handling.
Manages epochs with different shuffling per epoch while maintaining state for checkpointing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
EpochAwareSamplerConfig
|
Configuration specifying num_records, epochs, shuffle, and seed. |
required |
rngs
|
Rngs | None
|
Optional Flax NNX random number generators. |
None
|
name
|
str | None
|
Optional module name for identification. |
None
|
epoch_complete_callbacks
instance-attribute
¤
add_epoch_callback ¤
add_epoch_callback(callback: Callable) -> None
Add callback for epoch completion.
reset ¤
reset(seed: int | None = None) -> None
Reset the sampler to initial state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
seed
|
int | None
|
Optional new base seed. If provided, changes the shuffle order for subsequent iterations. |
None
|
get_operation_stats ¤
reset_operation_stats ¤
Reset operation statistics to zero.
Note: Creates new JAX arrays to reset the counters.
compute_statistics ¤
Compute statistics from data using batch_stats_fn.
If batch_stats_fn is not configured, returns None. Computed statistics are cached in _computed_stats.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Any
|
Input data to compute statistics from |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
Dictionary of statistics, or None if no batch_stats_fn configured |
get_statistics ¤
set_statistics ¤
reset_statistics ¤
Reset all statistics to None.
This clears both computed statistics and marks that precomputed_stats should be ignored (via internal flag). After reset, get_statistics() will return None until new statistics are set or computed.
copy ¤
copy(*, config: DataraxModuleConfig | None = None, rngs: Rngs | None = None, name: str | None = None) -> DataraxModule
Create a copy of this module with optional config/parameter changes.
This allows creating a new module instance with modified configuration while preserving other attributes. Useful for hyperparameter tuning.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
DataraxModuleConfig | None
|
New config (if None, uses current config) |
None
|
rngs
|
Rngs | None
|
New RNG state (if None, uses current rngs) |
None
|
name
|
str | None
|
New name (if None, uses current name) |
None
|
Returns:
| Type | Description |
|---|---|
DataraxModule
|
New module instance with updated parameters |
Examples:
Change configuration¤
new_config = DataraxModuleConfig(cacheable=True) new_module = module.copy(config=new_config)
Change name only¤
renamed = module.copy(name="new_name")
Note
Subclasses can override this method to provide more fine-grained control over copying, such as allowing individual config field updates without requiring dataclass replace().
get_state ¤
set_state ¤
clone ¤
clone() -> DataraxModule
Create a new instance with the same state as this module.
Uses NNX's clone function for proper deep cloning of all state.
Returns:
| Type | Description |
|---|---|
DataraxModule
|
A new module instance with the same state. |
requires_rng_streams ¤
ensure_rng_streams ¤
Ensure that the required RNG streams are available.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream_names
|
list[str]
|
A list of available RNG stream names. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If a required RNG stream is not available. |
process ¤
Process input structure.
This method transforms the structure/organization of input data without modifying the data values themselves.
Subclasses MUST implement this method.
The input/output types depend on the specific structural processor:
- Batcher: list[Element] -> list[Batch]
- Sampler: int -> list[int]
- Sharder: Batch -> Sharded[Batch]
- Splitter: Dataset -> tuple[Dataset, Dataset]
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input
|
Any
|
Input to process (type varies by processor) |
required |
*args
|
Any
|
Additional positional arguments (processor-specific) |
()
|
**kwargs
|
Any
|
Additional keyword arguments (processor-specific) |
{}
|
Returns:
| Type | Description |
|---|---|
Any
|
Processed output (type varies by processor) |
Examples:
Batcher implementation:
def process(self, elements: list[Element]) -> list[Batch]:
batches = []
for i in range(0, len(elements), self.config.batch_size):
batch_elements = elements[i:i + self.config.batch_size]
batches.append(Batch.from_elements(batch_elements))
return batches
Sampler implementation (deterministic):
def process(self, dataset_size: int) -> list[int]:
return list(range(min(self.config.num_samples, dataset_size)))
Sampler implementation (stochastic):
sample ¤
Return a list of sampled indices.
This method returns all indices that would be yielded by the iterator, collected into a list. This is useful when you need all indices upfront rather than iterating through them one by one.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
n
|
int
|
The number of indices to sample (typically the dataset size). |
required |
Returns:
| Type | Description |
|---|---|
list[int]
|
A list of sampled indices. |
Note
The default implementation simply collects all indices from the iterator. Subclasses may override this for more efficient implementations.
index_spec ¤
index_spec() -> Any
Return a jax.ShapeDtypeStruct (or PyTree thereof) describing emitted indices.
The default implementation returns a scalar int32 spec, matching the
common case of one-index-per-call samplers (sequential, shuffle, range).
Specialized samplers (SlidingWindowSampler, BufferSampler)
override this to declare windowed or vectorized index shapes.
Returns:
| Type | Description |
|---|---|
Any
|
A |
Any
|
shape and dtype of one emitted index. |