Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Architecture Overview

Volley is a dataflow engine. You declare a pipeline as a graph of stages (source -> operators -> sink), and Volley executes it as concurrent Tokio tasks connected by bounded channels.

Pipeline Builder (Compile-Time)

StreamExecutionEnvironment::new()
    .from_source(...)
    .filter(...)
    .key_by(...)
    .window(...)
    .aggregate(...)
    .to_sink(...)
    .execute("job-name")

Runtime Execution (Tokio Tasks)

At runtime, each stage runs as an independent Tokio task. Stages communicate via bounded tokio::sync::mpsc channels. Backpressure propagates naturally: when a downstream channel is full, the upstream task blocks on send.

+--------+    channel    +----------+    channel    +------+
| Source |  --------->   | Operator |  --------->   | Sink |
| Task   |  RecordBatch  | Task     |  RecordBatch  | Task |
+--------+              +----------+               +------+
     |                       |                         |
     | CheckpointBarrier     | on_checkpoint()         | on_checkpoint()
     | (in-band)             | flush state             | commit writes
     v                       v                         v
+---------------------------------------------------------+
|              Checkpoint Coordinator                      |
|  Injects barriers -> collects acks -> snapshots state   |
+---------------------------------------------------------+

Checkpoint barriers and watermarks flow in-band alongside data records through the same channels. This is how Volley achieves aligned checkpoints without pausing the pipeline.

Intra-Node Parallelism

After .key_by(), records are hash-partitioned across N parallel operator instances. Each instance has its own state namespace. Checkpoint barriers are aligned across all partitions before snapshot.

                    +-- Partition 0 --> [Operator Instance 0] --+
Source --> HashPartitioner -- Partition 1 --> [Operator Instance 1] --> Merge --> Sink
                    +-- Partition 2 --> [Operator Instance 2] --+

Parallelism is set via .with_parallelism(N) on the stream builder.

Distributed Execution (Horizontal Scaling)

For workloads that exceed a single node, Volley supports distributed execution across multiple K8s pods. Enable it by passing a ClusterConfig:

#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
    .with_cluster(ClusterConfig::new(worker_id, num_workers, headless_service, app_name))
    .from_source(source)
    // ... same pipeline API ...
    .execute("job").await?;
}

In distributed mode, keyed operators use a two-level hashing scheme:

  1. Key group: hash(key) % max_key_groups (256 virtual partitions by default)
  2. Worker assignment: contiguous key group ranges assigned to each worker

Records destined for remote workers are shuffled via Arrow Flight DoExchange. Non-keyed operators (filter, map) run locally with no shuffle.

Worker 0                                    Worker 1
┌──────────────────────┐                    ┌──────────────────────┐
│ Source               │                    │ Source               │
│   ↓                  │                    │   ↓                  │
│ ShuffleRouter ──────►│── Arrow Flight ──►│◄── ShuffleRouter     │
│   ↓                  │◄── Arrow Flight ──│                      │
│ BarrierAligner       │                    │ BarrierAligner       │
│   ↓                  │                    │   ↓                  │
│ Sink                 │                    │ Sink                 │
└──────────────────────┘                    └──────────────────────┘

K8s primitives handle coordination:

  • Leader election: K8s Lease API — one worker becomes coordinator
  • Checkpoint metadata: stored in K8s ConfigMaps (no OwnerReference, survives redeployment)
  • Checkpoint data: shared filesystem (EFS/Filestore/Azure Files) via ReadWriteMany PVC
  • Worker discovery: StatefulSet headless service DNS
  • Live rescaling: checkpoint + reassignment, no state migration needed (shared FS)

Single-node mode remains the default with zero overhead when ClusterConfig is absent.

Graceful Shutdown

Source stops producing, drains in-flight data through the pipeline, then each stage shuts down in order.

Source Task --> channel(capacity) --> Operator Task --> channel(capacity) --> Sink Task
                    ^                                        ^
                    |                                        |
              backpressure                            backpressure
              (bounded send)                          (bounded send)

Deep Dives