Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Architecture Overview

A Volley pipeline is a graph of stages — one source, some operators, one or more sinks. You describe the graph in Rust, and Volley runs it as concurrent Tokio tasks connected by bounded channels. This page explains what happens once you call .execute().

Pipeline Builder (Compile-Time)

You build the graph with a fluent API. The order of calls matches the order of data flow:

StreamExecutionEnvironment::new()
    .from_source(...)
    .filter_expr(...)
    .key_by(...)
    .window(...)
    .aggregate_expr(..., state_backend)
    .to_sink(...)
    .execute("job-name")

Runtime Execution (Tokio Tasks)

At runtime, each stage runs as an independent Tokio task. Stages communicate via bounded tokio::sync::mpsc channels. Backpressure propagates naturally: when a downstream channel is full, the upstream task blocks on send.

+--------+    channel    +----------+    channel    +------+
| Source |  --------->   | Operator |  --------->   | Sink |
| Task   |  RecordBatch  | Task     |  RecordBatch  | Task |
+--------+              +----------+               +------+
     |                       |                         |
     | CheckpointBarrier     | on_checkpoint()         | on_checkpoint()
     | (in-band)             | flush state             | commit writes
     v                       v                         v
+---------------------------------------------------------+
|              Checkpoint Coordinator                      |
|  Injects barriers -> collects acks -> snapshots state   |
+---------------------------------------------------------+

Checkpoint barriers and watermarks flow in-band alongside data records through the same channels. This is how Volley achieves aligned checkpoints without pausing the pipeline.

Intra-Node Parallelism

After .key_by(), records are hash-partitioned across N parallel operator instances. Each instance has its own state namespace. Checkpoint barriers are aligned across all partitions before snapshot.

                    +-- Partition 0 --> [Operator Instance 0] --+
Source --> HashPartitioner -- Partition 1 --> [Operator Instance 1] --> Merge --> Sink
                    +-- Partition 2 --> [Operator Instance 2] --+

Parallelism is set via .with_parallelism(N) on the stream builder.

Distributed Execution (Horizontal Scaling)

For workloads that exceed a single node, Volley supports distributed execution across multiple K8s pods. Enable it by passing a ClusterConfig:

#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
    .with_cluster(ClusterConfig::new(worker_id, num_workers, headless_service, app_name))
    .from_source(source)
    // ... same pipeline API ...
    .execute("job").await?;
}

In distributed mode, keyed operators use a two-level hashing scheme:

  1. Key group: hash(key) % max_key_groups (256 virtual partitions by default)
  2. Worker assignment: contiguous key group ranges assigned to each worker

Records destined for remote workers are shuffled via Arrow Flight DoExchange. Non-keyed operators (filter, map) run locally with no shuffle.

Worker 0                                    Worker 1
┌──────────────────────┐                    ┌──────────────────────┐
│ Source               │                    │ Source               │
│   ↓                  │                    │   ↓                  │
│ ShuffleRouter ──────►│── Arrow Flight ──►│◄── ShuffleRouter     │
│   ↓                  │◄── Arrow Flight ──│                      │
│ BarrierAligner       │                    │ BarrierAligner       │
│   ↓                  │                    │   ↓                  │
│ Sink                 │                    │ Sink                 │
└──────────────────────┘                    └──────────────────────┘

K8s primitives handle coordination:

  • Leader election: K8s Lease API — one worker becomes coordinator
  • Checkpoint metadata: stored in K8s ConfigMaps (no OwnerReference, survives redeployment)
  • Checkpoint data: shared filesystem (EFS/Filestore/Azure Files) via ReadWriteMany PVC
  • Worker discovery: StatefulSet headless service DNS
  • Live rescaling: checkpoint + reassignment, no state migration needed (shared FS)

Single-node mode remains the default with zero overhead when ClusterConfig is absent.

Graceful Shutdown

Source stops producing, drains in-flight data through the pipeline, then each stage shuts down in order.

Source Task --> channel(capacity) --> Operator Task --> channel(capacity) --> Sink Task
                    ^                                        ^
                    |                                        |
              backpressure                            backpressure
              (bounded send)                          (bounded send)

Deep Dives