Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Introduction

Volley is a Rust library for building streaming pipelines — read from Kafka or a bucket, filter and aggregate with SQL-style expressions, write to Kafka, Delta, Iceberg, or a database, and have every record counted exactly once even when things crash. Pipelines are single Rust binaries: no JVM, no separate job server, and every operator is checked by the compiler before it ever runs.

If you’ve used Flink or Kafka Streams, the shape is familiar — a fluent DataStream builder, event-time windows, checkpoints. If you’ve used DataFusion, Polars, or DuckDB, the operators are familiar — expressions over Arrow RecordBatches. Volley is the place those two worlds meet.

The API is type-safe at compile time, the docs are structured for both humans and AI coding agents, and every example below is runnable out of the box.

Key Features

  • High Performance — Native Arrow batching with write-behind state caching. Validated via NEXMark benchmarks
  • Exactly-Once Semantics — Checkpoint-based fault tolerance with aligned barrier snapshotting
  • Flink-like API — Fluent DataStream API with typestate-enforced compile-time safety
  • Expression Engine — Vectorized filter_expr(), select_expr(), aggregate_expr() powered by DataFusion. You write the same kinds of predicates and projections you’d write in SQL (col("price").gt(lit(100)), sum(col("amount"))) and they run column-at-a-time on native Arrow types
  • Event-Time Windowing — Tumbling, sliding, and session windows with watermark tracking
  • State Management — RocksDB backend with write-behind cache and hardlink checkpoints
  • Intra-Node Parallelism — Hash-partitioned parallel operator instances
  • Kafka Integration — Source and sink with exactly-once transactional semantics
  • Cloud Blob Sources — S3+SQS and Azure Blob+Queue with Parquet/JSON/CSV/Avro
  • Cloud Blob Sinks — S3, Azure Blob, and GCS with Parquet/JSON Lines/CSV
  • Table Format Sinks — Delta Lake and Apache Iceberg with exactly-once commits
  • Kubernetes Operator — VolleyApplication CRD with autoscaling, health checks, and Prometheus

Built On

ComponentPurpose
Apache ArrowIn-memory columnar format
DataFusionQuery engine and expression evaluation
RocksDBState management and checkpointing
TokioAsync runtime for task-per-stage execution

Architecture

+-----------------------------------------------------+
|  Stream Processing Layer (Rust)                     |
|  +- Fluent Builder API (DataStream, KeyedStream)    |
|  +- Multi-Row RecordBatch Operators                 |
|  +- Watermarks & Event Time                         |
|  +- Windowing (Tumbling, Sliding, Session)          |
|  +- Intra-Node Parallelism (hash-partitioned)       |
|  +- Checkpoint Coordinator                          |
+-----------------------+-----------------------------+
                        |
+-----------------------v-----------------------------+
|  Async Runtime (Tokio)                              |
|  +- Task-per-Stage Execution                        |
|  +- Bounded Channels (Backpressure)                 |
|  +- Graceful Shutdown                               |
+-----------------------+-----------------------------+
                        |
+-----------------------v-----------------------------+
|  State Management (RocksDB)                         |
|  +- Write-Behind Accumulator Cache                  |
|  +- BatchStateBackend (multi_get / WriteBatch)      |
|  +- Per-Key State Isolation                         |
|  +- Hardlink Checkpoints                            |
+-----------------------+-----------------------------+
                        |
+-----------------------v-----------------------------+
|  Connectors                                         |
|  +- Kafka (exactly-once, feature-gated)             |
|  +- Cloud Blob Stores (S3, Azure, GCS)              |
|  +- Delta Lake, Iceberg (exactly-once sinks)        |
|  +- Memory (batched source, counting sink)          |
+-----------------------------------------------------+

Performance

Volley uses the NEXMark benchmark suite for performance evaluation — the industry standard for stream processing systems. See volley-benchmark/ for methodology, results, and Flink comparison baseline.

Connectors

Sources

ConnectorFormatsStatus
KafkaJSON, Protobuf, Avro (Schema Registry)Available (feature-gated)
AWS S3 + SQSParquet, JSON, CSV, AvroAvailable
Azure Blob + QueueParquet, JSON, CSV, AvroAvailable
Memory/IteratorArrow RecordBatchAvailable

Sinks

ConnectorFormatsStatus
KafkaJSON, Protobuf, Avro (Schema Registry)Available (feature-gated, transactional)
AWS S3 · Azure Blob · GCSParquet, JSON Lines, CSVAvailable (BufferedBlobSink)
Delta LakeParquetAvailable (exactly-once)
Apache IcebergParquetAvailable (REST catalog)
Memory (collect)Arrow RecordBatchAvailable

Next Steps

Installation

Rust Toolchain

Volley requires a stable Rust toolchain. Install via rustup:

curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
rustup default stable

System Dependencies

Most crates build with a standard Rust toolchain. Some connectors need additional system packages:

DependencyRequired bymacOSUbuntu
cmakevolley-connector-kafka (rdkafka)brew install cmakeapt install cmake
protobuf-compilervolley-schedulerbrew install protobufapt install protobuf-compiler

Note: libcurl and OpenSSL are vendored and statically linked by rdkafka — no system packages needed.

Run volley doctor to check your environment for missing dependencies.

Install the CLI

curl -fsSL https://raw.githubusercontent.com/volley-streams/volley/main/install.sh | sh

Or with Cargo:

cargo install --git https://github.com/volley-streams/volley volley-cli

Adding Volley to a Project

Scaffold a new pipeline

# Interactive mode — prompts for source, sink, and processing pattern
volley new my-pipeline

# Non-interactive with explicit flags (comma-separate to chain operators)
volley new --source kafka --sink s3 --sink-format delta --processing filter my-pipeline
volley new --source kafka --sink kafka --processing filter,ml_inference my-enriched

# Built-in templates bypass source/sink prompts
volley new --template observability-demo my-demo
volley new --template ml-pipeline my-classifier

cd my-pipeline
volley doctor
cargo build && cargo run

Built-in templates: observability-demo, ml-pipeline. Without --template, the CLI prompts for source (kafka, s3, azure-blob), sink (kafka, s3, azure-blob), and processing operators (filter, keyed_agg, windowed_agg, ml_inference, adbc_enrichment, custom). Select multiple operators to chain them in the generated pipeline.

Add to an existing project

Add Volley crates to your Cargo.toml:

[dependencies]
volley-core = { git = "https://github.com/volley-streams/volley", features = ["state-rocksdb"] }
volley-connector-kafka = { git = "https://github.com/volley-streams/volley" }

Note: Volley currently uses git dependencies. crates.io publishing is planned — see the roadmap.

Feature Flags

The volley-connectors crate uses feature flags to control which connectors are compiled:

FeatureConnectors Included
state-rocksdbRocksDB state backend (default, ~1.5 GB C++ build)
blob-storeCloud blob store abstraction
blob-store-awsAWS S3 + SQS source/sink
blob-store-azureAzure Blob + Queue source/sink
blob-store-gcsGCS sink (source removed in v1.1.0)
file-formatsParquet, JSON, CSV, Avro decoders
table-formatsDelta Lake + Iceberg sinks

Tip: If you don’t need the RocksDB state backend (e.g. you’re only developing a Kafka connector), use default-features = false and enable only the features you need to save ~1.5 GB in build artifacts.

Troubleshooting

cmake not found

error: failed to run custom build command for `rdkafka-sys`
--- stderr
CMake not found

Install cmake: brew install cmake (macOS) or apt install cmake (Ubuntu).

RocksDB compilation slow or failing

RocksDB is a C++ dependency that takes ~3–5 minutes to compile on first build. If you don’t need state management (e.g., stateless transformations only), disable it:

volley-core = { git = "...", default-features = false }

protobuf-compiler not found

Only needed if building volley-scheduler. Install: brew install protobuf (macOS) or apt install protobuf-compiler (Ubuntu).

volley doctor reports issues

volley doctor checks for system dependencies, Docker availability, and toolchain versions. Address each reported issue individually. Common fixes:

  • Docker not running: Start Docker Desktop or the Docker daemon
  • Rust toolchain outdated: rustup update stable
  • Missing system packages: See the System Dependencies table above

Verify Your Setup

# Build the workspace
cargo build --workspace

# Run tests
cargo test --workspace --exclude volley-python

# Run a quick example
cargo run --example in_memory_pipeline -p volley-examples

Quick Start

Build your first Volley pipeline in 5 minutes.

Basic In-Memory Pipeline

This pipeline reads sample trade data from memory, filters high-value trades, and collects results:

use std::sync::Arc;

use arrow::array::{Float64Array, Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use volley_connectors::memory::{MemorySink, MemorySource};
use volley_core::prelude::*;

#[tokio::main]
async fn main() -> Result<()> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("symbol", DataType::Utf8, false),
        Field::new("price", DataType::Float64, false),
        Field::new("quantity", DataType::Int64, false),
    ]));

    let records = vec![
        StreamRecord::new(RecordBatch::try_new(schema.clone(), vec![
            Arc::new(StringArray::from(vec!["AAPL"])),
            Arc::new(Float64Array::from(vec![150.0])),
            Arc::new(Int64Array::from(vec![100])),
        ]).unwrap()),
        StreamRecord::new(RecordBatch::try_new(schema.clone(), vec![
            Arc::new(StringArray::from(vec!["GOOG"])),
            Arc::new(Float64Array::from(vec![280.0])),
            Arc::new(Int64Array::from(vec![50])),
        ]).unwrap()),
    ];

    let sink = MemorySink::new();
    // `handle()` returns a shared reference to the sink's buffer that
    // stays live after `to_sink()` consumes the sink itself.
    let output = sink.handle();

    let report = StreamExecutionEnvironment::new()
        .from_source(MemorySource::new(records))
        .filter_expr(col("price").gt(lit(200.0)))
        .to_sink(sink)
        .execute("filter-example")
        .await?;

    // `execute` returns an `ExecutionReport { records_written, epochs_committed }`.
    // The filtered rows come back via the sink handle.
    println!(
        "{} rows, {} epochs",
        report.records_written, report.epochs_committed
    );
    println!("kept {} records", output.lock().unwrap().len());
    Ok(())
}

Run the full in-memory example from the repo:

cargo run --example in_memory_pipeline -p volley-examples

Expected output (abridged):

=== Volley in-memory DAG pipeline ===

Input: 5 trades

Execution report: 0 records written, 1 epochs committed

Filtered output (3 records):
  1. GOOG @ $280 x 50
  2. MSFT @ $310 x 75
  3. GOOG @ $290 x 150

Windowed Aggregation

Add keyed windowing and expression-based aggregation (requires the state-rocksdb feature):

use std::sync::Arc;
use std::time::Duration;
use volley_core::prelude::*;
use volley_connectors::memory::{MemorySink, MemorySource};

#[tokio::main]
async fn main() -> Result<()> {
    // `RocksDbBackend` implements `BatchStateBackend` directly — no
    // `KeyedStateBackend` wrapper needed at the builder API.
    let tmp = tempfile::tempdir().unwrap();

    StreamExecutionEnvironment::new()
        .from_source(MemorySource::new(records))
        .filter_expr(col("amount").gt(lit(100)))
        .key_by(col("user_id"))
        .window(TumblingWindows::of(Duration::from_secs(300)))
        .aggregate_expr(
            vec![sum(col("amount"))],
            RocksDbBackend::open(tmp.path().join("state")).unwrap(),
        )
        .to_sink(MemorySink::new())
        .execute("aggregation-job")
        .await?;

    Ok(())
}

Kafka Source to Kafka Sink

A real-world pipeline reading from Kafka, aggregating, and writing back to Kafka:

use std::sync::Arc;
use std::time::Duration;
use arrow::datatypes::{DataType, Field, Schema};
use volley_core::prelude::*;
use volley_connector_kafka::{KafkaEnvExt, KafkaStreamExt, KafkaSourceConfig, KafkaSinkConfig};

#[tokio::main]
async fn main() -> Result<()> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("user_id", DataType::Utf8, false),
        Field::new("amount", DataType::Float64, false),
    ]));

    let source_config = KafkaSourceConfig::new("localhost:9092", "events", "my-group", schema);
    let sink_config = KafkaSinkConfig::new("localhost:9092", "output");

    let tmp = tempfile::tempdir().unwrap();

    StreamExecutionEnvironment::new()
        .from_kafka(source_config).await?
        .filter_expr(col("amount").gt(lit(0.0)))
        .key_by(col("user_id"))
        .window(TumblingWindows::of(Duration::from_secs(60)))
        .aggregate_expr(
            vec![sum(col("amount"))],
            RocksDbBackend::open(tmp.path().join("state")).unwrap(),
        )
        .to_kafka(sink_config).await?
        .execute("kafka-aggregation-job")
        .await?;

    Ok(())
}

For exactly-once Kafka EOS (source offsets committed inside the sink’s Kafka transaction), swap the source/sink pair:

let (stream, cgm) = env.from_kafka_exactly_once(source_config).await?;
stream
    // ... operators ...
    .to_kafka_exactly_once(KafkaSinkConfig::new_transactional(
        "localhost:9092", "output", "my-pipeline-txn",
    ), cgm)
    .await?
    .execute("eos-job")
    .await?;

Note: Kafka connectors require cmake installed on your system. See Installation for details.

What’s Happening

  1. StreamExecutionEnvironment::new() — Creates the pipeline builder
  2. .from_source() / .from_kafka() — Attaches a source, producing a DataStream
  3. .filter_expr() / .select_expr() — Applies vectorised operators to the stream
  4. .key_by() — Partitions the stream by key, producing a KeyedStream
  5. .window() — Assigns records to time windows, producing a WindowedKeyedStream
  6. .aggregate_expr() — Computes an aggregation per window per key
  7. .to_kafka() / .to_sink() — Attaches a sink (transitions to DataStream<HasSink>)
  8. .execute() — Starts the pipeline as concurrent Tokio tasks

The type system enforces valid construction at compile time. You can only call .aggregate_expr() on a WindowedKeyedStream, not on a raw DataStream. You can only call .execute() on a DataStream<HasSink>.

Next Steps

Architecture Overview

A Volley pipeline is a graph of stages — one source, some operators, one or more sinks. You describe the graph in Rust, and Volley runs it as concurrent Tokio tasks connected by bounded channels. This page explains what happens once you call .execute().

Pipeline Builder (Compile-Time)

You build the graph with a fluent API. The order of calls matches the order of data flow:

StreamExecutionEnvironment::new()
    .from_source(...)
    .filter_expr(...)
    .key_by(...)
    .window(...)
    .aggregate_expr(..., state_backend)
    .to_sink(...)
    .execute("job-name")

Runtime Execution (Tokio Tasks)

At runtime, each stage runs as an independent Tokio task. Stages communicate via bounded tokio::sync::mpsc channels. Backpressure propagates naturally: when a downstream channel is full, the upstream task blocks on send.

+--------+    channel    +----------+    channel    +------+
| Source |  --------->   | Operator |  --------->   | Sink |
| Task   |  RecordBatch  | Task     |  RecordBatch  | Task |
+--------+              +----------+               +------+
     |                       |                         |
     | CheckpointBarrier     | on_checkpoint()         | on_checkpoint()
     | (in-band)             | flush state             | commit writes
     v                       v                         v
+---------------------------------------------------------+
|              Checkpoint Coordinator                      |
|  Injects barriers -> collects acks -> snapshots state   |
+---------------------------------------------------------+

Checkpoint barriers and watermarks flow in-band alongside data records through the same channels. This is how Volley achieves aligned checkpoints without pausing the pipeline.

Intra-Node Parallelism

After .key_by(), records are hash-partitioned across N parallel operator instances. Each instance has its own state namespace. Checkpoint barriers are aligned across all partitions before snapshot.

                    +-- Partition 0 --> [Operator Instance 0] --+
Source --> HashPartitioner -- Partition 1 --> [Operator Instance 1] --> Merge --> Sink
                    +-- Partition 2 --> [Operator Instance 2] --+

Parallelism is set via .with_parallelism(N) on the stream builder.

Distributed Execution (Horizontal Scaling)

For workloads that exceed a single node, Volley supports distributed execution across multiple K8s pods. Enable it by passing a ClusterConfig:

#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
    .with_cluster(ClusterConfig::new(worker_id, num_workers, headless_service, app_name))
    .from_source(source)
    // ... same pipeline API ...
    .execute("job").await?;
}

In distributed mode, keyed operators use a two-level hashing scheme:

  1. Key group: hash(key) % max_key_groups (256 virtual partitions by default)
  2. Worker assignment: contiguous key group ranges assigned to each worker

Records destined for remote workers are shuffled via Arrow Flight DoExchange. Non-keyed operators (filter, map) run locally with no shuffle.

Worker 0                                    Worker 1
┌──────────────────────┐                    ┌──────────────────────┐
│ Source               │                    │ Source               │
│   ↓                  │                    │   ↓                  │
│ ShuffleRouter ──────►│── Arrow Flight ──►│◄── ShuffleRouter     │
│   ↓                  │◄── Arrow Flight ──│                      │
│ BarrierAligner       │                    │ BarrierAligner       │
│   ↓                  │                    │   ↓                  │
│ Sink                 │                    │ Sink                 │
└──────────────────────┘                    └──────────────────────┘

K8s primitives handle coordination:

  • Leader election: K8s Lease API — one worker becomes coordinator
  • Checkpoint metadata: stored in K8s ConfigMaps (no OwnerReference, survives redeployment)
  • Checkpoint data: shared filesystem (EFS/Filestore/Azure Files) via ReadWriteMany PVC
  • Worker discovery: StatefulSet headless service DNS
  • Live rescaling: checkpoint + reassignment, no state migration needed (shared FS)

Single-node mode remains the default with zero overhead when ClusterConfig is absent.

Graceful Shutdown

Source stops producing, drains in-flight data through the pipeline, then each stage shuts down in order.

Source Task --> channel(capacity) --> Operator Task --> channel(capacity) --> Sink Task
                    ^                                        ^
                    |                                        |
              backpressure                            backpressure
              (bounded send)                          (bounded send)

Deep Dives

Data Model

Volley moves your data in batches, not one row at a time. A batch is an Arrow RecordBatch: a chunk of rows stored in columnar form, passed between stages without copying. Operators see whole batches so they can work on many rows at once — the same technique a query engine uses to keep the CPU busy.

If you’ve used DataFusion, Polars, or DuckDB, this is the same RecordBatch you already know. If not: think “a page of a columnar table” — each column is a contiguous array, and operators like filter and sum work over the whole column in one pass.

StreamRecord

The StreamRecord wrapper adds metadata to each batch:

StreamRecord {
    batch: RecordBatch,                       // columnar Arrow data (1 or more rows)
    event_time_ms: Option<i64>,               // event timestamp (for windowing)
    trace: Option<RecordTraceContext>,         // per-record trace context (for distributed tracing)
}
  • batch — the actual payload, a columnar Arrow RecordBatch
  • event_time_ms — event timestamp used by window assignment and watermark tracking
  • trace — optional trace context for per-record distributed tracing. Set by the runtime’s sampler or extracted from source message headers (e.g., Kafka traceparent). When present, the runtime creates OpenTelemetry child spans at each pipeline stage.

StreamElement

Records share channels with control signals. The StreamElement enum wraps all possible messages:

StreamElement = Data(StreamRecord)
              | Barrier(CheckpointBarrier)
              | Watermark(Watermark)
  • Data — normal data records flowing through the pipeline
  • Barrier — checkpoint coordination signals injected by the Checkpoint Coordinator
  • Watermark — event-time progress markers for window triggering

Barriers and watermarks flow in-band alongside data records through the same bounded channels. This is what enables aligned checkpoints without pausing the pipeline.

Why Arrow?

  • Columnar format — efficient for analytical operations (aggregations, filters)
  • Zero-copy — data passes between pipeline stages without serialization
  • Multi-row batching — operators process many rows at once, amortizing per-record overhead
  • Ecosystem interop — Arrow is the standard for DataFusion, Parquet, Delta Lake, Iceberg, and most analytics tooling

Pipeline Builder

You build a Volley pipeline with a fluent chain — source first, then operators, then a sink. The builder catches mistakes while you type: partition a stream before windowing, attach a sink before calling .execute(). If you try to window an unkeyed stream or execute a pipeline that still has no sink, the code simply won’t compile.

This compile-time safety is built on Rust’s type system (a pattern called “typestate” — see the details below if you’re curious). Most of the time you don’t need to think about it; you just follow the chain.

The chain

StreamExecutionEnvironment  -->  DataStream<NoSink>  -->  KeyedStream  -->  WindowedKeyedStream
        |                           |                        |                      |
    from_source()              filter_expr()            key_by()             window()
    from_kafka()               select_expr()                                aggregate_expr()
                               filter()
                               map()
                               flat_map()
                               apply_operator()

DataStream<NoSink>  -->  DataStream<HasSink>  -->  execute()
        |                       |
    to_sink()               execute()
    to_kafka()

Each step returns a different type, and each type exposes only the operations that make sense at that point:

TypeWhat you haveWhat you can do next
DataStream<NoSink>An unkeyed streamfilter_expr, select_expr, filter, map, flat_map, apply_operator, key_by, to_sink, to_kafka, with_tracing, with_observability
KeyedStreamA stream partitioned by a keywindow, aggregate_expr, to_sink, to_kafka, with_tracing, with_observability
WindowedKeyedStreamKeyed and windowedaggregate_expr
DataStream<HasSink>A sink is attached — ready to runexecute

aggregate_expr() works on both KeyedStream (global aggregation across all records) and WindowedKeyedStream (per-window aggregation). The closure-based operators (filter, map, flat_map) still live on DataStream<NoSink> for cases where you need per-record logic that doesn’t fit a DataFusion expression.

Example

StreamExecutionEnvironment::new()
    // Returns DataStream<NoSink>
    .from_source(source)
    // Still DataStream<NoSink> — vectorized filter
    .filter_expr(col("amount").gt(lit(100)))
    // Returns KeyedStream
    .key_by(col("user_id"))
    // Returns WindowedKeyedStream
    .window(TumblingWindows::of(Duration::from_secs(60)))
    // `aggregate_expr` takes a Vec of aggregates + a `BatchStateBackend`
    // (`RocksDbBackend` implements the trait directly — no wrapper needed).
    // Consumes WindowedKeyedStream, returns DataStream<NoSink>.
    .aggregate_expr(vec![sum(col("amount"))], state_backend)
    // Returns DataStream<HasSink>
    .to_sink(sink)
    // `execute` returns `ExecutionReport { records_written, epochs_committed }`.
    .execute("my-job")
    .await?;

Parallelism

Set the parallelism level on the environment. After .key_by(), records are hash-partitioned across N parallel operator instances:

#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
    .set_parallelism(4)
    .from_source(...)
    // ...
}

Each parallel instance has its own state namespace. Checkpoint barriers are aligned across all partitions.

Typestate mechanics

The type progression above is an application of the “typestate” pattern: each builder method consumes self and returns a new type that only exposes the next set of valid operations. DataStream<NoSink> and DataStream<HasSink> share the same struct with a marker type parameter; .to_sink() flips the marker from NoSink to HasSink, which is the only variant that has an .execute() method. You can read the compile errors as a guide — if the compiler says a method doesn’t exist on your stream, you probably need to add (or remove) a step earlier in the chain.

Exactly-Once Semantics

Exactly-once means a record affects your output exactly one time, no matter what goes wrong — a pod gets killed, the network drops, a broker restarts. Replayed records don’t show up twice. Dropped records don’t disappear.

Volley gets there with periodic checkpoints. A coordinator injects a marker (a “barrier”) into the source; the barrier flows through the pipeline alongside your data, and every operator and sink records its state when the barrier passes. If something fails, Volley restarts from the last completed checkpoint and each sink uses an idempotent commit strategy so replayed writes don’t duplicate. The rest of this page walks through that protocol.

Checkpoint Barrier Protocol

1. Coordinator injects CheckpointBarrier(epoch=N) into source
2. Barrier flows through pipeline in-band with data
3. Each operator receives barrier:
   a. Calls on_checkpoint() hook (flush caches, prepare state)
   b. Forwards barrier downstream
   c. State backend snapshots via RocksDB hardlink
4. Sink receives barrier:
   a. Commits buffered writes
   b. Acknowledges barrier to coordinator
5. Coordinator marks epoch N complete when all sinks ack

The key insight: barriers flow in-band alongside data records. No global pause is needed. Each operator processes the barrier when it arrives, then continues processing data.

Per-Sink Commit Strategies

Each sink type uses a different mechanism to achieve exactly-once:

SinkCommit StrategyRecovery
KafkaTransaction commit on barrier (+ send_offsets_to_transaction for Kafka↔Kafka EOS)Abort + replay on recovery
Delta LakeEpoch tag in commit metadataSkip already-committed epochs
IcebergSnapshot commit via REST catalogEpoch tracking in catalog

Kafka↔Kafka End-to-End Exactly-Once

When both source and sink are Kafka, Volley supports the full Flink-style EOS protocol via librdkafka’s send_offsets_to_transaction:

#![allow(unused)]
fn main() {
let (stream, cgm) = StreamExecutionEnvironment::new()
    .from_kafka_exactly_once(source_config).await?;

stream
    .filter_expr(col("status").eq(lit("active")))
    .to_kafka_exactly_once(sink_config, cgm).await?
    .execute("eos-pipeline").await?;
}

How It Works

  1. Setup: from_kafka_exactly_once() returns a (DataStream, ConsumerGroupMetadata) tuple. The ConsumerGroupMetadata (CGM) is a handle to the Kafka consumer group that the source belongs to. Pass it to to_kafka_exactly_once() so the sink can commit consumer offsets atomically with produced records.

  2. Barrier injection: When the checkpoint coordinator injects a barrier, the source records its current Kafka partition offsets in barrier.source_offsets.

  3. Barrier alignment: If the pipeline has parallel branches, the BarrierAligner merges source offsets from all branches before forwarding the barrier to the sink. (The BarrierAligner is an internal component that ensures all parallel inputs to an operator have delivered the same barrier epoch before proceeding.)

  4. Atomic commit: When the sink receives the barrier, it:

    • Calls producer.send_offsets_to_transaction(offsets, cgm) — this stages the consumer group offset advance inside the producer’s open transaction
    • Calls producer.commit_transaction() — this atomically commits both the produced output records and the consumer offset advance
  5. No auto-commit: In EOS mode, the sink disables batch-size and deadline-driven auto-commits. The only commit trigger is a checkpoint barrier. This means CheckpointConfig::interval must be shorter than the broker’s transaction.timeout.ms (default 60s), or the broker will fence the producer.

  6. Recovery: On restart, the consumer group’s last committed offset (written atomically with the transaction) determines where to resume. The runtime skips file-based offset restoration for transactional sources.

Recovery

When a failure occurs, records may be in various stages of processing:

Checkpoint N          In-flight records           Failure
     |                                                |
     v                                                v
  [committed] ---[r1]---[r2]---[r3]---[r4]---[r5]--- X
              ^                                       ^
         known good                            these are lost

The RecoveryCoordinator restores to the last known-good state:

  1. Find last complete checkpoint — the most recent epoch where all sinks acknowledged their commit
  2. Restore source offsets — rewind sources to the offsets recorded at that checkpoint (e.g., Kafka partition offsets, S3 file positions)
  3. Restore operator state — load RocksDB hardlink snapshots from the checkpoint directory, restoring window state, aggregation accumulators, and any custom operator state
  4. Resume processing — records between the checkpoint and the failure (r1–r5 above) are replayed from the source

Replayed records don’t produce duplicates because each sink type has an idempotent commit strategy:

  • Kafka: The incomplete transaction from the failed run is aborted. Replayed records are written in a new transaction. Consumers with isolation.level=read_committed never see the aborted records.
  • Delta Lake: The sink checks the Delta log for the current epoch’s pipeline_id. If an epoch was already committed, the write is skipped entirely.
  • Iceberg: The sink reads the latest snapshot’s epoch from catalog metadata. Already-committed epochs are skipped.

Operator Lifecycle

The on_checkpoint() hook gives operators a chance to prepare for the snapshot:

#![allow(unused)]
fn main() {
fn on_checkpoint(&mut self, epoch: u64) {
    // Flush any buffered state to RocksDB
    // The state backend will snapshot after this returns
}
}

For stateful operators using the write-behind cache, on_checkpoint() flushes dirty keys from the in-memory HashMap to RocksDB via BatchStateBackend::write_batch().

State Management

Aggregations, windows, and any custom operator you write can keep per-key state that survives restarts. Volley stores that state in RocksDB on disk, but during normal processing your operator reads and writes an in-memory cache — RocksDB is only touched when a checkpoint fires. The result: stateful operators run at in-memory speed and still recover exactly where they left off after a crash.

The rest of this page walks through how the cache, the backend, and the checkpoint snapshot fit together.

Architecture

+----------------------------------+
|        AggregateOperator         |
|  +----------------------------+  |
|  | Write-Behind Cache (HashMap)| |
|  | key -> Accumulator          | |
|  +----------------------------+  |
|         | on_checkpoint()        |
|         v                        |
|  +----------------------------+  |
|  | BatchStateBackend (RocksDB)| |
|  | WriteBatch for dirty keys   | |
|  | multi_get for bulk reads    | |
|  +----------------------------+  |
|         | hardlink snapshot      |
|         v                        |
|  +----------------------------+  |
|  | Checkpoint Directory        | |
|  | /checkpoints/epoch-N/       | |
|  +----------------------------+  |
+----------------------------------+

Write-Behind Cache

During normal processing, state reads and writes go to an in-memory HashMap cache. The cache holds accumulator values keyed by partition key. This eliminates all RocksDB I/O during processing, keeping state operations on the hot path entirely in memory.

BatchStateBackend

When a checkpoint barrier arrives:

  1. on_checkpoint() is called on the operator
  2. Dirty keys from the cache are flushed to RocksDB via WriteBatch (a single atomic write)
  3. RocksDB creates a hardlink snapshot to the checkpoint directory
  4. The snapshot is nearly instant because hardlinks don’t copy data

Key operations:

  • write_batch() — Atomic batch write of dirty keys
  • multi_get() — Bulk read for cache misses (rare during steady-state)
  • Hardlink checkpoints — O(1) snapshot via filesystem hardlinks to SST files

Per-Key State Isolation

After .key_by(), each parallel operator instance has its own state namespace. Keys are hash-partitioned across instances, so state never conflicts between parallel operators.

Checkpoint Storage

/checkpoints/
  epoch-1/    # hardlink snapshot
  epoch-2/    # hardlink snapshot
  epoch-3/    # latest checkpoint

RocksDB checkpoints use hardlinks to SST files. Incremental growth is small when data doesn’t change between checkpoints. As data mutates, old SST files are retained for older checkpoints.

See the Capacity Planning guide for storage sizing recommendations.

Window State LRU

Window operators (WindowOperator, ExprWindowOperator) maintain an in-memory index called pending_windows that tracks active (key, window) pairs. Unlike the aggregate cache (which is a hot copy of data also in RocksDB), this index is the only record of which windows exist and need to fire on watermark advance.

To bound memory without data loss, the window LRU uses an evict-to-backend strategy:

+----------------------------------+
|        WindowOperator            |
|  +----------------------------+  |
|  | pending_windows (LRU)      |  |
|  | key:window_start -> (key,  |  |
|  |                    window)  |  |
|  +----------------------------+  |
|         | on eviction            |
|         v                        |
|  +----------------------------+  |
|  | State Backend (RocksDB)    |  |
|  | pending_index/{pending_key}|  |
|  +----------------------------+  |
|         | on watermark advance   |
|         v                        |
|  Scan persisted entries for      |
|  windows ready to fire           |
+----------------------------------+
  1. When the LRU is full, evicted entries are persisted to a pending_index/ namespace in the state backend
  2. On watermark advance, persisted entries are scanned (only when evictions have occurred, tracked by a has_evicted flag)
  3. Fired windows are deleted from both the LRU and the state backend

This ensures no window is orphaned — every pending window eventually fires or is flushed at end-of-stream, regardless of whether it was evicted from the in-memory cache.

Configure via with_max_pending_windows(n) on WindowedKeyedStream. When unset, the index is unbounded (default, no behavior change from prior releases).

See the design document for full details on trade-offs and the evict-to-backend strategy.

Recovery

On pipeline restart from a checkpoint, the RecoveryCoordinator restores source offsets and opens the RocksDB backend from the checkpoint directory. Stateful operators then lazily recover their in-memory structures on the first process() call:

  • Aggregate operators (ExprAggregateOperator, AggregateOperator) use StateBackend::scan_prefix() (backed by RocksDB’s prefix_iterator) to enumerate all keys with persisted accumulators. The cache and pending_keys maps are rebuilt from the scan results.
  • Window operators (WindowOperator, ExprWindowOperator) persist a WindowOperatorMeta struct (containing pending_windows and current_watermark) to a well-known state key during on_checkpoint(). On recovery, this metadata is loaded to restore window tracking state and watermark progress.

Recovery is lazy rather than eager: fresh pipelines (no state in RocksDB) pay no cost, and recovery happens only once before the first record is processed.

Horizontal Scaling

When a pipeline outgrows a single node — too many Kafka partitions, too much keyed state to fit in one pod’s RAM, or you need a pod failure not to stop processing — you can run the same pipeline across multiple Kubernetes pods. The code doesn’t change: you add a ClusterConfig and deploy more replicas. Volley hashes keys to consistent “key groups,” assigns ranges of groups to each worker, and shuffles records between workers over Apache Arrow Flight. If you’re not using it, single-node mode stays the default with zero overhead.

This page covers the design. The step-by-step deployment guide lives in Horizontal Scaling Guide.

Design Principles

  • Symmetric workers — every pod runs the same binary. No dedicated driver node.
  • K8s-native — uses Lease API for leader election, ConfigMaps for checkpoint metadata, StatefulSet DNS for discovery. No ZooKeeper.
  • Single-node is free — when ClusterConfig is absent, no distributed code paths execute.
  • Shared filesystem — checkpoint data on EFS/Filestore/Azure Files. No state migration needed for rescaling.

Two-Level Key Hashing

Records are partitioned across workers using a two-level scheme:

hash(key) → key_group → worker
  1. Key group: hash(key) % max_key_groups (default 256). This is a virtual partition that never changes.
  2. Worker assignment: contiguous ranges of key groups are assigned to workers. Rescaling reassigns ranges without rehashing.
256 key groups, 3 workers:

Worker 0: key groups [0, 86)    — 86 groups
Worker 1: key groups [86, 171)  — 85 groups
Worker 2: key groups [171, 256) — 85 groups

This design, borrowed from Apache Flink’s KeyGroupRangeAssignment, minimizes data movement during rescaling. Scaling from 2 to 3 workers moves roughly 1/3 of key groups.

Arrow Flight Shuffle

When a keyed operator runs in distributed mode, records are shuffled between workers via Apache Arrow Flight DoExchange:

Worker 0                                    Worker 1
┌──────────────────────┐                    ┌──────────────────────┐
│ Source               │                    │ Source               │
│   ↓                  │                    │   ↓                  │
│ ShuffleRouter        │   Arrow Flight     │ ShuffleRouter        │
│   ├─ local partition ─┼─────────────────→ │   ├─ local partition │
│   └─ remote → Flight ─┼─────────────────→ │   └─ remote → Flight│
│                      │ ←────────────────  │                      │
│ BarrierAligner       │                    │ BarrierAligner       │
│   (local + remote)   │                    │   (local + remote)   │
│   ↓                  │                    │   ↓                  │
│ Sink                 │                    │ Sink                 │
└──────────────────────┘                    └──────────────────────┘
  • ShuffleRouter replaces PartitionRouter for keyed operators. Routes via key_group → worker → local mpsc or Flight sender.
  • Non-keyed operators (filter, map, flat_map) run locally — no shuffle.
  • BarrierAligner merges both local partition outputs and remote Flight inputs.
  • Barriers and watermarks are broadcast to all local partitions AND all remote workers.

Wire Format

StreamElement variants are encoded as FlightData with a discriminator byte in app_metadata[0]:

ByteVariantdata_bodyapp_metadata[1..]
0x01DataArrow IPC RecordBatchJSON record metadata
0x02BarrieremptyJSON CheckpointBarrier
0x03WatermarkemptyJSON Watermark

K8s-Native Coordination

Leader Election

One worker acquires a K8s Lease (coordination.k8s.io/v1) and becomes the coordinator. The coordinator handles checkpoint injection and key group assignment. It still processes data normally — coordination is lightweight.

If the leader dies, the lease expires and another worker takes over.

Checkpoint Metadata

Completed checkpoint metadata is stored in K8s ConfigMaps:

  • {app}-ckpt-epoch-{N} — per-epoch metadata (workers, state paths, source offsets)
  • {app}-ckpt-latest — pointer to the latest completed epoch

ConfigMaps have no OwnerReference, so they survive CRD deletion. This allows recovery after redeployment.

Shared Filesystem Checkpoints

Checkpoint data is written to a shared filesystem (AWS EFS, GCP Filestore, Azure Files) mounted as a ReadWriteMany PVC. All workers write to and read from the same mount.

/mnt/volley/checkpoints/
  epoch-42/
    worker-0/     ← RocksDB SST files
    worker-1/
    worker-2/

Recovery is near-instant: the pod reads the checkpoint directory directly. No download step.

Worker Discovery

Workers discover each other via StatefulSet headless service DNS:

{app}-{ordinal}.{app}-headless.{namespace}.svc:{flight_port}

Live Rescaling

Since all workers share the same filesystem, rescaling doesn’t require state migration:

  1. Coordinator detects replica count change
  2. Triggers a checkpoint — all workers flush in-memory state to shared FS
  3. Computes new KeyGroupAssignment and diffs against the old one
  4. Broadcasts RescaleEvent via watch channel
  5. Workers stop processing lost key groups, load state for gained ones from the shared mount
  6. Resume — no restart, no data transfer

Key Types

TypeLocationPurpose
ClusterConfigcluster::configWorker ID, num workers, headless service, app name, flight port
KeyGroupAssignmentcluster::assignmentMaps key groups to workers with contiguous range assignment
KeyGroupRangecluster::assignmentA [start, end) range of key groups owned by one worker
ShuffleRoutercluster::shuffleRoutes records to local or remote destinations
ShuffleFlightServicecluster::flight::serverArrow Flight DoExchange server
ShuffleFlightClientcluster::flight::clientArrow Flight DoExchange client with connection pooling
LeaseLeaderElectorcluster::coordinator::leaderK8s Lease-based leader election
DistributedCheckpointCoordinatorcluster::coordinator::checkpoint_coordinatorConfigMap-based checkpoint metadata
RescaleCoordinatorcluster::rescaleCheckpoint + key group reassignment

Writing Operators

Operators are the steps in the middle of your pipeline — they filter, reshape, and aggregate records as they flow from source to sink. Most of what you need is already built in. This guide shows you the built-in operators to reach for first, and how to drop down to closures or a custom Operator trait when the built-ins can’t express what you need.

Expression operators use DataFusion expressions for vectorized, multi-row processing. These are the default choice — they operate on whole Arrow RecordBatches at once instead of one row at a time:

MethodDescription
.filter_expr(expr)Keep records matching an expression (e.g., col("price").gt(lit(100)))
.select_expr(exprs)Project/rename columns (e.g., vec![col("user_id"), col("total").alias("sum")])
.key_by(expr)Partition by a key column (e.g., col("user_id")) — transitions to KeyedStream
.window(window_type)Assign records to time windows (on KeyedStream) — transitions to WindowedKeyedStream
.aggregate_expr(aggs, state_backend)Windowed aggregation. First arg is a Vec<AggregateExprDef>; second is any BatchStateBackend (e.g., RocksDbBackend directly — no wrapper needed).

Example

use std::time::Duration;
use volley_core::prelude::*;

// source: impl Source, sink: impl DynSink (see the Quick Start for full examples)

let tmp = tempfile::tempdir().unwrap();

StreamExecutionEnvironment::new()
    .from_source(source)
    .filter_expr(col("price").gt(lit(100)))
    .select_expr(vec![col("user_id"), col("price"), col("quantity")])
    .key_by(col("user_id"))
    .window(TumblingWindows::of(Duration::from_secs(60)))
    .aggregate_expr(
        vec![sum(col("price"))],
        RocksDbBackend::open(tmp.path().join("state")).unwrap(),
    )
    .to_sink(sink)
    .execute("expr-pipeline")
    .await?;

Available Aggregation Functions

FunctionDescription
sum(expr)Sum of values
count(expr)Count of values
avg(expr)Average of values
min(expr)Minimum value
max(expr)Maximum value

All are imported via volley_core::prelude::*. Expression helpers col() and lit() are also in the prelude.

Closure-Based Operators

Use closure operators when you need access to StreamRecord metadata (like event_time_ms or the per-record trace context) or when the logic doesn’t fit a DataFusion expression. They receive one StreamRecord at a time:

MethodDescription
.filter(fn)Keep records matching a predicate
.map(fn)Transform each record
.flat_map(fn)Transform each record into zero or more records
.apply_operator(op)Apply a custom Operator implementation
#![allow(unused)]
fn main() {
// Drop records whose event time is older than a cutoff — a good fit for
// a closure because `event_time_ms` lives on StreamRecord, not the batch.
stream.filter(move |r| r.event_time_ms.map_or(false, |t| t > cutoff))
}

Note: Closure operators process one StreamRecord at a time. For bulk column operations on the record payload, expression operators (filter_expr, select_expr) are more efficient because they use DataFusion’s vectorized kernels on the full RecordBatch.

Custom Operators

Implement the Operator trait from volley-core:

#![allow(unused)]
fn main() {
use volley_core::prelude::*;

struct MyOperator;

impl Operator for MyOperator {
    fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
        // Transform the record
        // Return zero or more output records
        vec![record]
    }

    fn on_checkpoint(&mut self, epoch: u64) {
        // Optional: flush any buffered state before checkpoint
    }
}
}

Apply it to a stream:

#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
    .from_source(source)
    .apply_operator(MyOperator)
    .to_sink(sink)
    .execute("my-job")
    .await?;
}

Multi-Row RecordBatch Processing

Operators receive StreamRecord which wraps an Arrow RecordBatch. A batch may contain one or more rows. Your operator should handle multi-row batches correctly:

#![allow(unused)]
fn main() {
fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
    let batch = &record.batch;
    let num_rows = batch.num_rows();
    // Process all rows, not just row 0
    // ...
}
}

Stateful Operators

For operators that maintain state across records, use the on_checkpoint() hook to flush state before a checkpoint snapshot:

#![allow(unused)]
fn main() {
struct CountingOperator {
    counts: HashMap<String, u64>,
}

impl Operator for CountingOperator {
    fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
        // Extract key from the batch (assumes a "key" column exists)
        let key = record.batch
            .column_by_name("key")
            .and_then(|col| col.as_any().downcast_ref::<StringArray>())
            .map(|arr| arr.value(0).to_string())
            .unwrap_or_default();
        *self.counts.entry(key).or_default() += 1;
        vec![record]
    }

    fn on_checkpoint(&mut self, epoch: u64) {
        // Flush state to durable storage here
        // The state backend will snapshot after this returns
    }
}
}

See State Management for details on the write-behind cache and RocksDB backend.

Type-State Safety

The Pipeline Builder enforces valid operator chains at compile time. You can only call .aggregate_expr() on a WindowedKeyedStream, not on a plain DataStream. You can only call .execute() on a DataStream<HasSink>.

Writing Connectors

Connectors are how Volley reads from and writes to external systems. Volley provides built-in connectors for Kafka, cloud blob stores, Delta Lake, and Iceberg. You can also write your own.

Source and Sink Traits

#![allow(unused)]
fn main() {
// Source: produces StreamRecords
trait Source: Send {
    async fn poll(&mut self) -> Option<StreamRecord>;
    fn connector_id(&self) -> &str;
}

// Sink: consumes StreamRecords
trait Sink: Send {
    async fn write(&mut self, record: StreamRecord) -> Result<()>;
    async fn flush(&mut self) -> Result<()>;
    fn on_checkpoint(&mut self, epoch: u64);
    fn connector_id(&self) -> &str;
}
}

Sources produce StreamRecord values via poll(). Sinks consume them via write() and flush(). Both support the checkpoint lifecycle via on_checkpoint().

Cloud Blob Store Pattern

All cloud blob store connectors follow a shared architecture:

Cloud Queue/Topic          Cloud Storage
(SQS / Azure Queue)       (S3 / Azure Blob)
      |                        |
      v                        v
NotificationSource         BlobReader
      |                        |
      +--------+   +-----------+
               |   |
               v   v
          BlobSource (volley-connector-blob-store)
               |
               v
          Decoder (Parquet / JSON / CSV / Avro)
               |
               v
          RecordBatch --> Pipeline

To add a new cloud backend, implement two traits:

  • NotificationSource — polls a queue for new object notifications
  • BlobReader — reads object data from cloud storage

The decoder layer (Parquet, JSON, CSV, Avro -> Arrow RecordBatch) is shared across all cloud backends.

Feature Flags

The volley-connectors umbrella crate uses feature flags:

FeatureWhat it includes
kafkaKafkaSource, KafkaSink
blob-storeBlobSource, decoders
blob-store-awsS3Source, SqsNotificationSource, S3BlobWriter
blob-store-azureAzureBlobSource, AzureQueueNotificationSource, AzureBlobWriter
blob-store-gcsGcsBlobWriter (sink only; source removed in v1.1.0)
file-formatsParquet, JSON, CSV, Avro decoders
table-formatsDelta Lake + Iceberg sinks

Existing Connectors

  • Kafka — source and sink with exactly-once semantics
  • Delta Lake — sink with epoch-tagged commits
  • Iceberg — sink via REST catalog
  • AWS S3 — source via SQS notifications
  • Blob Store — Azure Blob and GCS sources

Windowing

Streams never end, so “the average price” only makes sense over a time range: the last minute, the last hour, this user’s current session. Windowing is how you slice an unbounded stream into finite chunks that aggregations can actually compute over.

Volley gives you three shapes of window — tumbling, sliding, and session — all driven by event time (the time the record happened) rather than wall-clock time (when it arrived). This means late or out-of-order data still lands in the right window.

The shortest path: .key_by(col("user_id")).window(TumblingWindows::of(Duration::from_secs(60))).aggregate_expr(vec![sum(col("amount"))], state_backend). Read on for how to pick the window shape and tune watermarks and late data.

Window Types

Tumbling Windows

Fixed-size, non-overlapping windows:

#![allow(unused)]
fn main() {
.window(TumblingWindows::of(Duration::from_secs(60)))
}

Each record belongs to exactly one window.

Sliding Windows

Fixed-size, overlapping windows:

#![allow(unused)]
fn main() {
.window(SlidingWindows::of(
    Duration::from_secs(60),  // window size
    Duration::from_secs(10),  // slide interval
))
}

Each record may belong to multiple windows.

Session Windows

Dynamic windows that merge on activity:

#![allow(unused)]
fn main() {
.window(SessionWindows::with_gap(Duration::from_secs(30)))
}

A new session starts when the gap between records exceeds the threshold.

Event-Time Semantics

Event stream:  --[e1 t=1]--[e2 t=3]--[e3 t=7]--[watermark t=10]--[e4 t=5]-->

TumblingWindows(size=5):
  Window [0, 5):  e1, e2           -- fires at watermark t=10 (window end <= 10)
  Window [5, 10): e3               -- fires at watermark t=10
  e4 (t=5):       late data        -- routed to side output if allowed_lateness passed

SessionWindows(gap=3):
  Session 1: e1(t=1), e2(t=3)     -- gap(3-1)=2 < 3, merged
  Session 2: e3(t=7)              -- gap(7-3)=4 >= 3, new session

Windows fire when a watermark advances past the window’s end time.

Watermarks

Watermarks track event-time progress. The BoundedOutOfOrdernessGenerator allows configurable out-of-orderness before advancing the watermark:

#![allow(unused)]
fn main() {
.with_watermark_generator(
    BoundedOutOfOrdernessGenerator::new(Duration::from_secs(5))
)
}

This allows records to arrive up to 5 seconds late before the watermark advances past them.

Late Data

Records that arrive after the watermark has passed their window are considered late. Configure side_output_late_data() with a tag and attach a side output sink via with_late_data_sink() to route late events to a separate destination (e.g., a dead-letter queue):

#![allow(unused)]
fn main() {
let late_tag = OutputTag::new("late-events");

StreamExecutionEnvironment::new()
    .from_source(source)
    .with_watermarks(WatermarkConfig::new(Duration::from_secs(5)))
    .with_late_data_sink(late_events_sink)  // side output sink
    .key_by(col("user_id"))
    .window(TumblingWindows::of(Duration::from_secs(60)))
    .allowed_lateness(Duration::from_secs(10))
    .side_output_late_data(late_tag)
    .aggregate_expr(vec![sum(col("amount"))], state_backend)
    .to_sink(main_sink)
    .execute("job")
    .await?;
}

Late events are written to the side output sink as StreamRecords with the schema from late_event_schema(): event_data (JSON string), event_time, watermark, window_start, window_end, lateness_ms. The side output sink participates in checkpoint barriers for exactly-once semantics.

Without with_late_data_sink(), late events are logged and discarded.

Batch Processing Performance

The window operator is designed to exploit Arrow’s columnar batch processing. When a RecordBatch arrives with N rows, the operator avoids per-row state backend I/O through two mechanisms:

Write-behind accumulator cache

Accumulator state (partial aggregation results per key per window) is cached in an in-memory HashMap. During process(), the operator reads and updates accumulators entirely in memory. State is only flushed to RocksDB when a checkpoint barrier arrives — the same write-behind pattern used by the non-windowed aggregate_expr() operator.

This means a batch of 1000 rows touching 50 unique (key, window) pairs does zero RocksDB I/O during processing. The only state backend calls happen on cache misses after recovery, or when the pending_windows LRU evicts entries.

Group-then-update strategy

Instead of processing each row individually, the operator groups row indices by (key, window) and then calls update_batch() once per group with a multi-row Arrow array:

Batch: 4000 rows, 100 keys, SlidingWindows(10s, 2s) → 5 windows per row

Step 1 — Assign & group:
  For each row, compute (key, window) pairs → HashMap<(key, window), [row indices]>
  Result: ~500 groups (100 keys × 5 windows), ~8 rows per group

Step 2 — Batch update:
  For each group:
    arrow::compute::take(value_column, group_indices) → multi-row array
    accumulator.update_batch([multi_row_array])       → one call, not 8

This reduces 20,000 update_batch() calls (4000 rows × 5 windows) to 500 calls with ~8-row arrays each.

Batch-level event time fast path: When all rows share the same event time (no timestamp_column configured), window assignment is computed once for the entire batch rather than per-row. The operator then only groups by key, which is even cheaper.

Session window fallback: Session windows require per-row sequential processing because each event can trigger a merge of overlapping sessions. The batch path is used for tumbling and sliding windows only.

Memory Management

The window operator tracks active (key, window) pairs in an in-memory index called pending_windows. With high-cardinality keys or wide/sliding windows, this index can grow large.

Bounding Window Memory with LRU

Use with_max_pending_windows() to cap the in-memory index size. Evicted entries are persisted to the state backend and scanned during watermark advancement, so no data is lost:

#![allow(unused)]
fn main() {
stream
    .key_by(col("user_id"))
    .window(TumblingWindows::of(Duration::from_secs(60)))
    .with_max_pending_windows(100_000)  // bound in-memory window index
    .aggregate_expr(vec![sum(col("amount"))], state_backend)
}

When unset (the default), the index is unbounded — identical to previous releases.

Sizing guidance:

Key cardinalityWindows per keyRecommended limitApprox. memory
< 10KAnyUnbounded (default)< 10 MB
10K–100K1 (tumbling)100,000~10 MB
100K–1M1 (tumbling)500,000~50 MB
Any10+ (sliding)Multiply by panesVaries

Each pending entry is ~100 bytes. Evicted entries add a small RocksDB I/O cost on watermark advance.

Note: with_max_pending_windows() is not recommended with session windows. Session window merging requires scanning the pending index for overlapping sessions, and evicted entries may cause additional state backend I/O during merges.

Monitoring

Monitor the volley_window_pending_size gauge and volley_window_pending_evictions_total counter to tune the limit. A high eviction rate indicates the LRU is too small for the workload. See Observability for the full metrics reference.

Per-Record Tracing

Volley can trace individual records as they flow through a pipeline, creating OpenTelemetry spans at each processing stage. This gives you a complete picture of what happened to a record: which operators processed it, how long each step took, what the input and output looked like, and whether any errors occurred.

Quick Start

#![allow(unused)]
fn main() {
use volley_core::prelude::*;
use volley_core::observability::{TracingConfig, SamplingStrategy};

let results = StreamExecutionEnvironment::new()
    .from_source(my_source)
    .with_tracing(TracingConfig {
        sampling: SamplingStrategy::Ratio(0.01),
        ..Default::default()
    })
    .map(|record| {
        // Your transformation
        Ok(record)
    })
    .to_sink(my_sink)
    .execute("traced-pipeline")
    .await?;
}

Open Jaeger or Grafana Tempo to see traces. Each sampled record appears as a trace with child spans for source, operators, and sink.

How It Works

  1. Sampling decision happens at the source. Based on your SamplingStrategy, the runtime decides whether to trace each record.
  2. Root span is created for sampled records (volley.record). The span’s context is attached to the StreamRecord.trace field.
  3. Child spans are created automatically at each pipeline stage (operators, sink). No code changes needed in your operators.
  4. Payload previews (optional) capture a JSON snapshot of the first row at each operator’s input and output, size-capped to max_payload_bytes.
  5. Cleanup happens at pipeline exit. Any root spans for records that were filtered or lost to errors are ended.

Sampling

At production throughput (thousands of records per second), tracing every record would overwhelm your tracing backend. Use Ratio sampling:

ThroughputSuggested ratioTraces/sec
1K rec/s0.10 (10%)~100
10K rec/s0.01 (1%)~100
100K rec/s0.001 (0.1%)~100

Use Always only during development or debugging.

Window and Aggregate Traces

Window and aggregate operators accumulate input records into new output records. The output gets its own trace, linked back to the input traces via OpenTelemetry span links. This preserves lineage without creating misleading parent-child relationships.

The volley.window.input_trace_count attribute tells you how many sampled records contributed to each aggregation result.

Kafka Trace Propagation

When using Kafka connectors, trace context propagates automatically via W3C traceparent headers:

  • Kafka source extracts traceparent from message headers
  • Kafka sink injects traceparent into produced messages

A traced request in an upstream service continues its trace through Volley and into downstream consumers. The runtime respects upstream sampling decisions (parent-based sampling).

Configuration Reference

FieldTypeDefaultDescription
samplingSamplingStrategyNeverWhich records to trace
max_payload_bytesusize1024Max JSON preview size per span event
capture_payloadbooltrueWhether to attach input/output previews

Viewing Traces

Volley emits traces via the OTLP gRPC exporter configured in ObservabilityConfig. Point it at your collector:

#![allow(unused)]
fn main() {
.with_observability(ObservabilityConfig::new()
    .with_node_id("node-1")
    .with_otlp_endpoint("http://otel-collector:4317"))
.with_tracing(TracingConfig {
    sampling: SamplingStrategy::Ratio(0.01),
    ..Default::default()
})
}

Compatible backends: Jaeger, Grafana Tempo, Datadog, Honeycomb, or any OTLP-compatible collector.

Error Handling

When a record fails during processing (e.g., a transformation returns an error), the record’s trace span is:

  1. Marked with otel.status_code = ERROR
  2. Annotated with the error message via span.record_error()
  3. Ended at the point of failure

Failed records are visible in your tracing backend by filtering on otel.status_code = ERROR. This makes tracing a useful debugging tool even when records are dropped or filtered.

Performance Impact

Tracing adds minimal overhead when sampling is configured appropriately:

SamplingOverheadNotes
Never~0%No spans created; trace context still propagated for Kafka
Ratio(0.001)< 1%Recommended for high-throughput production
Ratio(0.01)~1-2%Good for staging or moderate-throughput production
Always~5-10%Development and debugging only

The overhead comes from span creation and OTLP export, not from sampling decisions (which are a single random number comparison).

Troubleshooting

No traces appearing in the backend

  • Verify with_observability() is configured with the correct otlp_endpoint
  • Check that the OTLP collector is reachable from your application
  • Confirm sampling is not set to Never (the default)
  • Check collector logs for rejected spans (often caused by resource limits)

Traces appear but are missing operators

  • Ensure with_tracing() is called before operators are added to the pipeline
  • Custom operators using apply_operator() create spans automatically; no manual instrumentation needed

Kafka traces not connecting to upstream services

  • The upstream producer must inject W3C traceparent headers into Kafka message headers
  • If the upstream doesn’t send traceparent, Volley creates a new root trace (no parent linkage)

ML Inference

Enrich every record with a model prediction — sentiment scores on chat messages, fraud scores on transactions, embeddings on product descriptions — without leaving your pipeline. The volley-ml crate gives you stream operators that call ONNX or Candle models in-process, or forward batches to an external model server (vLLM, Triton, LiteLLM). The ergonomics match the rest of Volley: a config builder, a method on your stream, and the model output becomes extra Arrow columns downstream.

Backend Configuration

Configure the ML backend once and share it across operators:

#![allow(unused)]
fn main() {
use std::sync::Arc;
use volley_ml::prelude::*;

// CPU-only ONNX (simplest)
let ml = Arc::new(MlBackendConfig::onnx_cpu());

// ONNX on CUDA GPU with limits
let ml = Arc::new(MlBackendConfig::onnx_cuda(0)
    .with_num_threads(4)
    .with_memory_limit(4 * 1024 * 1024 * 1024));

// Candle on CPU (pure Rust, no C++ toolchain)
let ml = Arc::new(MlBackendConfig::candle_cpu());

// Candle on Metal (macOS GPU)
let ml = Arc::new(MlBackendConfig::candle_metal());

// ONNX on macOS Metal / Neural Engine via CoreML
let ml = Arc::new(MlBackendConfig::onnx_coreml());

// CoreML with specific compute units and model format
let ml = Arc::new(MlBackendConfig::onnx_coreml()
    .with_coreml_compute_units(CoreMLComputeUnits::CpuAndGpu)
    .with_coreml_model_format(CoreMLModelFormat::MLProgram));

// Candle with explicit architecture and HF revision
let ml = Arc::new(MlBackendConfig::candle_cpu()
    .with_architecture(CandleArchitecture::ModernBert)
    .with_hf_revision("v1.0"));
}

Loading ONNX Models from HuggingFace Hub

Pass a HuggingFace repo ID as the model path — the backend downloads and caches the ONNX model and tokenizer automatically:

#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cpu());

stream
    .embed(EmbeddingConfig::new(
        "sentence-transformers/all-MiniLM-L6-v2",  // HF repo ID
        "text",
        ml.clone(),
    ))
    .await?
}

Models are cached in ~/.cache/huggingface/hub/ and reused across runs.

Revision Pinning

Pin a specific revision with @:

#![allow(unused)]
fn main() {
EmbeddingConfig::new("user/model@v1.0", "text", ml.clone())
}

Or set it on the config:

#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cpu()
    .with_hf_revision("abc123"));
}

ONNX Model Variants

Many repos include optimized variants. Select one with with_onnx_model_file():

#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cpu()
    .with_onnx_model_file("onnx/model_O2.onnx"));  // O2 optimized
}

Common variants:

  • onnx/model.onnx — standard (default)
  • onnx/model_O2.onnx — O2 optimized
  • onnx/model_qint8_arm64.onnx — quantized for ARM64

Operators

Embedding (.embed())

Generate vector embeddings from a text column:

#![allow(unused)]
fn main() {
stream
    .embed(EmbeddingConfig::new("models/e5-small.onnx", "text", ml.clone())
        .with_output_column("embedding")
        .with_embedding_dim(384))
    .await?
}

Appends a FixedSizeList<Float32> column to the output.

Classification (.classify())

Classify records and get label + confidence score:

#![allow(unused)]
fn main() {
stream
    .classify(ClassifyConfig::new("models/fraud.onnx", "features", ml.clone())
        .with_label_column("is_fraud")
        .with_score_column("fraud_score")
        .with_labels(vec!["legitimate", "fraud"]))
    .await?
}

Appends Utf8 label and Float32 score columns.

Generic Inference (.infer())

Run any ONNX model on selected columns:

#![allow(unused)]
fn main() {
stream
    .infer(InferenceConfig::new("models/custom.onnx", ml.clone())
        .with_input_columns(vec!["feature_1", "feature_2"])
        .with_output_columns(vec![
            ("prediction", DataType::Float32),
        ]))
    .await?
}

External Model Server (.infer_remote())

Call OpenAI-compatible APIs (vLLM, TGI, LiteLLM):

#![allow(unused)]
fn main() {
stream
    .infer_remote(RemoteInferenceConfig::new(
        "http://localhost:8000/v1/embeddings",
        ApiFormat::OpenAI,
    )
        .with_input_columns(vec!["text"])
        .with_output_columns(vec![("embedding", DataType::Float32)])
        .with_max_concurrent(32)
        .with_timeout(Duration::from_secs(10)))
    .await?
}

Candle Backend

The Candle backend provides pure-Rust inference using HuggingFace’s Candle library. No C++ toolchain required.

Loading from HuggingFace Hub

Pass a HuggingFace repo ID as the model path — the backend downloads and caches model weights, config, and tokenizer automatically:

#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cpu());

stream
    .embed(EmbeddingConfig::new(
        "sentence-transformers/all-MiniLM-L6-v2",  // HF repo ID
        "text",
        ml.clone(),
    ))
    .await?
}

Pin a specific revision with @:

#![allow(unused)]
fn main() {
EmbeddingConfig::new("user/model@v1.0", "text", ml.clone())
}

Or set it on the config:

#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cpu()
    .with_hf_revision("abc123"));
}

Loading from Local Files

Point to a local directory containing model.safetensors, config.json, and optionally tokenizer.json:

#![allow(unused)]
fn main() {
stream
    .embed(EmbeddingConfig::new("/models/my-bert", "text", ml.clone()))
    .await?
}

Supported Architectures

The backend auto-detects the architecture from the model’s config.json (model_type field):

Encoders

model_typeArchitectureEnum VariantUse Cases
bert, robertaBERTBertEmbeddings, classification
distilbertDistilBERTDistilBertFaster embeddings (6-layer distilled)
nomic_bertNomicBERTNomicBertNomic Embed text embeddings
jina_bertJinaBERTJinaBertJina AI embeddings (ALiBi attention)
modernbertModernBERTModernBertImproved BERT with rotary embeddings
deberta-v2DeBERTa V2DebertaV2High-accuracy classification
xlm-robertaXLM-RoBERTaXlmRobertaMultilingual embeddings

Decoders

model_typeArchitectureEnum VariantUse Cases
llamaLlamaLlamaClassification, inference
mistralMistralMistralClassification, inference
gemma2Gemma 2Gemma2Classification, inference
phi3Phi-3Phi3Small/efficient inference
deepseek_v2DeepSeek V2DeepSeekV2MoE inference

Encoder models output [batch, hidden_size] (CLS token extraction). Decoder models output [batch, vocab_size] (last-token logits).

Override auto-detection with with_architecture():

#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cpu()
    .with_architecture(CandleArchitecture::ModernBert));
}

Tokenizer Integration

When the tokenizers feature is enabled and the model has a tokenizer.json, text columns are automatically tokenized before the forward pass. For HuggingFace Hub models, Volley also checks onnx/tokenizer.json when the repo keeps shared tokenizer assets under the onnx/ subdirectory. No manual tokenization needed.

For .classify(), the backend must return per-class logits. Volley now validates that the model output width matches the configured label count and raises an operator error when a backend returns encoder embeddings instead of classifier logits.

volley-ml = { version = "1.0.0", features = ["candle", "tokenizers"] }

GPU Acceleration

GPU inference is 10–100× faster for transformer models. Volley supports CUDA (NVIDIA) and Metal (macOS) GPUs.

CUDA (NVIDIA GPUs)

ONNX Runtime: CUDA is handled at runtime via execution providers — no compile-time feature flag needed. Use MlBackendConfig::onnx_cuda(device_id):

#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cuda(0));  // GPU 0
}

If CUDA is unavailable, the ONNX Runtime silently falls back to CPU.

Candle: Requires the cuda compile-time feature (pulls in cudarc CUDA bindings):

volley-ml = { version = "1.0.0", features = ["candle", "tokenizers", "cuda"] }
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cuda(0));
}

Prerequisites: NVIDIA driver and CUDA toolkit must be installed. Verify with nvidia-smi or volley doctor.

Metal (macOS GPUs)

Requires the metal compile-time feature:

volley-ml = { version = "1.0.0", features = ["candle", "tokenizers", "metal"] }
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_metal());
}

CoreML (ONNX on macOS)

The CoreML execution provider accelerates ONNX models on macOS using Metal GPU and/or the Apple Neural Engine. Requires the coreml compile-time feature:

volley-ml = { version = "1.0.0", features = ["onnx", "coreml"] }
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_coreml());
}

If CoreML is unavailable (e.g., on Linux), the ONNX Runtime silently falls back to CPU.

Partial operator coverage: CoreML does not support every ONNX operation. Unsupported ops automatically fall back to CPU within the same session. For example, DistilBERT has 387 graph nodes but only 17 are CoreML-eligible. This means most computation still runs on CPU, and the CoreML compilation overhead can make the “GPU” path slower than pure CPU for small models or small batch sizes. Use GPU=cpu to benchmark both paths. Models with convolution-heavy architectures (vision models) tend to have better CoreML coverage than attention-heavy transformers.

Compute units control which hardware CoreML dispatches to:

ValueHardware
All (default)CPU + GPU + Neural Engine
CpuAndGpuCPU + Metal GPU only
CpuAndNeuralEngineCPU + Neural Engine only
CpuOnlyCPU only (debugging)
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_coreml()
    .with_coreml_compute_units(CoreMLComputeUnits::CpuAndGpu));
}

Model format controls the internal CoreML representation:

FormatRequirementNotes
NeuralNetwork (default)macOS 10.15+Broadest compatibility
MLProgrammacOS 12+More operators, potentially faster
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_coreml()
    .with_coreml_model_format(CoreMLModelFormat::MLProgram));
}

GPU Detection

Check GPU availability at runtime without loading a model:

#![allow(unused)]
fn main() {
let info = volley_ml::gpu::detect_gpu();
println!("{}", info.summary);
// "CUDA: disabled (enable `cuda` feature); Metal: available; CoreML: available"

if info.cuda_available {
    println!("Using GPU with {} CUDA device(s)", info.cuda_device_count);
}
if info.coreml_available {
    println!("CoreML available for ONNX Metal acceleration");
}
}

The volley doctor CLI command also reports GPU status.

Feature Flags

Add volley-ml to your Cargo.toml with the features you need:

[dependencies]
# ONNX Runtime backend (CPU)
volley-ml = { version = "1.0.0", features = ["onnx"] }

# ONNX on CUDA GPU (runtime EP registration, no compile-time flag)
volley-ml = { version = "1.0.0", features = ["onnx"] }

# Candle backend with tokenizers (pure Rust, CPU)
volley-ml = { version = "1.0.0", features = ["candle", "tokenizers"] }

# Candle on CUDA GPU
volley-ml = { version = "1.0.0", features = ["candle", "tokenizers", "cuda"] }

# Candle on macOS Metal GPU
volley-ml = { version = "1.0.0", features = ["candle", "tokenizers", "metal"] }

# ONNX on macOS CoreML (Metal GPU + Neural Engine)
volley-ml = { version = "1.0.0", features = ["onnx", "coreml"] }

# External model servers
volley-ml = { version = "1.0.0", features = ["remote-http"] }

Backend Comparison

ONNX Runtime (onnx)Candle (candle)
LanguageC++ via FFIPure Rust
Model formatONNX (universal)SafeTensors / HF Hub
Model loadingLocal file or HuggingFace HubLocal file or HuggingFace Hub
TokenizerExternal (user handles)Built-in (tokenizers feature)
GPUCUDA, TensorRT, CoreML (coreml feature)CUDA, Metal
Best forAny ONNX-exportable modelHF transformer models
BuildNeeds C++ toolchain (auto-downloaded)Pure cargo build

Performance Tuning

Session Pooling (ONNX)

By default, a single ONNX session is shared across all inference calls. For concurrent pipelines, pool multiple sessions to eliminate lock contention:

#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cuda(0)
    .with_session_pool_size(4));  // 4 sessions, round-robin
}

Each session is an independent model copy. A pool of 2–4 is usually sufficient.

Embedding Cache

Enable an LRU cache to skip inference for repeated text:

#![allow(unused)]
fn main() {
stream
    .embed(EmbeddingConfig::new("model.onnx", "text", ml.clone())
        .with_cache(10_000))  // Cache up to 10,000 unique embeddings
    .await?
}

Identical input strings are served from cache. This can eliminate 50–90% of model calls in pipelines with repeated product names, log messages, etc.

Half-Precision Inference (FP16 / BF16)

Load Candle model weights in half precision for 2× memory savings and faster GPU inference on hardware with tensor cores:

#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cuda(0)
    .with_dtype(ModelDType::F16));
}

On CPU, F16/BF16 are automatically promoted to F32 since most CPUs lack native half-precision arithmetic.

Remote Batch Accumulation

When upstream batches are small, buffer them before sending to the model server:

#![allow(unused)]
fn main() {
RemoteInferenceConfig::new("http://model-server:8000/v1/embeddings", ApiFormat::OpenAI)
    .with_batch_accumulation(64, Duration::from_millis(50))
    .with_max_concurrent(16)
}

This sends a single HTTP request once 64 rows accumulate or 50ms elapse, reducing round trips.

Running the Example

The ML inference example works out of the box with zero configuration. It auto-downloads a sentiment classification model from HuggingFace Hub and auto-detects Metal (CoreML) on macOS:

# Just works — downloads model from Hub, auto-detects Metal on macOS
cargo run --example ml_inference_pipeline -p volley-examples --features ml

Override defaults with environment variables:

  • MODEL_PATH=./my-model.onnx — use a local ONNX model instead of downloading from Hub
  • GPU=cpu — force CPU backend (disables Metal/CoreML auto-detection)

Recent Bug Fixes

  • Hub tokenizer resolution — tokenizer download now checks sibling paths (e.g., onnx/tokenizer.json) when tokenizer.json is not at the repo root.
  • Softmax normalization — classification scores now use softmax normalization, producing proper [0, 1] probabilities instead of raw logits.

CLI Template

Scaffold an ML pipeline project with one command:

volley new --template ml-pipeline my-classifier
cd my-classifier

This generates a Kafka-to-Kafka pipeline with ONNX classification:

  • src/main.rs — Reads text from Kafka, classifies with .classify(), writes enriched records to output topic
  • Cargo.toml — Includes volley-ml with onnx and tokenizers features
  • .env.example — Kafka connection and MODEL_PATH configuration

Edit src/main.rs to customize:

  • Change the model: Update MODEL_PATH and the label list in .classify()
  • Swap operators: Replace .classify() with .embed() for embeddings or .infer() for generic inference
  • Enable GPU: Change MlBackendConfig::onnx_cpu() to MlBackendConfig::onnx_cuda(0)
  • Use Candle: Switch to MlBackendConfig::candle_cpu() for pure-Rust inference
  • Add remote inference: Chain .infer_remote(RemoteInferenceConfig::new(...))

Integration Testing

Volley provides the volley-test-harness crate — a shared test framework inspired by Apache Flink’s testing patterns. It gives you containers, assertion harnesses, and a standardized connector test suite so you can validate pipelines end-to-end with real infrastructure.

Architecture

The test harness is layered — each layer builds on the one below it:

┌──────────────────────────────────────────────────┐
│  End-to-End Tests (tests/)                       │
│  Full pipelines with real connectors             │
├──────────────────────────────────────────────────┤
│  Connector Framework (connector/)                │
│  ConnectorTestContext + source/sink suites        │
├──────────────────────────────────────────────────┤
│  Assertion Harnesses (assertions/)               │
│  MetricsSnapshot, TraceCapture, ThroughputRecorder│
├──────────────────────────────────────────────────┤
│  TestCluster (cluster.rs)                        │
│  In-process pipeline runner + observability       │
├──────────────────────────────────────────────────┤
│  Container Management (containers/)              │
│  Kafka, MinIO with retry logic + ARM64 images     │
└──────────────────────────────────────────────────┘

Quick Start

Add volley-test-harness as a dev-dependency in your connector crate:

[dev-dependencies]
volley-test-harness = { path = "../volley-test-harness" }
async-trait = { workspace = true }
arrow = { workspace = true }
anyhow = { workspace = true }

Container Management

Shared testcontainers with retry logic and one-container-per-binary lifecycle via OnceCell. Containers start lazily on first use and are cleaned up when the test process exits.

Kafka

#![allow(unused)]
fn main() {
use volley_test_harness::containers::KafkaContainer;

// Starts a native ARM64 Kafka container on first call.
let servers = KafkaContainer::bootstrap_servers().await;

// Each test gets an isolated topic.
let topic = KafkaContainer::unique_topic("my-test");
KafkaContainer::create_topic(&topic, 1).await;

// Seed test data.
KafkaContainer::produce_json(&topic, &[
    serde_json::json!({"name": "alice", "score": 42}),
]).await;

// Verify output.
let messages = KafkaContainer::consume_json(&topic, 1, 10).await;
assert_eq!(messages[0]["name"], "alice");
}

MinIO (S3-compatible)

#![allow(unused)]
fn main() {
use volley_test_harness::containers::{MinioContainer, MINIO_ACCESS_KEY, MINIO_SECRET_KEY};

let endpoint = MinioContainer::endpoint().await;
let bucket = MinioContainer::unique_bucket("test");
MinioContainer::create_bucket(&bucket).await;
}

Polaris (Apache Iceberg REST catalog)

#![allow(unused)]
fn main() {
use volley_test_harness::containers::PolarisContainer;

// Starts a Polaris container backed by MinIO. MinIO must be started first.
let catalog_url = PolarisContainer::catalog_url().await;

// Each test gets an isolated catalog name.
let catalog = PolarisContainer::unique_catalog("my-test");
PolarisContainer::create_catalog(&catalog).await;
}

PolarisContainer depends on MinioContainer for the underlying storage — start MinioContainer before calling any PolarisContainer method.

Connector Test Framework

Like Flink’s Connector Testing Framework — implement ConnectorTestContext for your connector and get a standardized source/sink test suite for free.

Implementing ConnectorTestContext

#![allow(unused)]
fn main() {
use volley_test_harness::connector::ConnectorTestContext;

struct MyConnectorTestContext { /* ... */ }

#[async_trait]
impl ConnectorTestContext for MyConnectorTestContext {
    fn name(&self) -> &str { "MyConnector" }
    fn test_schema(&self) -> Arc<Schema> { /* ... */ }

    async fn seed_test_data(&self, records: &[RecordBatch]) -> anyhow::Result<()> {
        // Write records into the external system.
    }

    async fn create_source(&self) -> anyhow::Result<Option<Box<dyn DynSource>>> {
        // Create a source pointing at the seeded data.
    }

    async fn create_sink(&self) -> anyhow::Result<Option<Box<dyn DynSink>>> {
        // Create a sink writing to a test destination.
    }

    async fn read_sink_results(&self) -> anyhow::Result<Vec<RecordBatch>> {
        // Read back what the sink wrote for verification.
    }

    async fn create_empty_source(&self) -> anyhow::Result<Option<Box<dyn DynSource>>> {
        // Optional: source pointing at an empty data set.
        Ok(None) // Return None to skip the empty-source test.
    }
}
}

Running the Suites

#![allow(unused)]
fn main() {
#[tokio::test]
async fn my_connector_source_suite() {
    let ctx = MyConnectorTestContext::new().await;
    volley_test_harness::connector::run_source_suite(&ctx).await;
}

#[tokio::test]
async fn my_connector_sink_suite() {
    let ctx = MyConnectorTestContext::new().await;
    volley_test_harness::connector::run_sink_suite(&ctx).await;
}
}

The source suite runs three tests:

  • test_basic_read — seeds 5 records, verifies the source returns data
  • test_read_all_records — seeds 10 records, reads until all consumed
  • test_empty_source — verifies graceful handling of empty data sets

The sink suite runs three tests:

  • test_basic_write — writes records, flushes, verifies they’re readable
  • test_write_and_flush — multiple writes followed by flush
  • test_checkpoint_triggers_flush — verifies on_checkpoint() commits data

See volley-connector-kafka/tests/connector_suite.rs for the reference implementation.

TestCluster

An in-process pipeline execution wrapper with built-in observability capture. Registers an InMemorySpanExporter as the global OpenTelemetry tracer so spans created during pipeline execution are captured for assertions.

#![allow(unused)]
fn main() {
use volley_test_harness::cluster::TestCluster;

let cluster = TestCluster::new("my-test");

let pipeline = StreamExecutionEnvironment::new()
    .from_source(source)
    .with_tracing(cluster.tracing_config())  // 100% sampling, in-memory export
    .with_operator(my_operator)
    .with_operator_id("my-op")
    .to_sink(sink);

pipeline.execute("my-pipeline").await?;

// Inspect captured spans.
let traces = cluster.traces();
println!("Captured {} spans", traces.span_count());
traces.print_timing_summary();
}

Assertion Harnesses

TraceCapture

Assert on OpenTelemetry spans captured by the TestCluster:

#![allow(unused)]
fn main() {
let traces = cluster.traces();

// Verify span counts.
traces.assert_span_count("volley.operator", 10);

// Verify trace structure.
traces.assert_valid_tree();  // No orphan spans.

// Print per-operator timing breakdown.
traces.print_timing_summary();
}

The timing summary shows per-span-name breakdown with operator identity:

  Span                                      Count   Total ms     Avg ms     Max ms
  --------------------------------------------------------------------------------
  volley.record                                10      559.2       55.9      100.3
  volley.sink                                   8       72.0        9.0       11.0
  volley.operator (ml-embed)                    8       56.8        7.1       25.9
  volley.operator (status-filter)              10        1.1        0.1        0.8
  volley.operator (proto-decode)               10        0.8        0.1        0.5
  volley.source                                10        0.4        0.0        0.3

Use .with_operator_id("name") on your pipeline to assign human-readable names.

MetricsSnapshot

Scrape and assert on Prometheus metrics:

#![allow(unused)]
fn main() {
let metrics = MetricsSnapshot::scrape(prometheus_port).await?;

metrics.assert_counter_exists("volley_source_records_polled");
metrics.assert_counter_gte("volley_records_processed_total", 100.0);
metrics.assert_gauge_in_range("volley_pipeline_health", 1.0, 1.0);
}

ThroughputRecorder

Measure and assert on pipeline throughput:

#![allow(unused)]
fn main() {
let mut recorder = ThroughputRecorder::new();
recorder.start();
pipeline.execute("bench").await?;
recorder.stop(record_count);

recorder.assert_throughput_gte(100.0);        // min records/sec
recorder.assert_elapsed_under_secs(30.0);     // max wall time
println!("{:.1} records/sec", recorder.records_per_second().unwrap());
}

Test Data Generators

The data module provides test data factories:

#![allow(unused)]
fn main() {
use volley_test_harness::data::*;

// Protobuf messages (serialized bytes).
let protos = generate_crawl_results(50);
// Every 5th record is a failure (HTTP 503), rest are HTTP 200.

// Arrow RecordBatch (for non-protobuf tests).
let batch = generate_crawl_batch(50);

// The schema used by both generators.
let schema = crawl_result_schema();
}

The CrawlResult type has impl_to_arrow! so it works with auto_decode_operator::<CrawlResult>("payload").

Running Tests

All integration tests require Docker for testcontainers.

# Kafka connector suite (6 tests, ~45s)
cargo test -p volley-connector-kafka --test connector_suite -- --nocapture

# Iceberg connector suite (Polaris + MinIO, ~60s)
cargo test -p volley-connector-iceberg --test connector_suite -- --nocapture

# E2E golden path — full pipeline with windowing, metrics, traces, and ML (1 test, ~30s)
cargo test -p volley-test-harness --test e2e_golden_path -- --nocapture

# With Metal GPU disabled (CPU fallback)
cargo test -p volley-test-harness --test e2e_golden_path --no-default-features -- --nocapture

Where Tests Live

Connector-specific integration tests belong in their connector crate, importing shared infrastructure from volley-test-harness:

CrateTestsPurpose
volley-connector-kafkatests/connector_suite.rsKafka source/sink through ConnectorTestContext
volley-connector-icebergtests/connector_suite.rsIceberg source/sink through ConnectorTestContext (Polaris + MinIO)
volley-test-harnesstests/e2e_golden_path.rsCross-connector end-to-end pipeline test

The pattern: volley-test-harness provides the framework, each connector crate owns its own integration tests.

Golden Path Test Coverage

The E2E golden path test (e2e_golden_path.rs) exercises the full pipeline:

  • Correctness — Kafka protobuf → decode → filter → ML embed → Kafka sink, asserting output records match expectations
  • Windowed aggregation — tumbling window over the event stream with watermark advancement
  • ThroughputThroughputRecorder asserts sustained records/sec
  • MetricsMetricsSnapshot scrapes the embedded Prometheus endpoint and asserts counters (volley_records_processed_total, volley_source_records_polled, pipeline health gauge)
  • TracesTraceCapture from TestCluster asserts span count, valid parent–child tree, and per-operator timing via print_timing_summary()

Horizontal Scaling Guide

Run a single Volley pipeline across several Kubernetes pods when one machine can’t keep up — too many Kafka partitions, too much keyed state, or you need a pod failure to leave processing intact. You don’t change your pipeline code; you add a ClusterConfig, a shared ReadWriteMany checkpoint volume, and a replicas count in the operator CR. Volley handles key distribution, shuffling between pods, and live rescaling.

This page is the deployment checklist. For the design behind it (key groups, Arrow Flight shuffle, leader election), see Horizontal Scaling in Architecture.

Prerequisites

  • Kubernetes cluster (v1.26+)
  • A ReadWriteMany storage class (AWS EFS, GCP Filestore, Azure Files)
  • The Volley K8s operator deployed (see Kubernetes Operator)
  • Your pipeline built with the distributed feature enabled

When to Use Horizontal Scaling

Single-node mode handles most workloads. Consider horizontal scaling when:

  • Your key space is too large for one node’s memory (state doesn’t fit in RAM + RocksDB)
  • You need fault tolerance across nodes (pod failure shouldn’t stop the pipeline)
  • Your source has more partitions than one node can consume (e.g., 100+ Kafka partitions)

If your bottleneck is CPU on stateless operators (filter, map), scale vertically first. Horizontal scaling adds network overhead for the key shuffle.

Step 1: Enable the Distributed Feature

Add the distributed feature to your pipeline’s Cargo.toml:

[dependencies]
volley-core = { version = "0.8", features = ["distributed"] }

Step 2: Configure ClusterConfig

In your pipeline code, add with_cluster() to the environment:

#![allow(unused)]
fn main() {
use volley_core::prelude::*;

// In production, use ClusterConfig::from_env() to auto-detect from K8s env vars
let cluster = ClusterConfig::from_env();

let mut env = StreamExecutionEnvironment::new();
if let Some(config) = cluster {
    env = env.with_cluster(config);
}

env.from_source(source)
    .key_by(col("user_id"))
    .aggregate_expr(vec![sum(col("amount"))], state_backend)
    .to_sink(sink)
    .execute("my-pipeline")
    .await?;
}

The pipeline API is identical — with_cluster() is the only addition. When ClusterConfig is absent (e.g., running locally), the pipeline runs in single-node mode with zero overhead.

Step 3: Create a Shared Checkpoint PVC

All workers need access to the same checkpoint directory. Create a ReadWriteMany PVC:

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: volley-shared-ckpt
  namespace: my-pipeline
spec:
  accessModes: [ReadWriteMany]
  storageClassName: efs    # AWS EFS, or your cloud's RWX storage class
  resources:
    requests:
      storage: 50Gi
kubectl apply -f shared-pvc.yaml

Step 4: Deploy with the K8s Operator

Create a VolleyApplication with the cluster section:

apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
  name: my-pipeline
  namespace: my-pipeline
spec:
  image: registry.example.com/my-pipeline:v1.0
  replicas: 3

  resources:
    requests:
      cpu: "2"
      memory: "4Gi"
    limits:
      cpu: "4"
      memory: "8Gi"

  cluster:
    maxKeyGroups: 256              # default, power of 2
    flightPort: 50051              # default
    sharedCheckpoints:
      claimName: volley-shared-ckpt  # the PVC from step 3

  state:
    size: 10Gi                     # per-worker local RocksDB

  checkpoints:
    size: 5Gi                      # per-worker local checkpoint staging

The operator automatically:

  • Injects VOLLEY_* environment variables (VOLLEY_NUM_WORKERS, VOLLEY_HEADLESS_SERVICE, VOLLEY_MAX_KEY_GROUPS, VOLLEY_FLIGHT_PORT, VOLLEY_APP_NAME)
  • Adds the Arrow Flight port (50051) to the container and headless service
  • Mounts the shared checkpoint PVC at /mnt/volley/checkpoints

Step 5: Verify

# Check pods are running
kubectl get pods -n my-pipeline
# NAME             READY   STATUS    RESTARTS   AGE
# my-pipeline-0    1/1     Running   0          2m
# my-pipeline-1    1/1     Running   0          1m
# my-pipeline-2    1/1     Running   0          30s

# Check env vars
kubectl exec my-pipeline-0 -n my-pipeline -- env | grep VOLLEY
# VOLLEY_NUM_WORKERS=3
# VOLLEY_HEADLESS_SERVICE=my-pipeline-headless.my-pipeline.svc
# VOLLEY_MAX_KEY_GROUPS=256
# VOLLEY_FLIGHT_PORT=50051
# VOLLEY_APP_NAME=my-pipeline

# Check Flight port on headless service
kubectl get svc my-pipeline-headless -n my-pipeline -o jsonpath='{.spec.ports[*].name}'
# health flight

# Check shared checkpoint mount
kubectl exec my-pipeline-0 -n my-pipeline -- ls /mnt/volley/checkpoints

Scaling

Change the replica count to scale:

kubectl patch vapp my-pipeline -n my-pipeline --type merge -p '{"spec":{"replicas":5}}'

The coordinator triggers a checkpoint, recomputes key group assignments, and workers pick up their new key groups from the shared filesystem. No restart required.

Monitoring

Key metrics to watch in distributed mode:

MetricWhat it tells you
volley_shuffle_local_records_totalRecords staying on this worker
volley_shuffle_remote_records_total{target_worker="N"}Records sent to each remote worker
volley_flight_send_errors_totalTransport errors (indicates worker connectivity issues)
volley_flight_reconnects_total{target_worker}Successful reconnects after a HalfOpen probe — a steady trickle is a peer that keeps flapping; a single spike lines up with one pod restart
volley_circuit_breaker_state{target_worker}0=Closed, 1=Open, 2=HalfOpen per remote worker
volley_circuit_breaker_trips_total{target_worker}Closed → Open transitions
volley_circuit_breaker_probes_succeeded_total{target_worker}HalfOpen → Closed transitions; ratio with the trips counter is the reconnect success rate
volley_cluster_worker_idThis worker’s ordinal
volley_cluster_num_workersTotal cluster size
volley_cluster_key_groups_assignedKey groups owned by this worker
volley_barrier_alignment_wait_msTime to align barriers across all inputs

Troubleshooting

Pipeline stalls after a pod restart: Check if the BarrierAligner is waiting for a dead input. The Flight server should clean up stale connections via TCP keepalive (10s). If the stall persists, check volley_flight_send_errors_total for rising error counts. After a peer pod restart, the HalfOpen probe rebuilds the outbound tonic::Channel, reregisters the shuffle channel, and replays every unacknowledged frame from the client’s 8 MiB per-peer outbox onto the fresh stream (the server dedups via its last_delivered_seq so survivors land exactly once). Confirm volley_flight_reconnects_total{target_worker} increments and volley_circuit_breaker_state returns to 0 (Closed). Because replay is at-least-once, the epoch in flight at the moment of the crash often recovers without aborting — check that volley_dag_sink_aborts_total stays flat across the restart window. Longer outages (a reconnect that runs past the 2PC deadline) still fall back to a cluster-wide abort, with the next epoch committing cleanly.

Outbox backpressure (sender appears to stall per-peer): The client’s outbox is byte-bounded (8 MiB / peer by default; see DEFAULT_OUTBOX_MAX_BYTES). When a remote consumer goes silent — network partition, GC pause, OOM — the encoder parks on the byte budget until ACKs trim the backlog. Symptoms: volley_shuffle_remote_records_total{target_worker="N"} flatlines while other targets keep climbing. Root-cause by looking at the peer’s volley_flight_recv_messages_total and its drain-task logs; the outbox itself is doing the right thing by pushing back instead of running the worker out of memory.

Uneven record distribution: Check volley_shuffle_remote_records_total per target worker. The key group assignment is contiguous — if your key distribution is skewed, some workers will handle more traffic. Consider increasing maxKeyGroups for finer granularity.

Checkpoint failures: Verify the shared PVC is mounted and writable on all pods:

kubectl exec my-pipeline-0 -- touch /mnt/volley/checkpoints/test && echo OK

DNS resolution failures at startup: Workers discover each other via StatefulSet DNS. If pods start before DNS propagates, the runtime retries with exponential backoff (100ms to 5s). Check logs for “Waiting for remote worker” messages.

Kafka

Volley provides a Kafka source and sink via the volley-connector-kafka crate. The sink defaults to non-transactional idempotent mode for maximum throughput, with opt-in exactly-once transactional semantics.

Setup

Add the Kafka feature to your Cargo.toml:

[dependencies]
volley-connector-kafka = { git = "https://github.com/volley-streams/volley" }

System dependencies: cmake is required for building rdkafka. OpenSSL and libcurl are vendored and statically linked — no additional system packages needed. See Installation.

Authentication

Use KafkaSecurityConfig to configure authentication for both sources and sinks. The same config struct works with .with_security() on either KafkaSourceConfig or KafkaSinkConfig.

Confluent Cloud / SASL_SSL

The most common setup for managed Kafka (Confluent Cloud, AWS MSK Serverless, Aiven):

#![allow(unused)]
fn main() {
use volley_connector_kafka::{KafkaSourceConfig, KafkaSinkConfig, KafkaSecurityConfig};

let security = KafkaSecurityConfig::sasl_ssl("API_KEY", "API_SECRET");

let source_config = KafkaSourceConfig::new(
    "pkc-xxxxx.us-east-1.aws.confluent.cloud:9092", "events", "my-group", schema,
).with_security(security.clone());

let sink_config = KafkaSinkConfig::new(
    "pkc-xxxxx.us-east-1.aws.confluent.cloud:9092", "output",
).with_security(security);
}

This sets security.protocol=SASL_SSL and sasl.mechanism=PLAIN, which is the correct combination for Confluent Cloud and most SASL-based providers.

SCRAM Authentication

For brokers that use SCRAM (e.g., AWS MSK with IAM disabled, self-hosted clusters):

#![allow(unused)]
fn main() {
let security = KafkaSecurityConfig::sasl_ssl("username", "password")
    .with_sasl_mechanism("SCRAM-SHA-256");
}

Supported SASL mechanisms: PLAIN (default), SCRAM-SHA-256, SCRAM-SHA-512.

mTLS (Mutual TLS)

For clusters that authenticate clients via X.509 certificates:

#![allow(unused)]
fn main() {
let security = KafkaSecurityConfig::ssl()
    .with_ssl_ca("/etc/kafka/ca.pem")
    .with_ssl_cert("/etc/kafka/client.pem")
    .with_ssl_key("/etc/kafka/client.key");

let source_config = KafkaSourceConfig::new("broker:9093", "events", "my-group", schema)
    .with_security(security);
}

Key Types

TypeDescription
KafkaSourceConfigConfiguration for the Kafka consumer (brokers, topic, group, format, batch size)
KafkaSinkConfigConfiguration for the Kafka producer (brokers, topic, format, delivery mode)
KafkaSecurityConfigShared authentication/TLS config for source and sink
KafkaSourceFormatMessage format enum: Json (default), Protobuf, Avro
KafkaSinkFormatSerialization format enum: Json (default), Protobuf, Avro
KafkaSourceSource that polls Kafka for records
KafkaSinkSink that writes records to Kafka
KafkaEnvExtExtension trait for StreamExecutionEnvironment
KafkaStreamExtExtension trait for DataStream
ConfluentSchemaRegistryConfluent Schema Registry HTTP client (from volley-connectors)
env.from_kafka_protobuf::<M>()Trait method: Kafka source + ProtobufDecodeOperator
env.from_kafka_avro()Trait method: Kafka source + AvroDecodeOperator + Schema Registry

Example

use std::sync::Arc;
use std::time::Duration;
use arrow::datatypes::{DataType, Field, Schema};
use volley_core::prelude::*;
use volley_connector_kafka::{KafkaEnvExt, KafkaStreamExt, KafkaSourceConfig, KafkaSinkConfig};

#[tokio::main]
async fn main() -> Result<()> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("user_id", DataType::Utf8, false),
        Field::new("amount", DataType::Float64, false),
    ]));

    let source_config = KafkaSourceConfig::new("localhost:9092", "events", "my-group", schema);
    let sink_config = KafkaSinkConfig::new("localhost:9092", "output");

    let tmp = tempfile::tempdir().unwrap();

    StreamExecutionEnvironment::new()
        .from_kafka(source_config).await?
        .filter_expr(col("amount").gt(lit(0.0)))
        .key_by(col("user_id"))
        .window(TumblingWindows::of(Duration::from_secs(60)))
        .aggregate_expr(
            vec![sum(col("amount"))],
            RocksDbBackend::open(tmp.path().join("state")).unwrap(),
        )
        .to_kafka(sink_config).await?
        .execute("kafka-aggregation-job")
        .await?;

    Ok(())
}

For end-to-end exactly-once Kafka (source offsets committed inside the sink’s Kafka transaction), swap in the _exactly_once pair:

let (stream, cgm) = env.from_kafka_exactly_once(source_config).await?;
stream
    // ... operators ...
    .to_kafka_exactly_once(
        KafkaSinkConfig::new_transactional("localhost:9092", "output", "agg-txn"),
        cgm,
    )
    .await?
    .execute("kafka-eos")
    .await?;

Protobuf Format

For Protobuf-encoded Kafka messages, use KafkaSourceConfig::new_protobuf() and the from_kafka_protobuf trait method on KafkaEnvExt:

#![allow(unused)]
fn main() {
use volley_connector_kafka::{KafkaEnvExt, KafkaSourceConfig};

mod proto {
    include!(concat!(env!("OUT_DIR"), "/events.rs"));
}

let config = KafkaSourceConfig::new_protobuf("localhost:9092", "events", "my-group")
    .with_batch_size(100);

let stream = StreamExecutionEnvironment::new()
    .from_kafka_protobuf::<proto::Event>(config).await?;
}

The source emits raw binary payloads in a Binary “payload” column. The ProtobufDecodeOperator (wired automatically by from_kafka_protobuf) decodes them into typed Arrow columns using the compiled prost type. The type M must implement ToArrow, typically via the impl_to_arrow! proc-macro from volley-derive.

Scaffolding: volley new with Kafka source and protobuf format generates build.rs, proto/events.proto, and prost dependencies.

Avro Format (Schema Registry)

For Avro-encoded Kafka messages with Confluent Schema Registry, use KafkaSourceConfig::new_avro() and the from_kafka_avro trait method:

#![allow(unused)]
fn main() {
use volley_connector_kafka::{KafkaEnvExt, KafkaSourceConfig};

let config = KafkaSourceConfig::new_avro(
    "localhost:9092",
    "events",
    "my-group",
    "http://localhost:8081",  // Schema Registry URL
)
.with_schema_registry_auth("user", "password")  // optional basic auth
.with_batch_size(100);

let stream = StreamExecutionEnvironment::new()
    .from_kafka_avro(config).await?;
}

The source emits raw binary payloads. The AvroDecodeOperator (wired automatically by from_kafka_avro):

  1. Strips the Confluent 5-byte wire format header ([0x00][4-byte schema ID][payload])
  2. Fetches the Avro schema from the Schema Registry by ID (cached after first fetch)
  3. Decodes the Avro binary into apache_avro::types::Value
  4. Converts directly to Arrow arrays using typed ArrayBuilders (no JSON intermediary)

The Arrow schema is inferred from the Avro schema. Supported type mappings:

Avro TypeArrow Type
booleanBoolean
intInt32
longInt64
floatFloat32
doubleFloat64
stringUtf8
bytesBinary
fixedBinary
enumUtf8
union(null, T)T (nullable)

Plain Avro (No Schema Registry)

For Avro messages without the Confluent wire format, use AvroDecodeOperator directly with a static schema:

#![allow(unused)]
fn main() {
use volley_connectors::decode::avro::AvroDecodeOperator;

let avro_schema = apache_avro::Schema::parse_str(r#"{ ... }"#).unwrap();
let stream = env.from_kafka(config).await?
    .with_operator(AvroDecodeOperator::with_static_schema("payload", avro_schema));
}

Schema Evolution

For registry-backed Kafka sources with automatic schema evolution, use KafkaSourceConfig::new_dynamic() and from_kafka_with_schema_evolution(). This automatically chains the appropriate decode operator (Avro, Protobuf, or JSON Schema) based on the configured format and resolves schemas from the registry at runtime.

#![allow(unused)]
fn main() {
use std::sync::Arc;
use volley_connector_kafka::{KafkaEnvExt, KafkaSourceConfig, KafkaSourceFormat};
use volley_connectors::decode::{SchemaEvolutionConfig, SchemaResolutionConfig};
use volley_connectors::schema::{ConfluentSchemaRegistry, SubjectNameStrategy};
use volley_connectors::decode::wire::ConfluentWireFormat;
use volley_core::schema::SchemaEvolutionMode;
use volley_core::error_policy::ErrorPolicy;

// Registry-backed Avro source with schema evolution
let config = KafkaSourceConfig::new_dynamic("localhost:9092", "orders", "my-group")
    .with_format(KafkaSourceFormat::Avro)
    .with_schema_evolution(SchemaEvolutionConfig {
        resolution: SchemaResolutionConfig {
            registry: Arc::new(ConfluentSchemaRegistry::new("http://localhost:8081")),
            wire_format: Arc::new(ConfluentWireFormat),
            subject_strategy: SubjectNameStrategy::TopicName {
                topic: "orders".into(),
                is_key: false,
            },
        },
        evolution_mode: SchemaEvolutionMode::Adaptive,
        error_policy: ErrorPolicy::Skip,
    });

let stream = env.from_kafka_with_schema_evolution(config).await?;
}

The new_dynamic() constructor does not require an upfront Arrow schema — the schema is resolved from the registry at runtime based on message headers. The with_schema_evolution() builder method configures:

  • Registry: Which schema registry to query (e.g., ConfluentSchemaRegistry).
  • Wire format: How to extract the schema ID from message bytes (e.g., ConfluentWireFormat for the standard 5-byte header).
  • Subject strategy: How to resolve topic names to registry subjects (e.g., TopicName appends -value or -key).
  • Evolution mode: Strict (default, reject changes) or Adaptive (accept backward-compatible changes).
  • Error policy: Fail (abort on decode error), Skip (drop failed records), or DeadLetterQueue (route failures to a DLQ sink).

A keyed variant is also available: env.from_kafka_with_schema_evolution_keyed(config).

Batch Reads

By default, KafkaSource reads one message per poll_next() call (single-row RecordBatch). For high-throughput pipelines, enable batch reads:

#![allow(unused)]
fn main() {
let config = KafkaSourceConfig::new("localhost:9092", "events", "my-group", schema)
    .with_batch_size(100)       // accumulate up to 100 messages
    .with_batch_timeout(Duration::from_millis(50));  // wait 50ms for next message
}

The source accumulates up to batch_size messages, concatenating them into a single multi-row RecordBatch. All downstream operators handle multi-row batches correctly. Partial batches are returned on timeout (no blocking).

Batch metadata: event_time_ms is set to the max timestamp in the batch (correct for watermark advancement). For per-row event times in windowed operations, configure timestamp_column on your WindowConfig.

Consumer Tuning

Fine-tune the Kafka consumer’s fetch behavior for throughput vs. latency:

#![allow(unused)]
fn main() {
let config = KafkaSourceConfig::new("localhost:9092", "events", "my-group", schema)
    .with_batch_size(500)
    .with_fetch_min_bytes(32_768)         // wait for 32KB of data (default: 1)
    .with_fetch_max_wait_ms(100)          // broker waits up to 100ms to fill (default: 500)
    .with_max_partition_fetch_bytes(2_097_152); // 2MB per partition (default: 1MB)
}
MethodDefaultEffect
with_fetch_min_bytes(bytes)1Minimum bytes the broker accumulates before responding. Higher values reduce fetch requests at the cost of latency.
with_fetch_max_wait_ms(ms)500Max time the broker waits to fill fetch_min_bytes. Acts as an upper bound on added latency.
with_max_partition_fetch_bytes(bytes)1,048,576 (1MB)Max data returned per partition per fetch. Increase for partitions with large messages.
with_property(key, value)Escape hatch for any rdkafka consumer setting not covered above.

Tip: For high-throughput pipelines, combine with_batch_size() with with_fetch_min_bytes() so the broker pre-aggregates data and the source fills batches faster.

Retry on Transient Errors

Both the source and sink support optional retry with exponential backoff for transient broker errors (e.g., broker unavailable, request timeouts, leader changes, network exceptions). Configure via with_retry():

#![allow(unused)]
fn main() {
use std::time::Duration;
use volley_core::resilience::RetryConfig;

// Source with retry
let source_config = KafkaSourceConfig::new("localhost:9092", "topic", "group", schema)
    .with_retry(
        RetryConfig::new()
            .with_max_retries(5)
            .with_initial_backoff(Duration::from_millis(200))
            .with_max_backoff(Duration::from_secs(30)),
    );

// Sink with retry
let sink_config = KafkaSinkConfig::new_transactional("localhost:9092", "output", "txn-1")
    .with_retry(
        RetryConfig::new()
            .with_max_retries(3)
            .with_initial_backoff(Duration::from_millis(100)),
    );
}

When retry is not configured (the default), errors propagate immediately — existing behavior is unchanged.

Source behavior: Both single-message and batch accumulation paths retry recv() calls. Subsequent messages within a batch (after the first) are not retried — a partial batch is returned instead.

Sink behavior: Individual send() calls are retried. Only the send is retried, not the surrounding transaction. If retries are exhausted, the transaction is aborted (existing behavior).

Metrics: volley_retry_attempts_total counter with component label (kafka_source or kafka_sink).

Sink Delivery Modes

The Kafka sink supports three delivery modes, from lowest overhead to strongest guarantees:

Fire-and-Forget (acks=0)

No acknowledgment from the broker. Highest throughput, but messages can be lost:

#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new("localhost:9092", "output")
    .with_acks("0");
}

When acks is "0", idempotence is automatically disabled since the broker never confirms delivery.

Idempotent (default)

The default mode. The producer uses Kafka’s idempotent delivery (enable.idempotence=true) to guarantee no duplicates from producer retries, without the overhead of transactions:

#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new("localhost:9092", "output");
}

This is the right choice for most pipelines. It guarantees at-least-once delivery with no duplicate writes from the producer side, and has no per-batch transaction commit overhead.

Transactional (exactly-once)

Opt-in via new_transactional(). Wraps writes in Kafka transactions for exactly-once semantics:

#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new_transactional("localhost:9092", "output", "my-txn-id");
}

See Exactly-Once Semantics below for details.

Producer Tuning

Fine-tune the Kafka producer for throughput vs. latency:

#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new("localhost:9092", "output")
    .with_linger_ms(20)                // batch for 20ms before sending (default: 5)
    .with_compression("lz4")           // compress batches (default: "none")
    .with_acks("1");                   // leader-only ack (default: "all")
}
MethodDefaultEffect
with_acks(acks)"all""all" = full ISR ack, "1" = leader only, "0" = fire-and-forget.
with_linger_ms(ms)5How long rdkafka waits to batch messages before sending. Higher values increase batching and throughput at the cost of latency.
with_compression(codec)"none"Compression codec: "none", "gzip", "snappy", "lz4", "zstd". LZ4 and Zstd offer the best throughput-to-ratio tradeoff.
with_property(key, value)Escape hatch for any rdkafka producer setting not covered above.

Tip: For high-throughput sinks, with_linger_ms(20) + with_compression("lz4") is a good starting point. This lets rdkafka batch more messages per broker request and compress them in a single pass.

Exactly-Once Semantics

The Kafka sink supports exactly-once delivery when configured with new_transactional():

#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new_transactional("localhost:9092", "output", "my-txn-id");
}

How it works:

  1. Records are buffered during processing
  2. On checkpoint barrier, the sink commits the Kafka transaction
  3. On failure recovery, uncommitted transactions are aborted
  4. Processing resumes from the last checkpoint, replaying records

The consumer group offset is committed as part of the transaction, ensuring source and sink are in sync.

Note: Transactional mode adds a commit round-trip per checkpoint. For pipelines where at-least-once is acceptable, the default idempotent mode (KafkaSinkConfig::new()) avoids this overhead entirely.

Trace Context Propagation

The Kafka source and sink automatically propagate W3C Trace Context (traceparent header) for distributed tracing:

  • Source: If a consumed message has a traceparent header, the record inherits that trace context. The runtime’s sampler respects upstream sampling decisions (parent-based sampling).
  • Sink: If a record being written has trace context, a traceparent header is injected into the produced Kafka message.

This works automatically when per-record tracing is enabled via with_tracing(). No additional configuration is needed for Kafka trace propagation.

Consumer Lag Monitoring

The volley.kafka.consumer_lag metric tracks consumer group lag. Use it for KEDA autoscaling:

sum(volley_kafka_consumer_lag) by (job_name)

See Capacity Planning for KEDA trigger configuration.

Delta Lake

Write your stream to a Delta Lake table — queryable by Spark, DataFusion, Databricks, Trino, and anything else that speaks Delta. Pick this sink when downstream consumers want ACID tables with time travel and schema evolution rather than raw files. Volley commits Parquet files on every checkpoint and tags each commit with an epoch so replays after a crash never duplicate rows.

Key Types

TypeDescription
DeltaSinkSink that writes Parquet data to a Delta table
DeltaSinkConfigConfiguration (table URI, partitioning, buffer size, file size)
PartitionSpecSpecification for Hive-style partitioned writes

Usage

#![allow(unused)]
fn main() {
use volley_connector_delta::{DeltaSink, DeltaSinkConfig};

let config = DeltaSinkConfig::new("s3://my-bucket/output-table");
let sink = DeltaSink::new(config).await?;

env.from_source(source)
    .filter_expr(col("status").eq(lit("active")))
    .to_sink(sink)
    .execute("delta-writer")
    .await?;
}

Configuration Reference

#![allow(unused)]
fn main() {
let config = DeltaSinkConfig::new("s3://my-bucket/output-table")
    .with_pipeline_id("my-pipeline")        // idempotent commit tag (default: "volley-default")
    .with_max_records(50_000)               // records buffered before flush (default: 10,000)
    .with_target_file_size(256 * 1024 * 1024) // target Parquet file size (default: 128 MB)
    .with_partition_spec(partition_spec);    // Hive-style partitioning (default: none)
}
MethodDefaultDescription
with_pipeline_id()"volley-default"Identifies the pipeline in commit metadata for idempotent writes. Use a unique ID per pipeline writing to the same table
with_max_records()10,000Number of records buffered in memory before writing a Parquet file. Higher values produce fewer, larger files
with_target_file_size()128 MBTarget size for individual Parquet files. The sink flushes when buffered data approaches this size
with_partition_spec()NoneEnables Hive-style partitioned writes — see Partitioned Writes

Partitioned Writes

DeltaSink supports Hive-style partitioned writes for efficient query pruning:

#![allow(unused)]
fn main() {
use volley_connector_delta::PartitionSpec;

let config = DeltaSinkConfig::new("s3://my-bucket/events")
    .with_partition_spec(PartitionSpec::columns(vec![
        "date".to_string(),
        "region".to_string(),
    ]));
}

This produces a directory layout like:

s3://my-bucket/events/
  date=2026-03-31/region=us-east-1/part-0001.parquet
  date=2026-03-31/region=eu-west-1/part-0002.parquet

Partition column guidelines:

  • Use low-cardinality columns (date, region, status) to avoid excessive small files
  • Partition columns must exist in the record schema
  • Order matters: put the most selective column first for query engines

Exactly-Once Semantics

Delta commits include an epoch tag in the commit metadata. On recovery:

  1. The sink reads the Delta log to find the latest committed epoch for this pipeline_id
  2. If the current epoch was already committed, the write is skipped
  3. Replayed records after a failure never produce duplicate data

The pipeline_id scopes idempotency — multiple pipelines can write to the same table without conflicts as long as each uses a unique ID.

Storage Backends

Delta tables can be stored on any Delta-compatible storage:

StorageURI FormatCredentials
AWS S3s3://bucket/pathDefault credential chain (env, ~/.aws/credentials, instance profile)
Azure Blobaz://container/pathDefaultAzureCredential chain
Local filesystem/path/to/tableFile permissions

Performance Tuning

ScenarioRecommendation
High throughput (>100K rec/s)Increase with_max_records(50_000) to reduce commit frequency
Large records (>1 KB each)Decrease with_max_records() or increase with_target_file_size()
Many partitionsMonitor file count; consider periodic compaction
Low latency requirementsDecrease with_max_records() for more frequent flushes

Table Compaction

With many partitions and frequent checkpoints, Delta tables accumulate small files that degrade read performance. Run compaction periodically:

-- In a query engine (Spark, DataFusion, etc.)
OPTIMIZE 's3://my-bucket/events';

Automated compaction within the Volley sink is planned for a future release.

Troubleshooting

“Table not found” or empty table

  • Delta tables are created automatically on first write if they don’t exist
  • Verify the storage URI is correct and credentials have write access

Duplicate data after recovery

  • Ensure each pipeline uses a unique pipeline_id
  • If two pipelines share the same ID, epoch deduplication may skip valid writes

Small files accumulating

  • Increase with_max_records() to buffer more records per file
  • Run OPTIMIZE periodically from a query engine
  • Consider adjusting checkpoint interval to reduce commit frequency

Slow writes to S3

  • S3 has higher latency per PUT than local disk; increase with_target_file_size() to reduce PUT count
  • Ensure the application runs in the same AWS region as the S3 bucket

Apache Iceberg

Write your stream to an Apache Iceberg table registered in a REST catalog (Polaris, Tabular, Gravitino). Pick this sink when your organization runs on Iceberg — snapshot isolation, hidden partitioning, and engine-agnostic reads from Trino, Spark, DuckDB, and more. Volley buffers records into Parquet data files and commits a new Iceberg snapshot on every checkpoint; the epoch is written into the snapshot metadata so a replay after a crash skips already-committed epochs.

Key Types

TypeDescription
IcebergSinkSink that writes Parquet data files to an Iceberg table
IcebergSinkConfigConfiguration (catalog URI, warehouse, namespace, table, storage)
IcebergStorageStorage backend enum (S3, Azure, GCS)

Usage

#![allow(unused)]
fn main() {
use volley_connector_iceberg::{IcebergSink, IcebergSinkConfig, IcebergStorage};

let config = IcebergSinkConfig::new(
    "http://polaris:8181",           // REST catalog URI
    "my-warehouse",                  // catalog warehouse
    vec!["db".into()],               // namespace
    "events",                        // table name
    IcebergStorage::S3 {
        region: "us-east-1".into(),
        bucket: "my-bucket".into(),
        endpoint: None,
    },
);

let sink = IcebergSink::new(config).await?;

env.from_source(source)
    .filter_expr(col("status").eq(lit("active")))
    .to_sink(sink)
    .execute("iceberg-writer")
    .await?;
}

Configuration Reference

Required Parameters

ParameterDescription
catalog_uriREST catalog endpoint (e.g., http://polaris:8181)
catalog_warehouseWarehouse identifier in the catalog
namespaceIceberg namespace as a list of strings (e.g., ["db", "schema"])
table_nameTarget table name
storageStorage backend — see Storage Backends

Optional Parameters

#![allow(unused)]
fn main() {
let config = IcebergSinkConfig::new(catalog_uri, warehouse, namespace, table, storage)
    .with_credential("client-id:client-secret")  // OAuth2 credential for Polaris
    .with_max_records(10_000)                     // buffer size before flush (default: 10,000)
    .with_flush_interval(Duration::from_secs(30)); // time-based flush trigger
}
MethodDefaultDescription
with_credential()NoneOAuth2 credential for authenticated catalogs (Polaris, Tabular)
with_max_records()10,000Number of records buffered before writing a Parquet data file
with_flush_interval()NoneTime-based flush trigger; writes a file even if max_records hasn’t been reached

Storage Backends

S3

#![allow(unused)]
fn main() {
IcebergStorage::S3 {
    region: "us-east-1".into(),
    bucket: "my-iceberg-data".into(),
    endpoint: None,  // use None for AWS, Some("http://localhost:9000") for MinIO
}
}

AWS credentials are resolved via the default credential chain (environment variables, ~/.aws/credentials, instance profile).

For local development with MinIO:

#![allow(unused)]
fn main() {
IcebergStorage::S3 {
    region: "us-east-1".into(),
    bucket: "test-bucket".into(),
    endpoint: Some("http://localhost:9000".into()),
}
}

Azure Blob Storage

#![allow(unused)]
fn main() {
IcebergStorage::Azure {
    storage_account: "mystorageaccount".into(),
    container: "iceberg-data".into(),
    access_key: None,  // uses DefaultAzureCredential when None
}
}

When access_key is None, authentication uses Azure’s DefaultAzureCredential chain (environment variables, managed identity, Azure CLI).

Google Cloud Storage

#![allow(unused)]
fn main() {
IcebergStorage::Gcs {
    project_id: "my-gcp-project".into(),
    bucket: "iceberg-data".into(),
}
}

GCS credentials are resolved via Application Default Credentials (GOOGLE_APPLICATION_CREDENTIALS or metadata server).

Exactly-Once Semantics

The Iceberg sink achieves exactly-once via snapshot commits with epoch tracking:

  1. Records are buffered in memory and written as Parquet data files
  2. On checkpoint barrier, a new Iceberg snapshot is committed via the REST catalog
  3. The checkpoint epoch is stored in the snapshot’s summary metadata
  4. On recovery, the sink reads the latest snapshot’s epoch and skips already-committed epochs

This means replayed records after a failure never produce duplicate data in the table.

REST Catalog Compatibility

Volley uses the Iceberg REST catalog spec for table management. Compatible catalogs:

CatalogNotes
Apache PolarisRecommended. Use with_credential() for OAuth2 auth
TabularHosted REST catalog. Use with_credential()
AWS Glue (via REST adapter)Requires a REST adapter in front of Glue
GravitinoREST-compatible catalog

Troubleshooting

“Connection refused” to catalog URI

  • Verify the catalog is running and reachable from your application
  • Check firewall rules and network policies (especially in Kubernetes)

“Unauthorized” or “Forbidden” from catalog

  • Ensure with_credential() is set with a valid OAuth2 client credential
  • For Polaris: verify the principal has TABLE_WRITE_DATA privilege on the target table

“Table not found”

  • Iceberg tables must be created in the catalog before the sink writes to them
  • Verify namespace and table_name match the catalog’s table identifier exactly

Small files accumulating

  • Increase with_max_records() to buffer more records per file
  • Set with_flush_interval() to a longer duration
  • Consider running Iceberg’s rewrite_data_files procedure periodically

Lance

Write your stream into a Lance dataset — the columnar format behind LanceDB, built in Rust and optimised for ML, vector search, and random-access reads. Pick this sink when downstream consumers want fast point lookups or vector indexes rather than scan-style analytics. Volley stages each batch of records, tags it with the checkpoint epoch, and commits atomically through Lance’s transaction API so replays after a crash never duplicate data.

Key Types

TypeDescription
LanceSinkSink that writes Arrow records to a Lance dataset
LanceSinkConfigBuilder-style configuration (URI, pipeline id, file sizing, storage, recovery limit)
LanceStorageEnum configuring the object store backend (Local, S3, Gcs, Azure)
LanceStreamExtUmbrella extension trait adding DataStream::to_lance(config)

Quick Start

Attach the sink to a pipeline via the LanceStreamExt trait re-exported from volley_connectors:

#![allow(unused)]
fn main() {
use volley_connectors::blob_store::lance::{LanceStreamExt, LanceSinkConfig, LanceStorage};
use volley_core::prelude::*;

let config = LanceSinkConfig::new("s3://my-bucket/datasets/events")
    .with_pipeline_id("events-v1")
    .with_storage(LanceStorage::S3 {
        region: "us-east-1".into(),
        bucket: "my-bucket".into(),
        endpoint: None,
        access_key_id: None,
        secret_access_key: None,
    });

StreamExecutionEnvironment::new()
    .from_kafka(kafka_cfg).await?
    .filter_expr(col("status").eq(lit("active")))
    .to_lance(config).await?
    .execute("kafka-to-lance")
    .await?;
}

Enable the umbrella feature in your Cargo.toml:

volley-connectors = { version = "0.8", features = ["lance-sink"] }

Or depend on the standalone crate directly:

volley-connector-lance = "0.8"

Configuration Reference

#![allow(unused)]
fn main() {
let config = LanceSinkConfig::new("s3://bucket/path")
    .with_pipeline_id("my-pipeline")             // default: "volley-default"
    .with_storage(LanceStorage::Local)           // default: Local
    .with_max_rows_per_file(1_048_576)            // default: 1_048_576
    .with_max_rows_per_group(1024)                // default: 1024
    .with_max_bytes_per_file(90 * 1024 * 1024 * 1024) // default: 90 GB
    .with_max_records_buffer(100_000)             // default: 100_000
    .with_recovery_history_limit(256);            // default: 256
}
MethodDefaultDescription
with_pipeline_id()"volley-default"Stable identifier embedded in commit metadata. Used for idempotent recovery — use a unique value per pipeline writing to the same dataset
with_storage()LanceStorage::LocalObject store backend; see Storage Backends
with_max_rows_per_file()1,048,576Maximum rows per Lance data file. Larger values produce fewer, bigger files
with_max_rows_per_group()1,024Maximum rows per row group (columnar encoding unit)
with_max_bytes_per_file()90 GBSoft per-file size ceiling. Lance has a hard 100 GB limit on S3
with_max_records_buffer()100,000Soft in-memory row budget between checkpoints. Exceeding it logs a warning but does not force an untagged flush — all commits wait for the next checkpoint barrier so the epoch tag is always present
with_recovery_history_limit()256Maximum number of historical Lance transactions scanned on startup to recover last_committed_epoch. Increase if you retain a long uncompacted history

Exactly-Once Semantics

LanceSink uses a two-phase commit built on Lance’s native transaction API:

  1. Stage. Buffered batches are materialized through InsertBuilder::execute_uncommitted_stream, producing an uncommitted Transaction that represents the staged data files without making them visible.
  2. Tag. The sink sets the transaction’s transaction_properties field to {"volley.pipeline_id": id, "volley.epoch": N} before committing. These key-value pairs are persisted to the Lance transaction file on disk.
  3. Commit. CommitBuilder::execute(transaction) atomically publishes the new dataset version.

On startup, LanceSink::new calls Dataset::get_transactions to fetch up to recovery_history_limit recent transactions, scans each one for a matching volley.pipeline_id, and takes the maximum volley.epoch as last_committed_epoch. Any checkpoint barrier at or below that epoch is dropped as a duplicate on the next restart.

Note: Lance 4.0’s CommitBuilder::with_transaction_properties stores the map on the builder but does not forward it to the staged Transaction during execute. To work around this, the sink writes the properties directly to Transaction.transaction_properties — which is a public field on the struct — before calling execute. This is a tracked upstream issue; when Lance fixes it, the workaround can be replaced with the documented builder method.

Storage Backends

Lance delegates I/O to object_store under the hood, and LanceStorage maps cleanly onto its configuration keys.

VariantURICredentials
LanceStorage::Localfilesystem path or file://…none
LanceStorage::S3 { region, bucket, endpoint, access_key_id, secret_access_key }s3://bucket/pathinline fields, or AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY env vars, or IAM instance role
LanceStorage::Gcs { project_id, bucket, service_account_json }gs://bucket/pathinline JSON, or GOOGLE_APPLICATION_CREDENTIALS
LanceStorage::Azure { storage_account, container, access_key }az://container/pathinline key, or AZURE_STORAGE_ACCOUNT_KEY / DefaultAzureCredential

Setting endpoint = Some("http://…") on S3 also emits allow_http=true so MinIO / localstack work without extra plumbing.

Limitations

  • No Hive-style partitioning in v1. Lance uses fragments internally rather than filesystem-visible partitions, and the partition story is deferred.
  • No schema evolution within a single pipeline_id — the first batch’s schema is assumed to hold for every subsequent batch.
  • Single writer per (dataset_uri, pipeline_id) — concurrent writers conflict on commit and return an error rather than silently serializing.
  • Recovery horizon is bounded by recovery_history_limit. If a pipeline accumulates more uncompacted versions than the limit without a successful recovery, older commits may be invisible to the scan and epochs can be re-executed. Compact regularly or raise the limit.

Troubleshooting

Recovery does not pick up the last committed epoch — Ensure the new sink uses the same pipeline_id as the writer, and that recovery_history_limit covers the relevant version range. A warn! log is emitted if a transaction’s volley.epoch property is malformed.

Build fails with “Could not find protoc — Lance’s build scripts compile protobuf definitions at build time. Install the protoc binary (apt-get install protobuf-compiler on Debian/Ubuntu, brew install protobuf on macOS) before building.

Example

The standalone memory_to_lance example was retired in the 1.0 example cleanup (connector-specific examples moved out of volley-examples). The shape is identical to any other sink:

use volley_connector_lance::LanceStreamExt;
use volley_connectors::memory::MemorySource;
use volley_core::prelude::*;

StreamExecutionEnvironment::new()
    .from_source(MemorySource::new(records))
    .to_lance(config)           // `LanceStreamExt` installs the sink
    .execute("trades-to-lance")
    .await?;

to_lance(config) commits on every checkpoint barrier and on the final shutdown barrier — bounded sources are covered even if a periodic checkpoint hasn’t fired. For partitioning, vector-index creation, and compaction, see the tracked roadmap in TODOS.md.

AWS S3

Ingest files from an S3 bucket as a streaming source. Drop new objects in the bucket, and S3’s event notifications push their keys to an SQS queue; the volley-connector-aws-s3 crate polls that queue, reads each object, decodes it (Parquet, JSON, CSV, or Avro), and emits Arrow record batches. Use this when your upstream system writes files to S3 rather than publishing to Kafka.

Architecture

SQS Queue                    S3 Bucket
(object notifications)       (data files)
      |                          |
      v                          v
SqsNotificationSource        S3BlobReader
      |                          |
      +----------+    +----------+
                 |    |
                 v    v
            BlobSource
                 |
                 v
            Decoder (Parquet / JSON / CSV / Avro)
                 |
                 v
            RecordBatch --> Pipeline
  1. S3 sends object-created notifications to an SQS queue
  2. SqsNotificationSource polls the queue for new notifications
  3. S3BlobReader reads the object data from S3
  4. The shared decoder layer converts the data to Arrow RecordBatch

Key Types

TypeDescription
S3SourceConfigured S3 source (combines SQS + S3 reader)
S3BlobReaderReads object data from S3
SqsNotificationSourcePolls SQS for S3 event notifications

Supported Formats

FormatFile Extensions
Parquet.parquet
JSON.json, .jsonl
CSV.csv
Avro.avro

Format detection is automatic based on file extension, or can be configured explicitly.

Usage

#![allow(unused)]
fn main() {
use volley_connector_aws_s3::S3Source;
use volley_core::prelude::*;

let source = S3Source::builder()
    .sqs_queue_url("https://sqs.us-east-1.amazonaws.com/123456789/my-queue")
    .build()
    .await?;

env.from_source(source)
    .filter_expr(col("status").eq(lit("active")))
    .to_sink(sink)
    .execute("s3-ingest")
    .await?;
}

AWS credentials are resolved via the standard AWS credential chain (environment variables, IAM role, credential file).

Blob Store Connectors

The volley-connector-blob-store crate provides a cloud-agnostic abstraction for reading from object storage. Azure Blob Storage and Google Cloud Storage are built on this shared foundation.

Shared Architecture

All cloud blob store connectors follow the same pattern:

Cloud Queue/Topic          Cloud Storage
(SQS / Azure Queue)       (S3 / Azure Blob)
      |                        |
      v                        v
NotificationSource         BlobReader
      |                        |
      +--------+   +-----------+
               |   |
               v   v
          BlobSource
               |
               v
          Decoder (Parquet / JSON / CSV / Avro)
               |
               v
          RecordBatch --> Pipeline

To add a new cloud backend, implement NotificationSource and BlobReader. The decoder layer is shared.

Azure Blob Storage

From volley-connector-azure-blob:

TypeDescription
AzureBlobSourceBuilderBuilder with automatic credential discovery
AzureBlobSourceConfigured Azure Blob source
AzureBlobReaderReads blobs from Azure Blob Storage
AzureQueueNotificationSourcePolls Azure Queue Storage for blob notifications

Enable with feature flag blob-store-azure on volley-connectors.

Authentication

By default, Azure connectors use DefaultAzureCredential which discovers credentials automatically. The credential chain tries, in order:

  1. Environment variablesAZURE_TENANT_ID, AZURE_CLIENT_ID, AZURE_CLIENT_SECRET
  2. Workload Identity — Kubernetes pods with Azure Workload Identity federation
  3. Managed Identity — Azure VM, App Service, or Container Instance identity
  4. Azure CLI — Cached az login credentials (local development)

To override with an explicit storage account key, use with_access_key():

#![allow(unused)]
fn main() {
use volley_connector_azure_blob::AzureBlobSourceBuilder;

// DefaultAzureCredential (recommended for production)
let source = AzureBlobSourceBuilder::new("myaccount", "my-queue", "my-container")
    .build(config).await?;

// Explicit access key (override)
let source = AzureBlobSourceBuilder::new("myaccount", "my-queue", "my-container")
    .with_access_key("MYACCESSKEY...")
    .build(config).await?;
}

Google Cloud Storage

GCS is supported as a sink only. The previous Pub/Sub-driven source was removed in v1.1.0 because the google-cloud-pubsub streaming pull API was not stable enough for production use.

From volley-connector-gcp-gcs:

TypeDescription
GcsBlobWriterWrites objects to GCS (for use with BufferedBlobSink)

Enable with feature flag blob-store-gcs on volley-connectors. GCS credentials are discovered through Application Default Credentials (GOOGLE_APPLICATION_CREDENTIALS, GKE Workload Identity, GCE metadata, or gcloud auth application-default login).

Blob Store Sinks

The volley-connectors crate also provides buffered blob store sinks for writing output files to S3, Azure Blob Storage, or GCS. These sinks batch records and flush them as files based on record count or time interval.

Encoders

EncoderOutput FormatFile Extension
ParquetEncoderApache Parquet.parquet
JsonLinesEncoderJSON Lines (newline-delimited JSON).jsonl
CsvEncoderCSV with optional header.csv

Usage

#![allow(unused)]
fn main() {
use volley_connectors::blob_store::{BufferedBlobSink, BufferedBlobSinkConfig};
use volley_connectors::blob_store::encoders::ParquetEncoder;
use volley_connectors::blob_store::aws::S3BlobWriter;

let encoder = Box::new(ParquetEncoder::new());
let config = BufferedBlobSinkConfig::new("my-bucket", "output/")
    .with_max_records(100_000)
    .with_flush_interval(Duration::from_secs(60));
let sink = BufferedBlobSink::new(
    Box::new(S3BlobWriter::new(s3_client)),
    encoder,
    config,
);
}

For Azure Blob Storage, use AzureBlobWriter instead of S3BlobWriter:

#![allow(unused)]
fn main() {
use volley_connectors::blob_store::azure::AzureBlobWriter;

let sink = BufferedBlobSink::new(
    Box::new(AzureBlobWriter::new(blob_service_client, "my-container")),
    encoder,
    config,
);
}

For Google Cloud Storage, use GcsBlobWriter:

#![allow(unused)]
fn main() {
use volley_connectors::blob_store::gcs::GcsBlobWriter;

let sink = BufferedBlobSink::new(
    Box::new(GcsBlobWriter::new(gcs_client)),
    encoder,
    config,
);
}

Enable with feature flags blob-store-aws, blob-store-azure, or blob-store-gcs on volley-connectors.

Supported Source Formats

All blob store sources share the same decoder layer:

FormatFile Extensions
Parquet.parquet
JSON.json, .jsonl
CSV.csv
Avro.avro

Feature Flags

[dependencies]
volley-connectors = {
    git = "https://github.com/volley-streams/volley",
    features = ["blob-store-azure", "blob-store-gcs"]
}

See also: AWS S3 for the S3-specific connector.

ADBC (Arrow Database Connectivity)

Volley provides ADBC-based database connectivity via two crates:

  • volley-connector-adbc – A sink that writes Arrow RecordBatches to any database with an ADBC driver (PostgreSQL, BigQuery, Snowflake, Clickhouse, Databricks, MySQL, Redshift, Exasol, and more).
  • volley-enrichment-adbc – A real-time enrichment operator that queries external databases to enrich streaming records with lookup data, backed by an LRU+TTL cache.

ADBC uses a C-API driver model (dlopen/dlsym), so adding support for a new database only requires installing the corresponding ADBC driver shared library – no connector-specific code needed.

Setup

Add crate dependencies

[dependencies]
# Sink (write to a database)
volley-connector-adbc = { git = "https://github.com/volley-streams/volley" }

# Enrichment operator (query a database to enrich records)
volley-enrichment-adbc = { git = "https://github.com/volley-streams/volley" }

Install the ADBC driver shared library

ADBC drivers are platform-specific shared libraries. Install the driver for your target database:

PostgreSQL:

# macOS (Homebrew)
brew install apache-arrow-adbc-driver-postgresql

# Ubuntu/Debian
apt-get install libadbc-driver-postgresql

# Or download from https://github.com/apache/arrow-adbc/releases

BigQuery:

# Download the adbc_driver_bigquery shared library from
# https://github.com/apache/arrow-adbc/releases

The driver name (e.g., "adbc_driver_postgresql") is resolved via platform library search paths (LD_LIBRARY_PATH on Linux, DYLD_LIBRARY_PATH on macOS). Alternatively, pass a full file path to the shared library.

ADBC Sink

The sink writes Arrow RecordBatches to a database table. It supports insert (bulk ingest) and upsert (ON CONFLICT) modes with write-behind buffering.

Basic insert

#![allow(unused)]
fn main() {
use volley_connector_adbc::{AdbcConfig, AdbcSinkConfig, AdbcStreamExt};

let adbc = AdbcConfig::new("adbc_driver_postgresql", "postgresql://localhost/mydb")
    .with_pool_size(4);

let sink_config = AdbcSinkConfig::new(adbc, "events")
    .with_batch_size(5000)
    .with_create_table(true);

env.from_kafka(source_config).await?
    .filter_expr(col("status").eq(lit("active")))
    .to_adbc(sink_config).await?
    .execute("kafka-to-postgres").await?;
}

Upsert mode

Set with_upsert_key() to enable upsert semantics. The sink auto-generates INSERT ... ON CONFLICT (key) DO UPDATE SET SQL:

#![allow(unused)]
fn main() {
let sink_config = AdbcSinkConfig::new(adbc, "events")
    .with_upsert_key("event_id")
    .with_batch_size(5000);
}

For composite (multi-column) keys, use with_upsert_keys():

#![allow(unused)]
fn main() {
let sink_config = AdbcSinkConfig::new(adbc, "events")
    .with_upsert_keys(vec!["tenant_id", "event_id"])
    .with_batch_size(5000);
}

For databases with non-standard upsert syntax, provide custom SQL:

#![allow(unused)]
fn main() {
let sink_config = AdbcSinkConfig::new(adbc, "events")
    .with_upsert_query(
        "INSERT INTO events (id, name, value) VALUES ($1, $2, $3) \
         ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, value = EXCLUDED.value"
    );
}

Sink configuration reference

MethodDefaultDescription
AdbcSinkConfig::new(adbc, table)Required: ADBC config and target table name
.with_upsert_key(column)NoneSingle column for upsert (ON CONFLICT) semantics. None = insert-only
.with_upsert_keys(vec)NoneMultiple columns for composite upsert (ON CONFLICT). Overrides with_upsert_key
.with_upsert_query(sql)NoneCustom SQL for upsert. Overrides auto-generated SQL
.with_batch_size(n)1000Number of buffered rows before auto-flush
.with_create_table(bool)falseAuto-create the table from the first batch’s Arrow schema

Delivery guarantees

  • At-least-once (default)on_checkpoint() flushes buffered batches. If the pipeline crashes after flushing but before the checkpoint completes, some records may be written twice on recovery.
  • Effectively exactly-once via upsert – When upsert_keys is set (via with_upsert_key() or with_upsert_keys()), duplicate writes are idempotent. Choose a natural key that makes re-delivery safe.

ADBC Enrichment Operator

The enrichment operator queries an external database to enrich streaming records. It extracts lookup keys from each batch, issues a single micro-batch query, and appends enrichment columns to the output. An LRU+TTL cache avoids redundant queries.

Key-column lookup

The simplest mode: specify a stream column and a remote table. The operator generates SELECT [columns] FROM table WHERE target_key IN (k1, k2, ...).

#![allow(unused)]
fn main() {
use volley_enrichment_adbc::{AdbcEnrichConfig, AdbcEnrichStreamExt, EnrichMissStrategy};
use volley_connector_adbc::AdbcConfig;
use std::time::Duration;

let pg = AdbcConfig::new("adbc_driver_postgresql", "postgresql://localhost/mydb");

env.from_kafka(source_config).await?
    .enrich_with_adbc(
        AdbcEnrichConfig::new(pg, "customers", "customer_id")
            .with_select_columns(vec!["tier", "region", "lifetime_value"])
            .with_prefix("customer_")
            .with_cache_ttl(Duration::from_secs(300))
            .with_on_miss(EnrichMissStrategy::NullFill)
    )
    .filter_expr(col("customer_tier").eq(lit("premium")))
    .to_kafka(sink_config).await?
    .execute("enrich-orders").await?;
}

Parameterized query

For complex lookups (joins, filters, computed columns), provide a SQL template with IN $1:

#![allow(unused)]
fn main() {
let bq = AdbcConfig::new("adbc_driver_bigquery", "bigquery://project-id");

env.from_kafka(source_config).await?
    .enrich_with_adbc(
        AdbcEnrichConfig::new_with_query(
            bq,
            "SELECT category, risk_score FROM products WHERE sku IN $1 AND active = true",
            "product_sku"
        )
    )
    .to_adbc(AdbcSinkConfig::new(pg, "enriched_events")).await?
    .execute("enrich-and-store").await?;
}

Miss strategies

When a lookup key has no match in the database:

StrategyBehavior
NullFill (default)Add enrichment columns filled with nulls
DropRecordDrop the record entirely
PassThroughKeep the original record without enrichment columns

Enrichment configuration reference

MethodDefaultDescription
AdbcEnrichConfig::new(adbc, table, key)Key-column lookup mode
AdbcEnrichConfig::new_with_query(adbc, sql, key)Parameterized query mode
.with_target_key(column)Same as lookup_keyRemote column to match against (if different from the stream column)
.with_select_columns(vec)All columnsWhich columns to fetch from the remote table
.with_prefix(prefix)NonePrefix for enriched columns (e.g., "customer_")
.with_cache_ttl(duration)5 minutesTime-to-live for cached entries
.with_cache_max_entries(n)10,000Maximum entries in the LRU cache
.with_on_miss(strategy)NullFillStrategy for unmatched records
.with_prefetch(bool)falseOverlap DB queries with batch processing for reduced latency

Supported key types

The lookup key column can be Utf8 (string), Int32, or Int64. Integer keys are interpolated into SQL without quoting and converted to strings internally for cache lookups.

Cache behavior

The enrichment operator uses an LRU cache with per-entry TTL:

  • Cache hits return the stored enrichment row without querying the database.
  • Cache misses (key not found) are cached as None to avoid repeated lookups for non-existent keys.
  • TTL expiry evicts stale entries so the operator picks up changes in the remote database.
  • On recovery, the cache starts cold and repopulates organically. Enrichment data is external and authoritative, so there is no state to recover.

Connection Configuration Reference

AdbcConfig is shared between the sink and enrichment operator:

MethodDefaultDescription
AdbcConfig::new(driver, uri)Required: driver name/path and connection URI
AdbcConfig::sqlite(path)Convenience: SQLite driver with database path
AdbcConfig::flight_sql(uri)Convenience: Flight SQL driver with gRPC endpoint
.with_option(key, value){}Add a driver-specific key-value option
.with_pool_size(n)4Connection pool size (round-robin)
.with_health_check(config)enabled, 30s, SELECT 1Connection health check configuration

Driver name resolution: If the driver name contains / or \, it is treated as a file path. Otherwise, it is resolved via platform library search paths (e.g., LD_LIBRARY_PATH, DYLD_LIBRARY_PATH).

Common driver names:

DatabaseDriver Name
PostgreSQLadbc_driver_postgresql
BigQueryadbc_driver_bigquery
Snowflakeadbc_driver_snowflake
SQLiteadbc_driver_sqlite
Flight SQLadbc_driver_flightsql

Observability

MetricTypeDescription
adbc_sink_rows_written_totalcounterRows written, by table
adbc_sink_flush_duration_secondshistogramFlush latency
adbc_connection_reconnects_totalcounterPool connections replaced by health check
adbc_enrich_cache_hit_totalcounterCache hits
adbc_enrich_cache_miss_totalcounterCache misses
adbc_enrich_cache_sizegaugeCurrent cache entry count
adbc_enrich_query_duration_secondshistogramMicro-batch query latency
adbc_enrich_query_errors_totalcounterFailed enrichment queries

Integration Testing

The ADBC integration tests require Docker (for PostgreSQL via testcontainers) and the adbc_driver_postgresql shared library.

Set the ADBC_POSTGRESQL_DRIVER environment variable if the driver is not on the default library search path:

# Point to the driver if not on LD_LIBRARY_PATH
export ADBC_POSTGRESQL_DRIVER=/usr/local/lib/libadbc_driver_postgresql.so

# Run the sink integration tests
cargo test -p volley-connector-adbc --test sink_integration -- --ignored

# Run the enrichment integration tests
cargo test -p volley-enrichment-adbc --test enrich_integration -- --ignored

Tests use volley-test-harness::containers::PostgresContainer to spin up a PostgreSQL instance automatically.

Troubleshooting

“Failed to load ADBC driver”

  • Verify the driver shared library is installed and on the library search path.
  • On macOS: check DYLD_LIBRARY_PATH. On Linux: check LD_LIBRARY_PATH.
  • Try passing the full file path instead of just the driver name.

“Failed to create ADBC connection”

  • Verify the connection URI is correct and the database is reachable.
  • Check that driver-specific options (credentials, project ID, etc.) are set via with_option().

Slow enrichment queries

  • Increase cache_ttl and cache_max_entries to reduce query frequency.
  • Add an index on the lookup column in the remote database.
  • Use with_select_columns() to fetch only the columns you need.

Duplicate rows after recovery

  • Use with_upsert_key() to make writes idempotent on a natural key.
  • Without upsert, the sink provides at-least-once delivery.

ScyllaDB / Cassandra

Volley provides ScyllaDB (and Cassandra-compatible) integration via two crates:

  • volley-connector-scylla — A sink that writes Arrow RecordBatches to a ScyllaDB table using micro-batch CQL writes, with full 2PC support and first-class vector column handling.
  • volley-enrichment-scylla — A real-time enrichment operator that queries ScyllaDB to enrich streaming records, with LRU+TTL caching, compound key support, and ANN vector similarity search.

Both crates use the scylla Rust driver, which provides native token-aware routing and efficient CQL batch execution.

Targets: ScyllaDB 5.x / 6.x and Apache Cassandra 4.x / 5.x.

Setup

Add crate dependencies

[dependencies]
# Sink (write to ScyllaDB)
volley-connector-scylla = { git = "https://github.com/volley-streams/volley" }

# Enrichment operator (query ScyllaDB to enrich records)
volley-enrichment-scylla = { git = "https://github.com/volley-streams/volley" }

Prerequisites

A running ScyllaDB (or Cassandra) cluster is required. For local development, start a single node with Docker:

docker run --rm -d -p 9042:9042 --name scylla scylladb/scylla:6.1

ScyllaDB Sink

The sink buffers incoming rows and flushes them as CQL batch statements. It supports append mode (INSERT), upsert mode (INSERT IF NOT EXISTS), and checkpoint-aligned 2PC.

Basic append

#![allow(unused)]
fn main() {
use volley_connector_scylla::{ScyllaConfig, ScyllaSinkConfig, ScyllaStreamExt};

let scylla = ScyllaConfig::new(vec!["127.0.0.1:9042"])
    .with_keyspace("my_keyspace");

let sink_config = ScyllaSinkConfig::new(scylla, "events")
    .with_batch_size(500)
    .with_create_table(true);

env.from_kafka(source_config).await?
    .filter_expr(col("status").eq(lit("active")))
    .to_scylla(sink_config).await?
    .execute("kafka-to-scylla").await?;
}

Upsert mode

Use ScyllaSinkConfig::new_upsert() for idempotent writes. Duplicate records with the same partition key are silently discarded:

#![allow(unused)]
fn main() {
let sink_config = ScyllaSinkConfig::new_upsert(scylla, "events", vec!["event_id"])
    .with_batch_size(500);
}

For tables with a composite primary key, pass all key columns:

#![allow(unused)]
fn main() {
let sink_config = ScyllaSinkConfig::new_upsert(scylla, "events", vec!["tenant_id", "event_id"])
    .with_batch_size(500);
}

Auto-create table

When with_create_table(true) is set, the sink issues a CREATE TABLE IF NOT EXISTS DDL statement on startup. The table schema is derived from the Arrow schema of the first batch. Arrow-to-CQL type mappings:

Arrow typeCQL type
Int32int
Int64bigint
Float32float
Float64double
Utf8 / LargeUtf8text
Booleanboolean
Timestamp(ms, ...)timestamp
FixedSizeList<Float16/32/64, N>vector<float, N>

Vector columns

ScyllaDB’s native vector<float> type is fully supported. Arrow FixedSizeList columns with Float16, Float32, or Float64 children are automatically serialised as vector values. This is the recommended way to store embedding vectors for ANN search:

#![allow(unused)]
fn main() {
// Arrow schema with a vector embedding column
let schema = Arc::new(Schema::new(vec![
    Field::new("id", DataType::Utf8, false),
    Field::new(
        "embedding",
        DataType::FixedSizeList(
            Arc::new(Field::new("item", DataType::Float32, false)),
            384,
        ),
        false,
    ),
]));

let sink_config = ScyllaSinkConfig::new(scylla, "embeddings")
    .with_create_table(true)
    .with_vector_index("embedding"); // CREATE CUSTOM INDEX for ANN
}

Sink configuration reference

MethodDefaultDescription
ScyllaSinkConfig::new(config, table)Append mode. Required: ScyllaDB config and target table name
ScyllaSinkConfig::new_upsert(config, table, keys)Upsert mode. keys lists partition + clustering key columns
.with_batch_size(n)500Number of buffered rows before auto-flush
.with_create_table(bool)falseAuto-create the table from the first batch’s Arrow schema
.with_vector_index(column)NoneCreate a ScyllaDB custom ANN index on this vector column
.with_ttl(seconds)NonePer-row TTL in seconds (USING TTL n on every INSERT)

Delivery guarantees

  • Append mode (default) — at-least-once. On checkpoint, buffered rows are flushed. If the pipeline crashes after the flush but before the checkpoint completes, the rows may be written again on recovery.
  • Upsert mode — effectively exactly-once for tables with a natural primary key. Duplicate INSERTs on recovery are idempotent because ScyllaDB discards rows whose partition key already exists.
  • 2PC (transactional pipelines)prepare_commit stages the CQL batch; finalize_commit executes it; abort_commit discards it. Sinks in a multi-sink DAG all commit atomically per epoch.

ScyllaDB Enrichment Operator

The enrichment operator queries ScyllaDB to join lookup data into the stream. For each incoming batch it extracts unique keys, issues per-key CQL SELECT queries (batched via async concurrency), and left-joins the results back into the batch. An LRU+TTL cache avoids redundant queries.

Simple key lookup

#![allow(unused)]
fn main() {
use volley_enrichment_scylla::{ScyllaEnrichConfig, ScyllaEnrichStreamExt, EnrichMissStrategy};
use volley_connector_scylla::ScyllaConfig;
use std::time::Duration;

let scylla = ScyllaConfig::new(vec!["127.0.0.1:9042"])
    .with_keyspace("my_keyspace");

env.from_kafka(source_config).await?
    .enrich_with_scylla(
        ScyllaEnrichConfig::new(scylla, "customers", "customer_id")
            .with_select_columns(vec!["tier", "region", "lifetime_value"])
            .with_prefix("customer_")
            .with_cache_ttl(Duration::from_secs(300))
            .with_on_miss(EnrichMissStrategy::NullFill)
    )
    .filter_expr(col("customer_tier").eq(lit("premium")))
    .to_kafka(sink_config).await?
    .execute("enrich-orders").await?;
}

This issues SELECT tier, region, lifetime_value FROM customers WHERE customer_id = ? for each unique key in the batch.

Compound key lookup

For tables with a composite primary key, specify all key columns and the corresponding stream columns:

#![allow(unused)]
fn main() {
ScyllaEnrichConfig::new_compound(
    scylla,
    "order_items",
    vec![
        ("order_id", "order_id"),    // (stream_column, table_column)
        ("item_sku", "sku"),
    ],
)
.with_select_columns(vec!["price", "weight_kg"])
.with_prefix("item_")
}

This issues SELECT price, weight_kg FROM order_items WHERE order_id = ? AND sku = ?.

For vector similarity search, use ScyllaEnrichConfig::new_ann(). The query vector is read from a designated column in the stream batch:

#![allow(unused)]
fn main() {
ScyllaEnrichConfig::new_ann(
    scylla,
    "product_embeddings",   // table
    "embedding",             // vector column in the table
    "query_embedding",       // vector column in the stream batch
)
.with_select_columns(vec!["product_id", "title", "category"])
.with_ann_limit(10)          // LIMIT n in the ANN query
.with_prefix("nearest_")
}

This issues SELECT product_id, title, category FROM product_embeddings ORDER BY embedding ANN OF ? LIMIT 10 for each row in the batch, enabling real-time semantic search enrichment.

Miss strategies

When a lookup key has no match in ScyllaDB:

StrategyBehavior
NullFill (default)Add enrichment columns filled with nulls
DropRecordDrop the record entirely
PassThroughKeep the original record without enrichment columns

Enrichment configuration reference

MethodDefaultDescription
ScyllaEnrichConfig::new(config, table, key)Simple-key lookup mode
ScyllaEnrichConfig::new_compound(config, table, key_pairs)Compound-key lookup mode
ScyllaEnrichConfig::new_ann(config, table, table_vec_col, stream_vec_col)ANN vector search mode
.with_select_columns(vec)All columnsColumns to fetch from the remote table
.with_prefix(prefix)NonePrefix for enriched columns (e.g., "customer_")
.with_cache_ttl(duration)5 minutesTime-to-live for cached lookup entries
.with_cache_max_entries(n)10,000Maximum entries in the LRU cache
.with_on_miss(strategy)NullFillStrategy for unmatched records
.with_prefetch(bool)falseOverlap ScyllaDB queries with batch processing for reduced latency
.with_ann_limit(n)5Number of ANN results to return per query (ANN mode only)

Connection configuration reference

ScyllaConfig is shared between the sink and enrichment operator:

MethodDefaultDescription
ScyllaConfig::new(contact_points)Required: list of "host:port" seed nodes
.with_keyspace(keyspace)NoneDefault keyspace for all queries
.with_username(user)NoneCQL authentication username
.with_password(pass)NoneCQL authentication password
.with_connect_timeout(duration)5 sConnection establishment timeout
.with_request_timeout(duration)30 sPer-query timeout
.with_consistency(level)LocalQuorumCQL consistency level for writes

Full pipeline example

A Kafka-to-ScyllaDB pipeline that enriches incoming order events with product data before writing to a destination table:

use volley_core::prelude::*;
use volley_connector_kafka::{KafkaEnvExt, KafkaStreamExt};
use volley_connector_scylla::{ScyllaConfig, ScyllaSinkConfig, ScyllaStreamExt};
use volley_enrichment_scylla::{ScyllaEnrichConfig, ScyllaEnrichStreamExt};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    let scylla = ScyllaConfig::new(vec!["scylla-node1:9042", "scylla-node2:9042"])
        .with_keyspace("ecommerce")
        .with_username("volley")
        .with_password("secret");

    let source_config = KafkaSourceConfig::new(
        "broker:9092",
        "orders",
        "volley-order-processor",
        schema,
    );

    let sink_config = ScyllaSinkConfig::new_upsert(
        scylla.clone(),
        "enriched_orders",
        vec!["order_id"],
    )
    .with_batch_size(1000)
    .with_create_table(true);

    StreamExecutionEnvironment::new()
        .with_checkpoints(CheckpointConfig::new(Duration::from_secs(30)))
        .from_kafka(source_config).await?
        .enrich_with_scylla(
            ScyllaEnrichConfig::new(scylla, "products", "product_id")
                .with_select_columns(vec!["name", "category", "unit_price"])
                .with_prefix("product_")
                .with_cache_ttl(Duration::from_secs(600))
        )
        .filter_expr(col("product_category").eq(lit("electronics")))
        .select_expr(vec![
            col("order_id"),
            col("customer_id"),
            col("product_name"),
            col("product_unit_price"),
            col("quantity"),
            (col("product_unit_price") * col("quantity")).alias("total"),
        ])
        .to_scylla(sink_config).await?
        .execute("orders-enrich-to-scylla")
        .await?;

    Ok(())
}

Observability

MetricTypeDescription
scylla_sink_rows_written_totalcounterRows written, labelled by table
scylla_sink_batch_duration_secondshistogramCQL batch execution latency
scylla_sink_retries_totalcounterBatch retries due to transient errors
scylla_enrich_cache_hit_totalcounterCache hits
scylla_enrich_cache_miss_totalcounterCache misses
scylla_enrich_cache_sizegaugeCurrent cache entry count
scylla_enrich_query_duration_secondshistogramPer-key query latency
scylla_enrich_query_errors_totalcounterFailed enrichment queries

Integration testing

The integration tests require Docker and a ScyllaDB image. They run via volley-test-harness which starts a scylladb/scylla container automatically via testcontainers:

# Run sink integration tests
cargo test -p volley-connector-scylla --test sink_integration -- --ignored

# Run enrichment integration tests
cargo test -p volley-enrichment-scylla --test enrich_integration -- --ignored

Troubleshooting

“Unable to connect to any node”

  • Verify the contact points include the correct host and port (default: 9042).
  • Check network reachability from the pipeline process to the ScyllaDB nodes.
  • For Docker, ensure the container exposes port 9042 and the pipeline connects to the host IP.

“Keyspace does not exist”

  • Create the keyspace before starting the pipeline:
    CREATE KEYSPACE my_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
    

Slow enrichment queries

  • Increase cache_ttl and cache_max_entries to reduce query frequency.
  • Ensure the lookup column is the partition key (range queries on non-key columns require ALLOW FILTERING and are slow).
  • Use with_prefetch(true) to overlap ScyllaDB I/O with downstream processing.

ANN search returns no results

  • Verify a custom index exists on the vector column:
    CREATE CUSTOM INDEX ON table_name (vector_col) USING 'StorageAttachedIndex';
    
  • ANN search requires ScyllaDB 6.0+ or the SAI extension on Cassandra 5+.

Kubernetes Operator

Volley includes a Kubernetes operator that automates deployment and management of Volley stream processing applications. Define a VolleyApplication custom resource and the operator handles StatefulSets, persistent storage, health checks, and Prometheus integration.

Prerequisites

  • Kubernetes cluster (v1.26+)
  • prometheus-operator (optional, for automatic metrics scraping)
  • KEDA (optional, for autoscaling)

Installing the Operator

Build and deploy the operator:

# Build the operator binary
cargo build --release -p volley-k8s-operator

# Generate the CRD manifest
./target/release/volley-k8s-operator generate-crd > volley-crd.json

# Apply the CRD to your cluster
kubectl apply -f volley-crd.json

# Run the operator (in-cluster or locally for development)
./target/release/volley-k8s-operator

Deploying a Volley Application

Create a VolleyApplication resource:

apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
  name: my-pipeline
  namespace: default
spec:
  # Container image (required)
  image: registry.example.com/my-volley-app:v1.0

  # Resource requirements (required)
  resources:
    requests:
      cpu: "2"
      memory: "4Gi"
    limits:
      cpu: "4"
      memory: "8Gi"

  # RocksDB state storage
  state:
    storageClassName: fast-ssd
    size: 10Gi

  # Checkpoint storage
  checkpoints:
    storageClassName: fast-ssd
    size: 20Gi

  # Environment variables
  env:
    - name: RUST_LOG
      value: "info"
    - name: KAFKA_BOOTSTRAP_SERVERS
      value: "kafka.default.svc:9092"
kubectl apply -f my-pipeline.yaml

The operator creates the following resources automatically:

ResourcePurpose
StatefulSetRuns the app with stable pod identity and persistent volumes
PVC (state)Persistent storage for RocksDB state (/var/lib/volley/state)
PVC (checkpoints)Persistent storage for checkpoints (/var/lib/volley/checkpoints)
Service (headless)StatefulSet DNS for stable network identity
Service (metrics)Exposes the Prometheus metrics port
ServiceMonitorPrometheus-operator auto-discovery for metrics scraping
PodDisruptionBudgetPrevents involuntary eviction during checkpointing

Health Checks

The operator automatically configures liveness and readiness probes backed by Volley’s built-in HealthReporter:

ProbePathBehaviour
Liveness/health (port 8080)Returns healthy when pipeline is Running or Starting
Readiness/ready (port 8080)Returns ready only when pipeline is Running

Customize the health check configuration:

spec:
  health:
    port: 8080
    livenessPath: /health
    readinessPath: /ready
    initialDelaySeconds: 30
    periodSeconds: 10

Observability

Metrics are exposed on port 9090 by default. When prometheus-operator is installed, the operator creates a ServiceMonitor for automatic scraping:

spec:
  observability:
    metricsPort: 9090
    metricsPath: /metrics
    serviceMonitor:
      enabled: true
      interval: 15s
      labels:
        release: prometheus   # match your Prometheus selector
    otlpEndpoint: http://otel-collector.monitoring:4317  # optional tracing

Key Volley metrics available for dashboards and alerting:

MetricTypeDescription
volley.source.records_polledCounterRecords read from source
volley.records.processedCounterRecords processed per operator
volley.stage.latency_msHistogramPer-operator processing latency
volley.checkpoint.duration_msHistogramCheckpoint completion time
volley.channel.utilizationGaugeBackpressure indicator (0.0-1.0)
volley.kafka.consumer_lagGaugeKafka consumer group lag
volley.pipeline.uptime_secondsGaugePipeline uptime

Node Isolation

Following Flink best practice, each Volley application should run on dedicated nodes. The operator supports node selectors, tolerations, and pod anti-affinity:

spec:
  isolation:
    # Schedule only on nodes labelled for stream processing
    nodeSelector:
      volley.io/pool: stream-processing
    # Tolerate the dedicated taint
    tolerations:
      - key: volley.io/dedicated
        operator: Equal
        value: "true"
        effect: NoSchedule
    # One pod per node (enabled by default)
    podAntiAffinity: true

Distributed Execution (Horizontal Scaling)

To run a pipeline across multiple pods, add the cluster section to your spec:

First, create a ReadWriteMany PVC backed by your cloud provider’s managed NFS (AWS EFS, GCP Filestore, Azure Files):

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: volley-shared-ckpt
spec:
  accessModes: [ReadWriteMany]
  storageClassName: efs  # your RWX storage class
  resources:
    requests:
      storage: 50Gi

Then reference it in your VolleyApplication:

spec:
  replicas: 3
  cluster:
    maxKeyGroups: 256                    # virtual partitions (default, power of 2)
    flightPort: 50051                    # Arrow Flight port (default)
    sharedCheckpoints:
      claimName: volley-shared-ckpt      # the PVC you created above

When cluster is present, the operator automatically:

ActionDetail
Injects VOLLEY_* env varsVOLLEY_NUM_WORKERS, VOLLEY_HEADLESS_SERVICE, VOLLEY_MAX_KEY_GROUPS, VOLLEY_FLIGHT_PORT, VOLLEY_APP_NAME, VOLLEY_SHARED_CHECKPOINT_DIR
Adds Flight portContainer port 50051 + headless service port for inter-pod communication
Mounts shared PVCThe existing ReadWriteMany PVC is mounted into all pods at /mnt/volley/checkpoints

The Volley runtime reads these env vars via ClusterConfig::from_env() and automatically enters distributed mode.

Shared checkpoint storage uses a pre-provisioned ReadWriteMany PVC (managed NFS from your cloud provider). All workers write checkpoints to the same shared mount, so recovery and rescaling are near-instant — no data transfer needed.

Rescaling is live: change spec.replicas and the coordinator triggers a checkpoint, recomputes key group assignments, and workers pick up their new key groups from the shared filesystem. No restart required.

Autoscaling with KEDA (Future)

The operator has built-in support for KEDA autoscaling. When enabled, it creates a ScaledObject that can scale based on Kafka consumer lag, Prometheus metrics, or CPU/memory:

spec:
  autoscaling:
    enabled: true
    minReplicas: 1
    maxReplicas: 10
    triggers:
      # Scale on Kafka consumer lag
      - type: kafka
        metadata:
          bootstrapServers: kafka.default.svc:9092
          consumerGroup: my-pipeline-group
          topic: input-events
          lagThreshold: "1000"
      # Scale on backpressure (channel utilization)
      - type: prometheus
        metadata:
          serverAddress: http://prometheus.monitoring:9090
          query: avg(volley_channel_utilization{job="my-pipeline"})
          threshold: "0.8"
      # Scale on CPU
      - type: cpu
        metadata:
          value: "70"

Checking Application Status

# List all Volley applications
kubectl get volleyapplications
# or use the short name:
kubectl get vapp

# Check status
kubectl describe vapp my-pipeline

# View operator logs
kubectl logs deployment/volley-k8s-operator

The status subresource reports the current phase (Pending, Running, Degraded, Failed), replica counts, and a human-readable message.

Full Spec Reference

Click to expand the full VolleyApplication spec
apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
  name: my-pipeline
  namespace: default
spec:
  image: registry.example.com/my-app:v1.0   # required
  imagePullPolicy: IfNotPresent              # default: IfNotPresent
  imagePullSecrets:
    - name: registry-secret

  resources:                                  # required
    requests: { cpu: "2", memory: "4Gi" }
    limits:   { cpu: "4", memory: "8Gi" }

  replicas: 1                                 # default: 1
  args: ["--config", "/etc/volley/config.yaml"]

  env:
    - name: RUST_LOG
      value: "info"
    - name: DB_PASSWORD
      valueFrom:
        secretKeyRef: { name: db-creds, key: password }

  state:
    storageClassName: fast-ssd
    size: 10Gi
    mountPath: /var/lib/volley/state          # default

  checkpoints:
    storageClassName: fast-ssd
    size: 20Gi
    mountPath: /var/lib/volley/checkpoints    # default

  observability:
    metricsPort: 9090                         # default: 9090
    metricsPath: /metrics                     # default: /metrics
    serviceMonitor:
      enabled: true                           # default: true
      interval: 15s                           # default: 15s
      labels: { release: prometheus }
    otlpEndpoint: http://otel-collector:4317

  health:
    port: 8080                                # default: 8080
    livenessPath: /health                     # default: /health
    readinessPath: /ready                     # default: /ready
    initialDelaySeconds: 30                   # default: 30
    periodSeconds: 10                         # default: 10

  isolation:
    nodeSelector: { volley.io/pool: stream-processing }
    tolerations:
      - key: volley.io/dedicated
        operator: Equal
        value: "true"
        effect: NoSchedule
    podAntiAffinity: true                     # default: true

  kafka:
    bootstrapServers: kafka:9092
    consumerGroup: my-group
    topics: [input-events]

  autoscaling:
    enabled: false                            # default: false
    minReplicas: 1
    maxReplicas: 10
    triggers:
      - type: kafka
        metadata:
          bootstrapServers: kafka:9092
          consumerGroup: my-group
          topic: input-events
          lagThreshold: "1000"

  cluster:                                      # distributed execution (optional)
    maxKeyGroups: 256                           # default: 256 (power of 2)
    flightPort: 50051                           # default: 50051
    sharedCheckpoints:
      claimName: volley-shared-ckpt             # existing ReadWriteMany PVC
      mountPath: /mnt/volley/checkpoints        # default

  labels: {}                                  # additional labels on all resources
  annotations: {}                             # additional annotations on all resources

Volley Capacity Planning Guide

This guide helps pipeline authors size CPU, memory, and storage for Volley deployments, and provides PromQL queries for ongoing capacity monitoring.


Throughput Reference

Run the NEXMark benchmarks (volley-benchmark/) for throughput numbers on your hardware. Throughput depends on operator complexity, state backend I/O, and connector latency.

Real-world throughput will vary depending on:

  • Operator complexity (aggregations, joins, windowing)
  • State backend I/O (RocksDB reads/writes)
  • Connector latency (Kafka, blob storage)
  • Checkpoint frequency

CPU Sizing

Throughput targetCPU requestsCPU limitsNotes
< 100K rec/s500m1Light pipelines, simple transforms
100K–1M rec/s12Moderate load, stateful operators
1M–5M rec/s24High throughput, batched processing
> 5M rec/s48Maximum throughput, parallelism=4+

Volley uses tokio async runtime. CPU is primarily consumed by:

  • Record deserialization/serialization
  • Operator logic (map, filter, aggregate, window)
  • RocksDB compaction (background threads)
  • Kafka consumer polling and producer flushing

Recommendation: Start with 1 CPU request and monitor container_cpu_usage_seconds_total. Scale up if sustained utilization exceeds 70%.


Memory Sizing

WorkloadMemory requestsMemory limitsNotes
Stateless transforms256Mi512MiNo state backend
Light stateful (< 100K keys)512Mi1GiSmall RocksDB footprint
Medium stateful (100K–1M keys)1Gi2GiRocksDB block cache
Heavy stateful (> 1M keys)2Gi4GiLarge working set

Memory is consumed by:

  • Channel buffers: In-flight records between pipeline stages
  • RocksDB block cache: Cached SST blocks for read performance (defaults to ~50% of available memory)
  • Arrow RecordBatch buffers: Columnar data in transit
  • Kafka consumer buffers: Pre-fetched messages
  • Window pending index: Active (key, window) pairs tracked by window operators (~100 bytes each)
  • Aggregate LRU cache: In-memory accumulator cache for keyed aggregations

Recommendation: Set memory limits to 2x requests. Monitor container_memory_working_set_bytes and watch for OOM kills.

Bounding Stateful Operator Memory

For high-cardinality keyed workloads, use the built-in LRU mechanisms to cap in-memory state:

  • Aggregate operators: keyed_stream.with_max_cache_entries(n) — bounds the in-memory accumulator cache. Evicted entries are flushed to RocksDB.
  • Window operators: windowed_stream.with_max_pending_windows(n) — bounds the in-memory window index. Evicted entries are persisted to the state backend and scanned during watermark advance.

As a rule of thumb, each aggregate cache entry is ~200-500 bytes (depends on accumulator type), and each pending window entry is ~100 bytes. Size the limits based on your available memory budget:

aggregate_memory ≈ max_cache_entries × 300 bytes
window_memory    ≈ max_pending_windows × 100 bytes

Monitor volley_state_cache_evictions_total and volley_window_pending_evictions_total — a sustained high eviction rate means the LRU is undersized and incurring excessive RocksDB I/O.


RocksDB Storage Sizing

Growth Formula

state_size = key_count × avg_value_size × amplification_factor

Where:

  • key_count: Number of unique keys in your keyed state
  • avg_value_size: Average serialized value size (JSON encoding adds ~30% overhead)
  • amplification_factor: RocksDB space amplification, typically 1.1–1.5x depending on compaction

Checkpoint Storage

checkpoint_storage = state_size × checkpoint_retention_count

RocksDB checkpoints use hardlinks to SST files, so incremental growth is small when data doesn’t change between checkpoints. However, as data mutates, old SST files are retained for older checkpoints.

PVC Sizing Recommendations

State sizeState PVCCheckpoint PVCRetention
< 1 GB5Gi5Gi3 checkpoints
1–10 GB20Gi20Gi3 checkpoints
10–50 GB100Gi100Gi2 checkpoints
> 50 GB2× state size2× state size2 checkpoints

Recommendation: Provision at least 2× your expected state size for headroom. Monitor actual usage with the PromQL queries below and adjust.

Helm Values Example

state:
  enabled: true
  size: "20Gi"
  storageClassName: gp3   # Use provisioned IOPS for write-heavy workloads

checkpoints:
  enabled: true
  size: "20Gi"

Capacity Monitoring PromQL Queries

Throughput

# Records ingested per second by pipeline
sum(rate(volley_source_records_polled_total[5m])) by (job_name)

# Records processed per second by pipeline
sum(rate(volley_records_processed_total[5m])) by (job_name)

# Processing ratio (should be close to 1.0)
sum(rate(volley_records_processed_total[5m])) by (job_name)
/
sum(rate(volley_source_records_polled_total[5m])) by (job_name)

Backpressure

# Channel utilization by stage (>0.8 = backpressure)
avg(volley_channel_utilization) by (job_name, stage)

# Stages with sustained backpressure
avg_over_time(volley_channel_utilization[15m]) > 0.8

Kafka Consumer Lag

# Total consumer lag by pipeline
sum(volley_kafka_consumer_lag) by (job_name)

# Lag growth rate (positive = falling behind)
deriv(sum(volley_kafka_consumer_lag) by (job_name)[15m:1m])

Latency

# p99 stage latency
histogram_quantile(0.99, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))

# p50 stage latency
histogram_quantile(0.5, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))

# Checkpoint duration p99
histogram_quantile(0.99, sum(rate(volley_checkpoint_duration_ms_bucket[5m])) by (le, job_name))

Error Rate

# Error rate by type
sum(rate(volley_errors_total[5m])) by (job_name, error_type)

# Source errors by connector
sum(rate(volley_source_errors_total[5m])) by (job_name, source_id)

# Sink errors by connector
sum(rate(volley_sink_errors_total[5m])) by (job_name, sink_id)

# Error ratio (for SLO tracking)
sum(rate(volley_errors_total[5m])) by (job_name)
/
clamp_min(sum(rate(volley_records_processed_total[5m])) by (job_name), 1)

Resource Utilization

# Watermark lag (event-time vs. wall-clock)
max(volley_watermark_lag_ms) by (job_name)

# Sink flush duration p99
histogram_quantile(0.99, sum(rate(volley_sink_flush_duration_ms_bucket[5m])) by (le, job_name))

KEDA Autoscaling Reference

Example KEDA trigger configurations for the Helm chart:

Scale on Kafka Consumer Lag

autoscaling:
  enabled: true
  minReplicas: 1
  maxReplicas: 8
  triggers:
    - type: kafka
      metadata:
        bootstrapServers: "kafka:9092"
        consumerGroup: "my-pipeline"
        topic: "input-topic"
        lagThreshold: "10000"    # Scale up when lag exceeds 10K messages

Scale on CPU

autoscaling:
  enabled: true
  minReplicas: 1
  maxReplicas: 4
  triggers:
    - type: cpu
      metadata:
        type: Utilization
        value: "70"              # Scale up at 70% CPU utilization

Scale on Custom Prometheus Metric

autoscaling:
  enabled: true
  minReplicas: 1
  maxReplicas: 8
  triggers:
    - type: prometheus
      metadata:
        serverAddress: "http://prometheus:9090"
        query: "sum(volley_channel_utilization{job_name='my-pipeline'}) / count(volley_channel_utilization{job_name='my-pipeline'})"
        threshold: "0.8"         # Scale up when average saturation > 80%

Capacity Planning Checklist

  1. Estimate throughput: What is your expected input rate (records/sec)?
  2. Size CPU/memory: Use the tables above as a starting point
  3. Estimate state size: key_count × avg_value_size × 1.5
  4. Size PVCs: 2× expected state size for both state and checkpoint volumes
  5. Configure autoscaling: Set KEDA triggers based on lag or saturation
  6. Deploy and monitor: Use the PromQL queries above to validate sizing
  7. Iterate: Adjust resources based on actual utilization patterns

Observability

Find out what your pipeline is doing without reaching for logs. Every Volley pipeline emits Prometheus metrics (throughput, latency, backpressure, consumer lag, checkpoint health), OpenTelemetry traces (per-stage spans, optionally per-record with sampling), structured JSON logs with trace_id for cross-referencing, and HTTP health endpoints for Kubernetes probes. All of it boots with zero config and Just Works™ when you scrape the metrics port and point an OTLP collector at the tracing endpoint.

This page is the reference for what you get. For the “which knobs do I turn?” guide on capacity and sizing, see Capacity Planning.

Metrics

Volley exposes 15+ metrics via Prometheus exposition format. All metrics include node_id and job_name labels.

MetricTypeLabelsDescription
volley.source.records_polledCountersource_idRecords read from source
volley.records.processedCounteroperator_idRecords processed per operator
volley.stage.latency_msHistogramstagePer-operator processing latency
volley.checkpoint.duration_msHistogramCheckpoint completion time
volley.checkpoint.epochGaugeCurrent checkpoint epoch
volley.checkpoint.failuresCounterCheckpoint failure count
volley.channel.utilizationGaugestageBackpressure indicator (0.0-1.0)
volley.watermark.lag_msGaugeEvent-time lag
volley.sink.records_writtenCountersink_idRecords written per sink
volley.sink.flush.duration_msHistogramsink_idSink flush latency
volley.pipeline.uptime_secondsGaugePipeline uptime
volley.pipeline.healthGaugeHealth state (0=Starting, 1=Running, 2=Degraded, 3=Failed, 4=ShuttingDown)
volley.errorsCountererror_typeErrors by type
volley.source.errorsCountersource_idPer-connector source errors
volley.sink.errorsCountersink_idPer-connector sink errors
volley.kafka.consumer_lagGaugeKafka consumer group lag
volley.kafka.commit.duration_msHistogramKafka commit latency
volley.kafka.commit.failuresCounterKafka commit failures
volley.blob.objects_readCounterBlob source throughput
volley.blob.bytes_readCounterBlob I/O volume
volley.state.cache_evictionsCounteroperator_idAggregate LRU cache evictions
volley.window.pending_evictionsCounterWindow pending index LRU evictions
volley.window.pending_sizeGaugeIn-memory pending window entry count
volley.checkpoint.gc_cleanedCounterOld checkpoints cleaned by GC

ML Inference Metrics

When using volley-ml operators (.infer(), .embed(), .classify(), .infer_remote()), additional inference-specific metrics are emitted:

MetricTypeLabelsDescription
volley.ml.inference.duration_msHistogrambackend, model_namePer-batch inference latency (backend call only)
volley.ml.inference.batch_rowsHistogrambackend, model_nameRows per inference batch (batch utilization)
volley.ml.model_load.duration_msHistogrambackend, model_nameModel load time (emitted once at startup)
volley.ml.inference.errorsCounterbackend, model_name, error_typeInference errors by backend and type
volley.ml.http.request.duration_msHistogramendpointHTTP round-trip latency for remote model servers
volley.ml.http.queue_depthGaugeendpointAvailable concurrency permits (inverse of queue depth)
volley.ml.http.errorsCounterendpoint, status_codeHTTP errors from remote model servers

The backend label is "onnx" or "candle". The model_name label is the model path or HuggingFace repo ID passed to the operator config.

Golden Signals PromQL

# Latency (p99)
histogram_quantile(0.99, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))

# Traffic (records/sec)
sum(rate(volley_source_records_polled_total[5m])) by (job_name)

# Errors (rate by type)
sum(rate(volley_errors_total[5m])) by (job_name, error_type)

# Error ratio
sum(rate(volley_errors_total[5m])) by (job_name)
/
sum(rate(volley_records_processed_total[5m])) by (job_name)

# Saturation (channel utilization)
avg(volley_channel_utilization) by (job_name, stage)

# Saturation (Kafka consumer lag)
sum(volley_kafka_consumer_lag) by (job_name)

ML Inference PromQL

# Inference latency (p99) by backend
histogram_quantile(0.99, sum(rate(volley_ml_inference_duration_ms_bucket[5m])) by (le, backend))

# Inference throughput (rows/sec)
sum(rate(volley_ml_inference_batch_rows_sum[5m])) by (backend)

# Inference error rate
sum(rate(volley_ml_inference_errors_total[5m])) by (backend, error_type)

# HTTP model server latency (p95)
histogram_quantile(0.95, sum(rate(volley_ml_http_request_duration_ms_bucket[5m])) by (le, endpoint))

State & Window PromQL

# Aggregate cache eviction rate (high = LRU undersized)
rate(volley_state_cache_evictions_total[5m])

# Window pending index eviction rate
rate(volley_window_pending_evictions_total[5m])

# In-memory pending window count
volley_window_pending_size

# Checkpoint GC cleanup rate
rate(volley_checkpoint_gc_cleaned_total[5m])

Tracing

OpenTelemetry integration via OTLP gRPC export with tracing-opentelemetry bridge:

  • Resource attributes include service.name and node.id
  • Graceful fallback if OTLP endpoint is unavailable
  • Trace-log correlation: trace_id and span_id injected into JSON log lines when a span is active

Configure the OTLP endpoint:

# In VolleyApplication CRD
spec:
  observability:
    otlpEndpoint: http://otel-collector.monitoring:4317

Per-Record Tracing

Volley supports per-record trace propagation with configurable sampling. When enabled, individual records flowing through the pipeline carry OpenTelemetry trace context. Each operator creates a child span showing processing time, input/output row counts, and optional payload previews.

Configuration

#![allow(unused)]
fn main() {
use volley_core::observability::{TracingConfig, SamplingStrategy};

StreamExecutionEnvironment::new()
    .from_source(source)
    .with_tracing(TracingConfig {
        sampling: SamplingStrategy::Ratio(0.01), // trace 1% of records
        max_payload_bytes: 1024,                 // max JSON preview size
        capture_payload: true,                   // enable input/output previews
    })
    .filter_expr(col("status").eq(lit("active")))
    .select_expr(vec![col("user_id"), col("amount")])
    .to_sink(sink)
    .execute("my-job")
    .await?;
}

Sampling Strategies

StrategyDescriptionUse case
AlwaysTrace every recordDevelopment, debugging
NeverDisable tracingProduction default
Ratio(f64)Trace a random fractionProduction with sampling (e.g., 0.01 = 1%)

Non-sampled records pay near-zero overhead (a single branch check).

Span Hierarchy

For a pipeline source -> map -> filter -> sink, a sampled record produces:

volley.record (root span)
├── volley.source        — source output metadata
├── volley.operator [0]  — map operator (input/output rows, timing, preview)
├── volley.operator [1]  — filter operator
└── volley.sink          — sink write (timing, errors)

Each span includes attributes:

AttributeDescription
volley.operator.nameAuto-generated operator name (e.g., operator-0)
volley.operator.indexPosition in the operator chain
volley.record.event_time_msRecord’s event timestamp
volley.record.input_rowsInput batch row count
volley.record.output_rowsOutput batch row count

When capture_payload is enabled, input and output JSON previews are attached as span events.

Window and Aggregate Outputs

Window and aggregate operators accumulate multiple input records and emit new result records. These outputs get a new trace with span links back to the input traces:

volley.window.output (new root, links to input traces)
├── volley.operator [next]
└── volley.sink

The volley.window.input_trace_count attribute shows how many sampled input records contributed to the aggregation.

Kafka Trace Context Propagation

When using the Kafka source, Volley automatically extracts W3C traceparent headers from consumed messages. If an upstream service traced a request that produced a Kafka message, the trace continues through Volley.

Similarly, the Kafka sink injects traceparent headers into produced messages, propagating trace context to downstream consumers.

This enables full end-to-end distributed tracing across service boundaries:

upstream service → Kafka → Volley pipeline → Kafka → downstream service
     trace-id=abc          trace-id=abc              trace-id=abc

Structured Logging

ObservabilityConfig supports a log_format field with Text (default) and Json modes:

#![allow(unused)]
fn main() {
ObservabilityConfig::new()
    .with_json_logging()
    .init();
}

JSON mode produces structured output compatible with Loki, Elasticsearch, and CloudWatch Logs. When OpenTelemetry is active, each log line includes trace_id and span_id fields for cross-referencing with distributed traces.

Useful RUST_LOG filters

# Default (info): lifecycle events, checkpoint completions, source/sink creation
RUST_LOG=info

# Debug state backend, window, and watermark behavior
RUST_LOG=info,volley_core::state=debug,volley_core::operators::window=debug

# Debug file decoding (Parquet, JSON Lines, CSV)
RUST_LOG=info,volley_connectors::blob_store::decoders=debug

# Deep debugging: per-record RocksDB get/put/delete
RUST_LOG=info,volley_core::state::rocks=trace

# Debug protobuf decode with batch context
RUST_LOG=info,volley_core::operators::protobuf=debug

Health Reporting

The HealthReporter tracks pipeline state with atomic transitions:

StateLivenessReadinessDescription
Startinghealthynot readyPipeline initializing
RunninghealthyreadyNormal operation
Degradedhealthynot readyPartial failure (with reason)
Failedunhealthynot readyComplete failure (with reason)
ShuttingDownunhealthynot readyGraceful shutdown in progress

The volley.pipeline.health gauge emits the state as a numeric value on every transition, enabling Prometheus alerting.

SLO Recording Rules

Prometheus recording rules are provided at deploy/prometheus/recording-rules.yaml:

  • volley:availability:ratio_rate5m — availability SLI ratio
  • volley:latency_sli:ratio_rate5m — latency SLI (fraction under 100ms)
  • volley:error_budget:remaining — remaining error budget against 99.9% target

Alerting Rules

Burn-rate alerting rules at deploy/prometheus/alerting-rules.yaml:

  • VolleyHighErrorBurnRate (critical) — 14.4x burn over 1h
  • VolleySlowErrorBurnRate (warning) — 1x burn over 3d
  • VolleyPipelineFailed (critical) — health==3 for 1m
  • VolleyPipelineDegraded (warning) — health==2 for 5m

Kubernetes Integration

The K8s operator automatically creates a ServiceMonitor for Prometheus scraping. See Kubernetes Operator for ServiceMonitor configuration.

Volley SRE Gap Analysis

Date: 2026-03-30 Reference framework: msitarzewski/agency-agents engineering-sre.md


Important Context: Framework vs. Service

Volley is a stream processing framework (library + Kubernetes operator), not a deployed service. This analysis evaluates:

  1. Framework enablement — How well does Volley equip pipeline authors to follow SRE practices?
  2. Framework development practices — How well does Volley’s own CI/testing/release process follow SRE principles?

Org-level concerns (on-call rotations, blameless culture documentation, MTTR tracking) are out of scope — those belong to teams operating Volley-based pipelines, not the framework itself.


Executive Summary

PillarScoreVerdict
SLOs & Error BudgetsC+ (55%)SLI-ready metrics + Prometheus recording/alerting rule templates + SLO definition template; no runtime error budget gauge yet
ObservabilityA- (88%)Strong metrics + tracing + structured JSON logging + alerting rule templates + trace-log correlation + per-connector error counters; missing dashboards
Toil ReductionB- (65%)CI + K8s operator + Dockerfile + Helm chart; no CD pipeline or benchmark regression tracking
Chaos EngineeringC- (40%)Checkpoint recovery unit-tested + fault injection integration tests for checkpoint/RocksDB failures; no Kafka fault injection
Capacity PlanningB- (65%)KEDA autoscaling scaffolded, benchmarks exist, capacity planning guide with sizing tables and PromQL queries
Progressive RolloutsF (5%)Operator uses default StatefulSet RollingUpdate; no canary or automated rollback
Incident Response PrimitivesC+ (55%)PipelineHealth enum + K8s probes + volley.pipeline.health gauge + alerting rules; missing severity-to-SLO mapping docs

Overall: Volley has a strong observability foundation, solid Kubernetes integration, SLO recording rules, alerting templates, structured logging with trace-log correlation, a Dockerfile, Helm chart, SLO definition template, fault injection tests, and capacity planning guide. Remaining gaps are dashboard templates, CD pipeline, progressive rollout support, and Kafka fault injection.


Scoring Methodology

GradeRangeMeaning
A90-100%Production-ready, matches SRE framework recommendations
B70-89%Strong foundation, minor gaps
C50-69%Partial coverage, significant gaps
D25-49%Minimal coverage, major gaps
F0-24%Absent or negligible

Criteria per pillar: API completeness, built-in tooling, documentation for pipeline authors, integration with ecosystem tooling (Prometheus, Grafana, K8s).


Pillar 1: SLOs & Error Budgets — C (50%)

Current State

Volley exposes metrics that serve as natural SLI candidates (volley-observability/src/metrics.rs):

MetricTypeSLI Use
volley.records.processedCounterAvailability — total processed records
volley.errorsCounterAvailability — error count for SLI ratio
volley.stage.latency_msHistogramLatency — processing duration distribution
volley.pipeline.uptime_secondsGaugeAvailability — pipeline uptime
volley.checkpoint.failuresCounterReliability — checkpoint failure rate

The PipelineHealth enum (volley-observability/src/health.rs) provides binary health states (Starting, Running, Degraded, Failed, ShuttingDown) but these are probe-level signals, not SLO-aligned measurements.

What Exists

Prometheus recording rules are provided at deploy/prometheus/recording-rules.yaml computing:

  • volley:availability:ratio_rate5m and volley:availability:ratio_rate30d — availability SLI ratios
  • volley:latency_sli:ratio_rate5m — latency SLI (fraction under 100ms)
  • volley:error_budget:remaining — remaining error budget against 99.9% target
  • volley:throughput:rate5m and volley:saturation:avg — capacity signals

Multi-window burn rate alerting rules are provided at deploy/prometheus/alerting-rules.yaml:

  • VolleyHighErrorBurnRate (critical) — 14.4x burn over 1h confirmed by 5m window
  • VolleySlowErrorBurnRate (warning) — 1x burn over 3d confirmed by 6h window

Remaining Gaps

  • No SLO definition template or configuration format for pipeline authors to declare per-pipeline targets
  • No runtime error budget gauge metric in volley-observability (the recording rule computes it in Prometheus, but no native Rust gauge)
  • No documentation connecting error budget exhaustion to operational decisions (e.g., deployment freeze policy)

Recommendations

(P1) Add an error_budget_remaining gauge to volley-observability that pipeline authors can enable to expose real-time budget consumption natively from the runtime, complementing the Prometheus recording rule.

(P1) Provide an SLO definition template format (YAML) that pipeline authors can use to declare per-pipeline availability and latency targets, which can be consumed by the Prometheus rules.

(P2) Document an error budget policy template: when budget < 25%, freeze non-critical deployments; when budget < 10%, enter incident response mode.


Pillar 2: Observability — B+ (80%)

Current State

Metrics (volley-observability/src/metrics.rs): 15+ named metric constants covering sources, operators, checkpoints, channels, watermarks, sinks, pipeline health, Kafka, and blob storage. All use node_id/job_name labels. Three helper functions (increment_counter, set_gauge, record_histogram) enforce consistent labeling.

Tracing (volley-observability/src/tracing_init.rs): OpenTelemetry integration via OTLP gRPC export with tracing-opentelemetry bridge. Resource attributes include service.name and node.id. Graceful fallback if OTLP endpoint unavailable.

Health (volley-observability/src/health.rs): HealthReporter with atomic state tracking, liveness/readiness semantics. K8s probes configured in operator’s StatefulSet builder.

ServiceMonitor (volley-operator/src/crd.rs): Auto-created by operator with configurable scrape interval and labels.

What Exists

  • Structured JSON logging: ObservabilityConfig supports a log_format field with Text (default) and Json modes. When set to Json, tracing_subscriber::fmt::json() is used for structured output compatible with log aggregation systems (Loki, Elasticsearch, CloudWatch Logs). Pipeline authors enable it via .with_json_logging().
  • Alerting rule templates: Prometheus alerting rules are provided at deploy/prometheus/alerting-rules.yaml covering error burn rate (fast + slow), pipeline health state, checkpoint failures, channel saturation, Kafka consumer lag, and p99 latency.

What Was Added

  • Trace-log correlation: TraceContextFormat in volley-core/src/observability/trace_context.rs safely injects trace_id and span_id fields into JSON log output via serde_json round-trip when an OpenTelemetry span is active, enabling cross-referencing between logs and distributed traces.
  • Per-connector error counters: volley.source.errors and volley.sink.errors metrics defined in volley-observability/src/metrics.rs with source_id/sink_id labels, and incremented in volley-core/src/builder/runtime.rs on poll/write/flush failures. connector_id() method added to Source, Sink, DynSource, and DynSink traits, implemented across Kafka, blob store, and Iceberg connectors.

Remaining Gaps

  • No dashboard templates: No Grafana dashboard JSON mapping the four golden signals to Volley metrics.

Recommendations

(P0) Provide a golden signals Grafana dashboard template using Volley’s actual metrics:

Golden SignalVolley MetricPromQL
Latencyvolley.stage.latency_mshistogram_quantile(0.99, rate(volley_stage_latency_ms_bucket[5m]))
Trafficvolley.source.records_polledsum(rate(volley_source_records_polled_total[5m])) by (job_name)
Errorsvolley.errorssum(rate(volley_errors_total[5m])) by (job_name, error_type)
Saturationvolley.channel.utilizationavg(volley_channel_utilization) by (job_name, stage)

(P1) Enable trace-log correlation by configuring the fmt layer with OpenTelemetry trace context propagation, so JSON log lines include trace_id and span_id fields.

(P2) Add per-connector error counters to metrics.rs (volley.source.errors, volley.sink.errors) with source_id/sink_id labels for finer-grained fault attribution.


Pillar 3: Toil Reduction — B- (65%)

Current State

CI pipeline (.github/workflows/ci.yml): Format checking (cargo fmt), linting (cargo clippy with default + kafka features), testing (default + kafka variants), benchmark compilation check. Uses Rust cache for faster builds.

K8s operator (volley-operator/): Automates StatefulSet, headless Service, metrics Service, ServiceMonitor, PodDisruptionBudget, and KEDA ScaledObject creation from a single VolleyApplication CRD. Server-side apply with idempotent reconciliation.

Checkpoint cleanup: RecoveryCoordinator handles automated old checkpoint cleanup.

What Exists

  • Dockerfile: A multi-stage Dockerfile is provided at deploy/docker/Dockerfile with a Rust builder stage and minimal distroless runtime stage. Pipeline authors can extend or customize it for their specific binaries.
  • Helm chart: A Helm chart exists at deploy/helm/volley/ that templates the VolleyApplication CRD with values.yaml covering all CRD fields (resources, storage, observability, health, autoscaling, Kafka, isolation).

Remaining Gaps

  • Dockerfile not referenced in documentation: The README and operator docs do not mention the Dockerfile or describe how pipeline authors should customize it for their applications.
  • Helm chart has no production-hardening overlays: No example values files for common deployment topologies (small/medium/large, multi-AZ).
  • No CD pipeline: CI exists but no automated release workflow (crate publishing, image building).
  • No benchmark regression tracking: Benchmarks compile-checked but results not tracked over time.

Recommendations

(P1) Add a CD workflow (.github/workflows/release.yml) triggered on git tags that builds container images and publishes to a registry.

(P1) Add production-hardened example values files to the Helm chart (e.g., values-production.yaml with resource limits, anti-affinity, KEDA autoscaling).

(P2) Add benchmark regression detection in CI using criterion with stored baselines.


Pillar 4: Chaos Engineering — D (30%)

Current State

Checkpoint recovery is tested via unit tests — the HealthReporter transitions through Starting -> Running -> Degraded -> Failed states and these are verified. The operator handles reconciliation errors with retry backoff (15s requeue on failure).

Gaps

  • No fault injection tests simulating Kafka broker disconnect, disk full, or OOM conditions.
  • No integration tests exercising the Degraded -> Running recovery path under realistic failure.
  • No Chaos Mesh or LitmusChaos experiment definitions for K8s-deployed pipelines.

Recommendations

(P1) Add integration tests for framework resilience:

  • Checkpoint directory becomes read-only mid-pipeline
  • Kafka broker disconnect during exactly-once transaction commit
  • RocksDB write failure during state flush

(P1) Provide Chaos Mesh experiment templates for K8s-deployed Volley applications:

apiVersion: chaos-mesh.org/v1alpha1
kind: PodChaos
metadata:
  name: volley-pod-kill
spec:
  action: pod-kill
  mode: one
  selector:
    labelSelectors:
      app.kubernetes.io/managed-by: volley-operator
  scheduler:
    cron: "@every 30m"

(P2) Add toxiproxy integration to Kafka integration tests for network degradation simulation.


Pillar 5: Capacity Planning — C (45%)

Current State

Autoscaling: KEDA support fully scaffolded in operator (AutoscalingSpec in CRD) with Kafka lag, Prometheus metric, and CPU-based triggers. ScaledObject is created during reconciliation.

Benchmarks: NEXMark benchmark suite (volley-benchmark/) provides industry-standard throughput, latency, and memory measurements.

Resource management: CRD requires explicit resources.requests and resources.limits for CPU/memory.

Gaps

  • No sizing guide mapping resource allocation to expected throughput.
  • No documentation on RocksDB storage growth patterns.
  • No tested KEDA trigger threshold examples.
  • No resource utilization dashboards.

Recommendations

(P1) Create a capacity planning guide covering:

  • CPU/memory requirements per records/sec (based on benchmark data)
  • RocksDB storage growth formula: key_count * avg_value_size * checkpoint_retention_count
  • PVC sizing guidance based on state size and checkpoint interval
  • Key PromQL queries for capacity monitoring:
    • Throughput: rate(volley_source_records_polled_total[5m])
    • Backpressure: volley_channel_utilization > 0.8
    • Input backlog: volley_kafka_consumer_lag

(P2) Document tested KEDA trigger configurations with proven thresholds (e.g., scale at consumer lag > 10000).


Pillar 6: Progressive Rollouts — F (5%)

Current State

The operator creates StatefulSets with a configurable replicas count but uses the default RollingUpdate strategy. No canary, blue-green, or partition-based rollout mechanism exists. imagePullPolicy is configurable but no image tag pinning is enforced.

Gaps

  • No updateStrategy field in the CRD spec.
  • No partition-based canary rollout support.
  • No automated rollback on health check failure post-update.
  • No Argo Rollouts or Flagger integration.

Recommendations

(P1) Add an updateStrategy field to VolleyApplicationSpec CRD with partition-based canary support:

#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct UpdateStrategySpec {
    /// Partition value for canary rollouts. Pods with ordinal >= partition
    /// get the new version. Set to replicas-1 to canary a single pod.
    #[serde(default, rename = "partition")]
    pub partition: Option<i32>,
}
}

(P2) Provide an Argo Rollouts AnalysisTemplate example that checks volley_errors_total rate and volley_stage_latency_ms p99 after each rollout step.


Pillar 7: Incident Response Primitives — C+ (55%)

Current State

Health states: PipelineHealth enum provides 5 states — Starting, Running, Degraded (with reason string), Failed (with reason string), ShuttingDown. These map to K8s liveness (healthy = Starting|Running) and readiness (ready = Running only).

Health gauge: volley.pipeline.health gauge is emitted on every state transition via HealthReporter, enabling Prometheus alerting on health state changes. Values: 0=Starting, 1=Running, 2=Degraded, 3=Failed, 4=ShuttingDown.

Operator status: CRD status tracks phases (Pending, Running, Degraded, Failed) with readyReplicas count and human-readable message.

Probes: Liveness at /health (port 8080), readiness at /ready, with configurable initialDelaySeconds and periodSeconds.

Alerting rules: deploy/prometheus/alerting-rules.yaml includes VolleyPipelineFailed (critical, health==3 for 1m) and VolleyPipelineDegraded (warning, health==2 for 5m).

Remaining Gaps

  • No documentation mapping severity levels to PipelineHealth states and SLO impact.
  • No runbook links in alert annotations (placeholder URLs only).

Recommendations

(P1) Document severity-to-health mapping for pipeline authors:

SeverityPipelineHealthSLO ImpactResponse
SEV1FailedSLO breached, pipeline stoppedImmediate: investigate root cause, restore from checkpoint
SEV2DegradedError budget burning above normalUrgent: identify degradation source (e.g., checkpoint failures)
SEV3Running with elevated latencyWithin budget but trendingInvestigate: check channel.utilization, watermark.lag_ms

(P1) Add runbook URLs to the existing alerting rules in deploy/prometheus/alerting-rules.yaml, linking to operational documentation for each failure mode.


Prioritized Implementation Roadmap

P0 — Completed

  1. SLO recording rule templatesdeploy/prometheus/recording-rules.yaml
  2. Burn rate alerting rule templatesdeploy/prometheus/alerting-rules.yaml
  3. Structured JSON loggingObservabilityConfig.log_format with LogFormat::Json
  4. Multi-stage Dockerfiledeploy/docker/Dockerfile
  5. Helm chartdeploy/helm/volley/
  6. Pipeline health gaugevolley.pipeline.health emitted by HealthReporter
  7. Health state alerting rulesVolleyPipelineFailed, VolleyPipelineDegraded in alerting rules

P0 — Remaining

  1. Golden signals dashboard template (Grafana JSON)
  2. Severity-to-PipelineHealth mapping documentation

P1 — Framework maturity

  1. Runtime error budget gauge metric in volley-observability
  2. SLO definition template format for per-pipeline targetsdeploy/prometheus/slo-template.yaml
  3. Trace-log correlation (trace_id in structured log output)volley-core/src/observability/trace_context.rs with TraceContextFormat
  4. Fault injection integration tests (checkpoint, RocksDB failures)volley-core/tests/fault_injection_test.rs (9 tests)
  5. Chaos Mesh experiment templates
  6. Capacity planning guide (sizing, storage growth, key PromQL)docs/src/operations/capacity-planning.md
  7. updateStrategy CRD field for partition-based canary rollouts
  8. Runbook URLs in alerting rule annotations
  9. CD workflow for framework releases
  10. Production-hardened Helm values examples

P2 — Excellence

  1. Per-connector error counters (volley.source.errors, volley.sink.errors) — constants in volley-observability/src/metrics.rs, wired up in volley-core/src/builder/runtime.rs with connector_id() on Source/Sink traits
  2. Benchmark regression detection in CI
  3. Toxiproxy integration for Kafka fault injection
  4. Argo Rollouts AnalysisTemplate example
  5. Tested KEDA trigger threshold documentation
  6. Error budget policy template documentation

Appendix A: Full Metric Inventory

All metrics defined in volley-observability/src/metrics.rs:

ConstantMetric NameTypeLabelsSRE Use
SOURCE_RECORDS_POLLEDvolley.source.records_polledCounternode_id, job_name, source_idTraffic golden signal
RECORDS_PROCESSEDvolley.records.processedCounternode_id, job_name, operator_idThroughput, SLI denominator
STAGE_LATENCY_MSvolley.stage.latency_msHistogramnode_id, job_name, stageLatency golden signal, SLI
CHECKPOINT_DURATION_MSvolley.checkpoint.duration_msHistogramnode_id, job_nameCheckpoint health
CHECKPOINT_EPOCHvolley.checkpoint.epochGaugenode_id, job_nameProgress tracking
CHECKPOINT_FAILURESvolley.checkpoint.failuresCounternode_id, job_nameReliability, alerting
CHANNEL_UTILIZATIONvolley.channel.utilizationGaugenode_id, job_name, stageSaturation golden signal
WATERMARK_LAG_MSvolley.watermark.lag_msGaugenode_id, job_nameEvent-time lag
SINK_RECORDS_WRITTENvolley.sink.records_writtenCounternode_id, job_name, sink_idOutput throughput
SINK_FLUSH_DURATION_MSvolley.sink.flush.duration_msHistogramnode_id, job_name, sink_idSink latency
PIPELINE_UPTIME_SECONDSvolley.pipeline.uptime_secondsGaugenode_id, job_nameAvailability
PIPELINE_HEALTHvolley.pipeline.healthGaugenode_id, job_nameHealth state (0-4), alerting on Failed/Degraded
ERRORS_TOTALvolley.errorsCounternode_id, job_name, error_typeError golden signal, SLI numerator
KAFKA_CONSUMER_LAGvolley.kafka.consumer_lagGaugenode_id, job_nameBacklog, autoscaling trigger
KAFKA_COMMIT_DURATION_MSvolley.kafka.commit.duration_msHistogramnode_id, job_nameKafka commit latency
KAFKA_COMMIT_FAILURESvolley.kafka.commit.failuresCounternode_id, job_nameExactly-once reliability
BLOB_OBJECTS_READvolley.blob.objects_readCounternode_id, job_nameBlob source throughput
BLOB_BYTES_READvolley.blob.bytes_readCounternode_id, job_nameBlob I/O volume

Label constants: LABEL_NODE_ID, LABEL_JOB_NAME, LABEL_STAGE, LABEL_OPERATOR_ID, LABEL_SOURCE_ID, LABEL_SINK_ID, LABEL_ERROR_TYPE

Appendix B: Reference Prometheus Rules

See Pillar 1 for SLO recording rules and burn rate alerts, and Pillar 7 for health state alerting rules.

Appendix C: Golden Signals PromQL Reference

# Latency (p50, p99)
histogram_quantile(0.5, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))
histogram_quantile(0.99, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))

# Traffic (records/sec by pipeline)
sum(rate(volley_source_records_polled_total[5m])) by (job_name)

# Errors (error rate by type)
sum(rate(volley_errors_total[5m])) by (job_name, error_type)

# Error ratio
sum(rate(volley_errors_total[5m])) by (job_name)
/
sum(rate(volley_records_processed_total[5m])) by (job_name)

# Saturation (channel utilization, higher = more backpressure)
avg(volley_channel_utilization) by (job_name, stage)

# Saturation (Kafka consumer lag)
sum(volley_kafka_consumer_lag) by (job_name)

NEXMark Benchmarks

NEXMark is the standard workload used to compare stream processing systems. It simulates an online auction — people, bids, auctions, a steady event rate — and defines a handful of queries (projection, windowed aggregation, stream-to-stream join) that together exercise the throughput, latency, and stateful-operator paths that matter in real pipelines.

Volley ships five of those queries as runnable examples so you can measure what your hardware does with your data shape, and compare against Flink or any other system that also publishes NEXMark numbers.

Queries

QueryDescriptionWhat it tests
Q1Currency ConversionRaw throughput — stateless map/projection
Q4Average Selling Price per CategoryWindowed join between bids and auctions; post-join filtering; min-watermark across dual sources
Q5Hot ItemsSliding window aggregation under sustained load
Q7Highest Bid per SessionSession window merging; high-cardinality keyed state; RocksDB under sustained load
Q8Monitor New UsersWindowed stream-to-stream join

Running

# Q8 — Windowed join (multi-source pipeline)
cargo run --example nexmark_q8 -p volley-benchmark --release

# Q5 — Sliding window aggregation (recommended starting point)
cargo run --example nexmark_q5 -p volley-benchmark --release

# Q4 — Windowed join (average selling price per category)
cargo run --example nexmark_q4 -p volley-benchmark --release

# Q7 — Session windows (highest bid per session per bidder)
cargo run --example nexmark_q7 -p volley-benchmark --release

# Q1 — Stateless projection baseline
cargo run --example nexmark_q1 -p volley-benchmark --release

Configuration

VariableDefaultDescription
NEXMARK_DURATION10Benchmark duration in seconds
NEXMARK_BATCH_SIZE4000Records per Arrow RecordBatch
NEXMARK_PARALLELISM4Parallel operator instances (Q5)
NEXMARK_AUCTIONS100Number of active auctions (Q5)
NEXMARK_WINDOW_SIZE10Window size in seconds (Q5)
NEXMARK_WINDOW_SLIDE2Window slide in seconds (Q5)
NEXMARK_INTER_EVENT_MS1Inter-event time in milliseconds

Metrics

  • Sustained throughput (records/sec) — measured over the full run, not peak
  • p50/p99/p999 latency — per-batch processing time from source to sink
  • Memory footprint — RSS delta under load
  • Recovery time — restore from checkpoint after failure (planned)

Comparison Baseline

When comparing against Apache Flink:

  1. Use tuned Flink — RocksDB state backend, off-heap memory, equivalent parallelism
  2. Match batch semantics — set Flink’s table.exec.mini-batch.size to match Volley’s batch size
  3. Same hardware, same event rate, same total events
  4. Discard first 10% of measurements for JIT warmup (Flink) and cache warming (both)

See volley-benchmark/README.md for full methodology.

Stress testing

In addition to the short benchmark runs above, Volley ships a stress testing suite for long-running pass/fail validation. Stress scenarios run for hours against Kafka-backed pipelines, auto-calibrate the throughput ceiling, and assert hard SLO thresholds.

Scenarios

ScenarioWhat it tests
stress_sustainedSliding window aggregation at 80% of calibrated ceiling for 2 h — verifies no throughput drift, latency regression, or memory growth
stress_backpressureThree-phase overload test: baseline → 120% ceiling → recovery — asserts catch-up within 60 s
stress_checkpoint_recoveryCrash mid-run and restart from checkpoint — asserts state restoration within 30 s
stress_nexmark_q4Kafka-backed Q4 (windowed join) under sustained load
stress_nexmark_q7Kafka-backed Q7 (session windows, high cardinality) under sustained load

Running

# All scenarios — default 2 h each (requires Docker for Kafka)
./scripts/stress-test.sh

# CI mode — 10 min per scenario
./scripts/stress-test.sh --ci

# Single scenario with custom settings
STRESS_DURATION=30m STRESS_TARGET_RATE=5000 ./scripts/stress-test.sh sustained

Pass/fail thresholds

ThresholdDefault limit
Throughput drift≤ 5% drop from calibrated operating point
p99 latency≤ 2× warmup baseline
p999 latency≤ 5× warmup baseline
Memory growth≤ 20% RSS increase from warmup to end
Checkpoint duration≤ 2× warmup baseline

Reports are written as JSON to target/stress-reports/ and uploaded as a CI artifact when run via the stress.yml GitHub Actions workflow.

Examples

Runnable examples live in the volley-examples crate. Six examples, each one DAG shape.

Running

# Memory-only examples (no external deps)
cargo run --example in_memory_pipeline  -p volley-examples
cargo run --example windowed_pipeline   -p volley-examples
cargo run --example fanout_multisink    -p volley-examples
cargo run --example observable_pipeline -p volley-examples

# Kafka (requires a live broker + two topics)
cargo run --example kafka_pipeline       -p volley-examples --features kafka

# ML (first run downloads a HuggingFace model; cached thereafter)
cargo run --example ml_inference_pipeline -p volley-examples --features ml --release

What each example shows

ExampleWhat it teachesFeatures
in_memory_pipelineMinimum DAG shape: from_sourcefilter_exprto_sink, plus the sink.handle() pattern for reading output after execute().(none)
windowed_pipelineTumbling / sliding / session windows over key_byaggregate_expr, event-time semantics, RocksDB state backend.(none)
fanout_multisinkDAG 1.0 marquee: tee() broadcasts to two branches, each filters differently, both sinks commit atomically in one 2PC epoch.(none)
observable_pipelinewith_observability(...) + HealthReporter + spawn_health_server/healthz / /readyz / /startupz; Prometheus / OTLP hooks are one uncomment away.(none)
kafka_pipelineKafka-to-Kafka exactly-once: from_kafka_exactly_once returns (stream, ConsumerGroupMetadata); to_kafka_exactly_once threads the CGM so source offsets commit inside the sink’s Kafka transaction.kafka
ml_inference_pipelineStreaming ONNX classification, backend auto-select (CoreML / CUDA / CPU), HuggingFace Hub auto-download.ml

NEXMark benchmarks

Throughput and windowed-join benchmarks live in a sibling crate:

cargo run --example nexmark_q1 -p volley-benchmark --release
cargo run --example nexmark_q5 -p volley-benchmark --release
cargo run --example nexmark_q8 -p volley-benchmark --release

See volley-benchmark/README.md for methodology and baselines.

Walkthrough

See Your first pipeline for a step-by-step of in_memory_pipeline.

Your first pipeline

This is the shortest useful Volley pipeline: read records from memory, drop the cheap ones, and collect the survivors. It mirrors the in_memory_pipeline example in the repo, so you can run it end-to-end without setting up Kafka or cloud storage.

What it does

  1. Builds five sample trades as Arrow RecordBatches.
  2. Filters to price > 200 using a vectorised DataFusion expression.
  3. Collects the output via a MemorySink.

Key concepts

Create the environment

let env = StreamExecutionEnvironment::new();

All pipeline construction starts from an environment. The same environment can own multiple branches and sinks — see the fanout_multisink example for multi-sink DAGs.

Attach a source

env.from_source(MemorySource::new(records))

from_source registers the source node and returns a DataStream<NoSink>. For production, use from_kafka, from_kafka_exactly_once, or one of the blob-store source builders.

Apply operators

Expression operators run through DataFusion’s vectorised kernels and are the default choice:

.filter_expr(col("price").gt(lit(200.0)))
.select_expr(vec![col("symbol"), col("price")])

Closure operators (filter, map, flat_map) remain available for logic that can’t be expressed with columnar expressions — for example, inspecting StreamRecord metadata like event_time_ms or attaching custom per-record state. Prefer the expression form when you can.

Key, window, aggregate

.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(300)))
.aggregate_expr(
    vec![sum(col("amount"))],
    RocksDbBackend::open(tmp.path().join("state")).unwrap(),
)

key_byKeyedStream (hash-partitioned by key). windowWindowedKeyedStream. aggregate_expr consumes the windowed stream and emits one record per window per key. The second argument is a BatchStateBackendRocksDbBackend implements it directly, so you don’t need the old KeyedStateBackend wrapper.

Attach a sink, execute, read the output

let sink = MemorySink::new();
let output = sink.handle();               // shared handle survives the move

let report = stream.to_sink(sink).execute("my-job").await?;

println!("committed epochs: {}", report.epochs_committed);
for r in output.lock().unwrap().iter() { /* ... */ }

execute returns ExecutionReport { records_written, epochs_committed }, not the output records. The sink’s handle() returns an Arc<Mutex<Vec<StreamRecord>>> that points at the sink’s internal buffer — lock it after execution to inspect the rows.

Run it

cargo run --example in_memory_pipeline -p volley-examples

Expected output (abridged):

=== Volley in-memory DAG pipeline ===

Input: 5 trades
Execution report: 0 records written, 1 epochs committed

Filtered output (3 records):
  1. GOOG @ $280 x 50
  2. MSFT @ $310 x 75
  3. GOOG @ $290 x 150

Next Steps

Functions Reference

Volley provides built-in vectorized functions that work with the expression API (select_expr, filter_expr). These functions operate on Apache Arrow arrays and are exposed as DataFusion UDFs.

Functions are imported from volley_core::functions or via the prelude:

#![allow(unused)]
fn main() {
use volley_core::prelude::*; // imports simhash, hamming_similarity
// or
use volley_core::functions::hash::{simhash, hamming_similarity};
}

Hash Functions

Functions for computing and comparing hash fingerprints.

FunctionSignatureDescription
simhashsimhash(Utf8 / LargeUtf8) → UInt6464-bit SimHash text fingerprint
hamming_similarityhamming_similarity(UInt64, UInt64) → Float64Bitwise similarity between two hashes

simhash

Computes a 64-bit SimHash fingerprint for text similarity. Similar texts produce fingerprints with a small hamming distance, approximating cosine similarity of the original term-frequency vectors.

Signature: simhash(text: Utf8 | LargeUtf8) → UInt64

Algorithm: Tokenizes text into character trigrams, hashes each trigram with a 64-bit hash, accumulates a weighted bit vector, and thresholds to produce the final fingerprint.

Null handling: Null input produces null output.

Example:

#![allow(unused)]
fn main() {
use volley_core::prelude::*;
use datafusion_expr::col;

// As an expression in a pipeline
stream
    .select_expr(vec![
        col("id"),
        simhash(col("text")).alias("text_hash"),
    ])

// As a direct Arrow kernel
use volley_core::functions::hash::simhash_kernel;
use arrow::array::StringArray;

let input = StringArray::from(vec!["hello world"]);
let hashes = simhash_kernel(&input).unwrap();
}

hamming_similarity

Computes the bitwise similarity between two UInt64 values, returning a Float64 between 0.0 (all bits differ) and 1.0 (identical).

Signature: hamming_similarity(a: UInt64, b: UInt64) → Float64

Formula: 1.0 - popcount(a XOR b) / 64.0

For a 64-bit hash, a similarity threshold of 0.95 means at most 3 bits differ.

Null handling: If either input is null, the output is null.

Example:

#![allow(unused)]
fn main() {
use volley_core::prelude::*;
use datafusion_expr::{col, lit};

// Filter out records where text hasn't changed enough
stream
    .filter_expr(
        hamming_similarity(
            simhash(col("current_text")),
            simhash(col("previous_text")),
        ).lt(lit(0.95))
    )

// Direct Arrow kernel
use volley_core::functions::hash::hamming_similarity_kernel;
use arrow::array::UInt64Array;

let a = UInt64Array::from(vec![0xDEADBEEFu64]);
let b = UInt64Array::from(vec![0xDEADBEFFu64]);
let similarity = hamming_similarity_kernel(&a, &b).unwrap();
}

Proto Functions

Functions for decoding binary protobuf data into Arrow columnar format.

FunctionSignatureDescription
decode_protodecode_proto(Binary / LargeBinary, &ProtoDescriptor) → StructDecode serialized protobuf messages into a struct column

decode_proto

Decodes a binary column containing serialized protobuf messages into an Arrow Struct column with all fields from the message definition.

Signature: decode_proto(data: Binary | LargeBinary, descriptor: &ProtoDescriptor) → Struct

The ProtoDescriptor is constructed once from a serialized FileDescriptorSet and a fully-qualified message name. It caches the Arrow schema derived from the protobuf definition.

Null handling: Null input rows and decode failures produce null struct rows. Decode failures emit a tracing::warn! but do not stop the pipeline.

Requires: protobuf feature flag on volley-core.

Example:

#![allow(unused)]
fn main() {
use volley_core::functions::proto::{decode_proto, ProtoDescriptor};
use datafusion_expr::col;

let descr = ProtoDescriptor::new(DESCRIPTOR_BYTES, "crawl.CrawlRecord").unwrap();

stream
    .select_expr(vec![
        decode_proto(col("data"), &descr).alias("record"),
    ])
    .select_expr(vec![
        col("record.url"),
        col("record.content"),
    ])
}

URL Functions

Functions for URL normalization and extraction.

FunctionSignatureDescription
normalize_urlnormalize_url(Utf8 / LargeUtf8) → Utf8Normalize a URL
normalize_url_with_paramsnormalize_url_with_params(Utf8 / LargeUtf8, Vec<String>) → Utf8Normalize with custom tracking params
extract_domainextract_domain(Utf8 / LargeUtf8) → Utf8Extract host/domain from URL

normalize_url

Normalizes a URL by applying: scheme/host lowercasing, default port removal (:80 for http, :443 for https), path normalization (/a/../b/b), percent-encoding normalization (decode unreserved chars), www. prefix removal, fragment removal, query parameter sorting, tracking parameter removal, and trailing slash removal.

Signature: normalize_url(url: Utf8 | LargeUtf8) → Utf8

Default tracking params removed: utm_source, utm_medium, utm_campaign, utm_term, utm_content, fbclid, gclid, mc_eid, msclkid.

Null handling: Null, empty, and invalid URL inputs produce null output.

Requires: url-functions feature flag on volley-core.

Example:

#![allow(unused)]
fn main() {
use volley_core::functions::url::normalize_url;
use datafusion_expr::col;

stream.select_expr(vec![normalize_url(col("url")).alias("url")])
}

normalize_url_with_params

Same as normalize_url but with a custom tracking params blocklist that fully replaces the defaults.

Signature: normalize_url_with_params(url: Utf8 | LargeUtf8, remove_params: Vec<String>) → Utf8

Example:

#![allow(unused)]
fn main() {
use volley_core::functions::url::normalize_url_with_params;
use datafusion_expr::col;

let custom_params = vec!["ref".to_string(), "src".to_string()];
stream.select_expr(vec![
    normalize_url_with_params(col("url"), custom_params).alias("url"),
])
}

extract_domain

Extracts the full host/domain from a URL string, including subdomains.

Signature: extract_domain(url: Utf8 | LargeUtf8) → Utf8

Null handling: Null, empty, and invalid URL inputs produce null output.

Requires: url-functions feature flag on volley-core.

Example:

#![allow(unused)]
fn main() {
use volley_core::functions::url::extract_domain;
use datafusion_expr::col;

// "https://api.v2.example.com/endpoint" → "api.v2.example.com"
stream.select_expr(vec![extract_domain(col("url")).alias("domain")])

// Direct Arrow kernel
use volley_core::functions::url::extract_domain_kernel;
use arrow::array::StringArray;

let input = StringArray::from(vec!["http://sub.example.com/path"]);
let domains = extract_domain_kernel(&input).unwrap();
}

Contributing

Contributions to Volley are welcome! This page summarizes the development workflow. See the full Contributing Guide for complete details.

Development Setup

git clone https://github.com/volley-streams/volley.git
cd volley
cargo build --workspace
cargo test --workspace --exclude volley-python

System Dependencies

DependencyRequired bymacOSUbuntu
cmakevolley-connector-kafkabrew install cmakeapt install cmake
libcurl4-openssl-devvolley-connector-kafka(included)apt install libcurl4-openssl-dev
protobuf-compilervolley-schedulerbrew install protobufapt install protobuf-compiler

Run volley doctor to check your environment.

Code Style

  • Formatting: cargo fmt --all
  • Linting: cargo clippy --workspace -- -D warnings
  • Tests: cargo test --workspace --exclude volley-python

All warnings are errors in CI (RUSTFLAGS: -Dwarnings).

Submitting Changes

  1. Fork the repository and create a feature branch
  2. Make your changes with clear, focused commits
  3. Ensure cargo fmt, cargo clippy, and cargo test all pass
  4. Open a pull request against main

Project Structure

See Crate Map for a per-crate breakdown of the 17-crate workspace.

Release Process

Volley follows semantic versioning (SemVer) with a staged release process.

Version Format

All crates in the workspace share a single version defined in the root Cargo.toml under [workspace.package]. Versions use 3-segment semver with optional pre-release identifiers:

  • X.Y.Z-alpha.N — Alpha: feature-complete for the release scope. API may change. Not for production.
  • X.Y.Z-beta.N — Beta: API frozen. Bug fixes only. Community testing encouraged.
  • X.Y.Z — Stable: production-ready. Semver guarantees apply.

How to Release

  1. Bump the version in Cargo.toml [workspace.package] section.
  2. Update CHANGELOG.md: move items from [Unreleased] to a new version section with today’s date.
  3. Create a PR and merge to main.
  4. The auto-tag.yml workflow detects the version change and creates a git tag (v{VERSION}).
  5. The tag triggers release.yml, which creates a GitHub Release with an auto-generated changelog.

Releases with -alpha, -beta, or -rc in the version are automatically marked as pre-releases.

Progression

1.0.0-alpha.1 → 1.0.0-alpha.2 → ... → 1.0.0-beta.1 → 1.0.0-beta.2 → ... → 1.0.0
  • Alpha → Alpha: bump the alpha number for each iteration.
  • Alpha → Beta: when the API is frozen and only bug fixes remain.
  • Beta → Stable: when all known issues are resolved and testing is complete.

Installing

Install the CLI from source:

cargo install --git https://github.com/volley-streams/volley volley-cli --tag v1.0.0-alpha.1

Or from a local checkout:

cargo install --path volley-cli

Crate Publishing

Cargo crate publishing to crates.io is not yet automated. This is planned for a future release.

Crate Map

Volley is a 15-crate Cargo workspace. See the full Crate Map for detailed per-crate documentation.

Status: stable = production-ready | experimental = API may change | placeholder = stub/future work

Core

CrateDescriptionKey TypesStatus
volley-coreStream processing engine with built-in observability and optional RocksDB stateStreamExecutionEnvironment, DataStream, KeyedStream, Operator, StreamRecord, ObservabilityConfig, RocksDbBackendstable
volley-deriveProcedural macrosimpl_to_arrow!stable

Connectors

CrateDescriptionKey TypesStatus
volley-connectorsUmbrella crate (memory + feature-gated re-exports)MemorySource, MemorySinkstable
volley-connector-kafkaKafka source + sink (exactly-once)KafkaSource, KafkaSink, KafkaSourceConfigstable
volley-connector-blob-storeCloud blob store abstraction + decodersNotificationSource, BlobReader, BlobWriterstable
volley-connector-aws-s3AWS S3 + SQS sourceS3Source, S3BlobReader, SqsNotificationSourcestable
volley-connector-azure-blobAzure Blob + Queue sourceAzureBlobSource, AzureBlobReaderstable
volley-connector-gcp-gcsGCS blob sink (Pub/Sub source removed in v1.1.0)GcsBlobWriterstable
volley-connector-deltaDelta Lake sink (exactly-once)DeltaSink, DeltaSinkConfigstable
volley-connector-icebergApache Iceberg sink (REST catalog)IcebergSink, IcebergSinkConfigstable

Infrastructure

CrateDescriptionStatus
volley-k8s-operatorKubernetes operator for VolleyApplication CRDstable
volley-cliCLI tool (volley new, volley doctor)stable
volley-schedulerDistributed execution (Ballista integration)experimental
volley-pythonPyO3 Python bindingsplaceholder

Other

CrateDescriptionStatus
volley-examplesRunnable examples and benchmarksstable