Horizontal Scaling Guide
Run a single Volley pipeline across several Kubernetes pods when one machine can’t keep up — too many Kafka partitions, too much keyed state, or you need a pod failure to leave processing intact. You don’t change your pipeline code; you add a ClusterConfig, a shared ReadWriteMany checkpoint volume, and a replicas count in the operator CR. Volley handles key distribution, shuffling between pods, and live rescaling.
This page is the deployment checklist. For the design behind it (key groups, Arrow Flight shuffle, leader election), see Horizontal Scaling in Architecture.
Prerequisites
- Kubernetes cluster (v1.26+)
- A
ReadWriteManystorage class (AWS EFS, GCP Filestore, Azure Files) - The Volley K8s operator deployed (see Kubernetes Operator)
- Your pipeline built with the
distributedfeature enabled
When to Use Horizontal Scaling
Single-node mode handles most workloads. Consider horizontal scaling when:
- Your key space is too large for one node’s memory (state doesn’t fit in RAM + RocksDB)
- You need fault tolerance across nodes (pod failure shouldn’t stop the pipeline)
- Your source has more partitions than one node can consume (e.g., 100+ Kafka partitions)
If your bottleneck is CPU on stateless operators (filter, map), scale vertically first. Horizontal scaling adds network overhead for the key shuffle.
Step 1: Enable the Distributed Feature
Add the distributed feature to your pipeline’s Cargo.toml:
[dependencies]
volley-core = { version = "0.8", features = ["distributed"] }
Step 2: Configure ClusterConfig
In your pipeline code, add with_cluster() to the environment:
#![allow(unused)]
fn main() {
use volley_core::prelude::*;
// In production, use ClusterConfig::from_env() to auto-detect from K8s env vars
let cluster = ClusterConfig::from_env();
let mut env = StreamExecutionEnvironment::new();
if let Some(config) = cluster {
env = env.with_cluster(config);
}
env.from_source(source)
.key_by(col("user_id"))
.aggregate_expr(vec![sum(col("amount"))], state_backend)
.to_sink(sink)
.execute("my-pipeline")
.await?;
}
The pipeline API is identical — with_cluster() is the only addition. When ClusterConfig is absent (e.g., running locally), the pipeline runs in single-node mode with zero overhead.
Step 3: Create a Shared Checkpoint PVC
All workers need access to the same checkpoint directory. Create a ReadWriteMany PVC:
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: volley-shared-ckpt
namespace: my-pipeline
spec:
accessModes: [ReadWriteMany]
storageClassName: efs # AWS EFS, or your cloud's RWX storage class
resources:
requests:
storage: 50Gi
kubectl apply -f shared-pvc.yaml
Step 4: Deploy with the K8s Operator
Create a VolleyApplication with the cluster section:
apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
name: my-pipeline
namespace: my-pipeline
spec:
image: registry.example.com/my-pipeline:v1.0
replicas: 3
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
cluster:
maxKeyGroups: 256 # default, power of 2
flightPort: 50051 # default
sharedCheckpoints:
claimName: volley-shared-ckpt # the PVC from step 3
state:
size: 10Gi # per-worker local RocksDB
checkpoints:
size: 5Gi # per-worker local checkpoint staging
The operator automatically:
- Injects
VOLLEY_*environment variables (VOLLEY_NUM_WORKERS,VOLLEY_HEADLESS_SERVICE,VOLLEY_MAX_KEY_GROUPS,VOLLEY_FLIGHT_PORT,VOLLEY_APP_NAME) - Adds the Arrow Flight port (50051) to the container and headless service
- Mounts the shared checkpoint PVC at
/mnt/volley/checkpoints
Step 5: Verify
# Check pods are running
kubectl get pods -n my-pipeline
# NAME READY STATUS RESTARTS AGE
# my-pipeline-0 1/1 Running 0 2m
# my-pipeline-1 1/1 Running 0 1m
# my-pipeline-2 1/1 Running 0 30s
# Check env vars
kubectl exec my-pipeline-0 -n my-pipeline -- env | grep VOLLEY
# VOLLEY_NUM_WORKERS=3
# VOLLEY_HEADLESS_SERVICE=my-pipeline-headless.my-pipeline.svc
# VOLLEY_MAX_KEY_GROUPS=256
# VOLLEY_FLIGHT_PORT=50051
# VOLLEY_APP_NAME=my-pipeline
# Check Flight port on headless service
kubectl get svc my-pipeline-headless -n my-pipeline -o jsonpath='{.spec.ports[*].name}'
# health flight
# Check shared checkpoint mount
kubectl exec my-pipeline-0 -n my-pipeline -- ls /mnt/volley/checkpoints
Scaling
Change the replica count to scale:
kubectl patch vapp my-pipeline -n my-pipeline --type merge -p '{"spec":{"replicas":5}}'
The coordinator triggers a checkpoint, recomputes key group assignments, and workers pick up their new key groups from the shared filesystem. No restart required.
Monitoring
Key metrics to watch in distributed mode:
| Metric | What it tells you |
|---|---|
volley_shuffle_local_records_total | Records staying on this worker |
volley_shuffle_remote_records_total{target_worker="N"} | Records sent to each remote worker |
volley_flight_send_errors_total | Transport errors (indicates worker connectivity issues) |
volley_flight_reconnects_total{target_worker} | Successful reconnects after a HalfOpen probe — a steady trickle is a peer that keeps flapping; a single spike lines up with one pod restart |
volley_circuit_breaker_state{target_worker} | 0=Closed, 1=Open, 2=HalfOpen per remote worker |
volley_circuit_breaker_trips_total{target_worker} | Closed → Open transitions |
volley_circuit_breaker_probes_succeeded_total{target_worker} | HalfOpen → Closed transitions; ratio with the trips counter is the reconnect success rate |
volley_cluster_worker_id | This worker’s ordinal |
volley_cluster_num_workers | Total cluster size |
volley_cluster_key_groups_assigned | Key groups owned by this worker |
volley_barrier_alignment_wait_ms | Time to align barriers across all inputs |
Troubleshooting
Pipeline stalls after a pod restart: Check if the BarrierAligner is waiting for a dead input. The Flight server should clean up stale connections via TCP keepalive (10s). If the stall persists, check volley_flight_send_errors_total for rising error counts. After a peer pod restart, the HalfOpen probe rebuilds the outbound tonic::Channel, reregisters the shuffle channel, and replays every unacknowledged frame from the client’s 8 MiB per-peer outbox onto the fresh stream (the server dedups via its last_delivered_seq so survivors land exactly once). Confirm volley_flight_reconnects_total{target_worker} increments and volley_circuit_breaker_state returns to 0 (Closed). Because replay is at-least-once, the epoch in flight at the moment of the crash often recovers without aborting — check that volley_dag_sink_aborts_total stays flat across the restart window. Longer outages (a reconnect that runs past the 2PC deadline) still fall back to a cluster-wide abort, with the next epoch committing cleanly.
Outbox backpressure (sender appears to stall per-peer): The client’s outbox is byte-bounded (8 MiB / peer by default; see DEFAULT_OUTBOX_MAX_BYTES). When a remote consumer goes silent — network partition, GC pause, OOM — the encoder parks on the byte budget until ACKs trim the backlog. Symptoms: volley_shuffle_remote_records_total{target_worker="N"} flatlines while other targets keep climbing. Root-cause by looking at the peer’s volley_flight_recv_messages_total and its drain-task logs; the outbox itself is doing the right thing by pushing back instead of running the worker out of memory.
Uneven record distribution: Check volley_shuffle_remote_records_total per target worker. The key group assignment is contiguous — if your key distribution is skewed, some workers will handle more traffic. Consider increasing maxKeyGroups for finer granularity.
Checkpoint failures: Verify the shared PVC is mounted and writable on all pods:
kubectl exec my-pipeline-0 -- touch /mnt/volley/checkpoints/test && echo OK
DNS resolution failures at startup: Workers discover each other via StatefulSet DNS. If pods start before DNS propagates, the runtime retries with exponential backoff (100ms to 5s). Check logs for “Waiting for remote worker” messages.