Sampler Protocol¤
Core protocol for data sampling.
See Also¤
- Core Overview - All core protocols
- Samplers - Sampler implementations
- Shuffle Sampler - Random sampling
- Data Sources Guide
datarax.core.sampler ¤
Base module for sampler components in Datarax.
This module defines the base class for all Datarax sampler components that use flax.nnx.Module for state management and JAX transformation compatibility.
SamplerModule ¤
SamplerModule(config: StructuralConfig, *, rngs: Rngs | None = None, name: str | None = None)
Bases: StructuralModule
Enhanced base module for all Datarax sampler components.
A SamplerModule determines the order in which records are accessed and processed. It handles global data transformations like shuffling and epoch management.
This class extends StructuralModule for non-parametric structural processing. Concrete samplers define their own config classes extending StructuralConfig.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
StructuralConfig
|
StructuralConfig or subclass with sampler-specific parameters |
required |
rngs
|
Rngs | None
|
Random number generators (required if stochastic=True) |
None
|
name
|
str | None
|
Optional name for the sampler |
None
|
Examples:
from dataclasses import dataclass from datarax.core.config import StructuralConfig from datarax.core.sampler import SamplerModule from flax import nnx
class SequentialSamplerConfig(StructuralConfig): num_records: int = 100 num_epochs: int = 1 SequentialSamplerConfig = dataclass(frozen=True)(SequentialSamplerConfig)
class SequentialSamplerModule(SamplerModule): def process(self, dataset_size): return list(range(min(self.config.num_records, dataset_size))) def iter(self): yield from self.process(100)
config = SequentialSamplerConfig(stochastic=False, num_records=10) sampler = SequentialSamplerModule(config, rngs=nnx.Rngs(0))
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
StructuralConfig
|
Sampler configuration (already validated) |
required |
rngs
|
Rngs | None
|
Random number generators (required if stochastic=True) |
None
|
name
|
str | None
|
Optional sampler name |
None
|
requires_rng_streams ¤
sample ¤
Return a list of sampled indices.
This method returns all indices that would be yielded by the iterator, collected into a list. This is useful when you need all indices upfront rather than iterating through them one by one.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
n
|
int
|
The number of indices to sample (typically the dataset size). |
required |
Returns:
| Type | Description |
|---|---|
list[int]
|
A list of sampled indices. |
Note
The default implementation simply collects all indices from the iterator. Subclasses may override this for more efficient implementations.
get_state ¤
set_state ¤
reset ¤
reset(seed: int | None = None) -> None
Reset the sampler state, typically used to start a new epoch.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
seed
|
int | None
|
Optional seed to use for shuffling or other random operations. If None, the sampler should use its default or previously set seed. |
None
|
index_spec ¤
index_spec() -> Any
Return a jax.ShapeDtypeStruct (or PyTree thereof) describing emitted indices.
The default implementation returns a scalar int32 spec, matching the
common case of one-index-per-call samplers (sequential, shuffle, range).
Specialized samplers (SlidingWindowSampler, BufferSampler)
override this to declare windowed or vectorized index shapes.
Returns:
| Type | Description |
|---|---|
Any
|
A |
Any
|
shape and dtype of one emitted index. |
get_operation_stats ¤
reset_operation_stats ¤
Reset operation statistics to zero.
Note: Creates new JAX arrays to reset the counters.
compute_statistics ¤
Compute statistics from data using batch_stats_fn.
If batch_stats_fn is not configured, returns None. Computed statistics are cached in _computed_stats.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Any
|
Input data to compute statistics from |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
Dictionary of statistics, or None if no batch_stats_fn configured |
get_statistics ¤
set_statistics ¤
reset_statistics ¤
Reset all statistics to None.
This clears both computed statistics and marks that precomputed_stats should be ignored (via internal flag). After reset, get_statistics() will return None until new statistics are set or computed.
copy ¤
copy(*, config: DataraxModuleConfig | None = None, rngs: Rngs | None = None, name: str | None = None) -> DataraxModule
Create a copy of this module with optional config/parameter changes.
This allows creating a new module instance with modified configuration while preserving other attributes. Useful for hyperparameter tuning.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
DataraxModuleConfig | None
|
New config (if None, uses current config) |
None
|
rngs
|
Rngs | None
|
New RNG state (if None, uses current rngs) |
None
|
name
|
str | None
|
New name (if None, uses current name) |
None
|
Returns:
| Type | Description |
|---|---|
DataraxModule
|
New module instance with updated parameters |
Examples:
Change configuration¤
new_config = DataraxModuleConfig(cacheable=True) new_module = module.copy(config=new_config)
Change name only¤
renamed = module.copy(name="new_name")
Note
Subclasses can override this method to provide more fine-grained control over copying, such as allowing individual config field updates without requiring dataclass replace().
clone ¤
clone() -> DataraxModule
Create a new instance with the same state as this module.
Uses NNX's clone function for proper deep cloning of all state.
Returns:
| Type | Description |
|---|---|
DataraxModule
|
A new module instance with the same state. |
ensure_rng_streams ¤
Ensure that the required RNG streams are available.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream_names
|
list[str]
|
A list of available RNG stream names. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If a required RNG stream is not available. |
process ¤
Process input structure.
This method transforms the structure/organization of input data without modifying the data values themselves.
Subclasses MUST implement this method.
The input/output types depend on the specific structural processor:
- Batcher: list[Element] -> list[Batch]
- Sampler: int -> list[int]
- Sharder: Batch -> Sharded[Batch]
- Splitter: Dataset -> tuple[Dataset, Dataset]
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input
|
Any
|
Input to process (type varies by processor) |
required |
*args
|
Any
|
Additional positional arguments (processor-specific) |
()
|
**kwargs
|
Any
|
Additional keyword arguments (processor-specific) |
{}
|
Returns:
| Type | Description |
|---|---|
Any
|
Processed output (type varies by processor) |
Examples:
Batcher implementation:
def process(self, elements: list[Element]) -> list[Batch]:
batches = []
for i in range(0, len(elements), self.config.batch_size):
batch_elements = elements[i:i + self.config.batch_size]
batches.append(Batch.from_elements(batch_elements))
return batches
Sampler implementation (deterministic):
def process(self, dataset_size: int) -> list[int]:
return list(range(min(self.config.num_samples, dataset_size)))
Sampler implementation (stochastic):