Skip to content

Monitoring¤

Metrics collection and observability for data pipelines. Track throughput, latency, and custom metrics during training and inference.

Components¤

Component Purpose Use Case
Metrics Collect measurements Throughput, latency, custom
Callbacks Event hooks Log on batch/epoch end
Reporters Output backends Console, file, TensorBoard

★ Insight ─────────────────────────────────────

  • Metrics are lightweight - minimal overhead
  • Use callbacks for automated logging
  • Reporters support multiple backends simultaneously

─────────────────────────────────────────────────

Quick Start¤

from datarax.monitoring import MetricsCollector

collector = MetricsCollector()

# Track metrics during training
for step, batch in enumerate(pipeline):
    loss = train_step(batch)
    collector.record_metric("loss", float(loss))
    collector.record_metric("throughput", batch_size / elapsed)

# Inspect collected records
for record in collector.get_metrics():
    print(record.name, record.value)

Modules¤

  • metrics - Core metrics collection and storage
  • callbacks - Event-driven monitoring callbacks
  • reporters - Output backends (console, file, etc.)

Callback Example¤

from datarax.monitoring import (
    CallbackRegistry,
    ConsoleReporter,
    MetricsCollector,
    MetricsObserver,
)
from datarax.monitoring.metrics import MetricRecord

collector = MetricsCollector()
registry = CallbackRegistry()
registry.register(ConsoleReporter(report_interval=1.0))


class LossPrinter(MetricsObserver):
    def update(self, metrics: list[MetricRecord]) -> None:
        for record in metrics:
            if record.name == "loss":
                print(f"step {record.metadata.get('step')}: loss = {record.value:.4f}")


registry.register(LossPrinter())

for step, batch in enumerate(pipeline):
    loss = train_step(batch)
    collector.record_metric("loss", float(loss), metadata={"step": step})
    registry.notify(collector.get_metrics())

See Also¤