Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Horizontal Scaling Guide

Run a single Volley pipeline across several Kubernetes pods when one machine can’t keep up — too many Kafka partitions, too much keyed state, or you need a pod failure to leave processing intact. You don’t change your pipeline code; you add a ClusterConfig, a shared ReadWriteMany checkpoint volume, and a replicas count in the operator CR. Volley handles key distribution, shuffling between pods, and live rescaling.

This page is the deployment checklist. For the design behind it (key groups, Arrow Flight shuffle, leader election), see Horizontal Scaling in Architecture.

Prerequisites

  • Kubernetes cluster (v1.26+)
  • A ReadWriteMany storage class (AWS EFS, GCP Filestore, Azure Files)
  • The Volley K8s operator deployed (see Kubernetes Operator)
  • Your pipeline built with the distributed feature enabled

When to Use Horizontal Scaling

Single-node mode handles most workloads. Consider horizontal scaling when:

  • Your key space is too large for one node’s memory (state doesn’t fit in RAM + RocksDB)
  • You need fault tolerance across nodes (pod failure shouldn’t stop the pipeline)
  • Your source has more partitions than one node can consume (e.g., 100+ Kafka partitions)

If your bottleneck is CPU on stateless operators (filter, map), scale vertically first. Horizontal scaling adds network overhead for the key shuffle.

Step 1: Enable the Distributed Feature

Add the distributed feature to your pipeline’s Cargo.toml:

[dependencies]
volley-core = { version = "0.8", features = ["distributed"] }

Step 2: Configure ClusterConfig

In your pipeline code, add with_cluster() to the environment:

#![allow(unused)]
fn main() {
use volley_core::prelude::*;

// In production, use ClusterConfig::from_env() to auto-detect from K8s env vars
let cluster = ClusterConfig::from_env();

let mut env = StreamExecutionEnvironment::new();
if let Some(config) = cluster {
    env = env.with_cluster(config);
}

env.from_source(source)
    .key_by(col("user_id"))
    .aggregate_expr(vec![sum(col("amount"))], state_backend)
    .to_sink(sink)
    .execute("my-pipeline")
    .await?;
}

The pipeline API is identical — with_cluster() is the only addition. When ClusterConfig is absent (e.g., running locally), the pipeline runs in single-node mode with zero overhead.

Step 3: Create a Shared Checkpoint PVC

All workers need access to the same checkpoint directory. Create a ReadWriteMany PVC:

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: volley-shared-ckpt
  namespace: my-pipeline
spec:
  accessModes: [ReadWriteMany]
  storageClassName: efs    # AWS EFS, or your cloud's RWX storage class
  resources:
    requests:
      storage: 50Gi
kubectl apply -f shared-pvc.yaml

Step 4: Deploy with the K8s Operator

Create a VolleyApplication with the cluster section:

apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
  name: my-pipeline
  namespace: my-pipeline
spec:
  image: registry.example.com/my-pipeline:v1.0
  replicas: 3

  resources:
    requests:
      cpu: "2"
      memory: "4Gi"
    limits:
      cpu: "4"
      memory: "8Gi"

  cluster:
    maxKeyGroups: 256              # default, power of 2
    flightPort: 50051              # default
    sharedCheckpoints:
      claimName: volley-shared-ckpt  # the PVC from step 3

  state:
    size: 10Gi                     # per-worker local RocksDB

  checkpoints:
    size: 5Gi                      # per-worker local checkpoint staging

The operator automatically:

  • Injects VOLLEY_* environment variables (VOLLEY_NUM_WORKERS, VOLLEY_HEADLESS_SERVICE, VOLLEY_MAX_KEY_GROUPS, VOLLEY_FLIGHT_PORT, VOLLEY_APP_NAME)
  • Adds the Arrow Flight port (50051) to the container and headless service
  • Mounts the shared checkpoint PVC at /mnt/volley/checkpoints

Step 5: Verify

# Check pods are running
kubectl get pods -n my-pipeline
# NAME             READY   STATUS    RESTARTS   AGE
# my-pipeline-0    1/1     Running   0          2m
# my-pipeline-1    1/1     Running   0          1m
# my-pipeline-2    1/1     Running   0          30s

# Check env vars
kubectl exec my-pipeline-0 -n my-pipeline -- env | grep VOLLEY
# VOLLEY_NUM_WORKERS=3
# VOLLEY_HEADLESS_SERVICE=my-pipeline-headless.my-pipeline.svc
# VOLLEY_MAX_KEY_GROUPS=256
# VOLLEY_FLIGHT_PORT=50051
# VOLLEY_APP_NAME=my-pipeline

# Check Flight port on headless service
kubectl get svc my-pipeline-headless -n my-pipeline -o jsonpath='{.spec.ports[*].name}'
# health flight

# Check shared checkpoint mount
kubectl exec my-pipeline-0 -n my-pipeline -- ls /mnt/volley/checkpoints

Scaling

Change the replica count to scale:

kubectl patch vapp my-pipeline -n my-pipeline --type merge -p '{"spec":{"replicas":5}}'

The coordinator triggers a checkpoint, recomputes key group assignments, and workers pick up their new key groups from the shared filesystem. No restart required.

Monitoring

Key metrics to watch in distributed mode:

MetricWhat it tells you
volley_shuffle_local_records_totalRecords staying on this worker
volley_shuffle_remote_records_total{target_worker="N"}Records sent to each remote worker
volley_flight_send_errors_totalTransport errors (indicates worker connectivity issues)
volley_flight_reconnects_total{target_worker}Successful reconnects after a HalfOpen probe — a steady trickle is a peer that keeps flapping; a single spike lines up with one pod restart
volley_circuit_breaker_state{target_worker}0=Closed, 1=Open, 2=HalfOpen per remote worker
volley_circuit_breaker_trips_total{target_worker}Closed → Open transitions
volley_circuit_breaker_probes_succeeded_total{target_worker}HalfOpen → Closed transitions; ratio with the trips counter is the reconnect success rate
volley_cluster_worker_idThis worker’s ordinal
volley_cluster_num_workersTotal cluster size
volley_cluster_key_groups_assignedKey groups owned by this worker
volley_barrier_alignment_wait_msTime to align barriers across all inputs

Troubleshooting

Pipeline stalls after a pod restart: Check if the BarrierAligner is waiting for a dead input. The Flight server should clean up stale connections via TCP keepalive (10s). If the stall persists, check volley_flight_send_errors_total for rising error counts. After a peer pod restart, the HalfOpen probe rebuilds the outbound tonic::Channel, reregisters the shuffle channel, and replays every unacknowledged frame from the client’s 8 MiB per-peer outbox onto the fresh stream (the server dedups via its last_delivered_seq so survivors land exactly once). Confirm volley_flight_reconnects_total{target_worker} increments and volley_circuit_breaker_state returns to 0 (Closed). Because replay is at-least-once, the epoch in flight at the moment of the crash often recovers without aborting — check that volley_dag_sink_aborts_total stays flat across the restart window. Longer outages (a reconnect that runs past the 2PC deadline) still fall back to a cluster-wide abort, with the next epoch committing cleanly.

Outbox backpressure (sender appears to stall per-peer): The client’s outbox is byte-bounded (8 MiB / peer by default; see DEFAULT_OUTBOX_MAX_BYTES). When a remote consumer goes silent — network partition, GC pause, OOM — the encoder parks on the byte budget until ACKs trim the backlog. Symptoms: volley_shuffle_remote_records_total{target_worker="N"} flatlines while other targets keep climbing. Root-cause by looking at the peer’s volley_flight_recv_messages_total and its drain-task logs; the outbox itself is doing the right thing by pushing back instead of running the worker out of memory.

Uneven record distribution: Check volley_shuffle_remote_records_total per target worker. The key group assignment is contiguous — if your key distribution is skewed, some workers will handle more traffic. Consider increasing maxKeyGroups for finer granularity.

Checkpoint failures: Verify the shared PVC is mounted and writable on all pods:

kubectl exec my-pipeline-0 -- touch /mnt/volley/checkpoints/test && echo OK

DNS resolution failures at startup: Workers discover each other via StatefulSet DNS. If pods start before DNS propagates, the runtime retries with exponential backoff (100ms to 5s). Check logs for “Waiting for remote worker” messages.