Skip to content

Checkpoint Iterators¤

Iterate over checkpoints for analysis and restoration.

See Also¤


datarax.checkpoint.iterators ¤

Iterator checkpoint functionality for Datarax.

This module provides checkpoint handlers for data iterators and streams, supporting save and restore of iterator state for resumable iteration.

logger module-attribute ¤

logger = getLogger(__name__)

T_co module-attribute ¤

T_co = TypeVar('T_co', covariant=True)

IteratorCheckpoint ¤

IteratorCheckpoint(base_dir: str | Path, handler: OrbaxCheckpointHandler | None = None, async_checkpointing: bool = False)

Handler for checkpointing iterators.

This class provides methods for saving and restoring the state of iterators that implement the CheckpointableIterator interface.

Parameters:

Name Type Description Default
base_dir str | Path

Base directory to store checkpoints in.

required
handler OrbaxCheckpointHandler | None

Optional custom checkpoint handler to use. If None, a default OrbaxCheckpointHandler will be created.

None
async_checkpointing bool

Whether to enable asynchronous checkpointing.

False

base_dir instance-attribute ¤

base_dir = Path(base_dir)

handler instance-attribute ¤

handler = handler or OrbaxCheckpointHandler(async_checkpointing=async_checkpointing)

save_to_directory ¤

save_to_directory(iterator: CheckpointableIterator[T_co], step: int | None = None, keep: int = 1, overwrite: bool = False, metadata: dict[str, int | str | float | bool] | None = None) -> str

Save the state of an iterator to the checkpoint directory.

Parameters:

Name Type Description Default
iterator CheckpointableIterator[T_co]

The iterator to checkpoint.

required
step int | None

Optional step number for versioning.

None
keep int

Number of checkpoints to keep.

1
overwrite bool

Whether to overwrite existing checkpoints.

False
metadata dict[str, int | str | float | bool] | None

Optional metadata to save with checkpoint.

None

Returns:

Type Description
str

Path to the saved checkpoint.

Raises:

Type Description
ValueError

If the iterator does not implement get_state.

restore ¤

restore(iterator: CheckpointableIterator[T_co], step: int | None = None) -> CheckpointableIterator[T_co]

Restore the state of an iterator from a checkpoint.

Parameters:

Name Type Description Default
iterator CheckpointableIterator[T_co]

The iterator to restore state into.

required
step int | None

Optional step number to restore from. If None, uses latest.

None

Returns:

Type Description
CheckpointableIterator[T_co]

The restored iterator.

Raises:

Type Description
ValueError

If the iterator does not implement set_state or if the checkpoint cannot be restored.

restore_latest ¤

restore_latest(iterator: CheckpointableIterator[T_co]) -> CheckpointableIterator[T_co]

Restore from the latest available checkpoint.

Parameters:

Name Type Description Default
iterator CheckpointableIterator[T_co]

The iterator to restore state into.

required

Returns:

Type Description
CheckpointableIterator[T_co]

The restored iterator.

Raises:

Type Description
ValueError

If no checkpoints are found or restoration fails.

get_latest_step ¤

get_latest_step() -> int | None

Get the latest checkpoint step.

Returns:

Type Description
int | None

The latest checkpoint step, or None if no checkpoints are found.

list_checkpoints ¤

list_checkpoints() -> dict[int, str]

List all available checkpoints.

Returns:

Type Description
dict[int, str]

A dictionary mapping step numbers to checkpoint paths.

has_checkpoint ¤

has_checkpoint() -> bool

Check if any checkpoints exist.

Returns:

Type Description
bool

True if at least one checkpoint exists, False otherwise.

PipelineCheckpoint ¤

PipelineCheckpoint(base_dir: str | Path, handler: OrbaxCheckpointHandler | None = None, async_checkpointing: bool = False)

Bases: IteratorCheckpoint

Specialized checkpoint handler for Pipeline objects.

This class extends IteratorCheckpoint with Pipeline-specific functionality like conditional saving based on step intervals.

Parameters:

Name Type Description Default
base_dir str | Path

Base directory to store checkpoints in.

required
handler OrbaxCheckpointHandler | None

Optional custom checkpoint handler to use. If None, a default OrbaxCheckpointHandler will be created.

None
async_checkpointing bool

Whether to enable asynchronous checkpointing.

False

base_dir instance-attribute ¤

base_dir = Path(base_dir)

handler instance-attribute ¤

handler = handler or OrbaxCheckpointHandler(async_checkpointing=async_checkpointing)

save_to_step ¤

save_to_step(data_stream: CheckpointableIterator[T_co], step: int, interval: int = 1000, keep: int = 5, overwrite: bool = False, metadata: dict[str, int | str | float | bool] | None = None) -> str | None

Save a checkpoint conditionally based on the step.

Only saves when the step is a multiple of the interval.

Parameters:

Name Type Description Default
data_stream CheckpointableIterator[T_co]

The data stream to checkpoint.

required
step int

Current step number.

required
interval int

Step interval for saving.

1000
keep int

Number of checkpoints to keep.

5
overwrite bool

Whether to overwrite existing checkpoints.

False
metadata dict[str, int | str | float | bool] | None

Optional metadata to store with the checkpoint.

None

Returns:

Type Description
str | None

Path to the saved checkpoint or None if not saved.

save_to_directory ¤

save_to_directory(iterator: CheckpointableIterator[T_co], step: int | None = None, keep: int = 1, overwrite: bool = False, metadata: dict[str, int | str | float | bool] | None = None) -> str

Save the state of an iterator to the checkpoint directory.

Parameters:

Name Type Description Default
iterator CheckpointableIterator[T_co]

The iterator to checkpoint.

required
step int | None

Optional step number for versioning.

None
keep int

Number of checkpoints to keep.

1
overwrite bool

Whether to overwrite existing checkpoints.

False
metadata dict[str, int | str | float | bool] | None

Optional metadata to save with checkpoint.

None

Returns:

Type Description
str

Path to the saved checkpoint.

Raises:

Type Description
ValueError

If the iterator does not implement get_state.

restore ¤

restore(iterator: CheckpointableIterator[T_co], step: int | None = None) -> CheckpointableIterator[T_co]

Restore the state of an iterator from a checkpoint.

Parameters:

Name Type Description Default
iterator CheckpointableIterator[T_co]

The iterator to restore state into.

required
step int | None

Optional step number to restore from. If None, uses latest.

None

Returns:

Type Description
CheckpointableIterator[T_co]

The restored iterator.

Raises:

Type Description
ValueError

If the iterator does not implement set_state or if the checkpoint cannot be restored.

restore_latest ¤

restore_latest(iterator: CheckpointableIterator[T_co]) -> CheckpointableIterator[T_co]

Restore from the latest available checkpoint.

Parameters:

Name Type Description Default
iterator CheckpointableIterator[T_co]

The iterator to restore state into.

required

Returns:

Type Description
CheckpointableIterator[T_co]

The restored iterator.

Raises:

Type Description
ValueError

If no checkpoints are found or restoration fails.

get_latest_step ¤

get_latest_step() -> int | None

Get the latest checkpoint step.

Returns:

Type Description
int | None

The latest checkpoint step, or None if no checkpoints are found.

list_checkpoints ¤

list_checkpoints() -> dict[int, str]

List all available checkpoints.

Returns:

Type Description
dict[int, str]

A dictionary mapping step numbers to checkpoint paths.

has_checkpoint ¤

has_checkpoint() -> bool

Check if any checkpoints exist.

Returns:

Type Description
bool

True if at least one checkpoint exists, False otherwise.