Architecture Overview
Volley is a dataflow engine. You declare a pipeline as a graph of stages (source -> operators -> sink), and Volley executes it as concurrent Tokio tasks connected by bounded channels.
Pipeline Builder (Compile-Time)
StreamExecutionEnvironment::new()
.from_source(...)
.filter(...)
.key_by(...)
.window(...)
.aggregate(...)
.to_sink(...)
.execute("job-name")
Runtime Execution (Tokio Tasks)
At runtime, each stage runs as an independent Tokio task. Stages communicate via bounded tokio::sync::mpsc channels. Backpressure propagates naturally: when a downstream channel is full, the upstream task blocks on send.
+--------+ channel +----------+ channel +------+
| Source | ---------> | Operator | ---------> | Sink |
| Task | RecordBatch | Task | RecordBatch | Task |
+--------+ +----------+ +------+
| | |
| CheckpointBarrier | on_checkpoint() | on_checkpoint()
| (in-band) | flush state | commit writes
v v v
+---------------------------------------------------------+
| Checkpoint Coordinator |
| Injects barriers -> collects acks -> snapshots state |
+---------------------------------------------------------+
Checkpoint barriers and watermarks flow in-band alongside data records through the same channels. This is how Volley achieves aligned checkpoints without pausing the pipeline.
Intra-Node Parallelism
After .key_by(), records are hash-partitioned across N parallel operator instances. Each instance has its own state namespace. Checkpoint barriers are aligned across all partitions before snapshot.
+-- Partition 0 --> [Operator Instance 0] --+
Source --> HashPartitioner -- Partition 1 --> [Operator Instance 1] --> Merge --> Sink
+-- Partition 2 --> [Operator Instance 2] --+
Parallelism is set via .with_parallelism(N) on the stream builder.
Distributed Execution (Horizontal Scaling)
For workloads that exceed a single node, Volley supports distributed execution across multiple K8s pods. Enable it by passing a ClusterConfig:
#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
.with_cluster(ClusterConfig::new(worker_id, num_workers, headless_service, app_name))
.from_source(source)
// ... same pipeline API ...
.execute("job").await?;
}
In distributed mode, keyed operators use a two-level hashing scheme:
- Key group:
hash(key) % max_key_groups(256 virtual partitions by default) - Worker assignment: contiguous key group ranges assigned to each worker
Records destined for remote workers are shuffled via Arrow Flight DoExchange. Non-keyed operators (filter, map) run locally with no shuffle.
Worker 0 Worker 1
┌──────────────────────┐ ┌──────────────────────┐
│ Source │ │ Source │
│ ↓ │ │ ↓ │
│ ShuffleRouter ──────►│── Arrow Flight ──►│◄── ShuffleRouter │
│ ↓ │◄── Arrow Flight ──│ │
│ BarrierAligner │ │ BarrierAligner │
│ ↓ │ │ ↓ │
│ Sink │ │ Sink │
└──────────────────────┘ └──────────────────────┘
K8s primitives handle coordination:
- Leader election: K8s Lease API — one worker becomes coordinator
- Checkpoint metadata: stored in K8s ConfigMaps (no OwnerReference, survives redeployment)
- Checkpoint data: shared filesystem (EFS/Filestore/Azure Files) via
ReadWriteManyPVC - Worker discovery: StatefulSet headless service DNS
- Live rescaling: checkpoint + reassignment, no state migration needed (shared FS)
Single-node mode remains the default with zero overhead when ClusterConfig is absent.
Graceful Shutdown
Source stops producing, drains in-flight data through the pipeline, then each stage shuts down in order.
Source Task --> channel(capacity) --> Operator Task --> channel(capacity) --> Sink Task
^ ^
| |
backpressure backpressure
(bounded send) (bounded send)
Deep Dives
- Data Model — Arrow RecordBatch, StreamRecord, StreamElement
- Pipeline Builder — type-state pattern and compile-time safety
- Exactly-Once Semantics — checkpoint barriers and recovery
- State Management — write-behind cache, RocksDB, and checkpoints