Skip to content

Benchmark Monitor¤

Real-time pipeline monitoring with GPU profiling and alerting.

See Also¤

Overview¤

This module provides three monitoring components:

  • AdvancedMonitor — Real-time pipeline monitoring with configurable thresholds, background metric collection (CPU, memory, GPU), and trend analysis. Integrates with AlertManager for threshold-based alerts.
  • ProductionMonitor — Extends AdvancedMonitor with production-specific features: performance baseline tracking, per-pipeline execution recording, error rate monitoring, and health reports.
  • AlertManager — Manages threshold-based alerts with configurable severity levels and pluggable alert handlers.

Quick Start¤

from calibrax.monitoring import AdvancedMonitor

monitor = AdvancedMonitor()

# Configure thresholds
monitor.set_threshold("gpu_memory_utilization", 0.85)
monitor.set_threshold("memory_usage_mb", 4000)

# Start background monitoring (collects metrics every 5 seconds)
monitor.start_monitoring(interval=5.0)

# ... run your pipeline ...

# Stop and check results
monitor.stop_monitoring()
summary = monitor.get_monitoring_summary()
print(f"Alerts triggered: {summary['total_alerts']}")
print(f"Critical alerts: {summary['critical_alerts']}")

calibrax.monitoring ¤

Monitoring: alerting, production monitoring, and threshold tracking.

AdvancedMonitor ¤

AdvancedMonitor(alert_manager: AlertManager | None = None, gpu_profiler: GPUProfilerProtocol | None = None, resource_monitor: ResourceMonitor | None = None)

Background resource monitor with threshold-based alerting.

Collects CPU, memory, and optional GPU metrics on a daemon thread. Triggers alerts when thresholds are exceeded.

Parameters:

Name Type Description Default
alert_manager AlertManager | None

Alert manager for dispatching alerts. Created if not provided.

None
gpu_profiler GPUProfilerProtocol | None

Optional GPU profiler for GPU metrics.

None
resource_monitor ResourceMonitor | None

Optional ResourceMonitor for background sampling.

None

alert_manager property ¤

alert_manager: AlertManager

Access the underlying alert manager.

set_threshold ¤

set_threshold(metric_name: str, threshold: float) -> None

Set an alerting threshold for a metric.

Parameters:

Name Type Description Default
metric_name str

Name of the metric to monitor.

required
threshold float

Value above which an alert is triggered.

required

start_monitoring ¤

start_monitoring(interval: float = 5.0) -> None

Start background monitoring on a daemon thread.

Parameters:

Name Type Description Default
interval float

Seconds between metric collection cycles.

5.0

stop_monitoring ¤

stop_monitoring() -> None

Stop background monitoring and wait for the thread to finish.

get_monitoring_summary ¤

get_monitoring_summary() -> dict[str, Any]

Return a summary of current monitoring state.

Returns:

Type Description
dict[str, Any]

Dictionary with thresholds, alert counts, and metric history summaries.

Alert dataclass ¤

Alert(*, message: str, severity: AlertSeverity, metric_name: str, metric_value: float, threshold: float, timestamp: float = time(), metadata: dict[str, Any] = dict())

A single monitoring alert triggered by a threshold violation.

Attributes:

Name Type Description
message str

Human-readable description of the alert.

severity AlertSeverity

Alert severity level.

metric_name str

Name of the metric that triggered the alert.

metric_value float

Observed value that triggered the alert.

threshold float

Threshold that was exceeded.

timestamp float

When the alert was triggered.

metadata dict[str, Any]

Additional context about the alert.

message instance-attribute ¤

message: str

severity instance-attribute ¤

severity: AlertSeverity

metric_name instance-attribute ¤

metric_name: str

metric_value instance-attribute ¤

metric_value: float

threshold instance-attribute ¤

threshold: float

timestamp class-attribute instance-attribute ¤

timestamp: float = field(default_factory=time)

metadata class-attribute instance-attribute ¤

metadata: dict[str, Any] = field(default_factory=dict)

to_dict ¤

to_dict() -> dict[str, Any]

Serialize to a JSON-compatible dictionary.

AlertManager ¤

AlertManager(max_alerts: int = 1000)

Thread-safe alert storage with callback handlers.

Parameters:

Name Type Description Default
max_alerts int

Maximum number of alerts to retain (oldest dropped first).

1000

add_alert_handler ¤

add_alert_handler(handler: Callable[[Alert], None]) -> None

Register a callback invoked on each new alert.

Parameters:

Name Type Description Default
handler Callable[[Alert], None]

Callable that receives an Alert instance.

required

trigger_alert ¤

trigger_alert(message: str, severity: AlertSeverity, metric_name: str, metric_value: float, threshold: float, metadata: dict[str, Any] | None = None) -> None

Create and store an alert, notifying all registered handlers.

Parameters:

Name Type Description Default
message str

Human-readable alert description.

required
severity AlertSeverity

Severity level.

required
metric_name str

Metric that triggered the alert.

required
metric_value float

Observed metric value.

required
threshold float

Threshold that was exceeded.

required
metadata dict[str, Any] | None

Optional additional context.

None

get_recent_alerts ¤

get_recent_alerts(count: int = 10) -> list[Alert]

Return the most recent alerts.

Parameters:

Name Type Description Default
count int

Maximum number of alerts to return.

10

Returns:

Type Description
list[Alert]

List of recent alerts, newest first.

get_alerts_by_severity ¤

get_alerts_by_severity(severity: AlertSeverity) -> list[Alert]

Return all alerts matching the given severity.

Parameters:

Name Type Description Default
severity AlertSeverity

Severity level to filter by.

required

Returns:

Type Description
list[Alert]

List of matching alerts.

clear_alerts ¤

clear_alerts() -> None

Remove all stored alerts.

AlertSeverity ¤

Bases: StrEnum

Severity levels for monitoring alerts.

INFO class-attribute instance-attribute ¤

INFO = 'info'

WARNING class-attribute instance-attribute ¤

WARNING = 'warning'

ERROR class-attribute instance-attribute ¤

ERROR = 'error'

CRITICAL class-attribute instance-attribute ¤

CRITICAL = 'critical'

ProductionMonitor ¤

ProductionMonitor(**kwargs: Any)

Bases: AdvancedMonitor

Extended monitor with pipeline health tracking and performance baselines.

Tracks pipeline execution times, success rates, and detects performance degradation against configured baselines.

alert_manager property ¤

alert_manager: AlertManager

Access the underlying alert manager.

set_threshold ¤

set_threshold(metric_name: str, threshold: float) -> None

Set an alerting threshold for a metric.

Parameters:

Name Type Description Default
metric_name str

Name of the metric to monitor.

required
threshold float

Value above which an alert is triggered.

required

start_monitoring ¤

start_monitoring(interval: float = 5.0) -> None

Start background monitoring on a daemon thread.

Parameters:

Name Type Description Default
interval float

Seconds between metric collection cycles.

5.0

stop_monitoring ¤

stop_monitoring() -> None

Stop background monitoring and wait for the thread to finish.

get_monitoring_summary ¤

get_monitoring_summary() -> dict[str, Any]

Return a summary of current monitoring state.

Returns:

Type Description
dict[str, Any]

Dictionary with thresholds, alert counts, and metric history summaries.

set_performance_baseline ¤

set_performance_baseline(metric_name: str, baseline_value: float) -> None

Set a performance baseline for degradation detection.

Parameters:

Name Type Description Default
metric_name str

Metric to track against baseline.

required
baseline_value float

Expected baseline value.

required

record_pipeline_execution ¤

record_pipeline_execution(pipeline_name: str, execution_time: float, success: bool, metadata: dict[str, Any] | None = None) -> None

Record a pipeline execution for health tracking.

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline that executed.

required
execution_time float

Wall-clock execution time in seconds.

required
success bool

Whether the execution succeeded.

required
metadata dict[str, Any] | None

Optional additional context.

None

get_pipeline_health_report ¤

get_pipeline_health_report() -> dict[str, Any]

Generate a health report across all tracked pipelines.

Returns:

Type Description
dict[str, Any]

Dictionary with per-pipeline statistics and overall health status.

monitor ¤

Alert management and background metric monitoring.

Provides threshold-based alerting with configurable handlers and background monitoring of system resources via daemon thread.

logger module-attribute ¤

logger = getLogger(__name__)

AlertSeverity ¤

Bases: StrEnum

Severity levels for monitoring alerts.

INFO class-attribute instance-attribute ¤
INFO = 'info'
WARNING class-attribute instance-attribute ¤
WARNING = 'warning'
ERROR class-attribute instance-attribute ¤
ERROR = 'error'
CRITICAL class-attribute instance-attribute ¤
CRITICAL = 'critical'

Alert dataclass ¤

Alert(*, message: str, severity: AlertSeverity, metric_name: str, metric_value: float, threshold: float, timestamp: float = time(), metadata: dict[str, Any] = dict())

A single monitoring alert triggered by a threshold violation.

Attributes:

Name Type Description
message str

Human-readable description of the alert.

severity AlertSeverity

Alert severity level.

metric_name str

Name of the metric that triggered the alert.

metric_value float

Observed value that triggered the alert.

threshold float

Threshold that was exceeded.

timestamp float

When the alert was triggered.

metadata dict[str, Any]

Additional context about the alert.

message instance-attribute ¤
message: str
severity instance-attribute ¤
severity: AlertSeverity
metric_name instance-attribute ¤
metric_name: str
metric_value instance-attribute ¤
metric_value: float
threshold instance-attribute ¤
threshold: float
timestamp class-attribute instance-attribute ¤
timestamp: float = field(default_factory=time)
metadata class-attribute instance-attribute ¤
metadata: dict[str, Any] = field(default_factory=dict)
to_dict ¤
to_dict() -> dict[str, Any]

Serialize to a JSON-compatible dictionary.

AlertManager ¤

AlertManager(max_alerts: int = 1000)

Thread-safe alert storage with callback handlers.

Parameters:

Name Type Description Default
max_alerts int

Maximum number of alerts to retain (oldest dropped first).

1000
add_alert_handler ¤
add_alert_handler(handler: Callable[[Alert], None]) -> None

Register a callback invoked on each new alert.

Parameters:

Name Type Description Default
handler Callable[[Alert], None]

Callable that receives an Alert instance.

required
trigger_alert ¤
trigger_alert(message: str, severity: AlertSeverity, metric_name: str, metric_value: float, threshold: float, metadata: dict[str, Any] | None = None) -> None

Create and store an alert, notifying all registered handlers.

Parameters:

Name Type Description Default
message str

Human-readable alert description.

required
severity AlertSeverity

Severity level.

required
metric_name str

Metric that triggered the alert.

required
metric_value float

Observed metric value.

required
threshold float

Threshold that was exceeded.

required
metadata dict[str, Any] | None

Optional additional context.

None
get_recent_alerts ¤
get_recent_alerts(count: int = 10) -> list[Alert]

Return the most recent alerts.

Parameters:

Name Type Description Default
count int

Maximum number of alerts to return.

10

Returns:

Type Description
list[Alert]

List of recent alerts, newest first.

get_alerts_by_severity ¤
get_alerts_by_severity(severity: AlertSeverity) -> list[Alert]

Return all alerts matching the given severity.

Parameters:

Name Type Description Default
severity AlertSeverity

Severity level to filter by.

required

Returns:

Type Description
list[Alert]

List of matching alerts.

clear_alerts ¤
clear_alerts() -> None

Remove all stored alerts.

AdvancedMonitor ¤

AdvancedMonitor(alert_manager: AlertManager | None = None, gpu_profiler: GPUProfilerProtocol | None = None, resource_monitor: ResourceMonitor | None = None)

Background resource monitor with threshold-based alerting.

Collects CPU, memory, and optional GPU metrics on a daemon thread. Triggers alerts when thresholds are exceeded.

Parameters:

Name Type Description Default
alert_manager AlertManager | None

Alert manager for dispatching alerts. Created if not provided.

None
gpu_profiler GPUProfilerProtocol | None

Optional GPU profiler for GPU metrics.

None
resource_monitor ResourceMonitor | None

Optional ResourceMonitor for background sampling.

None
alert_manager property ¤
alert_manager: AlertManager

Access the underlying alert manager.

set_threshold ¤
set_threshold(metric_name: str, threshold: float) -> None

Set an alerting threshold for a metric.

Parameters:

Name Type Description Default
metric_name str

Name of the metric to monitor.

required
threshold float

Value above which an alert is triggered.

required
start_monitoring ¤
start_monitoring(interval: float = 5.0) -> None

Start background monitoring on a daemon thread.

Parameters:

Name Type Description Default
interval float

Seconds between metric collection cycles.

5.0
stop_monitoring ¤
stop_monitoring() -> None

Stop background monitoring and wait for the thread to finish.

get_monitoring_summary ¤
get_monitoring_summary() -> dict[str, Any]

Return a summary of current monitoring state.

Returns:

Type Description
dict[str, Any]

Dictionary with thresholds, alert counts, and metric history summaries.

production ¤

Production-grade monitoring with pipeline health tracking.

Extends AdvancedMonitor with performance baselines, pipeline execution tracking, and health report generation.

logger module-attribute ¤

logger = getLogger(__name__)

ProductionMonitor ¤

ProductionMonitor(**kwargs: Any)

Bases: AdvancedMonitor

Extended monitor with pipeline health tracking and performance baselines.

Tracks pipeline execution times, success rates, and detects performance degradation against configured baselines.

alert_manager property ¤
alert_manager: AlertManager

Access the underlying alert manager.

set_performance_baseline ¤
set_performance_baseline(metric_name: str, baseline_value: float) -> None

Set a performance baseline for degradation detection.

Parameters:

Name Type Description Default
metric_name str

Metric to track against baseline.

required
baseline_value float

Expected baseline value.

required
record_pipeline_execution ¤
record_pipeline_execution(pipeline_name: str, execution_time: float, success: bool, metadata: dict[str, Any] | None = None) -> None

Record a pipeline execution for health tracking.

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline that executed.

required
execution_time float

Wall-clock execution time in seconds.

required
success bool

Whether the execution succeeded.

required
metadata dict[str, Any] | None

Optional additional context.

None
get_pipeline_health_report ¤
get_pipeline_health_report() -> dict[str, Any]

Generate a health report across all tracked pipelines.

Returns:

Type Description
dict[str, Any]

Dictionary with per-pipeline statistics and overall health status.

set_threshold ¤
set_threshold(metric_name: str, threshold: float) -> None

Set an alerting threshold for a metric.

Parameters:

Name Type Description Default
metric_name str

Name of the metric to monitor.

required
threshold float

Value above which an alert is triggered.

required
start_monitoring ¤
start_monitoring(interval: float = 5.0) -> None

Start background monitoring on a daemon thread.

Parameters:

Name Type Description Default
interval float

Seconds between metric collection cycles.

5.0
stop_monitoring ¤
stop_monitoring() -> None

Stop background monitoring and wait for the thread to finish.

get_monitoring_summary ¤
get_monitoring_summary() -> dict[str, Any]

Return a summary of current monitoring state.

Returns:

Type Description
dict[str, Any]

Dictionary with thresholds, alert counts, and metric history summaries.