Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Volley Capacity Planning Guide

This guide helps pipeline authors size CPU, memory, and storage for Volley deployments, and provides PromQL queries for ongoing capacity monitoring.


Throughput Reference

Benchmark data from volley-examples/examples/benchmark_sustained.rs on a single node:

ConfigurationThroughputNotes
Single record, parallelism=1~850K rec/sBaseline, no batching
Batch=1000, parallelism=1~3.5M rec/sBatching only
Batch=1000, parallelism=4~8.7M rec/sBatching + parallelism
Sustained 10s, batch=1000, parallelism=4~9.4M rec/sSteady-state throughput

These numbers reflect in-memory processing with simple operators. Real-world throughput will be lower depending on:

  • Operator complexity (aggregations, joins, windowing)
  • State backend I/O (RocksDB reads/writes)
  • Connector latency (Kafka, blob storage)
  • Checkpoint frequency

CPU Sizing

Throughput targetCPU requestsCPU limitsNotes
< 100K rec/s500m1Light pipelines, simple transforms
100K–1M rec/s12Moderate load, stateful operators
1M–5M rec/s24High throughput, batched processing
> 5M rec/s48Maximum throughput, parallelism=4+

Volley uses tokio async runtime. CPU is primarily consumed by:

  • Record deserialization/serialization
  • Operator logic (map, filter, aggregate, window)
  • RocksDB compaction (background threads)
  • Kafka consumer polling and producer flushing

Recommendation: Start with 1 CPU request and monitor container_cpu_usage_seconds_total. Scale up if sustained utilization exceeds 70%.


Memory Sizing

WorkloadMemory requestsMemory limitsNotes
Stateless transforms256Mi512MiNo state backend
Light stateful (< 100K keys)512Mi1GiSmall RocksDB footprint
Medium stateful (100K–1M keys)1Gi2GiRocksDB block cache
Heavy stateful (> 1M keys)2Gi4GiLarge working set

Memory is consumed by:

  • Channel buffers: In-flight records between pipeline stages
  • RocksDB block cache: Cached SST blocks for read performance (defaults to ~50% of available memory)
  • Arrow RecordBatch buffers: Columnar data in transit
  • Kafka consumer buffers: Pre-fetched messages
  • Window pending index: Active (key, window) pairs tracked by window operators (~100 bytes each)
  • Aggregate LRU cache: In-memory accumulator cache for keyed aggregations

Recommendation: Set memory limits to 2x requests. Monitor container_memory_working_set_bytes and watch for OOM kills.

Bounding Stateful Operator Memory

For high-cardinality keyed workloads, use the built-in LRU mechanisms to cap in-memory state:

  • Aggregate operators: keyed_stream.with_max_cache_entries(n) — bounds the in-memory accumulator cache. Evicted entries are flushed to RocksDB.
  • Window operators: windowed_stream.with_max_pending_windows(n) — bounds the in-memory window index. Evicted entries are persisted to the state backend and scanned during watermark advance.

As a rule of thumb, each aggregate cache entry is ~200-500 bytes (depends on accumulator type), and each pending window entry is ~100 bytes. Size the limits based on your available memory budget:

aggregate_memory ≈ max_cache_entries × 300 bytes
window_memory    ≈ max_pending_windows × 100 bytes

Monitor volley_state_cache_evictions_total and volley_window_pending_evictions_total — a sustained high eviction rate means the LRU is undersized and incurring excessive RocksDB I/O.


RocksDB Storage Sizing

Growth Formula

state_size = key_count × avg_value_size × amplification_factor

Where:

  • key_count: Number of unique keys in your keyed state
  • avg_value_size: Average serialized value size (JSON encoding adds ~30% overhead)
  • amplification_factor: RocksDB space amplification, typically 1.1–1.5x depending on compaction

Checkpoint Storage

checkpoint_storage = state_size × checkpoint_retention_count

RocksDB checkpoints use hardlinks to SST files, so incremental growth is small when data doesn’t change between checkpoints. However, as data mutates, old SST files are retained for older checkpoints.

PVC Sizing Recommendations

State sizeState PVCCheckpoint PVCRetention
< 1 GB5Gi5Gi3 checkpoints
1–10 GB20Gi20Gi3 checkpoints
10–50 GB100Gi100Gi2 checkpoints
> 50 GB2× state size2× state size2 checkpoints

Recommendation: Provision at least 2× your expected state size for headroom. Monitor actual usage with the PromQL queries below and adjust.

Helm Values Example

state:
  enabled: true
  size: "20Gi"
  storageClassName: gp3   # Use provisioned IOPS for write-heavy workloads

checkpoints:
  enabled: true
  size: "20Gi"

Capacity Monitoring PromQL Queries

Throughput

# Records ingested per second by pipeline
sum(rate(volley_source_records_polled_total[5m])) by (job_name)

# Records processed per second by pipeline
sum(rate(volley_records_processed_total[5m])) by (job_name)

# Processing ratio (should be close to 1.0)
sum(rate(volley_records_processed_total[5m])) by (job_name)
/
sum(rate(volley_source_records_polled_total[5m])) by (job_name)

Backpressure

# Channel utilization by stage (>0.8 = backpressure)
avg(volley_channel_utilization) by (job_name, stage)

# Stages with sustained backpressure
avg_over_time(volley_channel_utilization[15m]) > 0.8

Kafka Consumer Lag

# Total consumer lag by pipeline
sum(volley_kafka_consumer_lag) by (job_name)

# Lag growth rate (positive = falling behind)
deriv(sum(volley_kafka_consumer_lag) by (job_name)[15m:1m])

Latency

# p99 stage latency
histogram_quantile(0.99, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))

# p50 stage latency
histogram_quantile(0.5, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))

# Checkpoint duration p99
histogram_quantile(0.99, sum(rate(volley_checkpoint_duration_ms_bucket[5m])) by (le, job_name))

Error Rate

# Error rate by type
sum(rate(volley_errors_total[5m])) by (job_name, error_type)

# Source errors by connector
sum(rate(volley_source_errors_total[5m])) by (job_name, source_id)

# Sink errors by connector
sum(rate(volley_sink_errors_total[5m])) by (job_name, sink_id)

# Error ratio (for SLO tracking)
sum(rate(volley_errors_total[5m])) by (job_name)
/
clamp_min(sum(rate(volley_records_processed_total[5m])) by (job_name), 1)

Resource Utilization

# Watermark lag (event-time vs. wall-clock)
max(volley_watermark_lag_ms) by (job_name)

# Sink flush duration p99
histogram_quantile(0.99, sum(rate(volley_sink_flush_duration_ms_bucket[5m])) by (le, job_name))

KEDA Autoscaling Reference

Example KEDA trigger configurations for the Helm chart:

Scale on Kafka Consumer Lag

autoscaling:
  enabled: true
  minReplicas: 1
  maxReplicas: 8
  triggers:
    - type: kafka
      metadata:
        bootstrapServers: "kafka:9092"
        consumerGroup: "my-pipeline"
        topic: "input-topic"
        lagThreshold: "10000"    # Scale up when lag exceeds 10K messages

Scale on CPU

autoscaling:
  enabled: true
  minReplicas: 1
  maxReplicas: 4
  triggers:
    - type: cpu
      metadata:
        type: Utilization
        value: "70"              # Scale up at 70% CPU utilization

Scale on Custom Prometheus Metric

autoscaling:
  enabled: true
  minReplicas: 1
  maxReplicas: 8
  triggers:
    - type: prometheus
      metadata:
        serverAddress: "http://prometheus:9090"
        query: "sum(volley_channel_utilization{job_name='my-pipeline'}) / count(volley_channel_utilization{job_name='my-pipeline'})"
        threshold: "0.8"         # Scale up when average saturation > 80%

Capacity Planning Checklist

  1. Estimate throughput: What is your expected input rate (records/sec)?
  2. Size CPU/memory: Use the tables above as a starting point
  3. Estimate state size: key_count × avg_value_size × 1.5
  4. Size PVCs: 2× expected state size for both state and checkpoint volumes
  5. Configure autoscaling: Set KEDA triggers based on lag or saturation
  6. Deploy and monitor: Use the PromQL queries above to validate sizing
  7. Iterate: Adjust resources based on actual utilization patterns