Horizontal Scaling Guide
This guide covers how to run a Volley pipeline across multiple K8s pods for workloads that exceed a single node.
Prerequisites
- Kubernetes cluster (v1.26+)
- A
ReadWriteManystorage class (AWS EFS, GCP Filestore, Azure Files) - The Volley K8s operator deployed (see Kubernetes Operator)
- Your pipeline built with the
distributedfeature enabled
When to Use Horizontal Scaling
Single-node mode handles most workloads — Volley sustains 9M+ records/sec on one node. Consider horizontal scaling when:
- Your key space is too large for one node’s memory (state doesn’t fit in RAM + RocksDB)
- You need fault tolerance across nodes (pod failure shouldn’t stop the pipeline)
- Your source has more partitions than one node can consume (e.g., 100+ Kafka partitions)
If your bottleneck is CPU on stateless operators (filter, map), scale vertically first. Horizontal scaling adds network overhead for the key shuffle.
Step 1: Enable the Distributed Feature
Add the distributed feature to your pipeline’s Cargo.toml:
[dependencies]
volley-core = { version = "0.8", features = ["distributed"] }
Step 2: Configure ClusterConfig
In your pipeline code, add with_cluster() to the environment:
#![allow(unused)]
fn main() {
use volley_core::prelude::*;
// In production, use ClusterConfig::from_env() to auto-detect from K8s env vars
let cluster = ClusterConfig::from_env();
let mut env = StreamExecutionEnvironment::new();
if let Some(config) = cluster {
env = env.with_cluster(config);
}
env.from_source(source)
.key_by(col("user_id"))
.aggregate_expr(sum(col("amount")), state_backend)
.to_stream()
.to_sink(sink)
.execute("my-pipeline")
.await?;
}
The pipeline API is identical — with_cluster() is the only addition. When ClusterConfig is absent (e.g., running locally), the pipeline runs in single-node mode with zero overhead.
Step 3: Create a Shared Checkpoint PVC
All workers need access to the same checkpoint directory. Create a ReadWriteMany PVC:
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: volley-shared-ckpt
namespace: my-pipeline
spec:
accessModes: [ReadWriteMany]
storageClassName: efs # AWS EFS, or your cloud's RWX storage class
resources:
requests:
storage: 50Gi
kubectl apply -f shared-pvc.yaml
Step 4: Deploy with the K8s Operator
Create a VolleyApplication with the cluster section:
apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
name: my-pipeline
namespace: my-pipeline
spec:
image: registry.example.com/my-pipeline:v1.0
replicas: 3
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
cluster:
maxKeyGroups: 256 # default, power of 2
flightPort: 50051 # default
sharedCheckpoints:
claimName: volley-shared-ckpt # the PVC from step 3
state:
size: 10Gi # per-worker local RocksDB
checkpoints:
size: 5Gi # per-worker local checkpoint staging
The operator automatically:
- Injects
VOLLEY_*environment variables (VOLLEY_NUM_WORKERS,VOLLEY_HEADLESS_SERVICE,VOLLEY_MAX_KEY_GROUPS,VOLLEY_FLIGHT_PORT,VOLLEY_APP_NAME) - Adds the Arrow Flight port (50051) to the container and headless service
- Mounts the shared checkpoint PVC at
/mnt/volley/checkpoints
Step 5: Verify
# Check pods are running
kubectl get pods -n my-pipeline
# NAME READY STATUS RESTARTS AGE
# my-pipeline-0 1/1 Running 0 2m
# my-pipeline-1 1/1 Running 0 1m
# my-pipeline-2 1/1 Running 0 30s
# Check env vars
kubectl exec my-pipeline-0 -n my-pipeline -- env | grep VOLLEY
# VOLLEY_NUM_WORKERS=3
# VOLLEY_HEADLESS_SERVICE=my-pipeline-headless.my-pipeline.svc
# VOLLEY_MAX_KEY_GROUPS=256
# VOLLEY_FLIGHT_PORT=50051
# VOLLEY_APP_NAME=my-pipeline
# Check Flight port on headless service
kubectl get svc my-pipeline-headless -n my-pipeline -o jsonpath='{.spec.ports[*].name}'
# health flight
# Check shared checkpoint mount
kubectl exec my-pipeline-0 -n my-pipeline -- ls /mnt/volley/checkpoints
Scaling
Change the replica count to scale:
kubectl patch vapp my-pipeline -n my-pipeline --type merge -p '{"spec":{"replicas":5}}'
The coordinator triggers a checkpoint, recomputes key group assignments, and workers pick up their new key groups from the shared filesystem. No restart required.
Monitoring
Key metrics to watch in distributed mode:
| Metric | What it tells you |
|---|---|
volley_shuffle_local_records_total | Records staying on this worker |
volley_shuffle_remote_records_total{target_worker="N"} | Records sent to each remote worker |
volley_flight_send_errors_total | Transport errors (indicates worker connectivity issues) |
volley_cluster_worker_id | This worker’s ordinal |
volley_cluster_num_workers | Total cluster size |
volley_cluster_key_groups_assigned | Key groups owned by this worker |
volley_barrier_alignment_wait_ms | Time to align barriers across all inputs |
Troubleshooting
Pipeline stalls after a pod restart: Check if the BarrierAligner is waiting for a dead input. The Flight server should clean up stale connections via TCP keepalive (10s). If the stall persists, check volley_flight_send_errors_total for rising error counts.
Uneven record distribution: Check volley_shuffle_remote_records_total per target worker. The key group assignment is contiguous — if your key distribution is skewed, some workers will handle more traffic. Consider increasing maxKeyGroups for finer granularity.
Checkpoint failures: Verify the shared PVC is mounted and writable on all pods:
kubectl exec my-pipeline-0 -- touch /mnt/volley/checkpoints/test && echo OK
DNS resolution failures at startup: Workers discover each other via StatefulSet DNS. If pods start before DNS propagates, the runtime retries with exponential backoff (100ms to 5s). Check logs for “Waiting for remote worker” messages.