Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Observability

Volley provides built-in observability via the volley_core::observability module: Prometheus metrics, OpenTelemetry tracing, structured logging, and health reporting. Observability auto-initializes at pipeline startup with zero-config defaults.

Metrics

Volley exposes 15+ metrics via Prometheus exposition format. All metrics include node_id and job_name labels.

MetricTypeLabelsDescription
volley.source.records_polledCountersource_idRecords read from source
volley.records.processedCounteroperator_idRecords processed per operator
volley.stage.latency_msHistogramstagePer-operator processing latency
volley.checkpoint.duration_msHistogramCheckpoint completion time
volley.checkpoint.epochGaugeCurrent checkpoint epoch
volley.checkpoint.failuresCounterCheckpoint failure count
volley.channel.utilizationGaugestageBackpressure indicator (0.0-1.0)
volley.watermark.lag_msGaugeEvent-time lag
volley.sink.records_writtenCountersink_idRecords written per sink
volley.sink.flush.duration_msHistogramsink_idSink flush latency
volley.pipeline.uptime_secondsGaugePipeline uptime
volley.pipeline.healthGaugeHealth state (0=Starting, 1=Running, 2=Degraded, 3=Failed, 4=ShuttingDown)
volley.errorsCountererror_typeErrors by type
volley.source.errorsCountersource_idPer-connector source errors
volley.sink.errorsCountersink_idPer-connector sink errors
volley.kafka.consumer_lagGaugeKafka consumer group lag
volley.kafka.commit.duration_msHistogramKafka commit latency
volley.kafka.commit.failuresCounterKafka commit failures
volley.blob.objects_readCounterBlob source throughput
volley.blob.bytes_readCounterBlob I/O volume
volley.state.cache_evictionsCounteroperator_idAggregate LRU cache evictions
volley.window.pending_evictionsCounterWindow pending index LRU evictions
volley.window.pending_sizeGaugeIn-memory pending window entry count
volley.checkpoint.gc_cleanedCounterOld checkpoints cleaned by GC

ML Inference Metrics

When using volley-ml operators (.infer(), .embed(), .classify(), .infer_remote()), additional inference-specific metrics are emitted:

MetricTypeLabelsDescription
volley.ml.inference.duration_msHistogrambackend, model_namePer-batch inference latency (backend call only)
volley.ml.inference.batch_rowsHistogrambackend, model_nameRows per inference batch (batch utilization)
volley.ml.model_load.duration_msHistogrambackend, model_nameModel load time (emitted once at startup)
volley.ml.inference.errorsCounterbackend, model_name, error_typeInference errors by backend and type
volley.ml.http.request.duration_msHistogramendpointHTTP round-trip latency for remote model servers
volley.ml.http.queue_depthGaugeendpointAvailable concurrency permits (inverse of queue depth)
volley.ml.http.errorsCounterendpoint, status_codeHTTP errors from remote model servers

The backend label is "onnx" or "candle". The model_name label is the model path or HuggingFace repo ID passed to the operator config.

Golden Signals PromQL

# Latency (p99)
histogram_quantile(0.99, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))

# Traffic (records/sec)
sum(rate(volley_source_records_polled_total[5m])) by (job_name)

# Errors (rate by type)
sum(rate(volley_errors_total[5m])) by (job_name, error_type)

# Error ratio
sum(rate(volley_errors_total[5m])) by (job_name)
/
sum(rate(volley_records_processed_total[5m])) by (job_name)

# Saturation (channel utilization)
avg(volley_channel_utilization) by (job_name, stage)

# Saturation (Kafka consumer lag)
sum(volley_kafka_consumer_lag) by (job_name)

ML Inference PromQL

# Inference latency (p99) by backend
histogram_quantile(0.99, sum(rate(volley_ml_inference_duration_ms_bucket[5m])) by (le, backend))

# Inference throughput (rows/sec)
sum(rate(volley_ml_inference_batch_rows_sum[5m])) by (backend)

# Inference error rate
sum(rate(volley_ml_inference_errors_total[5m])) by (backend, error_type)

# HTTP model server latency (p95)
histogram_quantile(0.95, sum(rate(volley_ml_http_request_duration_ms_bucket[5m])) by (le, endpoint))

State & Window PromQL

# Aggregate cache eviction rate (high = LRU undersized)
rate(volley_state_cache_evictions_total[5m])

# Window pending index eviction rate
rate(volley_window_pending_evictions_total[5m])

# In-memory pending window count
volley_window_pending_size

# Checkpoint GC cleanup rate
rate(volley_checkpoint_gc_cleaned_total[5m])

Tracing

OpenTelemetry integration via OTLP gRPC export with tracing-opentelemetry bridge:

  • Resource attributes include service.name and node.id
  • Graceful fallback if OTLP endpoint is unavailable
  • Trace-log correlation: trace_id and span_id injected into JSON log lines when a span is active

Configure the OTLP endpoint:

# In VolleyApplication CRD
spec:
  observability:
    otlpEndpoint: http://otel-collector.monitoring:4317

Per-Record Tracing

Volley supports per-record trace propagation with configurable sampling. When enabled, individual records flowing through the pipeline carry OpenTelemetry trace context. Each operator creates a child span showing processing time, input/output row counts, and optional payload previews.

Configuration

#![allow(unused)]
fn main() {
use volley_core::observability::{TracingConfig, SamplingStrategy};

StreamExecutionEnvironment::new()
    .from_source(source)
    .with_tracing(TracingConfig {
        sampling: SamplingStrategy::Ratio(0.01), // trace 1% of records
        max_payload_bytes: 1024,                 // max JSON preview size
        capture_payload: true,                   // enable input/output previews
    })
    .filter(|r| true)
    .map(|r| Ok(r))
    .to_sink(sink)
    .execute("my-job")
    .await?;
}

Sampling Strategies

StrategyDescriptionUse case
AlwaysTrace every recordDevelopment, debugging
NeverDisable tracingProduction default
Ratio(f64)Trace a random fractionProduction with sampling (e.g., 0.01 = 1%)

Non-sampled records pay near-zero overhead (a single branch check).

Span Hierarchy

For a pipeline source -> map -> filter -> sink, a sampled record produces:

volley.record (root span)
├── volley.source        — source output metadata
├── volley.operator [0]  — map operator (input/output rows, timing, preview)
├── volley.operator [1]  — filter operator
└── volley.sink          — sink write (timing, errors)

Each span includes attributes:

AttributeDescription
volley.operator.nameAuto-generated operator name (e.g., operator-0)
volley.operator.indexPosition in the operator chain
volley.record.event_time_msRecord’s event timestamp
volley.record.input_rowsInput batch row count
volley.record.output_rowsOutput batch row count

When capture_payload is enabled, input and output JSON previews are attached as span events.

Window and Aggregate Outputs

Window and aggregate operators accumulate multiple input records and emit new result records. These outputs get a new trace with span links back to the input traces:

volley.window.output (new root, links to input traces)
├── volley.operator [next]
└── volley.sink

The volley.window.input_trace_count attribute shows how many sampled input records contributed to the aggregation.

Kafka Trace Context Propagation

When using the Kafka source, Volley automatically extracts W3C traceparent headers from consumed messages. If an upstream service traced a request that produced a Kafka message, the trace continues through Volley.

Similarly, the Kafka sink injects traceparent headers into produced messages, propagating trace context to downstream consumers.

This enables full end-to-end distributed tracing across service boundaries:

upstream service → Kafka → Volley pipeline → Kafka → downstream service
     trace-id=abc          trace-id=abc              trace-id=abc

Structured Logging

ObservabilityConfig supports a log_format field with Text (default) and Json modes:

#![allow(unused)]
fn main() {
ObservabilityConfig::new()
    .with_json_logging()
    .init();
}

JSON mode produces structured output compatible with Loki, Elasticsearch, and CloudWatch Logs. When OpenTelemetry is active, each log line includes trace_id and span_id fields for cross-referencing with distributed traces.

Useful RUST_LOG filters

# Default (info): lifecycle events, checkpoint completions, source/sink creation
RUST_LOG=info

# Debug state backend, window, and watermark behavior
RUST_LOG=info,volley_core::state=debug,volley_core::operators::window=debug

# Debug file decoding (Parquet, JSON Lines, CSV)
RUST_LOG=info,volley_connectors::blob_store::decoders=debug

# Deep debugging: per-record RocksDB get/put/delete
RUST_LOG=info,volley_core::state::rocks=trace

# Debug protobuf decode with batch context
RUST_LOG=info,volley_core::operators::protobuf=debug

Health Reporting

The HealthReporter tracks pipeline state with atomic transitions:

StateLivenessReadinessDescription
Startinghealthynot readyPipeline initializing
RunninghealthyreadyNormal operation
Degradedhealthynot readyPartial failure (with reason)
Failedunhealthynot readyComplete failure (with reason)
ShuttingDownunhealthynot readyGraceful shutdown in progress

The volley.pipeline.health gauge emits the state as a numeric value on every transition, enabling Prometheus alerting.

SLO Recording Rules

Prometheus recording rules are provided at deploy/prometheus/recording-rules.yaml:

  • volley:availability:ratio_rate5m — availability SLI ratio
  • volley:latency_sli:ratio_rate5m — latency SLI (fraction under 100ms)
  • volley:error_budget:remaining — remaining error budget against 99.9% target

Alerting Rules

Burn-rate alerting rules at deploy/prometheus/alerting-rules.yaml:

  • VolleyHighErrorBurnRate (critical) — 14.4x burn over 1h
  • VolleySlowErrorBurnRate (warning) — 1x burn over 3d
  • VolleyPipelineFailed (critical) — health==3 for 1m
  • VolleyPipelineDegraded (warning) — health==2 for 5m

Kubernetes Integration

The K8s operator automatically creates a ServiceMonitor for Prometheus scraping. See Kubernetes Operator for ServiceMonitor configuration.