Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Introduction

Volley is a high-performance stream processing framework built in Rust on Apache Arrow. It provides a Flink-inspired DataStream API with exactly-once semantics, achieving 9M+ records/sec sustained on a single node.

Volley is designed for both human developers and AI coding agents. The API is type-safe at compile time, the documentation is structured for machine readability, and the examples are runnable out of the box.

Key Features

  • High Performance — 9M+ rec/sec sustained with multi-row Arrow batching and write-behind state caching
  • Exactly-Once Semantics — Checkpoint-based fault tolerance with aligned barrier snapshotting
  • Flink-like API — Fluent DataStream API with typestate-enforced compile-time safety
  • Expression Engine — Vectorized filter_expr(), select_expr(), aggregate_expr() powered by DataFusion (without SQL). Multi-type aggregation on native Arrow types (i32, i64, f64, etc.)
  • Event-Time Windowing — Tumbling, sliding, and session windows with watermark tracking
  • State Management — RocksDB backend with write-behind cache and hardlink checkpoints
  • Intra-Node Parallelism — Hash-partitioned parallel operator instances
  • Kafka Integration — Source and sink with exactly-once transactional semantics
  • Cloud Blob Sources — S3+SQS, Azure Blob+Queue, GCS+Pub/Sub with Parquet/JSON/CSV/Avro
  • Table Format Sinks — Delta Lake and Apache Iceberg with exactly-once commits
  • Kubernetes Operator — VolleyApplication CRD with autoscaling, health checks, and Prometheus

Built On

ComponentPurpose
Apache ArrowIn-memory columnar format
DataFusionQuery engine and expression evaluation
RocksDBState management and checkpointing
TokioAsync runtime for task-per-stage execution

Architecture

+-----------------------------------------------------+
|  Stream Processing Layer (Rust)                     |
|  +- Fluent Builder API (DataStream, KeyedStream)    |
|  +- Multi-Row RecordBatch Operators                 |
|  +- Watermarks & Event Time                         |
|  +- Windowing (Tumbling, Sliding, Session)          |
|  +- Intra-Node Parallelism (hash-partitioned)       |
|  +- Checkpoint Coordinator                          |
+-----------------------+-----------------------------+
                        |
+-----------------------v-----------------------------+
|  Async Runtime (Tokio)                              |
|  +- Task-per-Stage Execution                        |
|  +- Bounded Channels (Backpressure)                 |
|  +- Graceful Shutdown                               |
+-----------------------+-----------------------------+
                        |
+-----------------------v-----------------------------+
|  State Management (RocksDB)                         |
|  +- Write-Behind Accumulator Cache                  |
|  +- BatchStateBackend (multi_get / WriteBatch)      |
|  +- Per-Key State Isolation                         |
|  +- Hardlink Checkpoints                            |
+-----------------------+-----------------------------+
                        |
+-----------------------v-----------------------------+
|  Connectors                                         |
|  +- Kafka (exactly-once, feature-gated)             |
|  +- Cloud Blob Stores (S3, Azure, GCS)              |
|  +- Delta Lake, Iceberg (exactly-once sinks)        |
|  +- Memory (batched source, counting sink)          |
+-----------------------------------------------------+

Performance

Benchmarked on a single node (12 CPUs, Apple Silicon):

ConfigurationThroughput
Single-row, parallelism=1850K rec/s
Batch=1000, parallelism=48.7M rec/s
Sustained 10s (batch=1000, parallelism=4)9.4M rec/s

Connectors

Sources

ConnectorFormatsStatus
KafkaJSONAvailable (feature-gated)
AWS S3 + SQSParquet, JSON, CSV, AvroAvailable
Azure Blob + QueueParquet, JSON, CSV, AvroAvailable
GCS + Pub/SubParquet, JSON, CSV, AvroAvailable
Memory/IteratorArrow RecordBatchAvailable

Sinks

ConnectorFormatsStatus
KafkaJSONAvailable (feature-gated, transactional)
Delta LakeParquetAvailable (exactly-once)
Apache IcebergParquetAvailable (REST catalog)
Memory (collect)Arrow RecordBatchAvailable

Next Steps

Installation

Rust Toolchain

Volley requires a stable Rust toolchain. Install via rustup:

curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
rustup default stable

System Dependencies

Most crates build with a standard Rust toolchain. Some connectors need additional system packages:

DependencyRequired bymacOSUbuntu
cmakevolley-connector-kafka (rdkafka)brew install cmakeapt install cmake
protobuf-compilervolley-schedulerbrew install protobufapt install protobuf-compiler

Note: libcurl and OpenSSL are vendored and statically linked by rdkafka — no system packages needed.

Run volley doctor to check your environment for missing dependencies.

Adding Volley to a Project

Scaffold a new pipeline

volley new --template kafka-to-delta my-pipeline
cd my-pipeline
volley doctor
cargo build && cargo run

Available templates: kafka-to-delta, in-memory, s3-to-delta.

Add to an existing project

Add Volley crates to your Cargo.toml:

[dependencies]
volley-core = { git = "https://github.com/volley-streams/volley", features = ["state-rocksdb"] }
volley-connector-kafka = { git = "https://github.com/volley-streams/volley" }

Note: Volley currently uses git dependencies. crates.io publishing is planned — see the roadmap.

Feature Flags

The volley-connectors crate uses feature flags to control which connectors are compiled:

FeatureConnectors Included
state-rocksdbRocksDB state backend (default, ~1.5 GB C++ build)
blob-storeCloud blob store abstraction
blob-store-awsAWS S3 + SQS source
blob-store-azureAzure Blob + Queue source
blob-store-gcsGCS + Pub/Sub source
file-formatsParquet, JSON, CSV, Avro decoders
table-formatsDelta Lake + Iceberg sinks

Tip: If you don’t need the RocksDB state backend (e.g. you’re only developing a Kafka connector), use default-features = false and enable only the features you need to save ~1.5 GB in build artifacts.

Troubleshooting

cmake not found

error: failed to run custom build command for `rdkafka-sys`
--- stderr
CMake not found

Install cmake: brew install cmake (macOS) or apt install cmake (Ubuntu).

RocksDB compilation slow or failing

RocksDB is a C++ dependency that takes ~3–5 minutes to compile on first build. If you don’t need state management (e.g., stateless transformations only), disable it:

volley-core = { git = "...", default-features = false }

protobuf-compiler not found

Only needed if building volley-scheduler. Install: brew install protobuf (macOS) or apt install protobuf-compiler (Ubuntu).

volley doctor reports issues

volley doctor checks for system dependencies, Docker availability, and toolchain versions. Address each reported issue individually. Common fixes:

  • Docker not running: Start Docker Desktop or the Docker daemon
  • Rust toolchain outdated: rustup update stable
  • Missing system packages: See the System Dependencies table above

Verify Your Setup

# Build the workspace
cargo build --workspace

# Run tests
cargo test --workspace --exclude volley-python

# Run a quick example
cargo run --example in_memory_pipeline -p volley-examples

Quick Start

Build your first Volley pipeline in 5 minutes.

Basic In-Memory Pipeline

This pipeline reads events from an iterator, filters them, groups by key, windows into 5-minute tumbling windows, and computes a sum aggregation:

use volley_core::prelude::*;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    let results = StreamExecutionEnvironment::new()
        .from_iter(events)
        .filter_expr(col("amount").gt(lit(100)))
        .key_by(col("user_id"))
        .window(TumblingWindows::of(Duration::from_secs(300)))
        .aggregate(AggregationType::Sum, "amount")
        .collect()
        .execute("aggregation-job")
        .await?;

    Ok(())
}

Run it:

cargo run --example in_memory_pipeline -p volley-examples

Expected output (records grouped by key and windowed):

Pipeline 'aggregation-job' started
  Source: MemorySource (4 records)
  Operators: filter_expr → key_by → window → aggregate
  Sink: MemorySink
Processing complete: 4 records in, 2 aggregated results out

Kafka Source to Kafka Sink

A real-world pipeline reading from Kafka, aggregating, and writing back to Kafka with exactly-once semantics:

use volley_core::prelude::*;
use volley_connector_kafka::{KafkaEnvExt, KafkaStreamExt, KafkaSourceConfig, KafkaSinkConfig};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    let source_config = KafkaSourceConfig::new("localhost:9092", "events", "my-group");
    let sink_config = KafkaSinkConfig::new("localhost:9092", "output");

    StreamExecutionEnvironment::new()
        .from_kafka(source_config).await?
        .filter_expr(col("amount").gt(lit(0)))
        .key_by(col("user_id"))
        .window(TumblingWindows::of(Duration::from_secs(60)))
        .aggregate(AggregationType::Sum, "amount")
        .to_kafka(sink_config).await?
        .execute("kafka-aggregation-job")
        .await?;

    Ok(())
}

Note: Kafka connectors require cmake installed on your system. See Installation for details.

What’s Happening

  1. StreamExecutionEnvironment::new() — Creates the pipeline builder
  2. .from_kafka() / .from_iter() — Attaches a source, producing a DataStream
  3. .filter_expr() / .map() — Applies operators to the stream
  4. .key_by() — Partitions the stream by key, producing a KeyedStream
  5. .window() — Assigns records to time windows, producing a WindowedKeyedStream
  6. .aggregate() — Computes an aggregation per window per key
  7. .to_kafka() / .collect() — Attaches a sink
  8. .execute() — Starts the pipeline as concurrent Tokio tasks

The type system enforces valid construction at compile time. You can only call .aggregate() on a WindowedKeyedStream, not on a raw DataStream.

Next Steps

Architecture Overview

Volley is a dataflow engine. You declare a pipeline as a graph of stages (source -> operators -> sink), and Volley executes it as concurrent Tokio tasks connected by bounded channels.

Pipeline Builder (Compile-Time)

StreamExecutionEnvironment::new()
    .from_source(...)
    .filter(...)
    .key_by(...)
    .window(...)
    .aggregate(...)
    .to_sink(...)
    .execute("job-name")

Runtime Execution (Tokio Tasks)

At runtime, each stage runs as an independent Tokio task. Stages communicate via bounded tokio::sync::mpsc channels. Backpressure propagates naturally: when a downstream channel is full, the upstream task blocks on send.

+--------+    channel    +----------+    channel    +------+
| Source |  --------->   | Operator |  --------->   | Sink |
| Task   |  RecordBatch  | Task     |  RecordBatch  | Task |
+--------+              +----------+               +------+
     |                       |                         |
     | CheckpointBarrier     | on_checkpoint()         | on_checkpoint()
     | (in-band)             | flush state             | commit writes
     v                       v                         v
+---------------------------------------------------------+
|              Checkpoint Coordinator                      |
|  Injects barriers -> collects acks -> snapshots state   |
+---------------------------------------------------------+

Checkpoint barriers and watermarks flow in-band alongside data records through the same channels. This is how Volley achieves aligned checkpoints without pausing the pipeline.

Intra-Node Parallelism

After .key_by(), records are hash-partitioned across N parallel operator instances. Each instance has its own state namespace. Checkpoint barriers are aligned across all partitions before snapshot.

                    +-- Partition 0 --> [Operator Instance 0] --+
Source --> HashPartitioner -- Partition 1 --> [Operator Instance 1] --> Merge --> Sink
                    +-- Partition 2 --> [Operator Instance 2] --+

Parallelism is set via .with_parallelism(N) on the stream builder.

Distributed Execution (Horizontal Scaling)

For workloads that exceed a single node, Volley supports distributed execution across multiple K8s pods. Enable it by passing a ClusterConfig:

#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
    .with_cluster(ClusterConfig::new(worker_id, num_workers, headless_service, app_name))
    .from_source(source)
    // ... same pipeline API ...
    .execute("job").await?;
}

In distributed mode, keyed operators use a two-level hashing scheme:

  1. Key group: hash(key) % max_key_groups (256 virtual partitions by default)
  2. Worker assignment: contiguous key group ranges assigned to each worker

Records destined for remote workers are shuffled via Arrow Flight DoExchange. Non-keyed operators (filter, map) run locally with no shuffle.

Worker 0                                    Worker 1
┌──────────────────────┐                    ┌──────────────────────┐
│ Source               │                    │ Source               │
│   ↓                  │                    │   ↓                  │
│ ShuffleRouter ──────►│── Arrow Flight ──►│◄── ShuffleRouter     │
│   ↓                  │◄── Arrow Flight ──│                      │
│ BarrierAligner       │                    │ BarrierAligner       │
│   ↓                  │                    │   ↓                  │
│ Sink                 │                    │ Sink                 │
└──────────────────────┘                    └──────────────────────┘

K8s primitives handle coordination:

  • Leader election: K8s Lease API — one worker becomes coordinator
  • Checkpoint metadata: stored in K8s ConfigMaps (no OwnerReference, survives redeployment)
  • Checkpoint data: shared filesystem (EFS/Filestore/Azure Files) via ReadWriteMany PVC
  • Worker discovery: StatefulSet headless service DNS
  • Live rescaling: checkpoint + reassignment, no state migration needed (shared FS)

Single-node mode remains the default with zero overhead when ClusterConfig is absent.

Graceful Shutdown

Source stops producing, drains in-flight data through the pipeline, then each stage shuts down in order.

Source Task --> channel(capacity) --> Operator Task --> channel(capacity) --> Sink Task
                    ^                                        ^
                    |                                        |
              backpressure                            backpressure
              (bounded send)                          (bounded send)

Deep Dives

Data Model

All data in Volley flows as Apache Arrow RecordBatch — a columnar, zero-copy, language-interoperable format. This means operators process batches of rows rather than individual records, enabling vectorized computation.

StreamRecord

The StreamRecord wrapper adds metadata to each batch:

StreamRecord {
    batch: RecordBatch,                       // columnar Arrow data (1 or more rows)
    event_time_ms: Option<i64>,               // event timestamp (for windowing)
    trace: Option<RecordTraceContext>,         // per-record trace context (for distributed tracing)
}
  • batch — the actual payload, a columnar Arrow RecordBatch
  • event_time_ms — event timestamp used by window assignment and watermark tracking
  • trace — optional trace context for per-record distributed tracing. Set by the runtime’s sampler or extracted from source message headers (e.g., Kafka traceparent). When present, the runtime creates OpenTelemetry child spans at each pipeline stage.

StreamElement

Records share channels with control signals. The StreamElement enum wraps all possible messages:

StreamElement = Data(StreamRecord)
              | Barrier(CheckpointBarrier)
              | Watermark(Watermark)
  • Data — normal data records flowing through the pipeline
  • Barrier — checkpoint coordination signals injected by the Checkpoint Coordinator
  • Watermark — event-time progress markers for window triggering

Barriers and watermarks flow in-band alongside data records through the same bounded channels. This is what enables aligned checkpoints without pausing the pipeline.

Why Arrow?

  • Columnar format — efficient for analytical operations (aggregations, filters)
  • Zero-copy — data passes between pipeline stages without serialization
  • Multi-row batching — operators process many rows at once, amortizing per-record overhead
  • Ecosystem interop — Arrow is the standard for DataFusion, Parquet, Delta Lake, Iceberg, and most analytics tooling

Pipeline Builder

Volley uses Rust’s type system to enforce valid pipeline construction at compile time. Invalid pipelines (like calling .aggregate() on an unkeyed stream) are caught by the compiler, not at runtime.

Type Progression

StreamExecutionEnvironment  -->  DataStream  -->  KeyedStream  -->  WindowedKeyedStream
        |                           |                |                      |
    from_source()              filter()          key_by()             window()
    from_iter()                map()                                  aggregate_expr()
    from_kafka()               flat_map()
                               filter_expr()
                               select_expr()
                               apply_operator()

Each builder method returns a different type:

TypeMeaningAvailable Operations
DataStreamUnkeyed streamfilter, filter_expr, map, flat_map, select_expr, apply_operator, key_by, to_sink, collect, with_tracing, with_observability
KeyedStreamPartitioned by keywindow, aggregate_expr, to_sink, collect, with_tracing, with_observability
WindowedKeyedStreamKeyed + windowedaggregate_expr

You can call .aggregate_expr() on both KeyedStream (global aggregation) and WindowedKeyedStream (windowed aggregation). Closure-based operators (filter, map, flat_map) remain alongside expression-based operators.

Example

#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
    // Returns DataStream
    .from_iter(events)
    // Still DataStream — vectorized filter
    .filter_expr(col("amount").gt(lit(100)))
    // Returns KeyedStream
    .key_by(col("user_id"))
    // Returns WindowedKeyedStream
    .window(TumblingWindows::of(Duration::from_secs(60)))
    // Consumes WindowedKeyedStream, returns DataStream
    .aggregate_expr(sum(col("amount")), state_backend)
    // Attach sink
    .collect()
    // Execute the pipeline
    .execute("my-job")
    .await?;
}

Parallelism

Set the parallelism level on the environment. After .key_by(), records are hash-partitioned across N parallel operator instances:

#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
    .set_parallelism(4)
    .from_source(...)
    // ...
}

Each parallel instance has its own state namespace. Checkpoint barriers are aligned across all partitions.

Exactly-Once Semantics

Volley uses checkpoint barriers for exactly-once processing. The protocol ensures that every record is processed exactly once, even in the presence of failures.

Checkpoint Barrier Protocol

1. Coordinator injects CheckpointBarrier(epoch=N) into source
2. Barrier flows through pipeline in-band with data
3. Each operator receives barrier:
   a. Calls on_checkpoint() hook (flush caches, prepare state)
   b. Forwards barrier downstream
   c. State backend snapshots via RocksDB hardlink
4. Sink receives barrier:
   a. Commits buffered writes
   b. Acknowledges barrier to coordinator
5. Coordinator marks epoch N complete when all sinks ack

The key insight: barriers flow in-band alongside data records. No global pause is needed. Each operator processes the barrier when it arrives, then continues processing data.

Per-Sink Commit Strategies

Each sink type uses a different mechanism to achieve exactly-once:

SinkCommit StrategyRecovery
KafkaTransaction commit on barrier (+ send_offsets_to_transaction for Kafka↔Kafka EOS)Abort + replay on recovery
Delta LakeEpoch tag in commit metadataSkip already-committed epochs
IcebergSnapshot commit via REST catalogEpoch tracking in catalog

Kafka↔Kafka End-to-End Exactly-Once

When both source and sink are Kafka, Volley supports the full Flink-style EOS protocol via librdkafka’s send_offsets_to_transaction:

#![allow(unused)]
fn main() {
let (stream, cgm) = StreamExecutionEnvironment::new()
    .from_kafka_exactly_once(source_config).await?;

stream
    .filter_expr(col("status").eq(lit("active")))
    .to_kafka_exactly_once(sink_config, cgm).await?
    .execute("eos-pipeline").await?;
}

How It Works

  1. Setup: from_kafka_exactly_once() returns a (DataStream, ConsumerGroupMetadata) tuple. The ConsumerGroupMetadata (CGM) is a handle to the Kafka consumer group that the source belongs to. Pass it to to_kafka_exactly_once() so the sink can commit consumer offsets atomically with produced records.

  2. Barrier injection: When the checkpoint coordinator injects a barrier, the source records its current Kafka partition offsets in barrier.source_offsets.

  3. Barrier alignment: If the pipeline has parallel branches, the BarrierAligner merges source offsets from all branches before forwarding the barrier to the sink. (The BarrierAligner is an internal component that ensures all parallel inputs to an operator have delivered the same barrier epoch before proceeding.)

  4. Atomic commit: When the sink receives the barrier, it:

    • Calls producer.send_offsets_to_transaction(offsets, cgm) — this stages the consumer group offset advance inside the producer’s open transaction
    • Calls producer.commit_transaction() — this atomically commits both the produced output records and the consumer offset advance
  5. No auto-commit: In EOS mode, the sink disables batch-size and deadline-driven auto-commits. The only commit trigger is a checkpoint barrier. This means CheckpointConfig::interval must be shorter than the broker’s transaction.timeout.ms (default 60s), or the broker will fence the producer.

  6. Recovery: On restart, the consumer group’s last committed offset (written atomically with the transaction) determines where to resume. The runtime skips file-based offset restoration for transactional sources.

Recovery

When a failure occurs, records may be in various stages of processing:

Checkpoint N          In-flight records           Failure
     |                                                |
     v                                                v
  [committed] ---[r1]---[r2]---[r3]---[r4]---[r5]--- X
              ^                                       ^
         known good                            these are lost

The RecoveryCoordinator restores to the last known-good state:

  1. Find last complete checkpoint — the most recent epoch where all sinks acknowledged their commit
  2. Restore source offsets — rewind sources to the offsets recorded at that checkpoint (e.g., Kafka partition offsets, S3 file positions)
  3. Restore operator state — load RocksDB hardlink snapshots from the checkpoint directory, restoring window state, aggregation accumulators, and any custom operator state
  4. Resume processing — records between the checkpoint and the failure (r1–r5 above) are replayed from the source

Replayed records don’t produce duplicates because each sink type has an idempotent commit strategy:

  • Kafka: The incomplete transaction from the failed run is aborted. Replayed records are written in a new transaction. Consumers with isolation.level=read_committed never see the aborted records.
  • Delta Lake: The sink checks the Delta log for the current epoch’s pipeline_id. If an epoch was already committed, the write is skipped entirely.
  • Iceberg: The sink reads the latest snapshot’s epoch from catalog metadata. Already-committed epochs are skipped.

Operator Lifecycle

The on_checkpoint() hook gives operators a chance to prepare for the snapshot:

#![allow(unused)]
fn main() {
fn on_checkpoint(&mut self, epoch: u64) {
    // Flush any buffered state to RocksDB
    // The state backend will snapshot after this returns
}
}

For stateful operators using the write-behind cache, on_checkpoint() flushes dirty keys from the in-memory HashMap to RocksDB via BatchStateBackend::write_batch().

State Management

Volley’s state management is designed for high throughput: most state operations hit an in-memory cache, with RocksDB used only during checkpoints.

Architecture

+----------------------------------+
|        AggregateOperator         |
|  +----------------------------+  |
|  | Write-Behind Cache (HashMap)| |
|  | key -> Accumulator          | |
|  +----------------------------+  |
|         | on_checkpoint()        |
|         v                        |
|  +----------------------------+  |
|  | BatchStateBackend (RocksDB)| |
|  | WriteBatch for dirty keys   | |
|  | multi_get for bulk reads    | |
|  +----------------------------+  |
|         | hardlink snapshot      |
|         v                        |
|  +----------------------------+  |
|  | Checkpoint Directory        | |
|  | /checkpoints/epoch-N/       | |
|  +----------------------------+  |
+----------------------------------+

Write-Behind Cache

During normal processing, state reads and writes go to an in-memory HashMap cache. The cache holds accumulator values keyed by partition key. This eliminates all RocksDB I/O during processing, which is why Volley achieves 9M+ records/sec.

BatchStateBackend

When a checkpoint barrier arrives:

  1. on_checkpoint() is called on the operator
  2. Dirty keys from the cache are flushed to RocksDB via WriteBatch (a single atomic write)
  3. RocksDB creates a hardlink snapshot to the checkpoint directory
  4. The snapshot is nearly instant because hardlinks don’t copy data

Key operations:

  • write_batch() — Atomic batch write of dirty keys
  • multi_get() — Bulk read for cache misses (rare during steady-state)
  • Hardlink checkpoints — O(1) snapshot via filesystem hardlinks to SST files

Per-Key State Isolation

After .key_by(), each parallel operator instance has its own state namespace. Keys are hash-partitioned across instances, so state never conflicts between parallel operators.

Checkpoint Storage

/checkpoints/
  epoch-1/    # hardlink snapshot
  epoch-2/    # hardlink snapshot
  epoch-3/    # latest checkpoint

RocksDB checkpoints use hardlinks to SST files. Incremental growth is small when data doesn’t change between checkpoints. As data mutates, old SST files are retained for older checkpoints.

See the Capacity Planning guide for storage sizing recommendations.

Window State LRU

Window operators (WindowOperator, ExprWindowOperator) maintain an in-memory index called pending_windows that tracks active (key, window) pairs. Unlike the aggregate cache (which is a hot copy of data also in RocksDB), this index is the only record of which windows exist and need to fire on watermark advance.

To bound memory without data loss, the window LRU uses an evict-to-backend strategy:

+----------------------------------+
|        WindowOperator            |
|  +----------------------------+  |
|  | pending_windows (LRU)      |  |
|  | key:window_start -> (key,  |  |
|  |                    window)  |  |
|  +----------------------------+  |
|         | on eviction            |
|         v                        |
|  +----------------------------+  |
|  | State Backend (RocksDB)    |  |
|  | pending_index/{pending_key}|  |
|  +----------------------------+  |
|         | on watermark advance   |
|         v                        |
|  Scan persisted entries for      |
|  windows ready to fire           |
+----------------------------------+
  1. When the LRU is full, evicted entries are persisted to a pending_index/ namespace in the state backend
  2. On watermark advance, persisted entries are scanned (only when evictions have occurred, tracked by a has_evicted flag)
  3. Fired windows are deleted from both the LRU and the state backend

This ensures no window is orphaned — every pending window eventually fires or is flushed at end-of-stream, regardless of whether it was evicted from the in-memory cache.

Configure via with_max_pending_windows(n) on WindowedKeyedStream. When unset, the index is unbounded (default, no behavior change from prior releases).

See the design document for full details on trade-offs and the evict-to-backend strategy.

Recovery

On pipeline restart from a checkpoint, the RecoveryCoordinator restores source offsets and opens the RocksDB backend from the checkpoint directory. Stateful operators then lazily recover their in-memory structures on the first process() call:

  • Aggregate operators (ExprAggregateOperator, AggregateOperator) use StateBackend::scan_prefix() (backed by RocksDB’s prefix_iterator) to enumerate all keys with persisted accumulators. The cache and pending_keys maps are rebuilt from the scan results.
  • Window operators (WindowOperator, ExprWindowOperator) persist a WindowOperatorMeta struct (containing pending_windows and current_watermark) to a well-known state key during on_checkpoint(). On recovery, this metadata is loaded to restore window tracking state and watermark progress.

Recovery is lazy rather than eager: fresh pipelines (no state in RocksDB) pay no cost, and recovery happens only once before the first record is processed.

Horizontal Scaling

Volley supports distributed execution across multiple K8s pods. Single-node mode remains the default with zero overhead. Horizontal scaling is opt-in via ClusterConfig.

Design Principles

  • Symmetric workers — every pod runs the same binary. No dedicated driver node.
  • K8s-native — uses Lease API for leader election, ConfigMaps for checkpoint metadata, StatefulSet DNS for discovery. No ZooKeeper.
  • Single-node is free — when ClusterConfig is absent, no distributed code paths execute.
  • Shared filesystem — checkpoint data on EFS/Filestore/Azure Files. No state migration needed for rescaling.

Two-Level Key Hashing

Records are partitioned across workers using a two-level scheme:

hash(key) → key_group → worker
  1. Key group: hash(key) % max_key_groups (default 256). This is a virtual partition that never changes.
  2. Worker assignment: contiguous ranges of key groups are assigned to workers. Rescaling reassigns ranges without rehashing.
256 key groups, 3 workers:

Worker 0: key groups [0, 86)    — 86 groups
Worker 1: key groups [86, 171)  — 85 groups
Worker 2: key groups [171, 256) — 85 groups

This design, borrowed from Apache Flink’s KeyGroupRangeAssignment, minimizes data movement during rescaling. Scaling from 2 to 3 workers moves roughly 1/3 of key groups.

Arrow Flight Shuffle

When a keyed operator runs in distributed mode, records are shuffled between workers via Apache Arrow Flight DoExchange:

Worker 0                                    Worker 1
┌──────────────────────┐                    ┌──────────────────────┐
│ Source               │                    │ Source               │
│   ↓                  │                    │   ↓                  │
│ ShuffleRouter        │   Arrow Flight     │ ShuffleRouter        │
│   ├─ local partition ─┼─────────────────→ │   ├─ local partition │
│   └─ remote → Flight ─┼─────────────────→ │   └─ remote → Flight│
│                      │ ←────────────────  │                      │
│ BarrierAligner       │                    │ BarrierAligner       │
│   (local + remote)   │                    │   (local + remote)   │
│   ↓                  │                    │   ↓                  │
│ Sink                 │                    │ Sink                 │
└──────────────────────┘                    └──────────────────────┘
  • ShuffleRouter replaces PartitionRouter for keyed operators. Routes via key_group → worker → local mpsc or Flight sender.
  • Non-keyed operators (filter, map, flat_map) run locally — no shuffle.
  • BarrierAligner merges both local partition outputs and remote Flight inputs.
  • Barriers and watermarks are broadcast to all local partitions AND all remote workers.

Wire Format

StreamElement variants are encoded as FlightData with a discriminator byte in app_metadata[0]:

ByteVariantdata_bodyapp_metadata[1..]
0x01DataArrow IPC RecordBatchJSON record metadata
0x02BarrieremptyJSON CheckpointBarrier
0x03WatermarkemptyJSON Watermark

K8s-Native Coordination

Leader Election

One worker acquires a K8s Lease (coordination.k8s.io/v1) and becomes the coordinator. The coordinator handles checkpoint injection and key group assignment. It still processes data normally — coordination is lightweight.

If the leader dies, the lease expires and another worker takes over.

Checkpoint Metadata

Completed checkpoint metadata is stored in K8s ConfigMaps:

  • {app}-ckpt-epoch-{N} — per-epoch metadata (workers, state paths, source offsets)
  • {app}-ckpt-latest — pointer to the latest completed epoch

ConfigMaps have no OwnerReference, so they survive CRD deletion. This allows recovery after redeployment.

Shared Filesystem Checkpoints

Checkpoint data is written to a shared filesystem (AWS EFS, GCP Filestore, Azure Files) mounted as a ReadWriteMany PVC. All workers write to and read from the same mount.

/mnt/volley/checkpoints/
  epoch-42/
    worker-0/     ← RocksDB SST files
    worker-1/
    worker-2/

Recovery is near-instant: the pod reads the checkpoint directory directly. No download step.

Worker Discovery

Workers discover each other via StatefulSet headless service DNS:

{app}-{ordinal}.{app}-headless.{namespace}.svc:{flight_port}

Live Rescaling

Since all workers share the same filesystem, rescaling doesn’t require state migration:

  1. Coordinator detects replica count change
  2. Triggers a checkpoint — all workers flush in-memory state to shared FS
  3. Computes new KeyGroupAssignment and diffs against the old one
  4. Broadcasts RescaleEvent via watch channel
  5. Workers stop processing lost key groups, load state for gained ones from the shared mount
  6. Resume — no restart, no data transfer

Key Types

TypeLocationPurpose
ClusterConfigcluster::configWorker ID, num workers, headless service, app name, flight port
KeyGroupAssignmentcluster::assignmentMaps key groups to workers with contiguous range assignment
KeyGroupRangecluster::assignmentA [start, end) range of key groups owned by one worker
ShuffleRoutercluster::shuffleRoutes records to local or remote destinations
ShuffleFlightServicecluster::flight::serverArrow Flight DoExchange server
ShuffleFlightClientcluster::flight::clientArrow Flight DoExchange client with connection pooling
LeaseLeaderElectorcluster::coordinator::leaderK8s Lease-based leader election
DistributedCheckpointCoordinatorcluster::coordinator::checkpoint_coordinatorConfigMap-based checkpoint metadata
RescaleCoordinatorcluster::rescaleCheckpoint + key group reassignment

Writing Operators

Operators transform data as it flows through the pipeline. Volley provides built-in operators and lets you write custom ones.

Built-In Operators

MethodDescription
.filter(fn)Keep records matching a predicate
.map(fn)Transform each record
.flat_map(fn)Transform each record into zero or more records
.apply_operator(op)Apply a custom Operator implementation
.key_by(field)Partition by a key field (transitions to KeyedStream)
.window(window_type)Assign records to time windows (on KeyedStream)
.aggregate(agg, field)Compute aggregation per window per key

Custom Operators

Implement the Operator trait from volley-core:

#![allow(unused)]
fn main() {
use volley_core::prelude::*;

struct MyOperator;

impl Operator for MyOperator {
    fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
        // Transform the record
        // Return zero or more output records
        vec![record]
    }

    fn on_checkpoint(&mut self, epoch: u64) {
        // Optional: flush any buffered state before checkpoint
    }
}
}

Apply it to a stream:

#![allow(unused)]
fn main() {
env.from_source(source)
    .apply_operator(MyOperator)
    .to_sink(sink)
    .execute("my-job")
    .await?;
}

Multi-Row RecordBatch Processing

Operators receive StreamRecord which wraps an Arrow RecordBatch. A batch may contain one or more rows. Your operator should handle multi-row batches correctly:

#![allow(unused)]
fn main() {
fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
    let batch = &record.data;
    let num_rows = batch.num_rows();
    // Process all rows, not just row 0
    // ...
}
}

Stateful Operators

For operators that maintain state across records, use the on_checkpoint() hook to flush state before a checkpoint snapshot:

#![allow(unused)]
fn main() {
struct CountingOperator {
    counts: HashMap<String, u64>,
}

impl Operator for CountingOperator {
    fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
        // Update in-memory state
        let key = record.key.clone().unwrap_or_default();
        *self.counts.entry(key).or_default() += 1;
        vec![record]
    }

    fn on_checkpoint(&mut self, epoch: u64) {
        // Flush state to durable storage here
        // The state backend will snapshot after this returns
    }
}
}

See State Management for details on the write-behind cache and RocksDB backend.

Type-State Safety

The Pipeline Builder enforces valid operator chains at compile time. You can only call .aggregate() on a WindowedKeyedStream, not on a plain DataStream.

Writing Connectors

Connectors are how Volley reads from and writes to external systems. Volley provides built-in connectors for Kafka, cloud blob stores, Delta Lake, and Iceberg. You can also write your own.

Source and Sink Traits

#![allow(unused)]
fn main() {
// Source: produces StreamRecords
trait Source: Send {
    async fn poll(&mut self) -> Option<StreamRecord>;
    fn connector_id(&self) -> &str;
}

// Sink: consumes StreamRecords
trait Sink: Send {
    async fn write(&mut self, record: StreamRecord) -> Result<()>;
    async fn flush(&mut self) -> Result<()>;
    fn on_checkpoint(&mut self, epoch: u64);
    fn connector_id(&self) -> &str;
}
}

Sources produce StreamRecord values via poll(). Sinks consume them via write() and flush(). Both support the checkpoint lifecycle via on_checkpoint().

Cloud Blob Store Pattern

All cloud blob store connectors follow a shared architecture:

Cloud Queue/Topic          Cloud Storage
(SQS / Azure Queue /      (S3 / Azure Blob /
 Pub/Sub)                   GCS)
      |                        |
      v                        v
NotificationSource         BlobReader
      |                        |
      +--------+   +-----------+
               |   |
               v   v
          BlobSource (volley-connector-blob-store)
               |
               v
          Decoder (Parquet / JSON / CSV / Avro)
               |
               v
          RecordBatch --> Pipeline

To add a new cloud backend, implement two traits:

  • NotificationSource — polls a queue for new object notifications
  • BlobReader — reads object data from cloud storage

The decoder layer (Parquet, JSON, CSV, Avro -> Arrow RecordBatch) is shared across all cloud backends.

Feature Flags

The volley-connectors umbrella crate uses feature flags:

FeatureWhat it includes
kafkaKafkaSource, KafkaSink
blob-storeBlobSource, decoders
blob-store-awsS3Source, SqsNotificationSource
blob-store-azureAzureBlobSource, AzureQueueNotificationSource
blob-store-gcsGcsSource, PubSubNotificationSource
file-formatsParquet, JSON, CSV, Avro decoders
table-formatsDelta Lake + Iceberg sinks

Existing Connectors

  • Kafka — source and sink with exactly-once semantics
  • Delta Lake — sink with epoch-tagged commits
  • Iceberg — sink via REST catalog
  • AWS S3 — source via SQS notifications
  • Blob Store — Azure Blob and GCS sources

Windowing

Windowing groups records by time intervals for aggregation. Volley supports three window types with event-time semantics.

Window Types

Tumbling Windows

Fixed-size, non-overlapping windows:

#![allow(unused)]
fn main() {
.window(TumblingWindows::of(Duration::from_secs(60)))
}

Each record belongs to exactly one window.

Sliding Windows

Fixed-size, overlapping windows:

#![allow(unused)]
fn main() {
.window(SlidingWindows::of(
    Duration::from_secs(60),  // window size
    Duration::from_secs(10),  // slide interval
))
}

Each record may belong to multiple windows.

Session Windows

Dynamic windows that merge on activity:

#![allow(unused)]
fn main() {
.window(SessionWindows::with_gap(Duration::from_secs(30)))
}

A new session starts when the gap between records exceeds the threshold.

Event-Time Semantics

Event stream:  --[e1 t=1]--[e2 t=3]--[e3 t=7]--[watermark t=10]--[e4 t=5]-->

TumblingWindows(size=5):
  Window [0, 5):  e1, e2           -- fires at watermark t=10 (window end <= 10)
  Window [5, 10): e3               -- fires at watermark t=10
  e4 (t=5):       late data        -- routed to side output if allowed_lateness passed

SessionWindows(gap=3):
  Session 1: e1(t=1), e2(t=3)     -- gap(3-1)=2 < 3, merged
  Session 2: e3(t=7)              -- gap(7-3)=4 >= 3, new session

Windows fire when a watermark advances past the window’s end time.

Watermarks

Watermarks track event-time progress. The BoundedOutOfOrdernessGenerator allows configurable out-of-orderness before advancing the watermark:

#![allow(unused)]
fn main() {
.with_watermark_generator(
    BoundedOutOfOrdernessGenerator::new(Duration::from_secs(5))
)
}

This allows records to arrive up to 5 seconds late before the watermark advances past them.

Late Data

Records that arrive after the watermark has passed their window are considered late. Configure side_output_late_data() with a tag and attach a side output sink via with_late_data_sink() to route late events to a separate destination (e.g., a dead-letter queue):

#![allow(unused)]
fn main() {
let late_tag = OutputTag::new("late-events");

StreamExecutionEnvironment::new()
    .from_source(source)
    .with_watermarks(WatermarkConfig::new(Duration::from_secs(5)))
    .with_late_data_sink(late_events_sink)  // side output sink
    .key_by(col("user_id"))
    .window(TumblingWindows::of(Duration::from_secs(60)))
    .allowed_lateness(Duration::from_secs(10))
    .side_output_late_data(late_tag)
    .aggregate_expr(sum(col("amount")), state_backend)
    .to_sink(main_sink)
    .execute("job")
    .await?;
}

Late events are written to the side output sink as StreamRecords with the schema from late_event_schema(): event_data (JSON string), event_time, watermark, window_start, window_end, lateness_ms. The side output sink participates in checkpoint barriers for exactly-once semantics.

Without with_late_data_sink(), late events are logged and discarded.

Memory Management

The window operator tracks active (key, window) pairs in an in-memory index called pending_windows. With high-cardinality keys or wide/sliding windows, this index can grow large.

Bounding Window Memory with LRU

Use with_max_pending_windows() to cap the in-memory index size. Evicted entries are persisted to the state backend and scanned during watermark advancement, so no data is lost:

#![allow(unused)]
fn main() {
stream
    .key_by(col("user_id"))
    .window(TumblingWindows::of(Duration::from_secs(60)))
    .with_max_pending_windows(100_000)  // bound in-memory window index
    .aggregate_expr(sum(col("amount")), state_backend)
}

When unset (the default), the index is unbounded — identical to previous releases.

Sizing guidance:

Key cardinalityWindows per keyRecommended limitApprox. memory
< 10KAnyUnbounded (default)< 10 MB
10K–100K1 (tumbling)100,000~10 MB
100K–1M1 (tumbling)500,000~50 MB
Any10+ (sliding)Multiply by panesVaries

Each pending entry is ~100 bytes. Evicted entries add a small RocksDB I/O cost on watermark advance.

Note: with_max_pending_windows() is not recommended with session windows. Session window merging requires scanning the pending index for overlapping sessions, and evicted entries may cause additional state backend I/O during merges.

Monitoring

Monitor the volley_window_pending_size gauge and volley_window_pending_evictions_total counter to tune the limit. A high eviction rate indicates the LRU is too small for the workload. See Observability for the full metrics reference.

Per-Record Tracing

Volley can trace individual records as they flow through a pipeline, creating OpenTelemetry spans at each processing stage. This gives you a complete picture of what happened to a record: which operators processed it, how long each step took, what the input and output looked like, and whether any errors occurred.

Quick Start

#![allow(unused)]
fn main() {
use volley_core::prelude::*;
use volley_core::observability::{TracingConfig, SamplingStrategy};

let results = StreamExecutionEnvironment::new()
    .from_source(my_source)
    .with_tracing(TracingConfig {
        sampling: SamplingStrategy::Ratio(0.01),
        ..Default::default()
    })
    .map(|record| {
        // Your transformation
        Ok(record)
    })
    .to_sink(my_sink)
    .execute("traced-pipeline")
    .await?;
}

Open Jaeger or Grafana Tempo to see traces. Each sampled record appears as a trace with child spans for source, operators, and sink.

How It Works

  1. Sampling decision happens at the source. Based on your SamplingStrategy, the runtime decides whether to trace each record.
  2. Root span is created for sampled records (volley.record). The span’s context is attached to the StreamRecord.trace field.
  3. Child spans are created automatically at each pipeline stage (operators, sink). No code changes needed in your operators.
  4. Payload previews (optional) capture a JSON snapshot of the first row at each operator’s input and output, size-capped to max_payload_bytes.
  5. Cleanup happens at pipeline exit. Any root spans for records that were filtered or lost to errors are ended.

Sampling

At production throughput (thousands of records per second), tracing every record would overwhelm your tracing backend. Use Ratio sampling:

ThroughputSuggested ratioTraces/sec
1K rec/s0.10 (10%)~100
10K rec/s0.01 (1%)~100
100K rec/s0.001 (0.1%)~100

Use Always only during development or debugging.

Window and Aggregate Traces

Window and aggregate operators accumulate input records into new output records. The output gets its own trace, linked back to the input traces via OpenTelemetry span links. This preserves lineage without creating misleading parent-child relationships.

The volley.window.input_trace_count attribute tells you how many sampled records contributed to each aggregation result.

Kafka Trace Propagation

When using Kafka connectors, trace context propagates automatically via W3C traceparent headers:

  • Kafka source extracts traceparent from message headers
  • Kafka sink injects traceparent into produced messages

A traced request in an upstream service continues its trace through Volley and into downstream consumers. The runtime respects upstream sampling decisions (parent-based sampling).

Configuration Reference

FieldTypeDefaultDescription
samplingSamplingStrategyNeverWhich records to trace
max_payload_bytesusize1024Max JSON preview size per span event
capture_payloadbooltrueWhether to attach input/output previews

Viewing Traces

Volley emits traces via the OTLP gRPC exporter configured in ObservabilityConfig. Point it at your collector:

#![allow(unused)]
fn main() {
.with_observability(ObservabilityConfig::new()
    .with_node_id("node-1")
    .with_otlp_endpoint("http://otel-collector:4317"))
.with_tracing(TracingConfig {
    sampling: SamplingStrategy::Ratio(0.01),
    ..Default::default()
})
}

Compatible backends: Jaeger, Grafana Tempo, Datadog, Honeycomb, or any OTLP-compatible collector.

Error Handling

When a record fails during processing (e.g., a transformation returns an error), the record’s trace span is:

  1. Marked with otel.status_code = ERROR
  2. Annotated with the error message via span.record_error()
  3. Ended at the point of failure

Failed records are visible in your tracing backend by filtering on otel.status_code = ERROR. This makes tracing a useful debugging tool even when records are dropped or filtered.

Performance Impact

Tracing adds minimal overhead when sampling is configured appropriately:

SamplingOverheadNotes
Never~0%No spans created; trace context still propagated for Kafka
Ratio(0.001)< 1%Recommended for high-throughput production
Ratio(0.01)~1-2%Good for staging or moderate-throughput production
Always~5-10%Development and debugging only

The overhead comes from span creation and OTLP export, not from sampling decisions (which are a single random number comparison).

Troubleshooting

No traces appearing in the backend

  • Verify with_observability() is configured with the correct otlp_endpoint
  • Check that the OTLP collector is reachable from your application
  • Confirm sampling is not set to Never (the default)
  • Check collector logs for rejected spans (often caused by resource limits)

Traces appear but are missing operators

  • Ensure with_tracing() is called before operators are added to the pipeline
  • Custom operators using apply_operator() create spans automatically; no manual instrumentation needed

Kafka traces not connecting to upstream services

  • The upstream producer must inject W3C traceparent headers into Kafka message headers
  • If the upstream doesn’t send traceparent, Volley creates a new root trace (no parent linkage)

ML Inference

The volley-ml crate adds machine learning inference operators to Volley pipelines. Run models embedded in-process or call external model servers — all as standard stream operators.

Backend Configuration

Configure the ML backend once and share it across operators:

#![allow(unused)]
fn main() {
use std::sync::Arc;
use volley_ml::prelude::*;

// CPU-only ONNX (simplest)
let ml = Arc::new(MlBackendConfig::onnx_cpu());

// ONNX on CUDA GPU with limits
let ml = Arc::new(MlBackendConfig::onnx_cuda(0)
    .with_num_threads(4)
    .with_memory_limit(4 * 1024 * 1024 * 1024));

// Candle on CPU (pure Rust, no C++ toolchain)
let ml = Arc::new(MlBackendConfig::candle_cpu());

// Candle on Metal (macOS GPU)
let ml = Arc::new(MlBackendConfig::candle_metal());

// ONNX on macOS Metal / Neural Engine via CoreML
let ml = Arc::new(MlBackendConfig::onnx_coreml());

// CoreML with specific compute units and model format
let ml = Arc::new(MlBackendConfig::onnx_coreml()
    .with_coreml_compute_units(CoreMLComputeUnits::CpuAndGpu)
    .with_coreml_model_format(CoreMLModelFormat::MLProgram));

// Candle with explicit architecture and HF revision
let ml = Arc::new(MlBackendConfig::candle_cpu()
    .with_architecture(CandleArchitecture::Bert)
    .with_hf_revision("v1.0"));
}

Loading ONNX Models from HuggingFace Hub

Pass a HuggingFace repo ID as the model path — the backend downloads and caches the ONNX model and tokenizer automatically:

#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cpu());

stream
    .embed(EmbeddingConfig::new(
        "sentence-transformers/all-MiniLM-L6-v2",  // HF repo ID
        "text",
        ml.clone(),
    ))
    .await?
}

Models are cached in ~/.cache/huggingface/hub/ and reused across runs.

Revision Pinning

Pin a specific revision with @:

#![allow(unused)]
fn main() {
EmbeddingConfig::new("user/model@v1.0", "text", ml.clone())
}

Or set it on the config:

#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cpu()
    .with_hf_revision("abc123"));
}

ONNX Model Variants

Many repos include optimized variants. Select one with with_onnx_model_file():

#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cpu()
    .with_onnx_model_file("onnx/model_O2.onnx"));  // O2 optimized
}

Common variants:

  • onnx/model.onnx — standard (default)
  • onnx/model_O2.onnx — O2 optimized
  • onnx/model_qint8_arm64.onnx — quantized for ARM64

Operators

Embedding (.embed())

Generate vector embeddings from a text column:

#![allow(unused)]
fn main() {
stream
    .embed(EmbeddingConfig::new("models/e5-small.onnx", "text", ml.clone())
        .with_output_column("embedding")
        .with_embedding_dim(384))
    .await?
}

Appends a FixedSizeList<Float32> column to the output.

Classification (.classify())

Classify records and get label + confidence score:

#![allow(unused)]
fn main() {
stream
    .classify(ClassifyConfig::new("models/fraud.onnx", "features", ml.clone())
        .with_label_column("is_fraud")
        .with_score_column("fraud_score")
        .with_labels(vec!["legitimate", "fraud"]))
    .await?
}

Appends Utf8 label and Float32 score columns.

Generic Inference (.infer())

Run any ONNX model on selected columns:

#![allow(unused)]
fn main() {
stream
    .infer(InferenceConfig::new("models/custom.onnx", ml.clone())
        .with_input_columns(vec!["feature_1", "feature_2"])
        .with_output_columns(vec![
            ("prediction", DataType::Float32),
        ]))
    .await?
}

External Model Server (.infer_remote())

Call OpenAI-compatible APIs (vLLM, TGI, LiteLLM):

#![allow(unused)]
fn main() {
stream
    .infer_remote(RemoteInferenceConfig::new(
        "http://localhost:8000/v1/embeddings",
        ApiFormat::OpenAI,
    )
        .with_input_columns(vec!["text"])
        .with_output_columns(vec![("embedding", DataType::Float32)])
        .with_max_concurrent(32)
        .with_timeout(Duration::from_secs(10)))
    .await?
}

Candle Backend

The Candle backend provides pure-Rust inference using HuggingFace’s Candle library. No C++ toolchain required.

Loading from HuggingFace Hub

Pass a HuggingFace repo ID as the model path — the backend downloads and caches model weights, config, and tokenizer automatically:

#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cpu());

stream
    .embed(EmbeddingConfig::new(
        "sentence-transformers/all-MiniLM-L6-v2",  // HF repo ID
        "text",
        ml.clone(),
    ))
    .await?
}

Pin a specific revision with @:

#![allow(unused)]
fn main() {
EmbeddingConfig::new("user/model@v1.0", "text", ml.clone())
}

Or set it on the config:

#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cpu()
    .with_hf_revision("abc123"));
}

Loading from Local Files

Point to a local directory containing model.safetensors, config.json, and optionally tokenizer.json:

#![allow(unused)]
fn main() {
stream
    .embed(EmbeddingConfig::new("/models/my-bert", "text", ml.clone()))
    .await?
}

Supported Architectures

The backend auto-detects the architecture from the model’s config.json (model_type field):

model_typeArchitectureUse Cases
bert, distilbert, robertaBERTEmbeddings, classification

Override auto-detection with with_architecture():

#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cpu()
    .with_architecture(CandleArchitecture::Bert));
}

Tokenizer Integration

When the tokenizers feature is enabled and the model has a tokenizer.json, text columns are automatically tokenized before the forward pass. No manual tokenization needed.

volley-ml = { version = "0.8.0", features = ["candle", "tokenizers"] }

GPU Acceleration

GPU inference is 10–100× faster for transformer models. Volley supports CUDA (NVIDIA) and Metal (macOS) GPUs.

CUDA (NVIDIA GPUs)

ONNX Runtime: CUDA is handled at runtime via execution providers — no compile-time feature flag needed. Use MlBackendConfig::onnx_cuda(device_id):

#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cuda(0));  // GPU 0
}

If CUDA is unavailable, the ONNX Runtime silently falls back to CPU.

Candle: Requires the cuda compile-time feature (pulls in cudarc CUDA bindings):

volley-ml = { version = "0.8.2", features = ["candle", "tokenizers", "cuda"] }
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cuda(0));
}

Prerequisites: NVIDIA driver and CUDA toolkit must be installed. Verify with nvidia-smi or volley doctor.

Metal (macOS GPUs)

Requires the metal compile-time feature:

volley-ml = { version = "0.8.2", features = ["candle", "tokenizers", "metal"] }
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_metal());
}

CoreML (ONNX on macOS)

The CoreML execution provider accelerates ONNX models on macOS using Metal GPU and/or the Apple Neural Engine. Requires the coreml compile-time feature:

volley-ml = { version = "0.8.3", features = ["onnx", "coreml"] }
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_coreml());
}

If CoreML is unavailable (e.g., on Linux), the ONNX Runtime silently falls back to CPU.

Partial operator coverage: CoreML does not support every ONNX operation. Unsupported ops automatically fall back to CPU within the same session. For example, DistilBERT has 387 graph nodes but only 17 are CoreML-eligible. This means most computation still runs on CPU, and the CoreML compilation overhead can make the “GPU” path slower than pure CPU for small models or small batch sizes. Use GPU=cpu to benchmark both paths. Models with convolution-heavy architectures (vision models) tend to have better CoreML coverage than attention-heavy transformers.

Compute units control which hardware CoreML dispatches to:

ValueHardware
All (default)CPU + GPU + Neural Engine
CpuAndGpuCPU + Metal GPU only
CpuAndNeuralEngineCPU + Neural Engine only
CpuOnlyCPU only (debugging)
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_coreml()
    .with_coreml_compute_units(CoreMLComputeUnits::CpuAndGpu));
}

Model format controls the internal CoreML representation:

FormatRequirementNotes
NeuralNetwork (default)macOS 10.15+Broadest compatibility
MLProgrammacOS 12+More operators, potentially faster
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_coreml()
    .with_coreml_model_format(CoreMLModelFormat::MLProgram));
}

GPU Detection

Check GPU availability at runtime without loading a model:

#![allow(unused)]
fn main() {
let info = volley_ml::gpu::detect_gpu();
println!("{}", info.summary);
// "CUDA: disabled (enable `cuda` feature); Metal: available; CoreML: available"

if info.cuda_available {
    println!("Using GPU with {} CUDA device(s)", info.cuda_device_count);
}
if info.coreml_available {
    println!("CoreML available for ONNX Metal acceleration");
}
}

The volley doctor CLI command also reports GPU status.

Feature Flags

Add volley-ml to your Cargo.toml with the features you need:

[dependencies]
# ONNX Runtime backend (CPU)
volley-ml = { version = "0.8.2", features = ["onnx"] }

# ONNX on CUDA GPU (runtime EP registration, no compile-time flag)
volley-ml = { version = "0.8.2", features = ["onnx"] }

# Candle backend with tokenizers (pure Rust, CPU)
volley-ml = { version = "0.8.2", features = ["candle", "tokenizers"] }

# Candle on CUDA GPU
volley-ml = { version = "0.8.2", features = ["candle", "tokenizers", "cuda"] }

# Candle on macOS Metal GPU
volley-ml = { version = "0.8.2", features = ["candle", "tokenizers", "metal"] }

# ONNX on macOS CoreML (Metal GPU + Neural Engine)
volley-ml = { version = "0.8.3", features = ["onnx", "coreml"] }

# External model servers
volley-ml = { version = "0.8.2", features = ["remote-http"] }

Backend Comparison

ONNX Runtime (onnx)Candle (candle)
LanguageC++ via FFIPure Rust
Model formatONNX (universal)SafeTensors / HF Hub
Model loadingLocal file or HuggingFace HubLocal file or HuggingFace Hub
TokenizerExternal (user handles)Built-in (tokenizers feature)
GPUCUDA, TensorRT, CoreML (coreml feature)CUDA, Metal
Best forAny ONNX-exportable modelHF transformer models
BuildNeeds C++ toolchain (auto-downloaded)Pure cargo build

Performance Tuning

Session Pooling (ONNX)

By default, a single ONNX session is shared across all inference calls. For concurrent pipelines, pool multiple sessions to eliminate lock contention:

#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cuda(0)
    .with_session_pool_size(4));  // 4 sessions, round-robin
}

Each session is an independent model copy. A pool of 2–4 is usually sufficient.

Embedding Cache

Enable an LRU cache to skip inference for repeated text:

#![allow(unused)]
fn main() {
stream
    .embed(EmbeddingConfig::new("model.onnx", "text", ml.clone())
        .with_cache(10_000))  // Cache up to 10,000 unique embeddings
    .await?
}

Identical input strings are served from cache. This can eliminate 50–90% of model calls in pipelines with repeated product names, log messages, etc.

Half-Precision Inference (FP16 / BF16)

Load Candle model weights in half precision for 2× memory savings and faster GPU inference on hardware with tensor cores:

#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cuda(0)
    .with_dtype(ModelDType::F16));
}

On CPU, F16/BF16 are automatically promoted to F32 since most CPUs lack native half-precision arithmetic.

Remote Batch Accumulation

When upstream batches are small, buffer them before sending to the model server:

#![allow(unused)]
fn main() {
RemoteInferenceConfig::new("http://model-server:8000/v1/embeddings", ApiFormat::OpenAI)
    .with_batch_accumulation(64, Duration::from_millis(50))
    .with_max_concurrent(16)
}

This sends a single HTTP request once 64 rows accumulate or 50ms elapse, reducing round trips.

Running the Example

The ML inference example works out of the box with zero configuration. It auto-downloads a sentiment classification model from HuggingFace Hub and auto-detects Metal (CoreML) on macOS:

# Just works — downloads model from Hub, auto-detects Metal on macOS
cargo run --example ml_inference_pipeline -p volley-examples --features ml

Override defaults with environment variables:

  • MODEL_PATH=./my-model.onnx — use a local ONNX model instead of downloading from Hub
  • GPU=cpu — force CPU backend (disables Metal/CoreML auto-detection)

Recent Bug Fixes

  • Hub tokenizer resolution — tokenizer download now checks sibling paths (e.g., onnx/tokenizer.json) when tokenizer.json is not at the repo root.
  • Softmax normalization — classification scores now use softmax normalization, producing proper [0, 1] probabilities instead of raw logits.

CLI Template

Scaffold an ML pipeline project with one command:

volley new --template ml-pipeline my-classifier
cd my-classifier

This generates a Kafka-to-Kafka pipeline with ONNX classification:

  • src/main.rs — Reads text from Kafka, classifies with .classify(), writes enriched records to output topic
  • Cargo.toml — Includes volley-ml with onnx and tokenizers features
  • .env.example — Kafka connection and MODEL_PATH configuration

Edit src/main.rs to customize:

  • Change the model: Update MODEL_PATH and the label list in .classify()
  • Swap operators: Replace .classify() with .embed() for embeddings or .infer() for generic inference
  • Enable GPU: Change MlBackendConfig::onnx_cpu() to MlBackendConfig::onnx_cuda(0)
  • Use Candle: Switch to MlBackendConfig::candle_cpu() for pure-Rust inference
  • Add remote inference: Chain .infer_remote(RemoteInferenceConfig::new(...))

Integration Testing

Volley provides the volley-test-harness crate — a shared test framework inspired by Apache Flink’s testing patterns. It gives you containers, assertion harnesses, and a standardized connector test suite so you can validate pipelines end-to-end with real infrastructure.

Architecture

The test harness is layered — each layer builds on the one below it:

┌──────────────────────────────────────────────────┐
│  End-to-End Tests (tests/)                       │
│  Full pipelines with real connectors             │
├──────────────────────────────────────────────────┤
│  Connector Framework (connector/)                │
│  ConnectorTestContext + source/sink suites        │
├──────────────────────────────────────────────────┤
│  Assertion Harnesses (assertions/)               │
│  MetricsSnapshot, TraceCapture, ThroughputRecorder│
├──────────────────────────────────────────────────┤
│  TestCluster (cluster.rs)                        │
│  In-process pipeline runner + observability       │
├──────────────────────────────────────────────────┤
│  Container Management (containers/)              │
│  Kafka, MinIO with retry logic + ARM64 images     │
└──────────────────────────────────────────────────┘

Quick Start

Add volley-test-harness as a dev-dependency in your connector crate:

[dev-dependencies]
volley-test-harness = { path = "../volley-test-harness" }
async-trait = { workspace = true }
arrow = { workspace = true }
anyhow = { workspace = true }

Container Management

Shared testcontainers with retry logic and one-container-per-binary lifecycle via OnceCell. Containers start lazily on first use and are cleaned up when the test process exits.

Kafka

#![allow(unused)]
fn main() {
use volley_test_harness::containers::KafkaContainer;

// Starts a native ARM64 Kafka container on first call.
let servers = KafkaContainer::bootstrap_servers().await;

// Each test gets an isolated topic.
let topic = KafkaContainer::unique_topic("my-test");
KafkaContainer::create_topic(&topic, 1).await;

// Seed test data.
KafkaContainer::produce_json(&topic, &[
    serde_json::json!({"name": "alice", "score": 42}),
]).await;

// Verify output.
let messages = KafkaContainer::consume_json(&topic, 1, 10).await;
assert_eq!(messages[0]["name"], "alice");
}

MinIO (S3-compatible)

#![allow(unused)]
fn main() {
use volley_test_harness::containers::{MinioContainer, MINIO_ACCESS_KEY, MINIO_SECRET_KEY};

let endpoint = MinioContainer::endpoint().await;
let bucket = MinioContainer::unique_bucket("test");
MinioContainer::create_bucket(&bucket).await;
}

Polaris (Apache Iceberg REST catalog)

#![allow(unused)]
fn main() {
use volley_test_harness::containers::PolarisContainer;

// Starts a Polaris container backed by MinIO. MinIO must be started first.
let catalog_url = PolarisContainer::catalog_url().await;

// Each test gets an isolated catalog name.
let catalog = PolarisContainer::unique_catalog("my-test");
PolarisContainer::create_catalog(&catalog).await;
}

PolarisContainer depends on MinioContainer for the underlying storage — start MinioContainer before calling any PolarisContainer method.

Connector Test Framework

Like Flink’s Connector Testing Framework — implement ConnectorTestContext for your connector and get a standardized source/sink test suite for free.

Implementing ConnectorTestContext

#![allow(unused)]
fn main() {
use volley_test_harness::connector::ConnectorTestContext;

struct MyConnectorTestContext { /* ... */ }

#[async_trait]
impl ConnectorTestContext for MyConnectorTestContext {
    fn name(&self) -> &str { "MyConnector" }
    fn test_schema(&self) -> Arc<Schema> { /* ... */ }

    async fn seed_test_data(&self, records: &[RecordBatch]) -> anyhow::Result<()> {
        // Write records into the external system.
    }

    async fn create_source(&self) -> anyhow::Result<Option<Box<dyn DynSource>>> {
        // Create a source pointing at the seeded data.
    }

    async fn create_sink(&self) -> anyhow::Result<Option<Box<dyn DynSink>>> {
        // Create a sink writing to a test destination.
    }

    async fn read_sink_results(&self) -> anyhow::Result<Vec<RecordBatch>> {
        // Read back what the sink wrote for verification.
    }

    async fn create_empty_source(&self) -> anyhow::Result<Option<Box<dyn DynSource>>> {
        // Optional: source pointing at an empty data set.
        Ok(None) // Return None to skip the empty-source test.
    }
}
}

Running the Suites

#![allow(unused)]
fn main() {
#[tokio::test]
async fn my_connector_source_suite() {
    let ctx = MyConnectorTestContext::new().await;
    volley_test_harness::connector::run_source_suite(&ctx).await;
}

#[tokio::test]
async fn my_connector_sink_suite() {
    let ctx = MyConnectorTestContext::new().await;
    volley_test_harness::connector::run_sink_suite(&ctx).await;
}
}

The source suite runs three tests:

  • test_basic_read — seeds 5 records, verifies the source returns data
  • test_read_all_records — seeds 10 records, reads until all consumed
  • test_empty_source — verifies graceful handling of empty data sets

The sink suite runs three tests:

  • test_basic_write — writes records, flushes, verifies they’re readable
  • test_write_and_flush — multiple writes followed by flush
  • test_checkpoint_triggers_flush — verifies on_checkpoint() commits data

See volley-connector-kafka/tests/connector_suite.rs for the reference implementation.

TestCluster

An in-process pipeline execution wrapper with built-in observability capture. Registers an InMemorySpanExporter as the global OpenTelemetry tracer so spans created during pipeline execution are captured for assertions.

#![allow(unused)]
fn main() {
use volley_test_harness::cluster::TestCluster;

let cluster = TestCluster::new("my-test");

let pipeline = StreamExecutionEnvironment::new()
    .from_source(source)
    .with_tracing(cluster.tracing_config())  // 100% sampling, in-memory export
    .with_operator(my_operator)
    .with_operator_id("my-op")
    .to_sink(sink);

pipeline.execute("my-pipeline").await?;

// Inspect captured spans.
let traces = cluster.traces();
println!("Captured {} spans", traces.span_count());
traces.print_timing_summary();
}

Assertion Harnesses

TraceCapture

Assert on OpenTelemetry spans captured by the TestCluster:

#![allow(unused)]
fn main() {
let traces = cluster.traces();

// Verify span counts.
traces.assert_span_count("volley.operator", 10);

// Verify trace structure.
traces.assert_valid_tree();  // No orphan spans.

// Print per-operator timing breakdown.
traces.print_timing_summary();
}

The timing summary shows per-span-name breakdown with operator identity:

  Span                                      Count   Total ms     Avg ms     Max ms
  --------------------------------------------------------------------------------
  volley.record                                10      559.2       55.9      100.3
  volley.sink                                   8       72.0        9.0       11.0
  volley.operator (ml-embed)                    8       56.8        7.1       25.9
  volley.operator (status-filter)              10        1.1        0.1        0.8
  volley.operator (proto-decode)               10        0.8        0.1        0.5
  volley.source                                10        0.4        0.0        0.3

Use .with_operator_id("name") on your pipeline to assign human-readable names.

MetricsSnapshot

Scrape and assert on Prometheus metrics:

#![allow(unused)]
fn main() {
let metrics = MetricsSnapshot::scrape(prometheus_port).await?;

metrics.assert_counter_exists("volley_source_records_polled");
metrics.assert_counter_gte("volley_records_processed_total", 100.0);
metrics.assert_gauge_in_range("volley_pipeline_health", 1.0, 1.0);
}

ThroughputRecorder

Measure and assert on pipeline throughput:

#![allow(unused)]
fn main() {
let mut recorder = ThroughputRecorder::new();
recorder.start();
pipeline.execute("bench").await?;
recorder.stop(record_count);

recorder.assert_throughput_gte(100.0);        // min records/sec
recorder.assert_elapsed_under_secs(30.0);     // max wall time
println!("{:.1} records/sec", recorder.records_per_second().unwrap());
}

Test Data Generators

The data module provides test data factories:

#![allow(unused)]
fn main() {
use volley_test_harness::data::*;

// Protobuf messages (serialized bytes).
let protos = generate_crawl_results(50);
// Every 5th record is a failure (HTTP 503), rest are HTTP 200.

// Arrow RecordBatch (for non-protobuf tests).
let batch = generate_crawl_batch(50);

// The schema used by both generators.
let schema = crawl_result_schema();
}

The CrawlResult type has impl_to_arrow! so it works with auto_decode_operator::<CrawlResult>("payload").

Running Tests

All integration tests require Docker for testcontainers.

# Kafka connector suite (6 tests, ~45s)
cargo test -p volley-connector-kafka --test connector_suite -- --nocapture

# Iceberg connector suite (Polaris + MinIO, ~60s)
cargo test -p volley-connector-iceberg --test connector_suite -- --nocapture

# E2E golden path — full pipeline with windowing, metrics, traces, and ML (1 test, ~30s)
cargo test -p volley-test-harness --test e2e_golden_path -- --nocapture

# With Metal GPU disabled (CPU fallback)
cargo test -p volley-test-harness --test e2e_golden_path --no-default-features -- --nocapture

Where Tests Live

Connector-specific integration tests belong in their connector crate, importing shared infrastructure from volley-test-harness:

CrateTestsPurpose
volley-connector-kafkatests/connector_suite.rsKafka source/sink through ConnectorTestContext
volley-connector-icebergtests/connector_suite.rsIceberg source/sink through ConnectorTestContext (Polaris + MinIO)
volley-test-harnesstests/e2e_golden_path.rsCross-connector end-to-end pipeline test

The pattern: volley-test-harness provides the framework, each connector crate owns its own integration tests.

Golden Path Test Coverage

The E2E golden path test (e2e_golden_path.rs) exercises the full pipeline:

  • Correctness — Kafka protobuf → decode → filter → ML embed → Kafka sink, asserting output records match expectations
  • Windowed aggregation — tumbling window over the event stream with watermark advancement
  • ThroughputThroughputRecorder asserts sustained records/sec
  • MetricsMetricsSnapshot scrapes the embedded Prometheus endpoint and asserts counters (volley_records_processed_total, volley_source_records_polled, pipeline health gauge)
  • TracesTraceCapture from TestCluster asserts span count, valid parent–child tree, and per-operator timing via print_timing_summary()

Horizontal Scaling Guide

This guide covers how to run a Volley pipeline across multiple K8s pods for workloads that exceed a single node.

Prerequisites

  • Kubernetes cluster (v1.26+)
  • A ReadWriteMany storage class (AWS EFS, GCP Filestore, Azure Files)
  • The Volley K8s operator deployed (see Kubernetes Operator)
  • Your pipeline built with the distributed feature enabled

When to Use Horizontal Scaling

Single-node mode handles most workloads — Volley sustains 9M+ records/sec on one node. Consider horizontal scaling when:

  • Your key space is too large for one node’s memory (state doesn’t fit in RAM + RocksDB)
  • You need fault tolerance across nodes (pod failure shouldn’t stop the pipeline)
  • Your source has more partitions than one node can consume (e.g., 100+ Kafka partitions)

If your bottleneck is CPU on stateless operators (filter, map), scale vertically first. Horizontal scaling adds network overhead for the key shuffle.

Step 1: Enable the Distributed Feature

Add the distributed feature to your pipeline’s Cargo.toml:

[dependencies]
volley-core = { version = "0.8", features = ["distributed"] }

Step 2: Configure ClusterConfig

In your pipeline code, add with_cluster() to the environment:

#![allow(unused)]
fn main() {
use volley_core::prelude::*;

// In production, use ClusterConfig::from_env() to auto-detect from K8s env vars
let cluster = ClusterConfig::from_env();

let mut env = StreamExecutionEnvironment::new();
if let Some(config) = cluster {
    env = env.with_cluster(config);
}

env.from_source(source)
    .key_by(col("user_id"))
    .aggregate_expr(sum(col("amount")), state_backend)
    .to_stream()
    .to_sink(sink)
    .execute("my-pipeline")
    .await?;
}

The pipeline API is identical — with_cluster() is the only addition. When ClusterConfig is absent (e.g., running locally), the pipeline runs in single-node mode with zero overhead.

Step 3: Create a Shared Checkpoint PVC

All workers need access to the same checkpoint directory. Create a ReadWriteMany PVC:

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: volley-shared-ckpt
  namespace: my-pipeline
spec:
  accessModes: [ReadWriteMany]
  storageClassName: efs    # AWS EFS, or your cloud's RWX storage class
  resources:
    requests:
      storage: 50Gi
kubectl apply -f shared-pvc.yaml

Step 4: Deploy with the K8s Operator

Create a VolleyApplication with the cluster section:

apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
  name: my-pipeline
  namespace: my-pipeline
spec:
  image: registry.example.com/my-pipeline:v1.0
  replicas: 3

  resources:
    requests:
      cpu: "2"
      memory: "4Gi"
    limits:
      cpu: "4"
      memory: "8Gi"

  cluster:
    maxKeyGroups: 256              # default, power of 2
    flightPort: 50051              # default
    sharedCheckpoints:
      claimName: volley-shared-ckpt  # the PVC from step 3

  state:
    size: 10Gi                     # per-worker local RocksDB

  checkpoints:
    size: 5Gi                      # per-worker local checkpoint staging

The operator automatically:

  • Injects VOLLEY_* environment variables (VOLLEY_NUM_WORKERS, VOLLEY_HEADLESS_SERVICE, VOLLEY_MAX_KEY_GROUPS, VOLLEY_FLIGHT_PORT, VOLLEY_APP_NAME)
  • Adds the Arrow Flight port (50051) to the container and headless service
  • Mounts the shared checkpoint PVC at /mnt/volley/checkpoints

Step 5: Verify

# Check pods are running
kubectl get pods -n my-pipeline
# NAME             READY   STATUS    RESTARTS   AGE
# my-pipeline-0    1/1     Running   0          2m
# my-pipeline-1    1/1     Running   0          1m
# my-pipeline-2    1/1     Running   0          30s

# Check env vars
kubectl exec my-pipeline-0 -n my-pipeline -- env | grep VOLLEY
# VOLLEY_NUM_WORKERS=3
# VOLLEY_HEADLESS_SERVICE=my-pipeline-headless.my-pipeline.svc
# VOLLEY_MAX_KEY_GROUPS=256
# VOLLEY_FLIGHT_PORT=50051
# VOLLEY_APP_NAME=my-pipeline

# Check Flight port on headless service
kubectl get svc my-pipeline-headless -n my-pipeline -o jsonpath='{.spec.ports[*].name}'
# health flight

# Check shared checkpoint mount
kubectl exec my-pipeline-0 -n my-pipeline -- ls /mnt/volley/checkpoints

Scaling

Change the replica count to scale:

kubectl patch vapp my-pipeline -n my-pipeline --type merge -p '{"spec":{"replicas":5}}'

The coordinator triggers a checkpoint, recomputes key group assignments, and workers pick up their new key groups from the shared filesystem. No restart required.

Monitoring

Key metrics to watch in distributed mode:

MetricWhat it tells you
volley_shuffle_local_records_totalRecords staying on this worker
volley_shuffle_remote_records_total{target_worker="N"}Records sent to each remote worker
volley_flight_send_errors_totalTransport errors (indicates worker connectivity issues)
volley_cluster_worker_idThis worker’s ordinal
volley_cluster_num_workersTotal cluster size
volley_cluster_key_groups_assignedKey groups owned by this worker
volley_barrier_alignment_wait_msTime to align barriers across all inputs

Troubleshooting

Pipeline stalls after a pod restart: Check if the BarrierAligner is waiting for a dead input. The Flight server should clean up stale connections via TCP keepalive (10s). If the stall persists, check volley_flight_send_errors_total for rising error counts.

Uneven record distribution: Check volley_shuffle_remote_records_total per target worker. The key group assignment is contiguous — if your key distribution is skewed, some workers will handle more traffic. Consider increasing maxKeyGroups for finer granularity.

Checkpoint failures: Verify the shared PVC is mounted and writable on all pods:

kubectl exec my-pipeline-0 -- touch /mnt/volley/checkpoints/test && echo OK

DNS resolution failures at startup: Workers discover each other via StatefulSet DNS. If pods start before DNS propagates, the runtime retries with exponential backoff (100ms to 5s). Check logs for “Waiting for remote worker” messages.

Kafka

Volley provides a Kafka source and sink via the volley-connector-kafka crate. The sink defaults to non-transactional idempotent mode for maximum throughput, with opt-in exactly-once transactional semantics.

Setup

Add the Kafka feature to your Cargo.toml:

[dependencies]
volley-connector-kafka = { git = "https://github.com/volley-streams/volley" }

System dependencies: cmake is required for building rdkafka. OpenSSL and libcurl are vendored and statically linked — no additional system packages needed. See Installation.

Authentication

Use KafkaSecurityConfig to configure authentication for both sources and sinks. The same config struct works with .with_security() on either KafkaSourceConfig or KafkaSinkConfig.

Confluent Cloud / SASL_SSL

The most common setup for managed Kafka (Confluent Cloud, AWS MSK Serverless, Aiven):

#![allow(unused)]
fn main() {
use volley_connector_kafka::{KafkaSourceConfig, KafkaSinkConfig, KafkaSecurityConfig};

let security = KafkaSecurityConfig::sasl_ssl("API_KEY", "API_SECRET");

let source_config = KafkaSourceConfig::new("pkc-xxxxx.us-east-1.aws.confluent.cloud:9092", "events", "my-group")
    .with_security(security.clone());

let sink_config = KafkaSinkConfig::new("pkc-xxxxx.us-east-1.aws.confluent.cloud:9092", "output")
    .with_security(security);
}

This sets security.protocol=SASL_SSL and sasl.mechanism=PLAIN, which is the correct combination for Confluent Cloud and most SASL-based providers.

SCRAM Authentication

For brokers that use SCRAM (e.g., AWS MSK with IAM disabled, self-hosted clusters):

#![allow(unused)]
fn main() {
let security = KafkaSecurityConfig::sasl_ssl("username", "password")
    .with_sasl_mechanism("SCRAM-SHA-256");
}

Supported SASL mechanisms: PLAIN (default), SCRAM-SHA-256, SCRAM-SHA-512.

mTLS (Mutual TLS)

For clusters that authenticate clients via X.509 certificates:

#![allow(unused)]
fn main() {
let security = KafkaSecurityConfig::ssl()
    .with_ssl_ca("/etc/kafka/ca.pem")
    .with_ssl_cert("/etc/kafka/client.pem")
    .with_ssl_key("/etc/kafka/client.key");

let source_config = KafkaSourceConfig::new("broker:9093", "events", "my-group")
    .with_security(security);
}

Key Types

TypeDescription
KafkaSourceConfigConfiguration for the Kafka consumer (brokers, topic, group, format, batch size)
KafkaSinkConfigConfiguration for the Kafka producer (brokers, topic, format, delivery mode)
KafkaSecurityConfigShared authentication/TLS config for source and sink
KafkaSourceFormatMessage format enum: Json (default), Protobuf, Avro
KafkaSinkFormatSerialization format enum: Json (default), Protobuf, Avro
KafkaSourceSource that polls Kafka for records
KafkaSinkSink that writes records to Kafka
KafkaEnvExtExtension trait for StreamExecutionEnvironment
KafkaStreamExtExtension trait for DataStream
SchemaRegistryClientConfluent Schema Registry HTTP client with caching
env.from_kafka_protobuf::<M>()Trait method: Kafka source + ProtobufDecodeOperator
env.from_kafka_avro()Trait method: Kafka source + AvroDecodeOperator + Schema Registry

Example

use volley_core::prelude::*;
use volley_connector_kafka::{KafkaEnvExt, KafkaStreamExt, KafkaSourceConfig, KafkaSinkConfig};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    let source_config = KafkaSourceConfig::new("localhost:9092", "events", "my-group");
    let sink_config = KafkaSinkConfig::new("localhost:9092", "output");

    StreamExecutionEnvironment::new()
        .from_kafka(source_config).await?
        .filter(|record| true)
        .key_by(col("user_id"))
        .window(TumblingWindows::of(Duration::from_secs(60)))
        .aggregate(AggregationType::Sum, "amount")
        .to_kafka(sink_config).await?
        .execute("kafka-aggregation-job")
        .await?;

    Ok(())
}

Protobuf Format

For Protobuf-encoded Kafka messages, use KafkaSourceConfig::new_protobuf() and the from_kafka_protobuf trait method on KafkaEnvExt:

#![allow(unused)]
fn main() {
use volley_connector_kafka::{KafkaEnvExt, KafkaSourceConfig};

mod proto {
    include!(concat!(env!("OUT_DIR"), "/events.rs"));
}

let config = KafkaSourceConfig::new_protobuf("localhost:9092", "events", "my-group")
    .with_batch_size(100);

let stream = StreamExecutionEnvironment::new()
    .from_kafka_protobuf::<proto::Event>(config).await?;
}

The source emits raw binary payloads in a Binary “payload” column. The ProtobufDecodeOperator (wired automatically by from_kafka_protobuf) decodes them into typed Arrow columns using the compiled prost type. The type M must implement ToArrow, typically via the impl_to_arrow! proc-macro from volley-derive.

Scaffolding: volley new with Kafka source and protobuf format generates build.rs, proto/events.proto, and prost dependencies.

Avro Format (Schema Registry)

For Avro-encoded Kafka messages with Confluent Schema Registry, use KafkaSourceConfig::new_avro() and the from_kafka_avro trait method:

#![allow(unused)]
fn main() {
use volley_connector_kafka::{KafkaEnvExt, KafkaSourceConfig};

let config = KafkaSourceConfig::new_avro(
    "localhost:9092",
    "events",
    "my-group",
    "http://localhost:8081",  // Schema Registry URL
)
.with_schema_registry_auth("user", "password")  // optional basic auth
.with_batch_size(100);

let stream = StreamExecutionEnvironment::new()
    .from_kafka_avro(config).await?;
}

The source emits raw binary payloads. The AvroDecodeOperator (wired automatically by from_kafka_avro):

  1. Strips the Confluent 5-byte wire format header ([0x00][4-byte schema ID][payload])
  2. Fetches the Avro schema from the Schema Registry by ID (cached after first fetch)
  3. Decodes the Avro binary into apache_avro::types::Value
  4. Converts directly to Arrow arrays using typed ArrayBuilders (no JSON intermediary)

The Arrow schema is inferred from the Avro schema. Supported type mappings:

Avro TypeArrow Type
booleanBoolean
intInt32
longInt64
floatFloat32
doubleFloat64
stringUtf8
bytesBinary
fixedBinary
enumUtf8
union(null, T)T (nullable)

Plain Avro (No Schema Registry)

For Avro messages without the Confluent wire format, use AvroDecodeOperator directly with a static schema:

#![allow(unused)]
fn main() {
use volley_core::operators::avro::AvroDecodeOperator;

let avro_schema = apache_avro::Schema::parse_str(r#"{ ... }"#).unwrap();
let stream = env.from_kafka(config).await?
    .with_operator(AvroDecodeOperator::with_static_schema("payload", avro_schema));
}

Batch Reads

By default, KafkaSource reads one message per poll_next() call (single-row RecordBatch). For high-throughput pipelines, enable batch reads:

#![allow(unused)]
fn main() {
let config = KafkaSourceConfig::new("localhost:9092", "events", "my-group", schema)
    .with_batch_size(100)       // accumulate up to 100 messages
    .with_batch_timeout(Duration::from_millis(50));  // wait 50ms for next message
}

The source accumulates up to batch_size messages, concatenating them into a single multi-row RecordBatch. All downstream operators handle multi-row batches correctly. Partial batches are returned on timeout (no blocking).

Batch metadata: event_time_ms is set to the max timestamp in the batch (correct for watermark advancement). For per-row event times in windowed operations, configure timestamp_column on your WindowConfig.

Consumer Tuning

Fine-tune the Kafka consumer’s fetch behavior for throughput vs. latency:

#![allow(unused)]
fn main() {
let config = KafkaSourceConfig::new("localhost:9092", "events", "my-group")
    .with_batch_size(500)
    .with_fetch_min_bytes(32_768)         // wait for 32KB of data (default: 1)
    .with_fetch_max_wait_ms(100)          // broker waits up to 100ms to fill (default: 500)
    .with_max_partition_fetch_bytes(2_097_152); // 2MB per partition (default: 1MB)
}
MethodDefaultEffect
with_fetch_min_bytes(bytes)1Minimum bytes the broker accumulates before responding. Higher values reduce fetch requests at the cost of latency.
with_fetch_max_wait_ms(ms)500Max time the broker waits to fill fetch_min_bytes. Acts as an upper bound on added latency.
with_max_partition_fetch_bytes(bytes)1,048,576 (1MB)Max data returned per partition per fetch. Increase for partitions with large messages.
with_property(key, value)Escape hatch for any rdkafka consumer setting not covered above.

Tip: For high-throughput pipelines, combine with_batch_size() with with_fetch_min_bytes() so the broker pre-aggregates data and the source fills batches faster.

Retry on Transient Errors

Both the source and sink support optional retry with exponential backoff for transient broker errors (e.g., broker unavailable, request timeouts, leader changes, network exceptions). Configure via with_retry():

#![allow(unused)]
fn main() {
use std::time::Duration;
use volley_core::resilience::RetryConfig;

// Source with retry
let source_config = KafkaSourceConfig::new("localhost:9092", "topic", "group", schema)
    .with_retry(
        RetryConfig::new()
            .with_max_retries(5)
            .with_initial_backoff(Duration::from_millis(200))
            .with_max_backoff(Duration::from_secs(30)),
    );

// Sink with retry
let sink_config = KafkaSinkConfig::new_transactional("localhost:9092", "output", "txn-1")
    .with_retry(
        RetryConfig::new()
            .with_max_retries(3)
            .with_initial_backoff(Duration::from_millis(100)),
    );
}

When retry is not configured (the default), errors propagate immediately — existing behavior is unchanged.

Source behavior: Both single-message and batch accumulation paths retry recv() calls. Subsequent messages within a batch (after the first) are not retried — a partial batch is returned instead.

Sink behavior: Individual send() calls are retried. Only the send is retried, not the surrounding transaction. If retries are exhausted, the transaction is aborted (existing behavior).

Metrics: volley_retry_attempts_total counter with component label (kafka_source or kafka_sink).

Sink Delivery Modes

The Kafka sink supports three delivery modes, from lowest overhead to strongest guarantees:

Fire-and-Forget (acks=0)

No acknowledgment from the broker. Highest throughput, but messages can be lost:

#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new("localhost:9092", "output")
    .with_acks("0");
}

When acks is "0", idempotence is automatically disabled since the broker never confirms delivery.

Idempotent (default)

The default mode. The producer uses Kafka’s idempotent delivery (enable.idempotence=true) to guarantee no duplicates from producer retries, without the overhead of transactions:

#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new("localhost:9092", "output");
}

This is the right choice for most pipelines. It guarantees at-least-once delivery with no duplicate writes from the producer side, and has no per-batch transaction commit overhead.

Transactional (exactly-once)

Opt-in via new_transactional(). Wraps writes in Kafka transactions for exactly-once semantics:

#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new_transactional("localhost:9092", "output", "my-txn-id");
}

See Exactly-Once Semantics below for details.

Breaking change: The old 3-argument KafkaSinkConfig::new(servers, topic, txn_id) has been renamed to new_transactional(). The 2-argument new(servers, topic) now creates a non-transactional idempotent sink.

Producer Tuning

Fine-tune the Kafka producer for throughput vs. latency:

#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new("localhost:9092", "output")
    .with_linger_ms(20)                // batch for 20ms before sending (default: 5)
    .with_compression("lz4")           // compress batches (default: "none")
    .with_acks("1");                   // leader-only ack (default: "all")
}
MethodDefaultEffect
with_acks(acks)"all""all" = full ISR ack, "1" = leader only, "0" = fire-and-forget.
with_linger_ms(ms)5How long rdkafka waits to batch messages before sending. Higher values increase batching and throughput at the cost of latency.
with_compression(codec)"none"Compression codec: "none", "gzip", "snappy", "lz4", "zstd". LZ4 and Zstd offer the best throughput-to-ratio tradeoff.
with_property(key, value)Escape hatch for any rdkafka producer setting not covered above.

Tip: For high-throughput sinks, with_linger_ms(20) + with_compression("lz4") is a good starting point. This lets rdkafka batch more messages per broker request and compress them in a single pass.

Exactly-Once Semantics

The Kafka sink supports exactly-once delivery when configured with new_transactional():

#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new_transactional("localhost:9092", "output", "my-txn-id");
}

How it works:

  1. Records are buffered during processing
  2. On checkpoint barrier, the sink commits the Kafka transaction
  3. On failure recovery, uncommitted transactions are aborted
  4. Processing resumes from the last checkpoint, replaying records

The consumer group offset is committed as part of the transaction, ensuring source and sink are in sync.

Note: Transactional mode adds a commit round-trip per checkpoint. For pipelines where at-least-once is acceptable, the default idempotent mode (KafkaSinkConfig::new()) avoids this overhead entirely.

Trace Context Propagation

The Kafka source and sink automatically propagate W3C Trace Context (traceparent header) for distributed tracing:

  • Source: If a consumed message has a traceparent header, the record inherits that trace context. The runtime’s sampler respects upstream sampling decisions (parent-based sampling).
  • Sink: If a record being written has trace context, a traceparent header is injected into the produced Kafka message.

This works automatically when per-record tracing is enabled via with_tracing(). No additional configuration is needed for Kafka trace propagation.

Consumer Lag Monitoring

The volley.kafka.consumer_lag metric tracks consumer group lag. Use it for KEDA autoscaling:

sum(volley_kafka_consumer_lag) by (job_name)

See Capacity Planning for KEDA trigger configuration.

Delta Lake

The volley-connector-delta crate provides a Delta Lake sink with exactly-once semantics via epoch-tagged commits.

Key Types

TypeDescription
DeltaSinkSink that writes Parquet data to a Delta table
DeltaSinkConfigConfiguration (table URI, partitioning, buffer size, file size)
PartitionSpecSpecification for Hive-style partitioned writes

Usage

#![allow(unused)]
fn main() {
use volley_connector_delta::{DeltaSink, DeltaSinkConfig};

let config = DeltaSinkConfig::new("s3://my-bucket/output-table");
let sink = DeltaSink::new(config).await?;

env.from_source(source)
    .filter_expr(col("status").eq(lit("active")))
    .to_sink(sink)
    .execute("delta-writer")
    .await?;
}

Configuration Reference

#![allow(unused)]
fn main() {
let config = DeltaSinkConfig::new("s3://my-bucket/output-table")
    .with_pipeline_id("my-pipeline")        // idempotent commit tag (default: "volley-default")
    .with_max_records(50_000)               // records buffered before flush (default: 10,000)
    .with_target_file_size(256 * 1024 * 1024) // target Parquet file size (default: 128 MB)
    .with_partition_spec(partition_spec);    // Hive-style partitioning (default: none)
}
MethodDefaultDescription
with_pipeline_id()"volley-default"Identifies the pipeline in commit metadata for idempotent writes. Use a unique ID per pipeline writing to the same table
with_max_records()10,000Number of records buffered in memory before writing a Parquet file. Higher values produce fewer, larger files
with_target_file_size()128 MBTarget size for individual Parquet files. The sink flushes when buffered data approaches this size
with_partition_spec()NoneEnables Hive-style partitioned writes — see Partitioned Writes

Partitioned Writes

DeltaSink supports Hive-style partitioned writes for efficient query pruning:

#![allow(unused)]
fn main() {
use volley_connector_delta::PartitionSpec;

let config = DeltaSinkConfig::new("s3://my-bucket/events")
    .with_partition_spec(PartitionSpec::columns(vec![
        "date".to_string(),
        "region".to_string(),
    ]));
}

This produces a directory layout like:

s3://my-bucket/events/
  date=2026-03-31/region=us-east-1/part-0001.parquet
  date=2026-03-31/region=eu-west-1/part-0002.parquet

Partition column guidelines:

  • Use low-cardinality columns (date, region, status) to avoid excessive small files
  • Partition columns must exist in the record schema
  • Order matters: put the most selective column first for query engines

Exactly-Once Semantics

Delta commits include an epoch tag in the commit metadata. On recovery:

  1. The sink reads the Delta log to find the latest committed epoch for this pipeline_id
  2. If the current epoch was already committed, the write is skipped
  3. Replayed records after a failure never produce duplicate data

The pipeline_id scopes idempotency — multiple pipelines can write to the same table without conflicts as long as each uses a unique ID.

Storage Backends

Delta tables can be stored on any Delta-compatible storage:

StorageURI FormatCredentials
AWS S3s3://bucket/pathDefault credential chain (env, ~/.aws/credentials, instance profile)
Azure Blobaz://container/pathDefaultAzureCredential chain
Local filesystem/path/to/tableFile permissions

Performance Tuning

ScenarioRecommendation
High throughput (>100K rec/s)Increase with_max_records(50_000) to reduce commit frequency
Large records (>1 KB each)Decrease with_max_records() or increase with_target_file_size()
Many partitionsMonitor file count; consider periodic compaction
Low latency requirementsDecrease with_max_records() for more frequent flushes

Table Compaction

With many partitions and frequent checkpoints, Delta tables accumulate small files that degrade read performance. Run compaction periodically:

-- In a query engine (Spark, DataFusion, etc.)
OPTIMIZE 's3://my-bucket/events';

Automated compaction within the Volley sink is planned for a future release.

Troubleshooting

“Table not found” or empty table

  • Delta tables are created automatically on first write if they don’t exist
  • Verify the storage URI is correct and credentials have write access

Duplicate data after recovery

  • Ensure each pipeline uses a unique pipeline_id
  • If two pipelines share the same ID, epoch deduplication may skip valid writes

Small files accumulating

  • Increase with_max_records() to buffer more records per file
  • Run OPTIMIZE periodically from a query engine
  • Consider adjusting checkpoint interval to reduce commit frequency

Slow writes to S3

  • S3 has higher latency per PUT than local disk; increase with_target_file_size() to reduce PUT count
  • Ensure the application runs in the same AWS region as the S3 bucket

Apache Iceberg

The volley-connector-iceberg crate provides an Iceberg sink via REST catalog with exactly-once semantics.

Key Types

TypeDescription
IcebergSinkSink that writes Parquet data files to an Iceberg table
IcebergSinkConfigConfiguration (catalog URI, warehouse, namespace, table, storage)
IcebergStorageStorage backend enum (S3, Azure, GCS)

Usage

#![allow(unused)]
fn main() {
use volley_connector_iceberg::{IcebergSink, IcebergSinkConfig, IcebergStorage};

let config = IcebergSinkConfig::new(
    "http://polaris:8181",           // REST catalog URI
    "my-warehouse",                  // catalog warehouse
    vec!["db".into()],               // namespace
    "events",                        // table name
    IcebergStorage::S3 {
        region: "us-east-1".into(),
        bucket: "my-bucket".into(),
        endpoint: None,
    },
);

let sink = IcebergSink::new(config).await?;

env.from_source(source)
    .filter_expr(col("status").eq(lit("active")))
    .to_sink(sink)
    .execute("iceberg-writer")
    .await?;
}

Configuration Reference

Required Parameters

ParameterDescription
catalog_uriREST catalog endpoint (e.g., http://polaris:8181)
catalog_warehouseWarehouse identifier in the catalog
namespaceIceberg namespace as a list of strings (e.g., ["db", "schema"])
table_nameTarget table name
storageStorage backend — see Storage Backends

Optional Parameters

#![allow(unused)]
fn main() {
let config = IcebergSinkConfig::new(catalog_uri, warehouse, namespace, table, storage)
    .with_credential("client-id:client-secret")  // OAuth2 credential for Polaris
    .with_max_records(10_000)                     // buffer size before flush (default: 10,000)
    .with_flush_interval(Duration::from_secs(30)); // time-based flush trigger
}
MethodDefaultDescription
with_credential()NoneOAuth2 credential for authenticated catalogs (Polaris, Tabular)
with_max_records()10,000Number of records buffered before writing a Parquet data file
with_flush_interval()NoneTime-based flush trigger; writes a file even if max_records hasn’t been reached

Storage Backends

S3

#![allow(unused)]
fn main() {
IcebergStorage::S3 {
    region: "us-east-1".into(),
    bucket: "my-iceberg-data".into(),
    endpoint: None,  // use None for AWS, Some("http://localhost:9000") for MinIO
}
}

AWS credentials are resolved via the default credential chain (environment variables, ~/.aws/credentials, instance profile).

For local development with MinIO:

#![allow(unused)]
fn main() {
IcebergStorage::S3 {
    region: "us-east-1".into(),
    bucket: "test-bucket".into(),
    endpoint: Some("http://localhost:9000".into()),
}
}

Azure Blob Storage

#![allow(unused)]
fn main() {
IcebergStorage::Azure {
    storage_account: "mystorageaccount".into(),
    container: "iceberg-data".into(),
    access_key: None,  // uses DefaultAzureCredential when None
}
}

When access_key is None, authentication uses Azure’s DefaultAzureCredential chain (environment variables, managed identity, Azure CLI).

Google Cloud Storage

#![allow(unused)]
fn main() {
IcebergStorage::Gcs {
    project_id: "my-gcp-project".into(),
    bucket: "iceberg-data".into(),
}
}

GCS credentials are resolved via Application Default Credentials (GOOGLE_APPLICATION_CREDENTIALS or metadata server).

Exactly-Once Semantics

The Iceberg sink achieves exactly-once via snapshot commits with epoch tracking:

  1. Records are buffered in memory and written as Parquet data files
  2. On checkpoint barrier, a new Iceberg snapshot is committed via the REST catalog
  3. The checkpoint epoch is stored in the snapshot’s summary metadata
  4. On recovery, the sink reads the latest snapshot’s epoch and skips already-committed epochs

This means replayed records after a failure never produce duplicate data in the table.

REST Catalog Compatibility

Volley uses the Iceberg REST catalog spec for table management. Compatible catalogs:

CatalogNotes
Apache PolarisRecommended. Use with_credential() for OAuth2 auth
TabularHosted REST catalog. Use with_credential()
AWS Glue (via REST adapter)Requires a REST adapter in front of Glue
GravitinoREST-compatible catalog

Troubleshooting

“Connection refused” to catalog URI

  • Verify the catalog is running and reachable from your application
  • Check firewall rules and network policies (especially in Kubernetes)

“Unauthorized” or “Forbidden” from catalog

  • Ensure with_credential() is set with a valid OAuth2 client credential
  • For Polaris: verify the principal has TABLE_WRITE_DATA privilege on the target table

“Table not found”

  • Iceberg tables must be created in the catalog before the sink writes to them
  • Verify namespace and table_name match the catalog’s table identifier exactly

Small files accumulating

  • Increase with_max_records() to buffer more records per file
  • Set with_flush_interval() to a longer duration
  • Consider running Iceberg’s rewrite_data_files procedure periodically

Lance

The volley-connector-lance crate provides a sink that writes Arrow RecordBatches to a Lance columnar dataset with exactly-once semantics. Lance is a modern columnar format optimized for ML, vector search, and random-access workloads; it is the on-disk format behind LanceDB and is written in Rust from the ground up.

Key Types

TypeDescription
LanceSinkSink that writes Arrow records to a Lance dataset
LanceSinkConfigBuilder-style configuration (URI, pipeline id, file sizing, storage, recovery limit)
LanceStorageEnum configuring the object store backend (Local, S3, Gcs, Azure)
LanceStreamExtUmbrella extension trait adding DataStream::to_lance(config)

Quick Start

Attach the sink to a pipeline via the LanceStreamExt trait re-exported from volley_connectors:

#![allow(unused)]
fn main() {
use volley_connectors::blob_store::lance::{LanceStreamExt, LanceSinkConfig, LanceStorage};
use volley_core::prelude::*;

let config = LanceSinkConfig::new("s3://my-bucket/datasets/events")
    .with_pipeline_id("events-v1")
    .with_storage(LanceStorage::S3 {
        region: "us-east-1".into(),
        bucket: "my-bucket".into(),
        endpoint: None,
        access_key_id: None,
        secret_access_key: None,
    });

StreamExecutionEnvironment::new()
    .from_kafka(kafka_cfg).await?
    .filter_expr(col("status").eq(lit("active")))
    .to_lance(config).await?
    .execute("kafka-to-lance")
    .await?;
}

Enable the umbrella feature in your Cargo.toml:

volley-connectors = { version = "0.8", features = ["lance-sink"] }

Or depend on the standalone crate directly:

volley-connector-lance = "0.8"

Configuration Reference

#![allow(unused)]
fn main() {
let config = LanceSinkConfig::new("s3://bucket/path")
    .with_pipeline_id("my-pipeline")             // default: "volley-default"
    .with_storage(LanceStorage::Local)           // default: Local
    .with_max_rows_per_file(1_048_576)            // default: 1_048_576
    .with_max_rows_per_group(1024)                // default: 1024
    .with_max_bytes_per_file(90 * 1024 * 1024 * 1024) // default: 90 GB
    .with_max_records_buffer(100_000)             // default: 100_000
    .with_recovery_history_limit(256);            // default: 256
}
MethodDefaultDescription
with_pipeline_id()"volley-default"Stable identifier embedded in commit metadata. Used for idempotent recovery — use a unique value per pipeline writing to the same dataset
with_storage()LanceStorage::LocalObject store backend; see Storage Backends
with_max_rows_per_file()1,048,576Maximum rows per Lance data file. Larger values produce fewer, bigger files
with_max_rows_per_group()1,024Maximum rows per row group (columnar encoding unit)
with_max_bytes_per_file()90 GBSoft per-file size ceiling. Lance has a hard 100 GB limit on S3
with_max_records_buffer()100,000Soft in-memory row budget between checkpoints. Exceeding it logs a warning but does not force an untagged flush — all commits wait for the next checkpoint barrier so the epoch tag is always present
with_recovery_history_limit()256Maximum number of historical Lance transactions scanned on startup to recover last_committed_epoch. Increase if you retain a long uncompacted history

Exactly-Once Semantics

LanceSink uses a two-phase commit built on Lance’s native transaction API:

  1. Stage. Buffered batches are materialized through InsertBuilder::execute_uncommitted_stream, producing an uncommitted Transaction that represents the staged data files without making them visible.
  2. Tag. The sink sets the transaction’s transaction_properties field to {"volley.pipeline_id": id, "volley.epoch": N} before committing. These key-value pairs are persisted to the Lance transaction file on disk.
  3. Commit. CommitBuilder::execute(transaction) atomically publishes the new dataset version.

On startup, LanceSink::new calls Dataset::get_transactions to fetch up to recovery_history_limit recent transactions, scans each one for a matching volley.pipeline_id, and takes the maximum volley.epoch as last_committed_epoch. Any checkpoint barrier at or below that epoch is dropped as a duplicate on the next restart.

Note: Lance 4.0’s CommitBuilder::with_transaction_properties stores the map on the builder but does not forward it to the staged Transaction during execute. To work around this, the sink writes the properties directly to Transaction.transaction_properties — which is a public field on the struct — before calling execute. This is a tracked upstream issue; when Lance fixes it, the workaround can be replaced with the documented builder method.

Storage Backends

Lance delegates I/O to object_store under the hood, and LanceStorage maps cleanly onto its configuration keys.

VariantURICredentials
LanceStorage::Localfilesystem path or file://…none
LanceStorage::S3 { region, bucket, endpoint, access_key_id, secret_access_key }s3://bucket/pathinline fields, or AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY env vars, or IAM instance role
LanceStorage::Gcs { project_id, bucket, service_account_json }gs://bucket/pathinline JSON, or GOOGLE_APPLICATION_CREDENTIALS
LanceStorage::Azure { storage_account, container, access_key }az://container/pathinline key, or AZURE_STORAGE_ACCOUNT_KEY / DefaultAzureCredential

Setting endpoint = Some("http://…") on S3 also emits allow_http=true so MinIO / localstack work without extra plumbing.

Limitations

  • No Hive-style partitioning in v1. Lance uses fragments internally rather than filesystem-visible partitions, and the partition story is deferred.
  • No schema evolution within a single pipeline_id — the first batch’s schema is assumed to hold for every subsequent batch.
  • Single writer per (dataset_uri, pipeline_id) — concurrent writers conflict on commit and return an error rather than silently serializing.
  • Recovery horizon is bounded by recovery_history_limit. If a pipeline accumulates more uncompacted versions than the limit without a successful recovery, older commits may be invisible to the scan and epochs can be re-executed. Compact regularly or raise the limit.

Troubleshooting

Recovery does not pick up the last committed epoch — Ensure the new sink uses the same pipeline_id as the writer, and that recovery_history_limit covers the relevant version range. A warn! log is emitted if a transaction’s volley.epoch property is malformed.

Build fails with “Could not find protoc — Lance’s build scripts compile protobuf definitions at build time. Install the protoc binary (apt-get install protobuf-compiler on Debian/Ubuntu, brew install protobuf on macOS) before building.

Example

See volley-examples/examples/memory_to_lance.rs for a complete end-to-end example that runs a MemorySourceLanceSink pipeline, writes 5 trade records to a local Lance dataset via the full Volley runtime (including the final shutdown barrier), then reopens the dataset and prints all rows.

cargo run --example memory_to_lance -p volley-examples --release

AWS S3

The volley-connector-aws-s3 crate provides an S3 source that reads objects triggered by SQS notifications.

Architecture

SQS Queue                    S3 Bucket
(object notifications)       (data files)
      |                          |
      v                          v
SqsNotificationSource        S3BlobReader
      |                          |
      +----------+    +----------+
                 |    |
                 v    v
            BlobSource
                 |
                 v
            Decoder (Parquet / JSON / CSV / Avro)
                 |
                 v
            RecordBatch --> Pipeline
  1. S3 sends object-created notifications to an SQS queue
  2. SqsNotificationSource polls the queue for new notifications
  3. S3BlobReader reads the object data from S3
  4. The shared decoder layer converts the data to Arrow RecordBatch

Key Types

TypeDescription
S3SourceConfigured S3 source (combines SQS + S3 reader)
S3BlobReaderReads object data from S3
SqsNotificationSourcePolls SQS for S3 event notifications

Supported Formats

FormatFile Extensions
Parquet.parquet
JSON.json, .jsonl
CSV.csv
Avro.avro

Format detection is automatic based on file extension, or can be configured explicitly.

Usage

#![allow(unused)]
fn main() {
use volley_connector_aws_s3::S3Source;

let source = S3Source::builder()
    .sqs_queue_url("https://sqs.us-east-1.amazonaws.com/123456789/my-queue")
    .build()
    .await?;

env.from_source(source)
    .map(transform)
    .to_sink(sink)
    .execute("s3-ingest")
    .await?;
}

AWS credentials are resolved via the standard AWS credential chain (environment variables, IAM role, credential file).

Blob Store Connectors

The volley-connector-blob-store crate provides a cloud-agnostic abstraction for reading from object storage. Azure Blob Storage and Google Cloud Storage are built on this shared foundation.

Shared Architecture

All cloud blob store connectors follow the same pattern:

Cloud Queue/Topic          Cloud Storage
(SQS / Azure Queue /      (S3 / Azure Blob /
 Pub/Sub)                   GCS)
      |                        |
      v                        v
NotificationSource         BlobReader
      |                        |
      +--------+   +-----------+
               |   |
               v   v
          BlobSource
               |
               v
          Decoder (Parquet / JSON / CSV / Avro)
               |
               v
          RecordBatch --> Pipeline

To add a new cloud backend, implement NotificationSource and BlobReader. The decoder layer is shared.

Azure Blob Storage

From volley-connector-azure-blob:

TypeDescription
AzureBlobSourceBuilderBuilder with automatic credential discovery
AzureBlobSourceConfigured Azure Blob source
AzureBlobReaderReads blobs from Azure Blob Storage
AzureQueueNotificationSourcePolls Azure Queue Storage for blob notifications

Enable with feature flag blob-store-azure on volley-connectors.

Authentication

By default, Azure connectors use DefaultAzureCredential which discovers credentials automatically. The credential chain tries, in order:

  1. Environment variablesAZURE_TENANT_ID, AZURE_CLIENT_ID, AZURE_CLIENT_SECRET
  2. Workload Identity — Kubernetes pods with Azure Workload Identity federation
  3. Managed Identity — Azure VM, App Service, or Container Instance identity
  4. Azure CLI — Cached az login credentials (local development)

To override with an explicit storage account key, use with_access_key():

#![allow(unused)]
fn main() {
use volley_connector_azure_blob::AzureBlobSourceBuilder;

// DefaultAzureCredential (recommended for production)
let source = AzureBlobSourceBuilder::new("myaccount", "my-queue", "my-container")
    .build(config).await?;

// Explicit access key (override)
let source = AzureBlobSourceBuilder::new("myaccount", "my-queue", "my-container")
    .with_access_key("MYACCESSKEY...")
    .build(config).await?;
}

Google Cloud Storage

From volley-connector-gcp-gcs:

TypeDescription
GcsSourceConfigured GCS source
GcsBlobReaderReads objects from GCS
PubSubNotificationSourcePolls Pub/Sub for GCS object notifications

Enable with feature flag blob-store-gcs on volley-connectors.

Authentication

GCS connectors use Application Default Credentials, which discovers credentials automatically. The credential chain tries, in order:

  1. Environment variableGOOGLE_APPLICATION_CREDENTIALS pointing to a service account key file
  2. Workload Identity — Kubernetes pods with GKE Workload Identity federation
  3. Compute Engine metadata — GCE VM, Cloud Run, or GKE node identity
  4. gcloud CLI — Cached gcloud auth application-default login credentials (local development)

Usage

#![allow(unused)]
fn main() {
use volley_connector_gcp_gcs::GcsSource;

let source = GcsSource::new("my-gcs-bucket", "my-pubsub-subscription")
    .build(config).await?;
}

Note: The GCS Pub/Sub notification integration is currently experimental. The PubSubNotificationSource depends on the google-cloud-pubsub crate’s streaming pull API, which is not yet fully stabilized. The blob reader and event parser are functional — the gap is in notification polling reliability under sustained load.

Blob Store Sinks

The volley-connectors crate also provides buffered blob store sinks for writing output files to S3 or Azure Blob Storage. These sinks batch records and flush them as files based on record count or time interval.

Encoders

EncoderOutput FormatFile Extension
ParquetEncoderApache Parquet.parquet
JsonLinesEncoderJSON Lines (newline-delimited JSON).jsonl
CsvEncoderCSV with optional header.csv

Usage

#![allow(unused)]
fn main() {
use volley_connectors::blob_store::{BufferedBlobSink, BufferedBlobSinkConfig};
use volley_connectors::blob_store::encoders::ParquetEncoder;
use volley_connectors::blob_store::aws::S3BlobWriter;

let encoder = Box::new(ParquetEncoder::new());
let config = BufferedBlobSinkConfig::new("my-bucket", "output/")
    .with_max_records(100_000)
    .with_flush_interval(Duration::from_secs(60));
let sink = BufferedBlobSink::new(
    Box::new(S3BlobWriter::new(s3_client)),
    encoder,
    config,
);
}

For Azure Blob Storage, use AzureBlobWriter instead of S3BlobWriter:

#![allow(unused)]
fn main() {
use volley_connectors::blob_store::azure::AzureBlobWriter;

let sink = BufferedBlobSink::new(
    Box::new(AzureBlobWriter::new(blob_service_client, "my-container")),
    encoder,
    config,
);
}

Enable with feature flags blob-store-aws or blob-store-azure on volley-connectors.

Supported Source Formats

All blob store sources share the same decoder layer:

FormatFile Extensions
Parquet.parquet
JSON.json, .jsonl
CSV.csv
Avro.avro

Feature Flags

[dependencies]
volley-connectors = {
    git = "https://github.com/volley-streams/volley",
    features = ["blob-store-azure", "blob-store-gcs"]
}

See also: AWS S3 for the S3-specific connector.

ADBC (Arrow Database Connectivity)

Volley provides ADBC-based database connectivity via two crates:

  • volley-connector-adbc – A sink that writes Arrow RecordBatches to any database with an ADBC driver (PostgreSQL, BigQuery, Snowflake, Clickhouse, Databricks, MySQL, Redshift, Exasol, and more).
  • volley-enrichment-adbc – A real-time enrichment operator that queries external databases to enrich streaming records with lookup data, backed by an LRU+TTL cache.

ADBC uses a C-API driver model (dlopen/dlsym), so adding support for a new database only requires installing the corresponding ADBC driver shared library – no connector-specific code needed.

Setup

Add crate dependencies

[dependencies]
# Sink (write to a database)
volley-connector-adbc = { git = "https://github.com/volley-streams/volley" }

# Enrichment operator (query a database to enrich records)
volley-enrichment-adbc = { git = "https://github.com/volley-streams/volley" }

Install the ADBC driver shared library

ADBC drivers are platform-specific shared libraries. Install the driver for your target database:

PostgreSQL:

# macOS (Homebrew)
brew install apache-arrow-adbc-driver-postgresql

# Ubuntu/Debian
apt-get install libadbc-driver-postgresql

# Or download from https://github.com/apache/arrow-adbc/releases

BigQuery:

# Download the adbc_driver_bigquery shared library from
# https://github.com/apache/arrow-adbc/releases

The driver name (e.g., "adbc_driver_postgresql") is resolved via platform library search paths (LD_LIBRARY_PATH on Linux, DYLD_LIBRARY_PATH on macOS). Alternatively, pass a full file path to the shared library.

ADBC Sink

The sink writes Arrow RecordBatches to a database table. It supports insert (bulk ingest) and upsert (ON CONFLICT) modes with write-behind buffering.

Basic insert

#![allow(unused)]
fn main() {
use volley_connector_adbc::{AdbcConfig, AdbcSinkConfig, AdbcStreamExt};

let adbc = AdbcConfig::new("adbc_driver_postgresql", "postgresql://localhost/mydb")
    .with_pool_size(4);

let sink_config = AdbcSinkConfig::new(adbc, "events")
    .with_batch_size(5000)
    .with_create_table(true);

env.from_kafka(source_config).await?
    .filter_expr(col("status").eq(lit("active")))
    .to_adbc(sink_config).await?
    .execute("kafka-to-postgres").await?;
}

Upsert mode

Set with_upsert_key() to enable upsert semantics. The sink auto-generates INSERT ... ON CONFLICT (key) DO UPDATE SET SQL:

#![allow(unused)]
fn main() {
let sink_config = AdbcSinkConfig::new(adbc, "events")
    .with_upsert_key("event_id")
    .with_batch_size(5000);
}

For databases with non-standard upsert syntax, provide custom SQL:

#![allow(unused)]
fn main() {
let sink_config = AdbcSinkConfig::new(adbc, "events")
    .with_upsert_query(
        "INSERT INTO events (id, name, value) VALUES ($1, $2, $3) \
         ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, value = EXCLUDED.value"
    );
}

Sink configuration reference

MethodDefaultDescription
AdbcSinkConfig::new(adbc, table)Required: ADBC config and target table name
.with_upsert_key(column)NoneColumn for upsert (ON CONFLICT) semantics. None = insert-only
.with_upsert_query(sql)NoneCustom SQL for upsert. Overrides auto-generated SQL
.with_batch_size(n)1000Number of buffered rows before auto-flush
.with_create_table(bool)falseAuto-create the table from the first batch’s Arrow schema

Delivery guarantees

  • At-least-once (default)on_checkpoint() flushes buffered batches. If the pipeline crashes after flushing but before the checkpoint completes, some records may be written twice on recovery.
  • Effectively exactly-once via upsert – When upsert_key is set, duplicate writes are idempotent. Choose a natural key that makes re-delivery safe.

ADBC Enrichment Operator

The enrichment operator queries an external database to enrich streaming records. It extracts lookup keys from each batch, issues a single micro-batch query, and appends enrichment columns to the output. An LRU+TTL cache avoids redundant queries.

Key-column lookup

The simplest mode: specify a stream column and a remote table. The operator generates SELECT [columns] FROM table WHERE target_key IN (k1, k2, ...).

#![allow(unused)]
fn main() {
use volley_enrichment_adbc::{AdbcEnrichConfig, AdbcEnrichStreamExt, EnrichMissStrategy};
use volley_connector_adbc::AdbcConfig;
use std::time::Duration;

let pg = AdbcConfig::new("adbc_driver_postgresql", "postgresql://localhost/mydb");

env.from_kafka(source_config).await?
    .enrich_with_adbc(
        AdbcEnrichConfig::new(pg, "customers", "customer_id")
            .with_select_columns(vec!["tier", "region", "lifetime_value"])
            .with_prefix("customer_")
            .with_cache_ttl(Duration::from_secs(300))
            .with_on_miss(EnrichMissStrategy::NullFill)
    )
    .filter_expr(col("customer_tier").eq(lit("premium")))
    .to_kafka(sink_config).await?
    .execute("enrich-orders").await?;
}

Parameterized query

For complex lookups (joins, filters, computed columns), provide a SQL template with IN $1:

#![allow(unused)]
fn main() {
let bq = AdbcConfig::new("adbc_driver_bigquery", "bigquery://project-id");

env.from_kafka(source_config).await?
    .enrich_with_adbc(
        AdbcEnrichConfig::new_with_query(
            bq,
            "SELECT category, risk_score FROM products WHERE sku IN $1 AND active = true",
            "product_sku"
        )
    )
    .to_adbc(AdbcSinkConfig::new(pg, "enriched_events")).await?
    .execute("enrich-and-store").await?;
}

Miss strategies

When a lookup key has no match in the database:

StrategyBehavior
NullFill (default)Add enrichment columns filled with nulls
DropRecordDrop the record entirely
PassThroughKeep the original record without enrichment columns

Enrichment configuration reference

MethodDefaultDescription
AdbcEnrichConfig::new(adbc, table, key)Key-column lookup mode
AdbcEnrichConfig::new_with_query(adbc, sql, key)Parameterized query mode
.with_target_key(column)Same as lookup_keyRemote column to match against (if different from the stream column)
.with_select_columns(vec)All columnsWhich columns to fetch from the remote table
.with_prefix(prefix)NonePrefix for enriched columns (e.g., "customer_")
.with_cache_ttl(duration)5 minutesTime-to-live for cached entries
.with_cache_max_entries(n)10,000Maximum entries in the LRU cache
.with_on_miss(strategy)NullFillStrategy for unmatched records

Cache behavior

The enrichment operator uses an LRU cache with per-entry TTL:

  • Cache hits return the stored enrichment row without querying the database.
  • Cache misses (key not found) are cached as None to avoid repeated lookups for non-existent keys.
  • TTL expiry evicts stale entries so the operator picks up changes in the remote database.
  • On recovery, the cache starts cold and repopulates organically. Enrichment data is external and authoritative, so there is no state to recover.

Connection Configuration Reference

AdbcConfig is shared between the sink and enrichment operator:

MethodDefaultDescription
AdbcConfig::new(driver, uri)Required: driver name/path and connection URI
.with_option(key, value){}Add a driver-specific key-value option
.with_pool_size(n)4Connection pool size (round-robin)

Driver name resolution: If the driver name contains / or \, it is treated as a file path. Otherwise, it is resolved via platform library search paths (e.g., LD_LIBRARY_PATH, DYLD_LIBRARY_PATH).

Common driver names:

DatabaseDriver Name
PostgreSQLadbc_driver_postgresql
BigQueryadbc_driver_bigquery
Snowflakeadbc_driver_snowflake
SQLiteadbc_driver_sqlite
Flight SQLadbc_driver_flightsql

Observability

MetricTypeDescription
adbc_sink_rows_written_totalcounterRows written, by table
adbc_sink_flush_duration_secondshistogramFlush latency
adbc_enrich_cache_hit_totalcounterCache hits
adbc_enrich_cache_miss_totalcounterCache misses
adbc_enrich_cache_sizegaugeCurrent cache entry count
adbc_enrich_query_duration_secondshistogramMicro-batch query latency
adbc_enrich_query_errors_totalcounterFailed enrichment queries

Integration Testing

The ADBC integration tests require Docker (for PostgreSQL via testcontainers) and the adbc_driver_postgresql shared library.

Set the ADBC_POSTGRESQL_DRIVER environment variable if the driver is not on the default library search path:

# Point to the driver if not on LD_LIBRARY_PATH
export ADBC_POSTGRESQL_DRIVER=/usr/local/lib/libadbc_driver_postgresql.so

# Run the sink integration tests
cargo test -p volley-connector-adbc --test sink_integration -- --ignored

# Run the enrichment integration tests
cargo test -p volley-enrichment-adbc --test enrich_integration -- --ignored

Tests use volley-test-harness::containers::PostgresContainer to spin up a PostgreSQL instance automatically.

Troubleshooting

“Failed to load ADBC driver”

  • Verify the driver shared library is installed and on the library search path.
  • On macOS: check DYLD_LIBRARY_PATH. On Linux: check LD_LIBRARY_PATH.
  • Try passing the full file path instead of just the driver name.

“Failed to create ADBC connection”

  • Verify the connection URI is correct and the database is reachable.
  • Check that driver-specific options (credentials, project ID, etc.) are set via with_option().

Slow enrichment queries

  • Increase cache_ttl and cache_max_entries to reduce query frequency.
  • Add an index on the lookup column in the remote database.
  • Use with_select_columns() to fetch only the columns you need.

Duplicate rows after recovery

  • Use with_upsert_key() to make writes idempotent on a natural key.
  • Without upsert, the sink provides at-least-once delivery.

Kubernetes Operator

Volley includes a Kubernetes operator that automates deployment and management of Volley stream processing applications. Define a VolleyApplication custom resource and the operator handles StatefulSets, persistent storage, health checks, and Prometheus integration.

Prerequisites

  • Kubernetes cluster (v1.26+)
  • prometheus-operator (optional, for automatic metrics scraping)
  • KEDA (optional, for autoscaling)

Installing the Operator

Build and deploy the operator:

# Build the operator binary
cargo build --release -p volley-k8s-operator

# Generate the CRD manifest
./target/release/volley-k8s-operator generate-crd > volley-crd.json

# Apply the CRD to your cluster
kubectl apply -f volley-crd.json

# Run the operator (in-cluster or locally for development)
./target/release/volley-k8s-operator

Deploying a Volley Application

Create a VolleyApplication resource:

apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
  name: my-pipeline
  namespace: default
spec:
  # Container image (required)
  image: registry.example.com/my-volley-app:v1.0

  # Resource requirements (required)
  resources:
    requests:
      cpu: "2"
      memory: "4Gi"
    limits:
      cpu: "4"
      memory: "8Gi"

  # RocksDB state storage
  state:
    storageClassName: fast-ssd
    size: 10Gi

  # Checkpoint storage
  checkpoints:
    storageClassName: fast-ssd
    size: 20Gi

  # Environment variables
  env:
    - name: RUST_LOG
      value: "info"
    - name: KAFKA_BOOTSTRAP_SERVERS
      value: "kafka.default.svc:9092"
kubectl apply -f my-pipeline.yaml

The operator creates the following resources automatically:

ResourcePurpose
StatefulSetRuns the app with stable pod identity and persistent volumes
PVC (state)Persistent storage for RocksDB state (/var/lib/volley/state)
PVC (checkpoints)Persistent storage for checkpoints (/var/lib/volley/checkpoints)
Service (headless)StatefulSet DNS for stable network identity
Service (metrics)Exposes the Prometheus metrics port
ServiceMonitorPrometheus-operator auto-discovery for metrics scraping
PodDisruptionBudgetPrevents involuntary eviction during checkpointing

Health Checks

The operator automatically configures liveness and readiness probes backed by Volley’s built-in HealthReporter:

ProbePathBehaviour
Liveness/health (port 8080)Returns healthy when pipeline is Running or Starting
Readiness/ready (port 8080)Returns ready only when pipeline is Running

Customize the health check configuration:

spec:
  health:
    port: 8080
    livenessPath: /health
    readinessPath: /ready
    initialDelaySeconds: 30
    periodSeconds: 10

Observability

Metrics are exposed on port 9090 by default. When prometheus-operator is installed, the operator creates a ServiceMonitor for automatic scraping:

spec:
  observability:
    metricsPort: 9090
    metricsPath: /metrics
    serviceMonitor:
      enabled: true
      interval: 15s
      labels:
        release: prometheus   # match your Prometheus selector
    otlpEndpoint: http://otel-collector.monitoring:4317  # optional tracing

Key Volley metrics available for dashboards and alerting:

MetricTypeDescription
volley.source.records_polledCounterRecords read from source
volley.records.processedCounterRecords processed per operator
volley.stage.latency_msHistogramPer-operator processing latency
volley.checkpoint.duration_msHistogramCheckpoint completion time
volley.channel.utilizationGaugeBackpressure indicator (0.0-1.0)
volley.kafka.consumer_lagGaugeKafka consumer group lag
volley.pipeline.uptime_secondsGaugePipeline uptime

Node Isolation

Following Flink best practice, each Volley application should run on dedicated nodes. The operator supports node selectors, tolerations, and pod anti-affinity:

spec:
  isolation:
    # Schedule only on nodes labelled for stream processing
    nodeSelector:
      volley.io/pool: stream-processing
    # Tolerate the dedicated taint
    tolerations:
      - key: volley.io/dedicated
        operator: Equal
        value: "true"
        effect: NoSchedule
    # One pod per node (enabled by default)
    podAntiAffinity: true

Distributed Execution (Horizontal Scaling)

To run a pipeline across multiple pods, add the cluster section to your spec:

First, create a ReadWriteMany PVC backed by your cloud provider’s managed NFS (AWS EFS, GCP Filestore, Azure Files):

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: volley-shared-ckpt
spec:
  accessModes: [ReadWriteMany]
  storageClassName: efs  # your RWX storage class
  resources:
    requests:
      storage: 50Gi

Then reference it in your VolleyApplication:

spec:
  replicas: 3
  cluster:
    maxKeyGroups: 256                    # virtual partitions (default, power of 2)
    flightPort: 50051                    # Arrow Flight port (default)
    sharedCheckpoints:
      claimName: volley-shared-ckpt      # the PVC you created above

When cluster is present, the operator automatically:

ActionDetail
Injects VOLLEY_* env varsVOLLEY_NUM_WORKERS, VOLLEY_HEADLESS_SERVICE, VOLLEY_MAX_KEY_GROUPS, VOLLEY_FLIGHT_PORT, VOLLEY_APP_NAME, VOLLEY_SHARED_CHECKPOINT_DIR
Adds Flight portContainer port 50051 + headless service port for inter-pod communication
Mounts shared PVCThe existing ReadWriteMany PVC is mounted into all pods at /mnt/volley/checkpoints

The Volley runtime reads these env vars via ClusterConfig::from_env() and automatically enters distributed mode.

Shared checkpoint storage uses a pre-provisioned ReadWriteMany PVC (managed NFS from your cloud provider). All workers write checkpoints to the same shared mount, so recovery and rescaling are near-instant — no data transfer needed.

Rescaling is live: change spec.replicas and the coordinator triggers a checkpoint, recomputes key group assignments, and workers pick up their new key groups from the shared filesystem. No restart required.

Autoscaling with KEDA (Future)

The operator has built-in support for KEDA autoscaling. When enabled, it creates a ScaledObject that can scale based on Kafka consumer lag, Prometheus metrics, or CPU/memory:

spec:
  autoscaling:
    enabled: true
    minReplicas: 1
    maxReplicas: 10
    triggers:
      # Scale on Kafka consumer lag
      - type: kafka
        metadata:
          bootstrapServers: kafka.default.svc:9092
          consumerGroup: my-pipeline-group
          topic: input-events
          lagThreshold: "1000"
      # Scale on backpressure (channel utilization)
      - type: prometheus
        metadata:
          serverAddress: http://prometheus.monitoring:9090
          query: avg(volley_channel_utilization{job="my-pipeline"})
          threshold: "0.8"
      # Scale on CPU
      - type: cpu
        metadata:
          value: "70"

Checking Application Status

# List all Volley applications
kubectl get volleyapplications
# or use the short name:
kubectl get vapp

# Check status
kubectl describe vapp my-pipeline

# View operator logs
kubectl logs deployment/volley-k8s-operator

The status subresource reports the current phase (Pending, Running, Degraded, Failed), replica counts, and a human-readable message.

Full Spec Reference

Click to expand the full VolleyApplication spec
apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
  name: my-pipeline
  namespace: default
spec:
  image: registry.example.com/my-app:v1.0   # required
  imagePullPolicy: IfNotPresent              # default: IfNotPresent
  imagePullSecrets:
    - name: registry-secret

  resources:                                  # required
    requests: { cpu: "2", memory: "4Gi" }
    limits:   { cpu: "4", memory: "8Gi" }

  replicas: 1                                 # default: 1
  args: ["--config", "/etc/volley/config.yaml"]

  env:
    - name: RUST_LOG
      value: "info"
    - name: DB_PASSWORD
      valueFrom:
        secretKeyRef: { name: db-creds, key: password }

  state:
    storageClassName: fast-ssd
    size: 10Gi
    mountPath: /var/lib/volley/state          # default

  checkpoints:
    storageClassName: fast-ssd
    size: 20Gi
    mountPath: /var/lib/volley/checkpoints    # default

  observability:
    metricsPort: 9090                         # default: 9090
    metricsPath: /metrics                     # default: /metrics
    serviceMonitor:
      enabled: true                           # default: true
      interval: 15s                           # default: 15s
      labels: { release: prometheus }
    otlpEndpoint: http://otel-collector:4317

  health:
    port: 8080                                # default: 8080
    livenessPath: /health                     # default: /health
    readinessPath: /ready                     # default: /ready
    initialDelaySeconds: 30                   # default: 30
    periodSeconds: 10                         # default: 10

  isolation:
    nodeSelector: { volley.io/pool: stream-processing }
    tolerations:
      - key: volley.io/dedicated
        operator: Equal
        value: "true"
        effect: NoSchedule
    podAntiAffinity: true                     # default: true

  kafka:
    bootstrapServers: kafka:9092
    consumerGroup: my-group
    topics: [input-events]

  autoscaling:
    enabled: false                            # default: false
    minReplicas: 1
    maxReplicas: 10
    triggers:
      - type: kafka
        metadata:
          bootstrapServers: kafka:9092
          consumerGroup: my-group
          topic: input-events
          lagThreshold: "1000"

  cluster:                                      # distributed execution (optional)
    maxKeyGroups: 256                           # default: 256 (power of 2)
    flightPort: 50051                           # default: 50051
    sharedCheckpoints:
      claimName: volley-shared-ckpt             # existing ReadWriteMany PVC
      mountPath: /mnt/volley/checkpoints        # default

  labels: {}                                  # additional labels on all resources
  annotations: {}                             # additional annotations on all resources

Volley Capacity Planning Guide

This guide helps pipeline authors size CPU, memory, and storage for Volley deployments, and provides PromQL queries for ongoing capacity monitoring.


Throughput Reference

Benchmark data from volley-examples/examples/benchmark_sustained.rs on a single node:

ConfigurationThroughputNotes
Single record, parallelism=1~850K rec/sBaseline, no batching
Batch=1000, parallelism=1~3.5M rec/sBatching only
Batch=1000, parallelism=4~8.7M rec/sBatching + parallelism
Sustained 10s, batch=1000, parallelism=4~9.4M rec/sSteady-state throughput

These numbers reflect in-memory processing with simple operators. Real-world throughput will be lower depending on:

  • Operator complexity (aggregations, joins, windowing)
  • State backend I/O (RocksDB reads/writes)
  • Connector latency (Kafka, blob storage)
  • Checkpoint frequency

CPU Sizing

Throughput targetCPU requestsCPU limitsNotes
< 100K rec/s500m1Light pipelines, simple transforms
100K–1M rec/s12Moderate load, stateful operators
1M–5M rec/s24High throughput, batched processing
> 5M rec/s48Maximum throughput, parallelism=4+

Volley uses tokio async runtime. CPU is primarily consumed by:

  • Record deserialization/serialization
  • Operator logic (map, filter, aggregate, window)
  • RocksDB compaction (background threads)
  • Kafka consumer polling and producer flushing

Recommendation: Start with 1 CPU request and monitor container_cpu_usage_seconds_total. Scale up if sustained utilization exceeds 70%.


Memory Sizing

WorkloadMemory requestsMemory limitsNotes
Stateless transforms256Mi512MiNo state backend
Light stateful (< 100K keys)512Mi1GiSmall RocksDB footprint
Medium stateful (100K–1M keys)1Gi2GiRocksDB block cache
Heavy stateful (> 1M keys)2Gi4GiLarge working set

Memory is consumed by:

  • Channel buffers: In-flight records between pipeline stages
  • RocksDB block cache: Cached SST blocks for read performance (defaults to ~50% of available memory)
  • Arrow RecordBatch buffers: Columnar data in transit
  • Kafka consumer buffers: Pre-fetched messages
  • Window pending index: Active (key, window) pairs tracked by window operators (~100 bytes each)
  • Aggregate LRU cache: In-memory accumulator cache for keyed aggregations

Recommendation: Set memory limits to 2x requests. Monitor container_memory_working_set_bytes and watch for OOM kills.

Bounding Stateful Operator Memory

For high-cardinality keyed workloads, use the built-in LRU mechanisms to cap in-memory state:

  • Aggregate operators: keyed_stream.with_max_cache_entries(n) — bounds the in-memory accumulator cache. Evicted entries are flushed to RocksDB.
  • Window operators: windowed_stream.with_max_pending_windows(n) — bounds the in-memory window index. Evicted entries are persisted to the state backend and scanned during watermark advance.

As a rule of thumb, each aggregate cache entry is ~200-500 bytes (depends on accumulator type), and each pending window entry is ~100 bytes. Size the limits based on your available memory budget:

aggregate_memory ≈ max_cache_entries × 300 bytes
window_memory    ≈ max_pending_windows × 100 bytes

Monitor volley_state_cache_evictions_total and volley_window_pending_evictions_total — a sustained high eviction rate means the LRU is undersized and incurring excessive RocksDB I/O.


RocksDB Storage Sizing

Growth Formula

state_size = key_count × avg_value_size × amplification_factor

Where:

  • key_count: Number of unique keys in your keyed state
  • avg_value_size: Average serialized value size (JSON encoding adds ~30% overhead)
  • amplification_factor: RocksDB space amplification, typically 1.1–1.5x depending on compaction

Checkpoint Storage

checkpoint_storage = state_size × checkpoint_retention_count

RocksDB checkpoints use hardlinks to SST files, so incremental growth is small when data doesn’t change between checkpoints. However, as data mutates, old SST files are retained for older checkpoints.

PVC Sizing Recommendations

State sizeState PVCCheckpoint PVCRetention
< 1 GB5Gi5Gi3 checkpoints
1–10 GB20Gi20Gi3 checkpoints
10–50 GB100Gi100Gi2 checkpoints
> 50 GB2× state size2× state size2 checkpoints

Recommendation: Provision at least 2× your expected state size for headroom. Monitor actual usage with the PromQL queries below and adjust.

Helm Values Example

state:
  enabled: true
  size: "20Gi"
  storageClassName: gp3   # Use provisioned IOPS for write-heavy workloads

checkpoints:
  enabled: true
  size: "20Gi"

Capacity Monitoring PromQL Queries

Throughput

# Records ingested per second by pipeline
sum(rate(volley_source_records_polled_total[5m])) by (job_name)

# Records processed per second by pipeline
sum(rate(volley_records_processed_total[5m])) by (job_name)

# Processing ratio (should be close to 1.0)
sum(rate(volley_records_processed_total[5m])) by (job_name)
/
sum(rate(volley_source_records_polled_total[5m])) by (job_name)

Backpressure

# Channel utilization by stage (>0.8 = backpressure)
avg(volley_channel_utilization) by (job_name, stage)

# Stages with sustained backpressure
avg_over_time(volley_channel_utilization[15m]) > 0.8

Kafka Consumer Lag

# Total consumer lag by pipeline
sum(volley_kafka_consumer_lag) by (job_name)

# Lag growth rate (positive = falling behind)
deriv(sum(volley_kafka_consumer_lag) by (job_name)[15m:1m])

Latency

# p99 stage latency
histogram_quantile(0.99, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))

# p50 stage latency
histogram_quantile(0.5, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))

# Checkpoint duration p99
histogram_quantile(0.99, sum(rate(volley_checkpoint_duration_ms_bucket[5m])) by (le, job_name))

Error Rate

# Error rate by type
sum(rate(volley_errors_total[5m])) by (job_name, error_type)

# Source errors by connector
sum(rate(volley_source_errors_total[5m])) by (job_name, source_id)

# Sink errors by connector
sum(rate(volley_sink_errors_total[5m])) by (job_name, sink_id)

# Error ratio (for SLO tracking)
sum(rate(volley_errors_total[5m])) by (job_name)
/
clamp_min(sum(rate(volley_records_processed_total[5m])) by (job_name), 1)

Resource Utilization

# Watermark lag (event-time vs. wall-clock)
max(volley_watermark_lag_ms) by (job_name)

# Sink flush duration p99
histogram_quantile(0.99, sum(rate(volley_sink_flush_duration_ms_bucket[5m])) by (le, job_name))

KEDA Autoscaling Reference

Example KEDA trigger configurations for the Helm chart:

Scale on Kafka Consumer Lag

autoscaling:
  enabled: true
  minReplicas: 1
  maxReplicas: 8
  triggers:
    - type: kafka
      metadata:
        bootstrapServers: "kafka:9092"
        consumerGroup: "my-pipeline"
        topic: "input-topic"
        lagThreshold: "10000"    # Scale up when lag exceeds 10K messages

Scale on CPU

autoscaling:
  enabled: true
  minReplicas: 1
  maxReplicas: 4
  triggers:
    - type: cpu
      metadata:
        type: Utilization
        value: "70"              # Scale up at 70% CPU utilization

Scale on Custom Prometheus Metric

autoscaling:
  enabled: true
  minReplicas: 1
  maxReplicas: 8
  triggers:
    - type: prometheus
      metadata:
        serverAddress: "http://prometheus:9090"
        query: "sum(volley_channel_utilization{job_name='my-pipeline'}) / count(volley_channel_utilization{job_name='my-pipeline'})"
        threshold: "0.8"         # Scale up when average saturation > 80%

Capacity Planning Checklist

  1. Estimate throughput: What is your expected input rate (records/sec)?
  2. Size CPU/memory: Use the tables above as a starting point
  3. Estimate state size: key_count × avg_value_size × 1.5
  4. Size PVCs: 2× expected state size for both state and checkpoint volumes
  5. Configure autoscaling: Set KEDA triggers based on lag or saturation
  6. Deploy and monitor: Use the PromQL queries above to validate sizing
  7. Iterate: Adjust resources based on actual utilization patterns

Observability

Volley provides built-in observability via the volley_core::observability module: Prometheus metrics, OpenTelemetry tracing, structured logging, and health reporting. Observability auto-initializes at pipeline startup with zero-config defaults.

Metrics

Volley exposes 15+ metrics via Prometheus exposition format. All metrics include node_id and job_name labels.

MetricTypeLabelsDescription
volley.source.records_polledCountersource_idRecords read from source
volley.records.processedCounteroperator_idRecords processed per operator
volley.stage.latency_msHistogramstagePer-operator processing latency
volley.checkpoint.duration_msHistogramCheckpoint completion time
volley.checkpoint.epochGaugeCurrent checkpoint epoch
volley.checkpoint.failuresCounterCheckpoint failure count
volley.channel.utilizationGaugestageBackpressure indicator (0.0-1.0)
volley.watermark.lag_msGaugeEvent-time lag
volley.sink.records_writtenCountersink_idRecords written per sink
volley.sink.flush.duration_msHistogramsink_idSink flush latency
volley.pipeline.uptime_secondsGaugePipeline uptime
volley.pipeline.healthGaugeHealth state (0=Starting, 1=Running, 2=Degraded, 3=Failed, 4=ShuttingDown)
volley.errorsCountererror_typeErrors by type
volley.source.errorsCountersource_idPer-connector source errors
volley.sink.errorsCountersink_idPer-connector sink errors
volley.kafka.consumer_lagGaugeKafka consumer group lag
volley.kafka.commit.duration_msHistogramKafka commit latency
volley.kafka.commit.failuresCounterKafka commit failures
volley.blob.objects_readCounterBlob source throughput
volley.blob.bytes_readCounterBlob I/O volume
volley.state.cache_evictionsCounteroperator_idAggregate LRU cache evictions
volley.window.pending_evictionsCounterWindow pending index LRU evictions
volley.window.pending_sizeGaugeIn-memory pending window entry count
volley.checkpoint.gc_cleanedCounterOld checkpoints cleaned by GC

ML Inference Metrics

When using volley-ml operators (.infer(), .embed(), .classify(), .infer_remote()), additional inference-specific metrics are emitted:

MetricTypeLabelsDescription
volley.ml.inference.duration_msHistogrambackend, model_namePer-batch inference latency (backend call only)
volley.ml.inference.batch_rowsHistogrambackend, model_nameRows per inference batch (batch utilization)
volley.ml.model_load.duration_msHistogrambackend, model_nameModel load time (emitted once at startup)
volley.ml.inference.errorsCounterbackend, model_name, error_typeInference errors by backend and type
volley.ml.http.request.duration_msHistogramendpointHTTP round-trip latency for remote model servers
volley.ml.http.queue_depthGaugeendpointAvailable concurrency permits (inverse of queue depth)
volley.ml.http.errorsCounterendpoint, status_codeHTTP errors from remote model servers

The backend label is "onnx" or "candle". The model_name label is the model path or HuggingFace repo ID passed to the operator config.

Golden Signals PromQL

# Latency (p99)
histogram_quantile(0.99, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))

# Traffic (records/sec)
sum(rate(volley_source_records_polled_total[5m])) by (job_name)

# Errors (rate by type)
sum(rate(volley_errors_total[5m])) by (job_name, error_type)

# Error ratio
sum(rate(volley_errors_total[5m])) by (job_name)
/
sum(rate(volley_records_processed_total[5m])) by (job_name)

# Saturation (channel utilization)
avg(volley_channel_utilization) by (job_name, stage)

# Saturation (Kafka consumer lag)
sum(volley_kafka_consumer_lag) by (job_name)

ML Inference PromQL

# Inference latency (p99) by backend
histogram_quantile(0.99, sum(rate(volley_ml_inference_duration_ms_bucket[5m])) by (le, backend))

# Inference throughput (rows/sec)
sum(rate(volley_ml_inference_batch_rows_sum[5m])) by (backend)

# Inference error rate
sum(rate(volley_ml_inference_errors_total[5m])) by (backend, error_type)

# HTTP model server latency (p95)
histogram_quantile(0.95, sum(rate(volley_ml_http_request_duration_ms_bucket[5m])) by (le, endpoint))

State & Window PromQL

# Aggregate cache eviction rate (high = LRU undersized)
rate(volley_state_cache_evictions_total[5m])

# Window pending index eviction rate
rate(volley_window_pending_evictions_total[5m])

# In-memory pending window count
volley_window_pending_size

# Checkpoint GC cleanup rate
rate(volley_checkpoint_gc_cleaned_total[5m])

Tracing

OpenTelemetry integration via OTLP gRPC export with tracing-opentelemetry bridge:

  • Resource attributes include service.name and node.id
  • Graceful fallback if OTLP endpoint is unavailable
  • Trace-log correlation: trace_id and span_id injected into JSON log lines when a span is active

Configure the OTLP endpoint:

# In VolleyApplication CRD
spec:
  observability:
    otlpEndpoint: http://otel-collector.monitoring:4317

Per-Record Tracing

Volley supports per-record trace propagation with configurable sampling. When enabled, individual records flowing through the pipeline carry OpenTelemetry trace context. Each operator creates a child span showing processing time, input/output row counts, and optional payload previews.

Configuration

#![allow(unused)]
fn main() {
use volley_core::observability::{TracingConfig, SamplingStrategy};

StreamExecutionEnvironment::new()
    .from_source(source)
    .with_tracing(TracingConfig {
        sampling: SamplingStrategy::Ratio(0.01), // trace 1% of records
        max_payload_bytes: 1024,                 // max JSON preview size
        capture_payload: true,                   // enable input/output previews
    })
    .filter(|r| true)
    .map(|r| Ok(r))
    .to_sink(sink)
    .execute("my-job")
    .await?;
}

Sampling Strategies

StrategyDescriptionUse case
AlwaysTrace every recordDevelopment, debugging
NeverDisable tracingProduction default
Ratio(f64)Trace a random fractionProduction with sampling (e.g., 0.01 = 1%)

Non-sampled records pay near-zero overhead (a single branch check).

Span Hierarchy

For a pipeline source -> map -> filter -> sink, a sampled record produces:

volley.record (root span)
├── volley.source        — source output metadata
├── volley.operator [0]  — map operator (input/output rows, timing, preview)
├── volley.operator [1]  — filter operator
└── volley.sink          — sink write (timing, errors)

Each span includes attributes:

AttributeDescription
volley.operator.nameAuto-generated operator name (e.g., operator-0)
volley.operator.indexPosition in the operator chain
volley.record.event_time_msRecord’s event timestamp
volley.record.input_rowsInput batch row count
volley.record.output_rowsOutput batch row count

When capture_payload is enabled, input and output JSON previews are attached as span events.

Window and Aggregate Outputs

Window and aggregate operators accumulate multiple input records and emit new result records. These outputs get a new trace with span links back to the input traces:

volley.window.output (new root, links to input traces)
├── volley.operator [next]
└── volley.sink

The volley.window.input_trace_count attribute shows how many sampled input records contributed to the aggregation.

Kafka Trace Context Propagation

When using the Kafka source, Volley automatically extracts W3C traceparent headers from consumed messages. If an upstream service traced a request that produced a Kafka message, the trace continues through Volley.

Similarly, the Kafka sink injects traceparent headers into produced messages, propagating trace context to downstream consumers.

This enables full end-to-end distributed tracing across service boundaries:

upstream service → Kafka → Volley pipeline → Kafka → downstream service
     trace-id=abc          trace-id=abc              trace-id=abc

Structured Logging

ObservabilityConfig supports a log_format field with Text (default) and Json modes:

#![allow(unused)]
fn main() {
ObservabilityConfig::new()
    .with_json_logging()
    .init();
}

JSON mode produces structured output compatible with Loki, Elasticsearch, and CloudWatch Logs. When OpenTelemetry is active, each log line includes trace_id and span_id fields for cross-referencing with distributed traces.

Useful RUST_LOG filters

# Default (info): lifecycle events, checkpoint completions, source/sink creation
RUST_LOG=info

# Debug state backend, window, and watermark behavior
RUST_LOG=info,volley_core::state=debug,volley_core::operators::window=debug

# Debug file decoding (Parquet, JSON Lines, CSV)
RUST_LOG=info,volley_connectors::blob_store::decoders=debug

# Deep debugging: per-record RocksDB get/put/delete
RUST_LOG=info,volley_core::state::rocks=trace

# Debug protobuf decode with batch context
RUST_LOG=info,volley_core::operators::protobuf=debug

Health Reporting

The HealthReporter tracks pipeline state with atomic transitions:

StateLivenessReadinessDescription
Startinghealthynot readyPipeline initializing
RunninghealthyreadyNormal operation
Degradedhealthynot readyPartial failure (with reason)
Failedunhealthynot readyComplete failure (with reason)
ShuttingDownunhealthynot readyGraceful shutdown in progress

The volley.pipeline.health gauge emits the state as a numeric value on every transition, enabling Prometheus alerting.

SLO Recording Rules

Prometheus recording rules are provided at deploy/prometheus/recording-rules.yaml:

  • volley:availability:ratio_rate5m — availability SLI ratio
  • volley:latency_sli:ratio_rate5m — latency SLI (fraction under 100ms)
  • volley:error_budget:remaining — remaining error budget against 99.9% target

Alerting Rules

Burn-rate alerting rules at deploy/prometheus/alerting-rules.yaml:

  • VolleyHighErrorBurnRate (critical) — 14.4x burn over 1h
  • VolleySlowErrorBurnRate (warning) — 1x burn over 3d
  • VolleyPipelineFailed (critical) — health==3 for 1m
  • VolleyPipelineDegraded (warning) — health==2 for 5m

Kubernetes Integration

The K8s operator automatically creates a ServiceMonitor for Prometheus scraping. See Kubernetes Operator for ServiceMonitor configuration.

Volley SRE Gap Analysis

Date: 2026-03-30 Reference framework: msitarzewski/agency-agents engineering-sre.md


Important Context: Framework vs. Service

Volley is a stream processing framework (library + Kubernetes operator), not a deployed service. This analysis evaluates:

  1. Framework enablement — How well does Volley equip pipeline authors to follow SRE practices?
  2. Framework development practices — How well does Volley’s own CI/testing/release process follow SRE principles?

Org-level concerns (on-call rotations, blameless culture documentation, MTTR tracking) are out of scope — those belong to teams operating Volley-based pipelines, not the framework itself.


Executive Summary

PillarScoreVerdict
SLOs & Error BudgetsC+ (55%)SLI-ready metrics + Prometheus recording/alerting rule templates + SLO definition template; no runtime error budget gauge yet
ObservabilityA- (88%)Strong metrics + tracing + structured JSON logging + alerting rule templates + trace-log correlation + per-connector error counters; missing dashboards
Toil ReductionB- (65%)CI + K8s operator + Dockerfile + Helm chart; no CD pipeline or benchmark regression tracking
Chaos EngineeringC- (40%)Checkpoint recovery unit-tested + fault injection integration tests for checkpoint/RocksDB failures; no Kafka fault injection
Capacity PlanningB- (65%)KEDA autoscaling scaffolded, benchmarks exist, capacity planning guide with sizing tables and PromQL queries
Progressive RolloutsF (5%)Operator uses default StatefulSet RollingUpdate; no canary or automated rollback
Incident Response PrimitivesC+ (55%)PipelineHealth enum + K8s probes + volley.pipeline.health gauge + alerting rules; missing severity-to-SLO mapping docs

Overall: Volley has a strong observability foundation, solid Kubernetes integration, SLO recording rules, alerting templates, structured logging with trace-log correlation, a Dockerfile, Helm chart, SLO definition template, fault injection tests, and capacity planning guide. Remaining gaps are dashboard templates, CD pipeline, progressive rollout support, and Kafka fault injection.


Scoring Methodology

GradeRangeMeaning
A90-100%Production-ready, matches SRE framework recommendations
B70-89%Strong foundation, minor gaps
C50-69%Partial coverage, significant gaps
D25-49%Minimal coverage, major gaps
F0-24%Absent or negligible

Criteria per pillar: API completeness, built-in tooling, documentation for pipeline authors, integration with ecosystem tooling (Prometheus, Grafana, K8s).


Pillar 1: SLOs & Error Budgets — C (50%)

Current State

Volley exposes metrics that serve as natural SLI candidates (volley-observability/src/metrics.rs):

MetricTypeSLI Use
volley.records.processedCounterAvailability — total processed records
volley.errorsCounterAvailability — error count for SLI ratio
volley.stage.latency_msHistogramLatency — processing duration distribution
volley.pipeline.uptime_secondsGaugeAvailability — pipeline uptime
volley.checkpoint.failuresCounterReliability — checkpoint failure rate

The PipelineHealth enum (volley-observability/src/health.rs) provides binary health states (Starting, Running, Degraded, Failed, ShuttingDown) but these are probe-level signals, not SLO-aligned measurements.

What Exists

Prometheus recording rules are provided at deploy/prometheus/recording-rules.yaml computing:

  • volley:availability:ratio_rate5m and volley:availability:ratio_rate30d — availability SLI ratios
  • volley:latency_sli:ratio_rate5m — latency SLI (fraction under 100ms)
  • volley:error_budget:remaining — remaining error budget against 99.9% target
  • volley:throughput:rate5m and volley:saturation:avg — capacity signals

Multi-window burn rate alerting rules are provided at deploy/prometheus/alerting-rules.yaml:

  • VolleyHighErrorBurnRate (critical) — 14.4x burn over 1h confirmed by 5m window
  • VolleySlowErrorBurnRate (warning) — 1x burn over 3d confirmed by 6h window

Remaining Gaps

  • No SLO definition template or configuration format for pipeline authors to declare per-pipeline targets
  • No runtime error budget gauge metric in volley-observability (the recording rule computes it in Prometheus, but no native Rust gauge)
  • No documentation connecting error budget exhaustion to operational decisions (e.g., deployment freeze policy)

Recommendations

(P1) Add an error_budget_remaining gauge to volley-observability that pipeline authors can enable to expose real-time budget consumption natively from the runtime, complementing the Prometheus recording rule.

(P1) Provide an SLO definition template format (YAML) that pipeline authors can use to declare per-pipeline availability and latency targets, which can be consumed by the Prometheus rules.

(P2) Document an error budget policy template: when budget < 25%, freeze non-critical deployments; when budget < 10%, enter incident response mode.


Pillar 2: Observability — B+ (80%)

Current State

Metrics (volley-observability/src/metrics.rs): 15+ named metric constants covering sources, operators, checkpoints, channels, watermarks, sinks, pipeline health, Kafka, and blob storage. All use node_id/job_name labels. Three helper functions (increment_counter, set_gauge, record_histogram) enforce consistent labeling.

Tracing (volley-observability/src/tracing_init.rs): OpenTelemetry integration via OTLP gRPC export with tracing-opentelemetry bridge. Resource attributes include service.name and node.id. Graceful fallback if OTLP endpoint unavailable.

Health (volley-observability/src/health.rs): HealthReporter with atomic state tracking, liveness/readiness semantics. K8s probes configured in operator’s StatefulSet builder.

ServiceMonitor (volley-operator/src/crd.rs): Auto-created by operator with configurable scrape interval and labels.

What Exists

  • Structured JSON logging: ObservabilityConfig supports a log_format field with Text (default) and Json modes. When set to Json, tracing_subscriber::fmt::json() is used for structured output compatible with log aggregation systems (Loki, Elasticsearch, CloudWatch Logs). Pipeline authors enable it via .with_json_logging().
  • Alerting rule templates: Prometheus alerting rules are provided at deploy/prometheus/alerting-rules.yaml covering error burn rate (fast + slow), pipeline health state, checkpoint failures, channel saturation, Kafka consumer lag, and p99 latency.

What Was Added

  • Trace-log correlation: TraceContextFormat in volley-core/src/observability/trace_context.rs safely injects trace_id and span_id fields into JSON log output via serde_json round-trip when an OpenTelemetry span is active, enabling cross-referencing between logs and distributed traces.
  • Per-connector error counters: volley.source.errors and volley.sink.errors metrics defined in volley-observability/src/metrics.rs with source_id/sink_id labels, and incremented in volley-core/src/builder/runtime.rs on poll/write/flush failures. connector_id() method added to Source, Sink, DynSource, and DynSink traits, implemented across Kafka, blob store, and Iceberg connectors.

Remaining Gaps

  • No dashboard templates: No Grafana dashboard JSON mapping the four golden signals to Volley metrics.

Recommendations

(P0) Provide a golden signals Grafana dashboard template using Volley’s actual metrics:

Golden SignalVolley MetricPromQL
Latencyvolley.stage.latency_mshistogram_quantile(0.99, rate(volley_stage_latency_ms_bucket[5m]))
Trafficvolley.source.records_polledsum(rate(volley_source_records_polled_total[5m])) by (job_name)
Errorsvolley.errorssum(rate(volley_errors_total[5m])) by (job_name, error_type)
Saturationvolley.channel.utilizationavg(volley_channel_utilization) by (job_name, stage)

(P1) Enable trace-log correlation by configuring the fmt layer with OpenTelemetry trace context propagation, so JSON log lines include trace_id and span_id fields.

(P2) Add per-connector error counters to metrics.rs (volley.source.errors, volley.sink.errors) with source_id/sink_id labels for finer-grained fault attribution.


Pillar 3: Toil Reduction — B- (65%)

Current State

CI pipeline (.github/workflows/ci.yml): Format checking (cargo fmt), linting (cargo clippy with default + kafka features), testing (default + kafka variants), benchmark compilation check. Uses Rust cache for faster builds.

K8s operator (volley-operator/): Automates StatefulSet, headless Service, metrics Service, ServiceMonitor, PodDisruptionBudget, and KEDA ScaledObject creation from a single VolleyApplication CRD. Server-side apply with idempotent reconciliation.

Checkpoint cleanup: RecoveryCoordinator handles automated old checkpoint cleanup.

What Exists

  • Dockerfile: A multi-stage Dockerfile is provided at deploy/docker/Dockerfile with a Rust builder stage and minimal distroless runtime stage. Pipeline authors can extend or customize it for their specific binaries.
  • Helm chart: A Helm chart exists at deploy/helm/volley/ that templates the VolleyApplication CRD with values.yaml covering all CRD fields (resources, storage, observability, health, autoscaling, Kafka, isolation).

Remaining Gaps

  • Dockerfile not referenced in documentation: The README and operator docs do not mention the Dockerfile or describe how pipeline authors should customize it for their applications.
  • Helm chart has no production-hardening overlays: No example values files for common deployment topologies (small/medium/large, multi-AZ).
  • No CD pipeline: CI exists but no automated release workflow (crate publishing, image building).
  • No benchmark regression tracking: Benchmarks compile-checked but results not tracked over time.

Recommendations

(P1) Add a CD workflow (.github/workflows/release.yml) triggered on git tags that builds container images and publishes to a registry.

(P1) Add production-hardened example values files to the Helm chart (e.g., values-production.yaml with resource limits, anti-affinity, KEDA autoscaling).

(P2) Add benchmark regression detection in CI using criterion with stored baselines.


Pillar 4: Chaos Engineering — D (30%)

Current State

Checkpoint recovery is tested via unit tests — the HealthReporter transitions through Starting -> Running -> Degraded -> Failed states and these are verified. The operator handles reconciliation errors with retry backoff (15s requeue on failure).

Gaps

  • No fault injection tests simulating Kafka broker disconnect, disk full, or OOM conditions.
  • No integration tests exercising the Degraded -> Running recovery path under realistic failure.
  • No Chaos Mesh or LitmusChaos experiment definitions for K8s-deployed pipelines.

Recommendations

(P1) Add integration tests for framework resilience:

  • Checkpoint directory becomes read-only mid-pipeline
  • Kafka broker disconnect during exactly-once transaction commit
  • RocksDB write failure during state flush

(P1) Provide Chaos Mesh experiment templates for K8s-deployed Volley applications:

apiVersion: chaos-mesh.org/v1alpha1
kind: PodChaos
metadata:
  name: volley-pod-kill
spec:
  action: pod-kill
  mode: one
  selector:
    labelSelectors:
      app.kubernetes.io/managed-by: volley-operator
  scheduler:
    cron: "@every 30m"

(P2) Add toxiproxy integration to Kafka integration tests for network degradation simulation.


Pillar 5: Capacity Planning — C (45%)

Current State

Autoscaling: KEDA support fully scaffolded in operator (AutoscalingSpec in CRD) with Kafka lag, Prometheus metric, and CPU-based triggers. ScaledObject is created during reconciliation.

Benchmarks: benchmark_throughput and benchmark_sustained examples demonstrate 9.4M rec/s sustained on a single node.

Resource management: CRD requires explicit resources.requests and resources.limits for CPU/memory.

Gaps

  • No sizing guide mapping resource allocation to expected throughput.
  • No documentation on RocksDB storage growth patterns.
  • No tested KEDA trigger threshold examples.
  • No resource utilization dashboards.

Recommendations

(P1) Create a capacity planning guide covering:

  • CPU/memory requirements per records/sec (based on benchmark data)
  • RocksDB storage growth formula: key_count * avg_value_size * checkpoint_retention_count
  • PVC sizing guidance based on state size and checkpoint interval
  • Key PromQL queries for capacity monitoring:
    • Throughput: rate(volley_source_records_polled_total[5m])
    • Backpressure: volley_channel_utilization > 0.8
    • Input backlog: volley_kafka_consumer_lag

(P2) Document tested KEDA trigger configurations with proven thresholds (e.g., scale at consumer lag > 10000).


Pillar 6: Progressive Rollouts — F (5%)

Current State

The operator creates StatefulSets with a configurable replicas count but uses the default RollingUpdate strategy. No canary, blue-green, or partition-based rollout mechanism exists. imagePullPolicy is configurable but no image tag pinning is enforced.

Gaps

  • No updateStrategy field in the CRD spec.
  • No partition-based canary rollout support.
  • No automated rollback on health check failure post-update.
  • No Argo Rollouts or Flagger integration.

Recommendations

(P1) Add an updateStrategy field to VolleyApplicationSpec CRD with partition-based canary support:

#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct UpdateStrategySpec {
    /// Partition value for canary rollouts. Pods with ordinal >= partition
    /// get the new version. Set to replicas-1 to canary a single pod.
    #[serde(default, rename = "partition")]
    pub partition: Option<i32>,
}
}

(P2) Provide an Argo Rollouts AnalysisTemplate example that checks volley_errors_total rate and volley_stage_latency_ms p99 after each rollout step.


Pillar 7: Incident Response Primitives — C+ (55%)

Current State

Health states: PipelineHealth enum provides 5 states — Starting, Running, Degraded (with reason string), Failed (with reason string), ShuttingDown. These map to K8s liveness (healthy = Starting|Running) and readiness (ready = Running only).

Health gauge: volley.pipeline.health gauge is emitted on every state transition via HealthReporter, enabling Prometheus alerting on health state changes. Values: 0=Starting, 1=Running, 2=Degraded, 3=Failed, 4=ShuttingDown.

Operator status: CRD status tracks phases (Pending, Running, Degraded, Failed) with readyReplicas count and human-readable message.

Probes: Liveness at /health (port 8080), readiness at /ready, with configurable initialDelaySeconds and periodSeconds.

Alerting rules: deploy/prometheus/alerting-rules.yaml includes VolleyPipelineFailed (critical, health==3 for 1m) and VolleyPipelineDegraded (warning, health==2 for 5m).

Remaining Gaps

  • No documentation mapping severity levels to PipelineHealth states and SLO impact.
  • No runbook links in alert annotations (placeholder URLs only).

Recommendations

(P1) Document severity-to-health mapping for pipeline authors:

SeverityPipelineHealthSLO ImpactResponse
SEV1FailedSLO breached, pipeline stoppedImmediate: investigate root cause, restore from checkpoint
SEV2DegradedError budget burning above normalUrgent: identify degradation source (e.g., checkpoint failures)
SEV3Running with elevated latencyWithin budget but trendingInvestigate: check channel.utilization, watermark.lag_ms

(P1) Add runbook URLs to the existing alerting rules in deploy/prometheus/alerting-rules.yaml, linking to operational documentation for each failure mode.


Prioritized Implementation Roadmap

P0 — Completed

  1. SLO recording rule templatesdeploy/prometheus/recording-rules.yaml
  2. Burn rate alerting rule templatesdeploy/prometheus/alerting-rules.yaml
  3. Structured JSON loggingObservabilityConfig.log_format with LogFormat::Json
  4. Multi-stage Dockerfiledeploy/docker/Dockerfile
  5. Helm chartdeploy/helm/volley/
  6. Pipeline health gaugevolley.pipeline.health emitted by HealthReporter
  7. Health state alerting rulesVolleyPipelineFailed, VolleyPipelineDegraded in alerting rules

P0 — Remaining

  1. Golden signals dashboard template (Grafana JSON)
  2. Severity-to-PipelineHealth mapping documentation

P1 — Framework maturity

  1. Runtime error budget gauge metric in volley-observability
  2. SLO definition template format for per-pipeline targetsdeploy/prometheus/slo-template.yaml
  3. Trace-log correlation (trace_id in structured log output)volley-core/src/observability/trace_context.rs with TraceContextFormat
  4. Fault injection integration tests (checkpoint, RocksDB failures)volley-core/tests/fault_injection_test.rs (9 tests)
  5. Chaos Mesh experiment templates
  6. Capacity planning guide (sizing, storage growth, key PromQL)docs/src/operations/capacity-planning.md
  7. updateStrategy CRD field for partition-based canary rollouts
  8. Runbook URLs in alerting rule annotations
  9. CD workflow for framework releases
  10. Production-hardened Helm values examples

P2 — Excellence

  1. Per-connector error counters (volley.source.errors, volley.sink.errors) — constants in volley-observability/src/metrics.rs, wired up in volley-core/src/builder/runtime.rs with connector_id() on Source/Sink traits
  2. Benchmark regression detection in CI
  3. Toxiproxy integration for Kafka fault injection
  4. Argo Rollouts AnalysisTemplate example
  5. Tested KEDA trigger threshold documentation
  6. Error budget policy template documentation

Appendix A: Full Metric Inventory

All metrics defined in volley-observability/src/metrics.rs:

ConstantMetric NameTypeLabelsSRE Use
SOURCE_RECORDS_POLLEDvolley.source.records_polledCounternode_id, job_name, source_idTraffic golden signal
RECORDS_PROCESSEDvolley.records.processedCounternode_id, job_name, operator_idThroughput, SLI denominator
STAGE_LATENCY_MSvolley.stage.latency_msHistogramnode_id, job_name, stageLatency golden signal, SLI
CHECKPOINT_DURATION_MSvolley.checkpoint.duration_msHistogramnode_id, job_nameCheckpoint health
CHECKPOINT_EPOCHvolley.checkpoint.epochGaugenode_id, job_nameProgress tracking
CHECKPOINT_FAILURESvolley.checkpoint.failuresCounternode_id, job_nameReliability, alerting
CHANNEL_UTILIZATIONvolley.channel.utilizationGaugenode_id, job_name, stageSaturation golden signal
WATERMARK_LAG_MSvolley.watermark.lag_msGaugenode_id, job_nameEvent-time lag
SINK_RECORDS_WRITTENvolley.sink.records_writtenCounternode_id, job_name, sink_idOutput throughput
SINK_FLUSH_DURATION_MSvolley.sink.flush.duration_msHistogramnode_id, job_name, sink_idSink latency
PIPELINE_UPTIME_SECONDSvolley.pipeline.uptime_secondsGaugenode_id, job_nameAvailability
PIPELINE_HEALTHvolley.pipeline.healthGaugenode_id, job_nameHealth state (0-4), alerting on Failed/Degraded
ERRORS_TOTALvolley.errorsCounternode_id, job_name, error_typeError golden signal, SLI numerator
KAFKA_CONSUMER_LAGvolley.kafka.consumer_lagGaugenode_id, job_nameBacklog, autoscaling trigger
KAFKA_COMMIT_DURATION_MSvolley.kafka.commit.duration_msHistogramnode_id, job_nameKafka commit latency
KAFKA_COMMIT_FAILURESvolley.kafka.commit.failuresCounternode_id, job_nameExactly-once reliability
BLOB_OBJECTS_READvolley.blob.objects_readCounternode_id, job_nameBlob source throughput
BLOB_BYTES_READvolley.blob.bytes_readCounternode_id, job_nameBlob I/O volume

Label constants: LABEL_NODE_ID, LABEL_JOB_NAME, LABEL_STAGE, LABEL_OPERATOR_ID, LABEL_SOURCE_ID, LABEL_SINK_ID, LABEL_ERROR_TYPE

Appendix B: Reference Prometheus Rules

See Pillar 1 for SLO recording rules and burn rate alerts, and Pillar 7 for health state alerting rules.

Appendix C: Golden Signals PromQL Reference

# Latency (p50, p99)
histogram_quantile(0.5, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))
histogram_quantile(0.99, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))

# Traffic (records/sec by pipeline)
sum(rate(volley_source_records_polled_total[5m])) by (job_name)

# Errors (error rate by type)
sum(rate(volley_errors_total[5m])) by (job_name, error_type)

# Error ratio
sum(rate(volley_errors_total[5m])) by (job_name)
/
sum(rate(volley_records_processed_total[5m])) by (job_name)

# Saturation (channel utilization, higher = more backpressure)
avg(volley_channel_utilization) by (job_name, stage)

# Saturation (Kafka consumer lag)
sum(volley_kafka_consumer_lag) by (job_name)

Examples

Volley includes runnable examples in the volley-examples crate. Each demonstrates a different feature or pipeline pattern.

Running Examples

# Run any example
cargo run --example <name> -p volley-examples

# Run benchmarks in release mode
cargo run --example <name> -p volley-examples --release

Available Examples

ExampleDescription
in_memory_pipelineBasic pipeline with filter, key_by, window, and aggregate using in-memory source
kafka_pipelineKafka source to Kafka sink with windowed aggregation
windowed_pipelineEvent-time windowing with watermarks and late data handling
observable_pipelinePipeline with observability (Prometheus metrics, OpenTelemetry tracing)
protobuf_to_deltaProtobuf decode -> Delta Lake sink with partitioned writes
nested_proto_partitionedNested protobuf types with Hive-style partitioned writes
benchmark_throughputSingle-shot throughput benchmark across batch sizes and parallelism levels
benchmark_sustainedSustained throughput benchmark (10s run, batch=1000, parallelism=4)

Prerequisites

Most examples work out of the box. Some have additional requirements:

  • kafka_pipeline — requires a running Kafka broker at localhost:9092 and cmake installed
  • protobuf_to_delta, nested_proto_partitioned — requires a writable path for Delta table output
  • Benchmarks — run with --release for meaningful numbers

Walkthrough

See Your First Pipeline for a step-by-step walkthrough of the in_memory_pipeline example.

Your First Pipeline

This walkthrough covers the in_memory_pipeline example, which demonstrates the core Volley API.

What It Does

The pipeline:

  1. Reads events from an in-memory iterator
  2. Filters records
  3. Partitions by a key field
  4. Groups into 5-minute tumbling windows
  5. Computes a sum aggregation
  6. Collects results in memory

Key Concepts

Creating the Environment

#![allow(unused)]
fn main() {
let env = StreamExecutionEnvironment::new();
}

This creates the pipeline builder. All pipeline construction starts here.

Adding a Source

#![allow(unused)]
fn main() {
env.from_iter(events)
}

from_iter() creates a DataStream from a Rust iterator of StreamRecord values. For production use, you’d use from_kafka() or a blob store source.

Applying Operators

#![allow(unused)]
fn main() {
.filter_expr(col("amount").gt(lit(100)))
}

Operators on a DataStream: filter_expr, map, flat_map, apply_operator.

Keying and Windowing

#![allow(unused)]
fn main() {
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(300)))
.aggregate(AggregationType::Sum, "amount")
}

.key_by() transitions to a KeyedStream (hash-partitioned by key). .window() transitions to WindowedKeyedStream. .aggregate() computes the result per window per key.

Executing

#![allow(unused)]
fn main() {
.collect()
.execute("aggregation-job")
.await?;
}

.collect() attaches an in-memory sink. .execute() starts the pipeline as concurrent Tokio tasks and returns when all data is processed.

Run It

cargo run --example in_memory_pipeline -p volley-examples

Expected output:

Pipeline 'aggregation-job' started
  Source: MemorySource (4 records)
  Operators: filter_expr → key_by → window → aggregate
  Sink: MemorySink
Processing complete: 4 records in, 2 aggregated results out

Next Steps

Contributing

Contributions to Volley are welcome! This page summarizes the development workflow. See the full Contributing Guide for complete details.

Development Setup

git clone https://github.com/volley-streams/volley.git
cd volley
cargo build --workspace
cargo test --workspace --exclude volley-python

System Dependencies

DependencyRequired bymacOSUbuntu
cmakevolley-connector-kafkabrew install cmakeapt install cmake
libcurl4-openssl-devvolley-connector-kafka(included)apt install libcurl4-openssl-dev
protobuf-compilervolley-schedulerbrew install protobufapt install protobuf-compiler

Run volley doctor to check your environment.

Code Style

  • Formatting: cargo fmt --all
  • Linting: cargo clippy --workspace -- -D warnings
  • Tests: cargo test --workspace --exclude volley-python

All warnings are errors in CI (RUSTFLAGS: -Dwarnings).

Submitting Changes

  1. Fork the repository and create a feature branch
  2. Make your changes with clear, focused commits
  3. Ensure cargo fmt, cargo clippy, and cargo test all pass
  4. Open a pull request against main

Project Structure

See Crate Map for a per-crate breakdown of the 17-crate workspace.

Crate Map

Volley is a 15-crate Cargo workspace. See the full Crate Map for detailed per-crate documentation.

Status: stable = production-ready | experimental = API may change | placeholder = stub/future work

Core

CrateDescriptionKey TypesStatus
volley-coreStream processing engine with built-in observability and optional RocksDB stateStreamExecutionEnvironment, DataStream, KeyedStream, Operator, StreamRecord, ObservabilityConfig, RocksDbBackendstable
volley-deriveProcedural macrosimpl_to_arrow!stable

Connectors

CrateDescriptionKey TypesStatus
volley-connectorsUmbrella crate (memory + feature-gated re-exports)MemorySource, MemorySinkstable
volley-connector-kafkaKafka source + sink (exactly-once)KafkaSource, KafkaSink, KafkaSourceConfigstable
volley-connector-blob-storeCloud blob store abstraction + decodersNotificationSource, BlobReader, BlobWriterstable
volley-connector-aws-s3AWS S3 + SQS sourceS3Source, S3BlobReader, SqsNotificationSourcestable
volley-connector-azure-blobAzure Blob + Queue sourceAzureBlobSource, AzureBlobReaderstable
volley-connector-gcp-gcsGCS + Pub/Sub sourceGcsSource, GcsBlobReader, PubSubNotificationSourcestable
volley-connector-deltaDelta Lake sink (exactly-once)DeltaSink, DeltaSinkConfigstable
volley-connector-icebergApache Iceberg sink (REST catalog)IcebergSink, IcebergSinkConfigstable

Infrastructure

CrateDescriptionStatus
volley-k8s-operatorKubernetes operator for VolleyApplication CRDstable
volley-cliCLI tool (volley new, volley doctor)stable
volley-schedulerDistributed execution (Ballista integration)experimental
volley-pythonPyO3 Python bindingsplaceholder

Other

CrateDescriptionStatus
volley-examplesRunnable examples and benchmarksstable