Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Horizontal Scaling Guide

This guide covers how to run a Volley pipeline across multiple K8s pods for workloads that exceed a single node.

Prerequisites

  • Kubernetes cluster (v1.26+)
  • A ReadWriteMany storage class (AWS EFS, GCP Filestore, Azure Files)
  • The Volley K8s operator deployed (see Kubernetes Operator)
  • Your pipeline built with the distributed feature enabled

When to Use Horizontal Scaling

Single-node mode handles most workloads — Volley sustains 9M+ records/sec on one node. Consider horizontal scaling when:

  • Your key space is too large for one node’s memory (state doesn’t fit in RAM + RocksDB)
  • You need fault tolerance across nodes (pod failure shouldn’t stop the pipeline)
  • Your source has more partitions than one node can consume (e.g., 100+ Kafka partitions)

If your bottleneck is CPU on stateless operators (filter, map), scale vertically first. Horizontal scaling adds network overhead for the key shuffle.

Step 1: Enable the Distributed Feature

Add the distributed feature to your pipeline’s Cargo.toml:

[dependencies]
volley-core = { version = "0.8", features = ["distributed"] }

Step 2: Configure ClusterConfig

In your pipeline code, add with_cluster() to the environment:

#![allow(unused)]
fn main() {
use volley_core::prelude::*;

// In production, use ClusterConfig::from_env() to auto-detect from K8s env vars
let cluster = ClusterConfig::from_env();

let mut env = StreamExecutionEnvironment::new();
if let Some(config) = cluster {
    env = env.with_cluster(config);
}

env.from_source(source)
    .key_by(col("user_id"))
    .aggregate_expr(sum(col("amount")), state_backend)
    .to_stream()
    .to_sink(sink)
    .execute("my-pipeline")
    .await?;
}

The pipeline API is identical — with_cluster() is the only addition. When ClusterConfig is absent (e.g., running locally), the pipeline runs in single-node mode with zero overhead.

Step 3: Create a Shared Checkpoint PVC

All workers need access to the same checkpoint directory. Create a ReadWriteMany PVC:

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: volley-shared-ckpt
  namespace: my-pipeline
spec:
  accessModes: [ReadWriteMany]
  storageClassName: efs    # AWS EFS, or your cloud's RWX storage class
  resources:
    requests:
      storage: 50Gi
kubectl apply -f shared-pvc.yaml

Step 4: Deploy with the K8s Operator

Create a VolleyApplication with the cluster section:

apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
  name: my-pipeline
  namespace: my-pipeline
spec:
  image: registry.example.com/my-pipeline:v1.0
  replicas: 3

  resources:
    requests:
      cpu: "2"
      memory: "4Gi"
    limits:
      cpu: "4"
      memory: "8Gi"

  cluster:
    maxKeyGroups: 256              # default, power of 2
    flightPort: 50051              # default
    sharedCheckpoints:
      claimName: volley-shared-ckpt  # the PVC from step 3

  state:
    size: 10Gi                     # per-worker local RocksDB

  checkpoints:
    size: 5Gi                      # per-worker local checkpoint staging

The operator automatically:

  • Injects VOLLEY_* environment variables (VOLLEY_NUM_WORKERS, VOLLEY_HEADLESS_SERVICE, VOLLEY_MAX_KEY_GROUPS, VOLLEY_FLIGHT_PORT, VOLLEY_APP_NAME)
  • Adds the Arrow Flight port (50051) to the container and headless service
  • Mounts the shared checkpoint PVC at /mnt/volley/checkpoints

Step 5: Verify

# Check pods are running
kubectl get pods -n my-pipeline
# NAME             READY   STATUS    RESTARTS   AGE
# my-pipeline-0    1/1     Running   0          2m
# my-pipeline-1    1/1     Running   0          1m
# my-pipeline-2    1/1     Running   0          30s

# Check env vars
kubectl exec my-pipeline-0 -n my-pipeline -- env | grep VOLLEY
# VOLLEY_NUM_WORKERS=3
# VOLLEY_HEADLESS_SERVICE=my-pipeline-headless.my-pipeline.svc
# VOLLEY_MAX_KEY_GROUPS=256
# VOLLEY_FLIGHT_PORT=50051
# VOLLEY_APP_NAME=my-pipeline

# Check Flight port on headless service
kubectl get svc my-pipeline-headless -n my-pipeline -o jsonpath='{.spec.ports[*].name}'
# health flight

# Check shared checkpoint mount
kubectl exec my-pipeline-0 -n my-pipeline -- ls /mnt/volley/checkpoints

Scaling

Change the replica count to scale:

kubectl patch vapp my-pipeline -n my-pipeline --type merge -p '{"spec":{"replicas":5}}'

The coordinator triggers a checkpoint, recomputes key group assignments, and workers pick up their new key groups from the shared filesystem. No restart required.

Monitoring

Key metrics to watch in distributed mode:

MetricWhat it tells you
volley_shuffle_local_records_totalRecords staying on this worker
volley_shuffle_remote_records_total{target_worker="N"}Records sent to each remote worker
volley_flight_send_errors_totalTransport errors (indicates worker connectivity issues)
volley_cluster_worker_idThis worker’s ordinal
volley_cluster_num_workersTotal cluster size
volley_cluster_key_groups_assignedKey groups owned by this worker
volley_barrier_alignment_wait_msTime to align barriers across all inputs

Troubleshooting

Pipeline stalls after a pod restart: Check if the BarrierAligner is waiting for a dead input. The Flight server should clean up stale connections via TCP keepalive (10s). If the stall persists, check volley_flight_send_errors_total for rising error counts.

Uneven record distribution: Check volley_shuffle_remote_records_total per target worker. The key group assignment is contiguous — if your key distribution is skewed, some workers will handle more traffic. Consider increasing maxKeyGroups for finer granularity.

Checkpoint failures: Verify the shared PVC is mounted and writable on all pods:

kubectl exec my-pipeline-0 -- touch /mnt/volley/checkpoints/test && echo OK

DNS resolution failures at startup: Workers discover each other via StatefulSet DNS. If pods start before DNS propagates, the runtime retries with exponential backoff (100ms to 5s). Check logs for “Waiting for remote worker” messages.