Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Horizontal Scaling

Volley supports distributed execution across multiple K8s pods. Single-node mode remains the default with zero overhead. Horizontal scaling is opt-in via ClusterConfig.

Design Principles

  • Symmetric workers — every pod runs the same binary. No dedicated driver node.
  • K8s-native — uses Lease API for leader election, ConfigMaps for checkpoint metadata, StatefulSet DNS for discovery. No ZooKeeper.
  • Single-node is free — when ClusterConfig is absent, no distributed code paths execute.
  • Shared filesystem — checkpoint data on EFS/Filestore/Azure Files. No state migration needed for rescaling.

Two-Level Key Hashing

Records are partitioned across workers using a two-level scheme:

hash(key) → key_group → worker
  1. Key group: hash(key) % max_key_groups (default 256). This is a virtual partition that never changes.
  2. Worker assignment: contiguous ranges of key groups are assigned to workers. Rescaling reassigns ranges without rehashing.
256 key groups, 3 workers:

Worker 0: key groups [0, 86)    — 86 groups
Worker 1: key groups [86, 171)  — 85 groups
Worker 2: key groups [171, 256) — 85 groups

This design, borrowed from Apache Flink’s KeyGroupRangeAssignment, minimizes data movement during rescaling. Scaling from 2 to 3 workers moves roughly 1/3 of key groups.

Arrow Flight Shuffle

When a keyed operator runs in distributed mode, records are shuffled between workers via Apache Arrow Flight DoExchange:

Worker 0                                    Worker 1
┌──────────────────────┐                    ┌──────────────────────┐
│ Source               │                    │ Source               │
│   ↓                  │                    │   ↓                  │
│ ShuffleRouter        │   Arrow Flight     │ ShuffleRouter        │
│   ├─ local partition ─┼─────────────────→ │   ├─ local partition │
│   └─ remote → Flight ─┼─────────────────→ │   └─ remote → Flight│
│                      │ ←────────────────  │                      │
│ BarrierAligner       │                    │ BarrierAligner       │
│   (local + remote)   │                    │   (local + remote)   │
│   ↓                  │                    │   ↓                  │
│ Sink                 │                    │ Sink                 │
└──────────────────────┘                    └──────────────────────┘
  • ShuffleRouter replaces PartitionRouter for keyed operators. Routes via key_group → worker → local mpsc or Flight sender.
  • Non-keyed operators (filter, map, flat_map) run locally — no shuffle.
  • BarrierAligner merges both local partition outputs and remote Flight inputs.
  • Barriers and watermarks are broadcast to all local partitions AND all remote workers.

Wire Format

StreamElement variants are encoded as FlightData with a discriminator byte in app_metadata[0]:

ByteVariantdata_bodyapp_metadata[1..]
0x01DataArrow IPC RecordBatchJSON record metadata
0x02BarrieremptyJSON CheckpointBarrier
0x03WatermarkemptyJSON Watermark

K8s-Native Coordination

Leader Election

One worker acquires a K8s Lease (coordination.k8s.io/v1) and becomes the coordinator. The coordinator handles checkpoint injection and key group assignment. It still processes data normally — coordination is lightweight.

If the leader dies, the lease expires and another worker takes over.

Checkpoint Metadata

Completed checkpoint metadata is stored in K8s ConfigMaps:

  • {app}-ckpt-epoch-{N} — per-epoch metadata (workers, state paths, source offsets)
  • {app}-ckpt-latest — pointer to the latest completed epoch

ConfigMaps have no OwnerReference, so they survive CRD deletion. This allows recovery after redeployment.

Shared Filesystem Checkpoints

Checkpoint data is written to a shared filesystem (AWS EFS, GCP Filestore, Azure Files) mounted as a ReadWriteMany PVC. All workers write to and read from the same mount.

/mnt/volley/checkpoints/
  epoch-42/
    worker-0/     ← RocksDB SST files
    worker-1/
    worker-2/

Recovery is near-instant: the pod reads the checkpoint directory directly. No download step.

Worker Discovery

Workers discover each other via StatefulSet headless service DNS:

{app}-{ordinal}.{app}-headless.{namespace}.svc:{flight_port}

Live Rescaling

Since all workers share the same filesystem, rescaling doesn’t require state migration:

  1. Coordinator detects replica count change
  2. Triggers a checkpoint — all workers flush in-memory state to shared FS
  3. Computes new KeyGroupAssignment and diffs against the old one
  4. Broadcasts RescaleEvent via watch channel
  5. Workers stop processing lost key groups, load state for gained ones from the shared mount
  6. Resume — no restart, no data transfer

Key Types

TypeLocationPurpose
ClusterConfigcluster::configWorker ID, num workers, headless service, app name, flight port
KeyGroupAssignmentcluster::assignmentMaps key groups to workers with contiguous range assignment
KeyGroupRangecluster::assignmentA [start, end) range of key groups owned by one worker
ShuffleRoutercluster::shuffleRoutes records to local or remote destinations
ShuffleFlightServicecluster::flight::serverArrow Flight DoExchange server
ShuffleFlightClientcluster::flight::clientArrow Flight DoExchange client with connection pooling
LeaseLeaderElectorcluster::coordinator::leaderK8s Lease-based leader election
DistributedCheckpointCoordinatorcluster::coordinator::checkpoint_coordinatorConfigMap-based checkpoint metadata
RescaleCoordinatorcluster::rescaleCheckpoint + key group reassignment