Horizontal Scaling
Volley supports distributed execution across multiple K8s pods. Single-node mode remains the default with zero overhead. Horizontal scaling is opt-in via ClusterConfig.
Design Principles
- Symmetric workers — every pod runs the same binary. No dedicated driver node.
- K8s-native — uses Lease API for leader election, ConfigMaps for checkpoint metadata, StatefulSet DNS for discovery. No ZooKeeper.
- Single-node is free — when
ClusterConfigis absent, no distributed code paths execute. - Shared filesystem — checkpoint data on EFS/Filestore/Azure Files. No state migration needed for rescaling.
Two-Level Key Hashing
Records are partitioned across workers using a two-level scheme:
hash(key) → key_group → worker
- Key group:
hash(key) % max_key_groups(default 256). This is a virtual partition that never changes. - Worker assignment: contiguous ranges of key groups are assigned to workers. Rescaling reassigns ranges without rehashing.
256 key groups, 3 workers:
Worker 0: key groups [0, 86) — 86 groups
Worker 1: key groups [86, 171) — 85 groups
Worker 2: key groups [171, 256) — 85 groups
This design, borrowed from Apache Flink’s KeyGroupRangeAssignment, minimizes data movement during rescaling. Scaling from 2 to 3 workers moves roughly 1/3 of key groups.
Arrow Flight Shuffle
When a keyed operator runs in distributed mode, records are shuffled between workers via Apache Arrow Flight DoExchange:
Worker 0 Worker 1
┌──────────────────────┐ ┌──────────────────────┐
│ Source │ │ Source │
│ ↓ │ │ ↓ │
│ ShuffleRouter │ Arrow Flight │ ShuffleRouter │
│ ├─ local partition ─┼─────────────────→ │ ├─ local partition │
│ └─ remote → Flight ─┼─────────────────→ │ └─ remote → Flight│
│ │ ←──────────────── │ │
│ BarrierAligner │ │ BarrierAligner │
│ (local + remote) │ │ (local + remote) │
│ ↓ │ │ ↓ │
│ Sink │ │ Sink │
└──────────────────────┘ └──────────────────────┘
- ShuffleRouter replaces
PartitionRouterfor keyed operators. Routes viakey_group → worker → local mpsc or Flight sender. - Non-keyed operators (filter, map, flat_map) run locally — no shuffle.
- BarrierAligner merges both local partition outputs and remote Flight inputs.
- Barriers and watermarks are broadcast to all local partitions AND all remote workers.
Wire Format
StreamElement variants are encoded as FlightData with a discriminator byte in app_metadata[0]:
| Byte | Variant | data_body | app_metadata[1..] |
|---|---|---|---|
| 0x01 | Data | Arrow IPC RecordBatch | JSON record metadata |
| 0x02 | Barrier | empty | JSON CheckpointBarrier |
| 0x03 | Watermark | empty | JSON Watermark |
K8s-Native Coordination
Leader Election
One worker acquires a K8s Lease (coordination.k8s.io/v1) and becomes the coordinator. The coordinator handles checkpoint injection and key group assignment. It still processes data normally — coordination is lightweight.
If the leader dies, the lease expires and another worker takes over.
Checkpoint Metadata
Completed checkpoint metadata is stored in K8s ConfigMaps:
{app}-ckpt-epoch-{N}— per-epoch metadata (workers, state paths, source offsets){app}-ckpt-latest— pointer to the latest completed epoch
ConfigMaps have no OwnerReference, so they survive CRD deletion. This allows recovery after redeployment.
Shared Filesystem Checkpoints
Checkpoint data is written to a shared filesystem (AWS EFS, GCP Filestore, Azure Files) mounted as a ReadWriteMany PVC. All workers write to and read from the same mount.
/mnt/volley/checkpoints/
epoch-42/
worker-0/ ← RocksDB SST files
worker-1/
worker-2/
Recovery is near-instant: the pod reads the checkpoint directory directly. No download step.
Worker Discovery
Workers discover each other via StatefulSet headless service DNS:
{app}-{ordinal}.{app}-headless.{namespace}.svc:{flight_port}
Live Rescaling
Since all workers share the same filesystem, rescaling doesn’t require state migration:
- Coordinator detects replica count change
- Triggers a checkpoint — all workers flush in-memory state to shared FS
- Computes new
KeyGroupAssignmentand diffs against the old one - Broadcasts
RescaleEventviawatchchannel - Workers stop processing lost key groups, load state for gained ones from the shared mount
- Resume — no restart, no data transfer
Key Types
| Type | Location | Purpose |
|---|---|---|
ClusterConfig | cluster::config | Worker ID, num workers, headless service, app name, flight port |
KeyGroupAssignment | cluster::assignment | Maps key groups to workers with contiguous range assignment |
KeyGroupRange | cluster::assignment | A [start, end) range of key groups owned by one worker |
ShuffleRouter | cluster::shuffle | Routes records to local or remote destinations |
ShuffleFlightService | cluster::flight::server | Arrow Flight DoExchange server |
ShuffleFlightClient | cluster::flight::client | Arrow Flight DoExchange client with connection pooling |
LeaseLeaderElector | cluster::coordinator::leader | K8s Lease-based leader election |
DistributedCheckpointCoordinator | cluster::coordinator::checkpoint_coordinator | ConfigMap-based checkpoint metadata |
RescaleCoordinator | cluster::rescale | Checkpoint + key group reassignment |