Observability
Volley provides built-in observability via the volley_core::observability module: Prometheus metrics, OpenTelemetry tracing, structured logging, and health reporting. Observability auto-initializes at pipeline startup with zero-config defaults.
Metrics
Volley exposes 15+ metrics via Prometheus exposition format. All metrics include node_id and job_name labels.
| Metric | Type | Labels | Description |
|---|---|---|---|
volley.source.records_polled | Counter | source_id | Records read from source |
volley.records.processed | Counter | operator_id | Records processed per operator |
volley.stage.latency_ms | Histogram | stage | Per-operator processing latency |
volley.checkpoint.duration_ms | Histogram | — | Checkpoint completion time |
volley.checkpoint.epoch | Gauge | — | Current checkpoint epoch |
volley.checkpoint.failures | Counter | — | Checkpoint failure count |
volley.channel.utilization | Gauge | stage | Backpressure indicator (0.0-1.0) |
volley.watermark.lag_ms | Gauge | — | Event-time lag |
volley.sink.records_written | Counter | sink_id | Records written per sink |
volley.sink.flush.duration_ms | Histogram | sink_id | Sink flush latency |
volley.pipeline.uptime_seconds | Gauge | — | Pipeline uptime |
volley.pipeline.health | Gauge | — | Health state (0=Starting, 1=Running, 2=Degraded, 3=Failed, 4=ShuttingDown) |
volley.errors | Counter | error_type | Errors by type |
volley.source.errors | Counter | source_id | Per-connector source errors |
volley.sink.errors | Counter | sink_id | Per-connector sink errors |
volley.kafka.consumer_lag | Gauge | — | Kafka consumer group lag |
volley.kafka.commit.duration_ms | Histogram | — | Kafka commit latency |
volley.kafka.commit.failures | Counter | — | Kafka commit failures |
volley.blob.objects_read | Counter | — | Blob source throughput |
volley.blob.bytes_read | Counter | — | Blob I/O volume |
volley.state.cache_evictions | Counter | operator_id | Aggregate LRU cache evictions |
volley.window.pending_evictions | Counter | — | Window pending index LRU evictions |
volley.window.pending_size | Gauge | — | In-memory pending window entry count |
volley.checkpoint.gc_cleaned | Counter | — | Old checkpoints cleaned by GC |
ML Inference Metrics
When using volley-ml operators (.infer(), .embed(), .classify(), .infer_remote()), additional inference-specific metrics are emitted:
| Metric | Type | Labels | Description |
|---|---|---|---|
volley.ml.inference.duration_ms | Histogram | backend, model_name | Per-batch inference latency (backend call only) |
volley.ml.inference.batch_rows | Histogram | backend, model_name | Rows per inference batch (batch utilization) |
volley.ml.model_load.duration_ms | Histogram | backend, model_name | Model load time (emitted once at startup) |
volley.ml.inference.errors | Counter | backend, model_name, error_type | Inference errors by backend and type |
volley.ml.http.request.duration_ms | Histogram | endpoint | HTTP round-trip latency for remote model servers |
volley.ml.http.queue_depth | Gauge | endpoint | Available concurrency permits (inverse of queue depth) |
volley.ml.http.errors | Counter | endpoint, status_code | HTTP errors from remote model servers |
The backend label is "onnx" or "candle". The model_name label is the model path or HuggingFace repo ID passed to the operator config.
Golden Signals PromQL
# Latency (p99)
histogram_quantile(0.99, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))
# Traffic (records/sec)
sum(rate(volley_source_records_polled_total[5m])) by (job_name)
# Errors (rate by type)
sum(rate(volley_errors_total[5m])) by (job_name, error_type)
# Error ratio
sum(rate(volley_errors_total[5m])) by (job_name)
/
sum(rate(volley_records_processed_total[5m])) by (job_name)
# Saturation (channel utilization)
avg(volley_channel_utilization) by (job_name, stage)
# Saturation (Kafka consumer lag)
sum(volley_kafka_consumer_lag) by (job_name)
ML Inference PromQL
# Inference latency (p99) by backend
histogram_quantile(0.99, sum(rate(volley_ml_inference_duration_ms_bucket[5m])) by (le, backend))
# Inference throughput (rows/sec)
sum(rate(volley_ml_inference_batch_rows_sum[5m])) by (backend)
# Inference error rate
sum(rate(volley_ml_inference_errors_total[5m])) by (backend, error_type)
# HTTP model server latency (p95)
histogram_quantile(0.95, sum(rate(volley_ml_http_request_duration_ms_bucket[5m])) by (le, endpoint))
State & Window PromQL
# Aggregate cache eviction rate (high = LRU undersized)
rate(volley_state_cache_evictions_total[5m])
# Window pending index eviction rate
rate(volley_window_pending_evictions_total[5m])
# In-memory pending window count
volley_window_pending_size
# Checkpoint GC cleanup rate
rate(volley_checkpoint_gc_cleaned_total[5m])
Tracing
OpenTelemetry integration via OTLP gRPC export with tracing-opentelemetry bridge:
- Resource attributes include
service.nameandnode.id - Graceful fallback if OTLP endpoint is unavailable
- Trace-log correlation:
trace_idandspan_idinjected into JSON log lines when a span is active
Configure the OTLP endpoint:
# In VolleyApplication CRD
spec:
observability:
otlpEndpoint: http://otel-collector.monitoring:4317
Per-Record Tracing
Volley supports per-record trace propagation with configurable sampling. When enabled, individual records flowing through the pipeline carry OpenTelemetry trace context. Each operator creates a child span showing processing time, input/output row counts, and optional payload previews.
Configuration
#![allow(unused)]
fn main() {
use volley_core::observability::{TracingConfig, SamplingStrategy};
StreamExecutionEnvironment::new()
.from_source(source)
.with_tracing(TracingConfig {
sampling: SamplingStrategy::Ratio(0.01), // trace 1% of records
max_payload_bytes: 1024, // max JSON preview size
capture_payload: true, // enable input/output previews
})
.filter(|r| true)
.map(|r| Ok(r))
.to_sink(sink)
.execute("my-job")
.await?;
}
Sampling Strategies
| Strategy | Description | Use case |
|---|---|---|
Always | Trace every record | Development, debugging |
Never | Disable tracing | Production default |
Ratio(f64) | Trace a random fraction | Production with sampling (e.g., 0.01 = 1%) |
Non-sampled records pay near-zero overhead (a single branch check).
Span Hierarchy
For a pipeline source -> map -> filter -> sink, a sampled record produces:
volley.record (root span)
├── volley.source — source output metadata
├── volley.operator [0] — map operator (input/output rows, timing, preview)
├── volley.operator [1] — filter operator
└── volley.sink — sink write (timing, errors)
Each span includes attributes:
| Attribute | Description |
|---|---|
volley.operator.name | Auto-generated operator name (e.g., operator-0) |
volley.operator.index | Position in the operator chain |
volley.record.event_time_ms | Record’s event timestamp |
volley.record.input_rows | Input batch row count |
volley.record.output_rows | Output batch row count |
When capture_payload is enabled, input and output JSON previews are attached as span events.
Window and Aggregate Outputs
Window and aggregate operators accumulate multiple input records and emit new result records. These outputs get a new trace with span links back to the input traces:
volley.window.output (new root, links to input traces)
├── volley.operator [next]
└── volley.sink
The volley.window.input_trace_count attribute shows how many sampled input records contributed to the aggregation.
Kafka Trace Context Propagation
When using the Kafka source, Volley automatically extracts W3C traceparent headers from consumed messages. If an upstream service traced a request that produced a Kafka message, the trace continues through Volley.
Similarly, the Kafka sink injects traceparent headers into produced messages, propagating trace context to downstream consumers.
This enables full end-to-end distributed tracing across service boundaries:
upstream service → Kafka → Volley pipeline → Kafka → downstream service
trace-id=abc trace-id=abc trace-id=abc
Structured Logging
ObservabilityConfig supports a log_format field with Text (default) and Json modes:
#![allow(unused)]
fn main() {
ObservabilityConfig::new()
.with_json_logging()
.init();
}
JSON mode produces structured output compatible with Loki, Elasticsearch, and CloudWatch Logs. When OpenTelemetry is active, each log line includes trace_id and span_id fields for cross-referencing with distributed traces.
Useful RUST_LOG filters
# Default (info): lifecycle events, checkpoint completions, source/sink creation
RUST_LOG=info
# Debug state backend, window, and watermark behavior
RUST_LOG=info,volley_core::state=debug,volley_core::operators::window=debug
# Debug file decoding (Parquet, JSON Lines, CSV)
RUST_LOG=info,volley_connectors::blob_store::decoders=debug
# Deep debugging: per-record RocksDB get/put/delete
RUST_LOG=info,volley_core::state::rocks=trace
# Debug protobuf decode with batch context
RUST_LOG=info,volley_core::operators::protobuf=debug
Health Reporting
The HealthReporter tracks pipeline state with atomic transitions:
| State | Liveness | Readiness | Description |
|---|---|---|---|
Starting | healthy | not ready | Pipeline initializing |
Running | healthy | ready | Normal operation |
Degraded | healthy | not ready | Partial failure (with reason) |
Failed | unhealthy | not ready | Complete failure (with reason) |
ShuttingDown | unhealthy | not ready | Graceful shutdown in progress |
The volley.pipeline.health gauge emits the state as a numeric value on every transition, enabling Prometheus alerting.
SLO Recording Rules
Prometheus recording rules are provided at deploy/prometheus/recording-rules.yaml:
volley:availability:ratio_rate5m— availability SLI ratiovolley:latency_sli:ratio_rate5m— latency SLI (fraction under 100ms)volley:error_budget:remaining— remaining error budget against 99.9% target
Alerting Rules
Burn-rate alerting rules at deploy/prometheus/alerting-rules.yaml:
VolleyHighErrorBurnRate(critical) — 14.4x burn over 1hVolleySlowErrorBurnRate(warning) — 1x burn over 3dVolleyPipelineFailed(critical) — health==3 for 1mVolleyPipelineDegraded(warning) — health==2 for 5m
Kubernetes Integration
The K8s operator automatically creates a ServiceMonitor for Prometheus scraping. See Kubernetes Operator for ServiceMonitor configuration.