Introduction
Volley is a Rust library for building streaming pipelines — read from Kafka or a bucket, filter and aggregate with SQL-style expressions, write to Kafka, Delta, Iceberg, or a database, and have every record counted exactly once even when things crash. Pipelines are single Rust binaries: no JVM, no separate job server, and every operator is checked by the compiler before it ever runs.
If you’ve used Flink or Kafka Streams, the shape is familiar — a fluent DataStream builder, event-time windows, checkpoints. If you’ve used DataFusion, Polars, or DuckDB, the operators are familiar — expressions over Arrow RecordBatches. Volley is the place those two worlds meet.
The API is type-safe at compile time, the docs are structured for both humans and AI coding agents, and every example below is runnable out of the box.
Key Features
- High Performance — Native Arrow batching with write-behind state caching. Validated via NEXMark benchmarks
- Exactly-Once Semantics — Checkpoint-based fault tolerance with aligned barrier snapshotting
- Flink-like API — Fluent DataStream API with typestate-enforced compile-time safety
- Expression Engine — Vectorized
filter_expr(),select_expr(),aggregate_expr()powered by DataFusion. You write the same kinds of predicates and projections you’d write in SQL (col("price").gt(lit(100)),sum(col("amount"))) and they run column-at-a-time on native Arrow types - Event-Time Windowing — Tumbling, sliding, and session windows with watermark tracking
- State Management — RocksDB backend with write-behind cache and hardlink checkpoints
- Intra-Node Parallelism — Hash-partitioned parallel operator instances
- Kafka Integration — Source and sink with exactly-once transactional semantics
- Cloud Blob Sources — S3+SQS and Azure Blob+Queue with Parquet/JSON/CSV/Avro
- Cloud Blob Sinks — S3, Azure Blob, and GCS with Parquet/JSON Lines/CSV
- Table Format Sinks — Delta Lake and Apache Iceberg with exactly-once commits
- Kubernetes Operator — VolleyApplication CRD with autoscaling, health checks, and Prometheus
Built On
| Component | Purpose |
|---|---|
| Apache Arrow | In-memory columnar format |
| DataFusion | Query engine and expression evaluation |
| RocksDB | State management and checkpointing |
| Tokio | Async runtime for task-per-stage execution |
Architecture
+-----------------------------------------------------+
| Stream Processing Layer (Rust) |
| +- Fluent Builder API (DataStream, KeyedStream) |
| +- Multi-Row RecordBatch Operators |
| +- Watermarks & Event Time |
| +- Windowing (Tumbling, Sliding, Session) |
| +- Intra-Node Parallelism (hash-partitioned) |
| +- Checkpoint Coordinator |
+-----------------------+-----------------------------+
|
+-----------------------v-----------------------------+
| Async Runtime (Tokio) |
| +- Task-per-Stage Execution |
| +- Bounded Channels (Backpressure) |
| +- Graceful Shutdown |
+-----------------------+-----------------------------+
|
+-----------------------v-----------------------------+
| State Management (RocksDB) |
| +- Write-Behind Accumulator Cache |
| +- BatchStateBackend (multi_get / WriteBatch) |
| +- Per-Key State Isolation |
| +- Hardlink Checkpoints |
+-----------------------+-----------------------------+
|
+-----------------------v-----------------------------+
| Connectors |
| +- Kafka (exactly-once, feature-gated) |
| +- Cloud Blob Stores (S3, Azure, GCS) |
| +- Delta Lake, Iceberg (exactly-once sinks) |
| +- Memory (batched source, counting sink) |
+-----------------------------------------------------+
Performance
Volley uses the NEXMark benchmark suite for performance evaluation — the industry standard for stream processing systems. See volley-benchmark/ for methodology, results, and Flink comparison baseline.
Connectors
Sources
| Connector | Formats | Status |
|---|---|---|
| Kafka | JSON, Protobuf, Avro (Schema Registry) | Available (feature-gated) |
| AWS S3 + SQS | Parquet, JSON, CSV, Avro | Available |
| Azure Blob + Queue | Parquet, JSON, CSV, Avro | Available |
| Memory/Iterator | Arrow RecordBatch | Available |
Sinks
| Connector | Formats | Status |
|---|---|---|
| Kafka | JSON, Protobuf, Avro (Schema Registry) | Available (feature-gated, transactional) |
| AWS S3 · Azure Blob · GCS | Parquet, JSON Lines, CSV | Available (BufferedBlobSink) |
| Delta Lake | Parquet | Available (exactly-once) |
| Apache Iceberg | Parquet | Available (REST catalog) |
| Memory (collect) | Arrow RecordBatch | Available |
Next Steps
- Installation — set up your environment
- Quick Start — build your first pipeline in 5 minutes
- Architecture Overview — understand how Volley works
- API Reference — auto-generated rustdoc for all crates
Installation
Rust Toolchain
Volley requires a stable Rust toolchain. Install via rustup:
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
rustup default stable
System Dependencies
Most crates build with a standard Rust toolchain. Some connectors need additional system packages:
| Dependency | Required by | macOS | Ubuntu |
|---|---|---|---|
cmake | volley-connector-kafka (rdkafka) | brew install cmake | apt install cmake |
protobuf-compiler | volley-scheduler | brew install protobuf | apt install protobuf-compiler |
Note:
libcurlandOpenSSLare vendored and statically linked byrdkafka— no system packages needed.
Run volley doctor to check your environment for missing dependencies.
Install the CLI
curl -fsSL https://raw.githubusercontent.com/volley-streams/volley/main/install.sh | sh
Or with Cargo:
cargo install --git https://github.com/volley-streams/volley volley-cli
Adding Volley to a Project
Scaffold a new pipeline
# Interactive mode — prompts for source, sink, and processing pattern
volley new my-pipeline
# Non-interactive with explicit flags (comma-separate to chain operators)
volley new --source kafka --sink s3 --sink-format delta --processing filter my-pipeline
volley new --source kafka --sink kafka --processing filter,ml_inference my-enriched
# Built-in templates bypass source/sink prompts
volley new --template observability-demo my-demo
volley new --template ml-pipeline my-classifier
cd my-pipeline
volley doctor
cargo build && cargo run
Built-in templates: observability-demo, ml-pipeline. Without --template, the CLI prompts for source (kafka, s3, azure-blob), sink (kafka, s3, azure-blob), and processing operators (filter, keyed_agg, windowed_agg, ml_inference, adbc_enrichment, custom). Select multiple operators to chain them in the generated pipeline.
Add to an existing project
Add Volley crates to your Cargo.toml:
[dependencies]
volley-core = { git = "https://github.com/volley-streams/volley", features = ["state-rocksdb"] }
volley-connector-kafka = { git = "https://github.com/volley-streams/volley" }
Note: Volley currently uses git dependencies. crates.io publishing is planned — see the roadmap.
Feature Flags
The volley-connectors crate uses feature flags to control which connectors are compiled:
| Feature | Connectors Included |
|---|---|
state-rocksdb | RocksDB state backend (default, ~1.5 GB C++ build) |
blob-store | Cloud blob store abstraction |
blob-store-aws | AWS S3 + SQS source/sink |
blob-store-azure | Azure Blob + Queue source/sink |
blob-store-gcs | GCS sink (source removed in v1.1.0) |
file-formats | Parquet, JSON, CSV, Avro decoders |
table-formats | Delta Lake + Iceberg sinks |
Tip: If you don’t need the RocksDB state backend (e.g. you’re only developing a Kafka connector), use
default-features = falseand enable only the features you need to save ~1.5 GB in build artifacts.
Troubleshooting
cmake not found
error: failed to run custom build command for `rdkafka-sys`
--- stderr
CMake not found
Install cmake: brew install cmake (macOS) or apt install cmake (Ubuntu).
RocksDB compilation slow or failing
RocksDB is a C++ dependency that takes ~3–5 minutes to compile on first build. If you don’t need state management (e.g., stateless transformations only), disable it:
volley-core = { git = "...", default-features = false }
protobuf-compiler not found
Only needed if building volley-scheduler. Install: brew install protobuf (macOS) or apt install protobuf-compiler (Ubuntu).
volley doctor reports issues
volley doctor checks for system dependencies, Docker availability, and toolchain versions. Address each reported issue individually. Common fixes:
- Docker not running: Start Docker Desktop or the Docker daemon
- Rust toolchain outdated:
rustup update stable - Missing system packages: See the System Dependencies table above
Verify Your Setup
# Build the workspace
cargo build --workspace
# Run tests
cargo test --workspace --exclude volley-python
# Run a quick example
cargo run --example in_memory_pipeline -p volley-examples
Quick Start
Build your first Volley pipeline in 5 minutes.
Basic In-Memory Pipeline
This pipeline reads sample trade data from memory, filters high-value trades, and collects results:
use std::sync::Arc;
use arrow::array::{Float64Array, Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use volley_connectors::memory::{MemorySink, MemorySource};
use volley_core::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("symbol", DataType::Utf8, false),
Field::new("price", DataType::Float64, false),
Field::new("quantity", DataType::Int64, false),
]));
let records = vec![
StreamRecord::new(RecordBatch::try_new(schema.clone(), vec![
Arc::new(StringArray::from(vec!["AAPL"])),
Arc::new(Float64Array::from(vec![150.0])),
Arc::new(Int64Array::from(vec![100])),
]).unwrap()),
StreamRecord::new(RecordBatch::try_new(schema.clone(), vec![
Arc::new(StringArray::from(vec!["GOOG"])),
Arc::new(Float64Array::from(vec![280.0])),
Arc::new(Int64Array::from(vec![50])),
]).unwrap()),
];
let sink = MemorySink::new();
// `handle()` returns a shared reference to the sink's buffer that
// stays live after `to_sink()` consumes the sink itself.
let output = sink.handle();
let report = StreamExecutionEnvironment::new()
.from_source(MemorySource::new(records))
.filter_expr(col("price").gt(lit(200.0)))
.to_sink(sink)
.execute("filter-example")
.await?;
// `execute` returns an `ExecutionReport { records_written, epochs_committed }`.
// The filtered rows come back via the sink handle.
println!(
"{} rows, {} epochs",
report.records_written, report.epochs_committed
);
println!("kept {} records", output.lock().unwrap().len());
Ok(())
}
Run the full in-memory example from the repo:
cargo run --example in_memory_pipeline -p volley-examples
Expected output (abridged):
=== Volley in-memory DAG pipeline ===
Input: 5 trades
Execution report: 0 records written, 1 epochs committed
Filtered output (3 records):
1. GOOG @ $280 x 50
2. MSFT @ $310 x 75
3. GOOG @ $290 x 150
Windowed Aggregation
Add keyed windowing and expression-based aggregation (requires the state-rocksdb feature):
use std::sync::Arc;
use std::time::Duration;
use volley_core::prelude::*;
use volley_connectors::memory::{MemorySink, MemorySource};
#[tokio::main]
async fn main() -> Result<()> {
// `RocksDbBackend` implements `BatchStateBackend` directly — no
// `KeyedStateBackend` wrapper needed at the builder API.
let tmp = tempfile::tempdir().unwrap();
StreamExecutionEnvironment::new()
.from_source(MemorySource::new(records))
.filter_expr(col("amount").gt(lit(100)))
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(300)))
.aggregate_expr(
vec![sum(col("amount"))],
RocksDbBackend::open(tmp.path().join("state")).unwrap(),
)
.to_sink(MemorySink::new())
.execute("aggregation-job")
.await?;
Ok(())
}
Kafka Source to Kafka Sink
A real-world pipeline reading from Kafka, aggregating, and writing back to Kafka:
use std::sync::Arc;
use std::time::Duration;
use arrow::datatypes::{DataType, Field, Schema};
use volley_core::prelude::*;
use volley_connector_kafka::{KafkaEnvExt, KafkaStreamExt, KafkaSourceConfig, KafkaSinkConfig};
#[tokio::main]
async fn main() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("user_id", DataType::Utf8, false),
Field::new("amount", DataType::Float64, false),
]));
let source_config = KafkaSourceConfig::new("localhost:9092", "events", "my-group", schema);
let sink_config = KafkaSinkConfig::new("localhost:9092", "output");
let tmp = tempfile::tempdir().unwrap();
StreamExecutionEnvironment::new()
.from_kafka(source_config).await?
.filter_expr(col("amount").gt(lit(0.0)))
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(60)))
.aggregate_expr(
vec![sum(col("amount"))],
RocksDbBackend::open(tmp.path().join("state")).unwrap(),
)
.to_kafka(sink_config).await?
.execute("kafka-aggregation-job")
.await?;
Ok(())
}
For exactly-once Kafka EOS (source offsets committed inside the sink’s Kafka transaction), swap the source/sink pair:
let (stream, cgm) = env.from_kafka_exactly_once(source_config).await?;
stream
// ... operators ...
.to_kafka_exactly_once(KafkaSinkConfig::new_transactional(
"localhost:9092", "output", "my-pipeline-txn",
), cgm)
.await?
.execute("eos-job")
.await?;
Note: Kafka connectors require
cmakeinstalled on your system. See Installation for details.
What’s Happening
StreamExecutionEnvironment::new()— Creates the pipeline builder.from_source()/.from_kafka()— Attaches a source, producing aDataStream.filter_expr()/.select_expr()— Applies vectorised operators to the stream.key_by()— Partitions the stream by key, producing aKeyedStream.window()— Assigns records to time windows, producing aWindowedKeyedStream.aggregate_expr()— Computes an aggregation per window per key.to_kafka()/.to_sink()— Attaches a sink (transitions toDataStream<HasSink>).execute()— Starts the pipeline as concurrent Tokio tasks
The type system enforces valid construction at compile time. You can only call .aggregate_expr() on a WindowedKeyedStream, not on a raw DataStream. You can only call .execute() on a DataStream<HasSink>.
Next Steps
- Architecture Overview — understand how Volley works under the hood
- Examples — browse all runnable examples
- Operators Guide — write custom operators
- Connectors Guide — write custom connectors
Architecture Overview
A Volley pipeline is a graph of stages — one source, some operators, one or more sinks. You describe the graph in Rust, and Volley runs it as concurrent Tokio tasks connected by bounded channels. This page explains what happens once you call .execute().
Pipeline Builder (Compile-Time)
You build the graph with a fluent API. The order of calls matches the order of data flow:
StreamExecutionEnvironment::new()
.from_source(...)
.filter_expr(...)
.key_by(...)
.window(...)
.aggregate_expr(..., state_backend)
.to_sink(...)
.execute("job-name")
Runtime Execution (Tokio Tasks)
At runtime, each stage runs as an independent Tokio task. Stages communicate via bounded tokio::sync::mpsc channels. Backpressure propagates naturally: when a downstream channel is full, the upstream task blocks on send.
+--------+ channel +----------+ channel +------+
| Source | ---------> | Operator | ---------> | Sink |
| Task | RecordBatch | Task | RecordBatch | Task |
+--------+ +----------+ +------+
| | |
| CheckpointBarrier | on_checkpoint() | on_checkpoint()
| (in-band) | flush state | commit writes
v v v
+---------------------------------------------------------+
| Checkpoint Coordinator |
| Injects barriers -> collects acks -> snapshots state |
+---------------------------------------------------------+
Checkpoint barriers and watermarks flow in-band alongside data records through the same channels. This is how Volley achieves aligned checkpoints without pausing the pipeline.
Intra-Node Parallelism
After .key_by(), records are hash-partitioned across N parallel operator instances. Each instance has its own state namespace. Checkpoint barriers are aligned across all partitions before snapshot.
+-- Partition 0 --> [Operator Instance 0] --+
Source --> HashPartitioner -- Partition 1 --> [Operator Instance 1] --> Merge --> Sink
+-- Partition 2 --> [Operator Instance 2] --+
Parallelism is set via .with_parallelism(N) on the stream builder.
Distributed Execution (Horizontal Scaling)
For workloads that exceed a single node, Volley supports distributed execution across multiple K8s pods. Enable it by passing a ClusterConfig:
#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
.with_cluster(ClusterConfig::new(worker_id, num_workers, headless_service, app_name))
.from_source(source)
// ... same pipeline API ...
.execute("job").await?;
}
In distributed mode, keyed operators use a two-level hashing scheme:
- Key group:
hash(key) % max_key_groups(256 virtual partitions by default) - Worker assignment: contiguous key group ranges assigned to each worker
Records destined for remote workers are shuffled via Arrow Flight DoExchange. Non-keyed operators (filter, map) run locally with no shuffle.
Worker 0 Worker 1
┌──────────────────────┐ ┌──────────────────────┐
│ Source │ │ Source │
│ ↓ │ │ ↓ │
│ ShuffleRouter ──────►│── Arrow Flight ──►│◄── ShuffleRouter │
│ ↓ │◄── Arrow Flight ──│ │
│ BarrierAligner │ │ BarrierAligner │
│ ↓ │ │ ↓ │
│ Sink │ │ Sink │
└──────────────────────┘ └──────────────────────┘
K8s primitives handle coordination:
- Leader election: K8s Lease API — one worker becomes coordinator
- Checkpoint metadata: stored in K8s ConfigMaps (no OwnerReference, survives redeployment)
- Checkpoint data: shared filesystem (EFS/Filestore/Azure Files) via
ReadWriteManyPVC - Worker discovery: StatefulSet headless service DNS
- Live rescaling: checkpoint + reassignment, no state migration needed (shared FS)
Single-node mode remains the default with zero overhead when ClusterConfig is absent.
Graceful Shutdown
Source stops producing, drains in-flight data through the pipeline, then each stage shuts down in order.
Source Task --> channel(capacity) --> Operator Task --> channel(capacity) --> Sink Task
^ ^
| |
backpressure backpressure
(bounded send) (bounded send)
Deep Dives
- Data Model — Arrow RecordBatch, StreamRecord, StreamElement
- Pipeline Builder — type-state pattern and compile-time safety
- Exactly-Once Semantics — checkpoint barriers and recovery
- State Management — write-behind cache, RocksDB, and checkpoints
Data Model
Volley moves your data in batches, not one row at a time. A batch is an Arrow RecordBatch: a chunk of rows stored in columnar form, passed between stages without copying. Operators see whole batches so they can work on many rows at once — the same technique a query engine uses to keep the CPU busy.
If you’ve used DataFusion, Polars, or DuckDB, this is the same RecordBatch you already know. If not: think “a page of a columnar table” — each column is a contiguous array, and operators like filter and sum work over the whole column in one pass.
StreamRecord
The StreamRecord wrapper adds metadata to each batch:
StreamRecord {
batch: RecordBatch, // columnar Arrow data (1 or more rows)
event_time_ms: Option<i64>, // event timestamp (for windowing)
trace: Option<RecordTraceContext>, // per-record trace context (for distributed tracing)
}
batch— the actual payload, a columnar Arrow RecordBatchevent_time_ms— event timestamp used by window assignment and watermark trackingtrace— optional trace context for per-record distributed tracing. Set by the runtime’s sampler or extracted from source message headers (e.g., Kafka traceparent). When present, the runtime creates OpenTelemetry child spans at each pipeline stage.
StreamElement
Records share channels with control signals. The StreamElement enum wraps all possible messages:
StreamElement = Data(StreamRecord)
| Barrier(CheckpointBarrier)
| Watermark(Watermark)
- Data — normal data records flowing through the pipeline
- Barrier — checkpoint coordination signals injected by the Checkpoint Coordinator
- Watermark — event-time progress markers for window triggering
Barriers and watermarks flow in-band alongside data records through the same bounded channels. This is what enables aligned checkpoints without pausing the pipeline.
Why Arrow?
- Columnar format — efficient for analytical operations (aggregations, filters)
- Zero-copy — data passes between pipeline stages without serialization
- Multi-row batching — operators process many rows at once, amortizing per-record overhead
- Ecosystem interop — Arrow is the standard for DataFusion, Parquet, Delta Lake, Iceberg, and most analytics tooling
Pipeline Builder
You build a Volley pipeline with a fluent chain — source first, then operators, then a sink. The builder catches mistakes while you type: partition a stream before windowing, attach a sink before calling .execute(). If you try to window an unkeyed stream or execute a pipeline that still has no sink, the code simply won’t compile.
This compile-time safety is built on Rust’s type system (a pattern called “typestate” — see the details below if you’re curious). Most of the time you don’t need to think about it; you just follow the chain.
The chain
StreamExecutionEnvironment --> DataStream<NoSink> --> KeyedStream --> WindowedKeyedStream
| | | |
from_source() filter_expr() key_by() window()
from_kafka() select_expr() aggregate_expr()
filter()
map()
flat_map()
apply_operator()
DataStream<NoSink> --> DataStream<HasSink> --> execute()
| |
to_sink() execute()
to_kafka()
Each step returns a different type, and each type exposes only the operations that make sense at that point:
| Type | What you have | What you can do next |
|---|---|---|
DataStream<NoSink> | An unkeyed stream | filter_expr, select_expr, filter, map, flat_map, apply_operator, key_by, to_sink, to_kafka, with_tracing, with_observability |
KeyedStream | A stream partitioned by a key | window, aggregate_expr, to_sink, to_kafka, with_tracing, with_observability |
WindowedKeyedStream | Keyed and windowed | aggregate_expr |
DataStream<HasSink> | A sink is attached — ready to run | execute |
aggregate_expr() works on both KeyedStream (global aggregation across all records) and WindowedKeyedStream (per-window aggregation). The closure-based operators (filter, map, flat_map) still live on DataStream<NoSink> for cases where you need per-record logic that doesn’t fit a DataFusion expression.
Example
StreamExecutionEnvironment::new()
// Returns DataStream<NoSink>
.from_source(source)
// Still DataStream<NoSink> — vectorized filter
.filter_expr(col("amount").gt(lit(100)))
// Returns KeyedStream
.key_by(col("user_id"))
// Returns WindowedKeyedStream
.window(TumblingWindows::of(Duration::from_secs(60)))
// `aggregate_expr` takes a Vec of aggregates + a `BatchStateBackend`
// (`RocksDbBackend` implements the trait directly — no wrapper needed).
// Consumes WindowedKeyedStream, returns DataStream<NoSink>.
.aggregate_expr(vec![sum(col("amount"))], state_backend)
// Returns DataStream<HasSink>
.to_sink(sink)
// `execute` returns `ExecutionReport { records_written, epochs_committed }`.
.execute("my-job")
.await?;
Parallelism
Set the parallelism level on the environment. After .key_by(), records are hash-partitioned across N parallel operator instances:
#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
.set_parallelism(4)
.from_source(...)
// ...
}
Each parallel instance has its own state namespace. Checkpoint barriers are aligned across all partitions.
Typestate mechanics
The type progression above is an application of the “typestate” pattern: each builder method consumes self and returns a new type that only exposes the next set of valid operations. DataStream<NoSink> and DataStream<HasSink> share the same struct with a marker type parameter; .to_sink() flips the marker from NoSink to HasSink, which is the only variant that has an .execute() method. You can read the compile errors as a guide — if the compiler says a method doesn’t exist on your stream, you probably need to add (or remove) a step earlier in the chain.
Exactly-Once Semantics
Exactly-once means a record affects your output exactly one time, no matter what goes wrong — a pod gets killed, the network drops, a broker restarts. Replayed records don’t show up twice. Dropped records don’t disappear.
Volley gets there with periodic checkpoints. A coordinator injects a marker (a “barrier”) into the source; the barrier flows through the pipeline alongside your data, and every operator and sink records its state when the barrier passes. If something fails, Volley restarts from the last completed checkpoint and each sink uses an idempotent commit strategy so replayed writes don’t duplicate. The rest of this page walks through that protocol.
Checkpoint Barrier Protocol
1. Coordinator injects CheckpointBarrier(epoch=N) into source
2. Barrier flows through pipeline in-band with data
3. Each operator receives barrier:
a. Calls on_checkpoint() hook (flush caches, prepare state)
b. Forwards barrier downstream
c. State backend snapshots via RocksDB hardlink
4. Sink receives barrier:
a. Commits buffered writes
b. Acknowledges barrier to coordinator
5. Coordinator marks epoch N complete when all sinks ack
The key insight: barriers flow in-band alongside data records. No global pause is needed. Each operator processes the barrier when it arrives, then continues processing data.
Per-Sink Commit Strategies
Each sink type uses a different mechanism to achieve exactly-once:
| Sink | Commit Strategy | Recovery |
|---|---|---|
| Kafka | Transaction commit on barrier (+ send_offsets_to_transaction for Kafka↔Kafka EOS) | Abort + replay on recovery |
| Delta Lake | Epoch tag in commit metadata | Skip already-committed epochs |
| Iceberg | Snapshot commit via REST catalog | Epoch tracking in catalog |
Kafka↔Kafka End-to-End Exactly-Once
When both source and sink are Kafka, Volley supports the full Flink-style EOS protocol via librdkafka’s send_offsets_to_transaction:
#![allow(unused)]
fn main() {
let (stream, cgm) = StreamExecutionEnvironment::new()
.from_kafka_exactly_once(source_config).await?;
stream
.filter_expr(col("status").eq(lit("active")))
.to_kafka_exactly_once(sink_config, cgm).await?
.execute("eos-pipeline").await?;
}
How It Works
-
Setup:
from_kafka_exactly_once()returns a(DataStream, ConsumerGroupMetadata)tuple. TheConsumerGroupMetadata(CGM) is a handle to the Kafka consumer group that the source belongs to. Pass it toto_kafka_exactly_once()so the sink can commit consumer offsets atomically with produced records. -
Barrier injection: When the checkpoint coordinator injects a barrier, the source records its current Kafka partition offsets in
barrier.source_offsets. -
Barrier alignment: If the pipeline has parallel branches, the
BarrierAlignermerges source offsets from all branches before forwarding the barrier to the sink. (TheBarrierAligneris an internal component that ensures all parallel inputs to an operator have delivered the same barrier epoch before proceeding.) -
Atomic commit: When the sink receives the barrier, it:
- Calls
producer.send_offsets_to_transaction(offsets, cgm)— this stages the consumer group offset advance inside the producer’s open transaction - Calls
producer.commit_transaction()— this atomically commits both the produced output records and the consumer offset advance
- Calls
-
No auto-commit: In EOS mode, the sink disables batch-size and deadline-driven auto-commits. The only commit trigger is a checkpoint barrier. This means
CheckpointConfig::intervalmust be shorter than the broker’stransaction.timeout.ms(default 60s), or the broker will fence the producer. -
Recovery: On restart, the consumer group’s last committed offset (written atomically with the transaction) determines where to resume. The runtime skips file-based offset restoration for transactional sources.
Recovery
When a failure occurs, records may be in various stages of processing:
Checkpoint N In-flight records Failure
| |
v v
[committed] ---[r1]---[r2]---[r3]---[r4]---[r5]--- X
^ ^
known good these are lost
The RecoveryCoordinator restores to the last known-good state:
- Find last complete checkpoint — the most recent epoch where all sinks acknowledged their commit
- Restore source offsets — rewind sources to the offsets recorded at that checkpoint (e.g., Kafka partition offsets, S3 file positions)
- Restore operator state — load RocksDB hardlink snapshots from the checkpoint directory, restoring window state, aggregation accumulators, and any custom operator state
- Resume processing — records between the checkpoint and the failure (r1–r5 above) are replayed from the source
Replayed records don’t produce duplicates because each sink type has an idempotent commit strategy:
- Kafka: The incomplete transaction from the failed run is aborted. Replayed records are written in a new transaction. Consumers with
isolation.level=read_committednever see the aborted records. - Delta Lake: The sink checks the Delta log for the current epoch’s
pipeline_id. If an epoch was already committed, the write is skipped entirely. - Iceberg: The sink reads the latest snapshot’s epoch from catalog metadata. Already-committed epochs are skipped.
Operator Lifecycle
The on_checkpoint() hook gives operators a chance to prepare for the snapshot:
#![allow(unused)]
fn main() {
fn on_checkpoint(&mut self, epoch: u64) {
// Flush any buffered state to RocksDB
// The state backend will snapshot after this returns
}
}
For stateful operators using the write-behind cache, on_checkpoint() flushes dirty keys from the in-memory HashMap to RocksDB via BatchStateBackend::write_batch().
State Management
Aggregations, windows, and any custom operator you write can keep per-key state that survives restarts. Volley stores that state in RocksDB on disk, but during normal processing your operator reads and writes an in-memory cache — RocksDB is only touched when a checkpoint fires. The result: stateful operators run at in-memory speed and still recover exactly where they left off after a crash.
The rest of this page walks through how the cache, the backend, and the checkpoint snapshot fit together.
Architecture
+----------------------------------+
| AggregateOperator |
| +----------------------------+ |
| | Write-Behind Cache (HashMap)| |
| | key -> Accumulator | |
| +----------------------------+ |
| | on_checkpoint() |
| v |
| +----------------------------+ |
| | BatchStateBackend (RocksDB)| |
| | WriteBatch for dirty keys | |
| | multi_get for bulk reads | |
| +----------------------------+ |
| | hardlink snapshot |
| v |
| +----------------------------+ |
| | Checkpoint Directory | |
| | /checkpoints/epoch-N/ | |
| +----------------------------+ |
+----------------------------------+
Write-Behind Cache
During normal processing, state reads and writes go to an in-memory HashMap cache. The cache holds accumulator values keyed by partition key. This eliminates all RocksDB I/O during processing, keeping state operations on the hot path entirely in memory.
BatchStateBackend
When a checkpoint barrier arrives:
on_checkpoint()is called on the operator- Dirty keys from the cache are flushed to RocksDB via
WriteBatch(a single atomic write) - RocksDB creates a hardlink snapshot to the checkpoint directory
- The snapshot is nearly instant because hardlinks don’t copy data
Key operations:
write_batch()— Atomic batch write of dirty keysmulti_get()— Bulk read for cache misses (rare during steady-state)- Hardlink checkpoints — O(1) snapshot via filesystem hardlinks to SST files
Per-Key State Isolation
After .key_by(), each parallel operator instance has its own state namespace. Keys are hash-partitioned across instances, so state never conflicts between parallel operators.
Checkpoint Storage
/checkpoints/
epoch-1/ # hardlink snapshot
epoch-2/ # hardlink snapshot
epoch-3/ # latest checkpoint
RocksDB checkpoints use hardlinks to SST files. Incremental growth is small when data doesn’t change between checkpoints. As data mutates, old SST files are retained for older checkpoints.
See the Capacity Planning guide for storage sizing recommendations.
Window State LRU
Window operators (WindowOperator, ExprWindowOperator) maintain an in-memory
index called pending_windows that tracks active (key, window) pairs. Unlike
the aggregate cache (which is a hot copy of data also in RocksDB), this index
is the only record of which windows exist and need to fire on watermark
advance.
To bound memory without data loss, the window LRU uses an evict-to-backend strategy:
+----------------------------------+
| WindowOperator |
| +----------------------------+ |
| | pending_windows (LRU) | |
| | key:window_start -> (key, | |
| | window) | |
| +----------------------------+ |
| | on eviction |
| v |
| +----------------------------+ |
| | State Backend (RocksDB) | |
| | pending_index/{pending_key}| |
| +----------------------------+ |
| | on watermark advance |
| v |
| Scan persisted entries for |
| windows ready to fire |
+----------------------------------+
- When the LRU is full, evicted entries are persisted to a
pending_index/namespace in the state backend - On watermark advance, persisted entries are scanned (only when evictions
have occurred, tracked by a
has_evictedflag) - Fired windows are deleted from both the LRU and the state backend
This ensures no window is orphaned — every pending window eventually fires or is flushed at end-of-stream, regardless of whether it was evicted from the in-memory cache.
Configure via with_max_pending_windows(n) on WindowedKeyedStream. When
unset, the index is unbounded (default, no behavior change from prior releases).
See the design document for full details on trade-offs and the evict-to-backend strategy.
Recovery
On pipeline restart from a checkpoint, the RecoveryCoordinator restores source offsets and opens the RocksDB backend from the checkpoint directory. Stateful operators then lazily recover their in-memory structures on the first process() call:
- Aggregate operators (
ExprAggregateOperator,AggregateOperator) useStateBackend::scan_prefix()(backed by RocksDB’sprefix_iterator) to enumerate all keys with persisted accumulators. Thecacheandpending_keysmaps are rebuilt from the scan results. - Window operators (
WindowOperator,ExprWindowOperator) persist aWindowOperatorMetastruct (containingpending_windowsandcurrent_watermark) to a well-known state key duringon_checkpoint(). On recovery, this metadata is loaded to restore window tracking state and watermark progress.
Recovery is lazy rather than eager: fresh pipelines (no state in RocksDB) pay no cost, and recovery happens only once before the first record is processed.
Horizontal Scaling
When a pipeline outgrows a single node — too many Kafka partitions, too much keyed state to fit in one pod’s RAM, or you need a pod failure not to stop processing — you can run the same pipeline across multiple Kubernetes pods. The code doesn’t change: you add a ClusterConfig and deploy more replicas. Volley hashes keys to consistent “key groups,” assigns ranges of groups to each worker, and shuffles records between workers over Apache Arrow Flight. If you’re not using it, single-node mode stays the default with zero overhead.
This page covers the design. The step-by-step deployment guide lives in Horizontal Scaling Guide.
Design Principles
- Symmetric workers — every pod runs the same binary. No dedicated driver node.
- K8s-native — uses Lease API for leader election, ConfigMaps for checkpoint metadata, StatefulSet DNS for discovery. No ZooKeeper.
- Single-node is free — when
ClusterConfigis absent, no distributed code paths execute. - Shared filesystem — checkpoint data on EFS/Filestore/Azure Files. No state migration needed for rescaling.
Two-Level Key Hashing
Records are partitioned across workers using a two-level scheme:
hash(key) → key_group → worker
- Key group:
hash(key) % max_key_groups(default 256). This is a virtual partition that never changes. - Worker assignment: contiguous ranges of key groups are assigned to workers. Rescaling reassigns ranges without rehashing.
256 key groups, 3 workers:
Worker 0: key groups [0, 86) — 86 groups
Worker 1: key groups [86, 171) — 85 groups
Worker 2: key groups [171, 256) — 85 groups
This design, borrowed from Apache Flink’s KeyGroupRangeAssignment, minimizes data movement during rescaling. Scaling from 2 to 3 workers moves roughly 1/3 of key groups.
Arrow Flight Shuffle
When a keyed operator runs in distributed mode, records are shuffled between workers via Apache Arrow Flight DoExchange:
Worker 0 Worker 1
┌──────────────────────┐ ┌──────────────────────┐
│ Source │ │ Source │
│ ↓ │ │ ↓ │
│ ShuffleRouter │ Arrow Flight │ ShuffleRouter │
│ ├─ local partition ─┼─────────────────→ │ ├─ local partition │
│ └─ remote → Flight ─┼─────────────────→ │ └─ remote → Flight│
│ │ ←──────────────── │ │
│ BarrierAligner │ │ BarrierAligner │
│ (local + remote) │ │ (local + remote) │
│ ↓ │ │ ↓ │
│ Sink │ │ Sink │
└──────────────────────┘ └──────────────────────┘
- ShuffleRouter replaces
PartitionRouterfor keyed operators. Routes viakey_group → worker → local mpsc or Flight sender. - Non-keyed operators (filter, map, flat_map) run locally — no shuffle.
- BarrierAligner merges both local partition outputs and remote Flight inputs.
- Barriers and watermarks are broadcast to all local partitions AND all remote workers.
Wire Format
StreamElement variants are encoded as FlightData with a discriminator byte in app_metadata[0]:
| Byte | Variant | data_body | app_metadata[1..] |
|---|---|---|---|
| 0x01 | Data | Arrow IPC RecordBatch | JSON record metadata |
| 0x02 | Barrier | empty | JSON CheckpointBarrier |
| 0x03 | Watermark | empty | JSON Watermark |
K8s-Native Coordination
Leader Election
One worker acquires a K8s Lease (coordination.k8s.io/v1) and becomes the coordinator. The coordinator handles checkpoint injection and key group assignment. It still processes data normally — coordination is lightweight.
If the leader dies, the lease expires and another worker takes over.
Checkpoint Metadata
Completed checkpoint metadata is stored in K8s ConfigMaps:
{app}-ckpt-epoch-{N}— per-epoch metadata (workers, state paths, source offsets){app}-ckpt-latest— pointer to the latest completed epoch
ConfigMaps have no OwnerReference, so they survive CRD deletion. This allows recovery after redeployment.
Shared Filesystem Checkpoints
Checkpoint data is written to a shared filesystem (AWS EFS, GCP Filestore, Azure Files) mounted as a ReadWriteMany PVC. All workers write to and read from the same mount.
/mnt/volley/checkpoints/
epoch-42/
worker-0/ ← RocksDB SST files
worker-1/
worker-2/
Recovery is near-instant: the pod reads the checkpoint directory directly. No download step.
Worker Discovery
Workers discover each other via StatefulSet headless service DNS:
{app}-{ordinal}.{app}-headless.{namespace}.svc:{flight_port}
Live Rescaling
Since all workers share the same filesystem, rescaling doesn’t require state migration:
- Coordinator detects replica count change
- Triggers a checkpoint — all workers flush in-memory state to shared FS
- Computes new
KeyGroupAssignmentand diffs against the old one - Broadcasts
RescaleEventviawatchchannel - Workers stop processing lost key groups, load state for gained ones from the shared mount
- Resume — no restart, no data transfer
Key Types
| Type | Location | Purpose |
|---|---|---|
ClusterConfig | cluster::config | Worker ID, num workers, headless service, app name, flight port |
KeyGroupAssignment | cluster::assignment | Maps key groups to workers with contiguous range assignment |
KeyGroupRange | cluster::assignment | A [start, end) range of key groups owned by one worker |
ShuffleRouter | cluster::shuffle | Routes records to local or remote destinations |
ShuffleFlightService | cluster::flight::server | Arrow Flight DoExchange server |
ShuffleFlightClient | cluster::flight::client | Arrow Flight DoExchange client with connection pooling |
LeaseLeaderElector | cluster::coordinator::leader | K8s Lease-based leader election |
DistributedCheckpointCoordinator | cluster::coordinator::checkpoint_coordinator | ConfigMap-based checkpoint metadata |
RescaleCoordinator | cluster::rescale | Checkpoint + key group reassignment |
Writing Operators
Operators are the steps in the middle of your pipeline — they filter, reshape, and aggregate records as they flow from source to sink. Most of what you need is already built in. This guide shows you the built-in operators to reach for first, and how to drop down to closures or a custom Operator trait when the built-ins can’t express what you need.
Expression-Based Operators (Recommended)
Expression operators use DataFusion expressions for vectorized, multi-row processing. These are the default choice — they operate on whole Arrow RecordBatches at once instead of one row at a time:
| Method | Description |
|---|---|
.filter_expr(expr) | Keep records matching an expression (e.g., col("price").gt(lit(100))) |
.select_expr(exprs) | Project/rename columns (e.g., vec![col("user_id"), col("total").alias("sum")]) |
.key_by(expr) | Partition by a key column (e.g., col("user_id")) — transitions to KeyedStream |
.window(window_type) | Assign records to time windows (on KeyedStream) — transitions to WindowedKeyedStream |
.aggregate_expr(aggs, state_backend) | Windowed aggregation. First arg is a Vec<AggregateExprDef>; second is any BatchStateBackend (e.g., RocksDbBackend directly — no wrapper needed). |
Example
use std::time::Duration;
use volley_core::prelude::*;
// source: impl Source, sink: impl DynSink (see the Quick Start for full examples)
let tmp = tempfile::tempdir().unwrap();
StreamExecutionEnvironment::new()
.from_source(source)
.filter_expr(col("price").gt(lit(100)))
.select_expr(vec![col("user_id"), col("price"), col("quantity")])
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(60)))
.aggregate_expr(
vec![sum(col("price"))],
RocksDbBackend::open(tmp.path().join("state")).unwrap(),
)
.to_sink(sink)
.execute("expr-pipeline")
.await?;
Available Aggregation Functions
| Function | Description |
|---|---|
sum(expr) | Sum of values |
count(expr) | Count of values |
avg(expr) | Average of values |
min(expr) | Minimum value |
max(expr) | Maximum value |
All are imported via volley_core::prelude::*. Expression helpers col() and lit() are also in the prelude.
Closure-Based Operators
Use closure operators when you need access to StreamRecord metadata (like event_time_ms or the per-record trace context) or when the logic doesn’t fit a DataFusion expression. They receive one StreamRecord at a time:
| Method | Description |
|---|---|
.filter(fn) | Keep records matching a predicate |
.map(fn) | Transform each record |
.flat_map(fn) | Transform each record into zero or more records |
.apply_operator(op) | Apply a custom Operator implementation |
#![allow(unused)]
fn main() {
// Drop records whose event time is older than a cutoff — a good fit for
// a closure because `event_time_ms` lives on StreamRecord, not the batch.
stream.filter(move |r| r.event_time_ms.map_or(false, |t| t > cutoff))
}
Note: Closure operators process one
StreamRecordat a time. For bulk column operations on the record payload, expression operators (filter_expr,select_expr) are more efficient because they use DataFusion’s vectorized kernels on the fullRecordBatch.
Custom Operators
Implement the Operator trait from volley-core:
#![allow(unused)]
fn main() {
use volley_core::prelude::*;
struct MyOperator;
impl Operator for MyOperator {
fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
// Transform the record
// Return zero or more output records
vec![record]
}
fn on_checkpoint(&mut self, epoch: u64) {
// Optional: flush any buffered state before checkpoint
}
}
}
Apply it to a stream:
#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
.from_source(source)
.apply_operator(MyOperator)
.to_sink(sink)
.execute("my-job")
.await?;
}
Multi-Row RecordBatch Processing
Operators receive StreamRecord which wraps an Arrow RecordBatch. A batch may contain one or more rows. Your operator should handle multi-row batches correctly:
#![allow(unused)]
fn main() {
fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
let batch = &record.batch;
let num_rows = batch.num_rows();
// Process all rows, not just row 0
// ...
}
}
Stateful Operators
For operators that maintain state across records, use the on_checkpoint() hook to flush state before a checkpoint snapshot:
#![allow(unused)]
fn main() {
struct CountingOperator {
counts: HashMap<String, u64>,
}
impl Operator for CountingOperator {
fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
// Extract key from the batch (assumes a "key" column exists)
let key = record.batch
.column_by_name("key")
.and_then(|col| col.as_any().downcast_ref::<StringArray>())
.map(|arr| arr.value(0).to_string())
.unwrap_or_default();
*self.counts.entry(key).or_default() += 1;
vec![record]
}
fn on_checkpoint(&mut self, epoch: u64) {
// Flush state to durable storage here
// The state backend will snapshot after this returns
}
}
}
See State Management for details on the write-behind cache and RocksDB backend.
Type-State Safety
The Pipeline Builder enforces valid operator chains at compile time. You can only call .aggregate_expr() on a WindowedKeyedStream, not on a plain DataStream. You can only call .execute() on a DataStream<HasSink>.
Writing Connectors
Connectors are how Volley reads from and writes to external systems. Volley provides built-in connectors for Kafka, cloud blob stores, Delta Lake, and Iceberg. You can also write your own.
Source and Sink Traits
#![allow(unused)]
fn main() {
// Source: produces StreamRecords
trait Source: Send {
async fn poll(&mut self) -> Option<StreamRecord>;
fn connector_id(&self) -> &str;
}
// Sink: consumes StreamRecords
trait Sink: Send {
async fn write(&mut self, record: StreamRecord) -> Result<()>;
async fn flush(&mut self) -> Result<()>;
fn on_checkpoint(&mut self, epoch: u64);
fn connector_id(&self) -> &str;
}
}
Sources produce StreamRecord values via poll(). Sinks consume them via write() and flush(). Both support the checkpoint lifecycle via on_checkpoint().
Cloud Blob Store Pattern
All cloud blob store connectors follow a shared architecture:
Cloud Queue/Topic Cloud Storage
(SQS / Azure Queue) (S3 / Azure Blob)
| |
v v
NotificationSource BlobReader
| |
+--------+ +-----------+
| |
v v
BlobSource (volley-connector-blob-store)
|
v
Decoder (Parquet / JSON / CSV / Avro)
|
v
RecordBatch --> Pipeline
To add a new cloud backend, implement two traits:
NotificationSource— polls a queue for new object notificationsBlobReader— reads object data from cloud storage
The decoder layer (Parquet, JSON, CSV, Avro -> Arrow RecordBatch) is shared across all cloud backends.
Feature Flags
The volley-connectors umbrella crate uses feature flags:
| Feature | What it includes |
|---|---|
kafka | KafkaSource, KafkaSink |
blob-store | BlobSource, decoders |
blob-store-aws | S3Source, SqsNotificationSource, S3BlobWriter |
blob-store-azure | AzureBlobSource, AzureQueueNotificationSource, AzureBlobWriter |
blob-store-gcs | GcsBlobWriter (sink only; source removed in v1.1.0) |
file-formats | Parquet, JSON, CSV, Avro decoders |
table-formats | Delta Lake + Iceberg sinks |
Existing Connectors
- Kafka — source and sink with exactly-once semantics
- Delta Lake — sink with epoch-tagged commits
- Iceberg — sink via REST catalog
- AWS S3 — source via SQS notifications
- Blob Store — Azure Blob and GCS sources
Windowing
Streams never end, so “the average price” only makes sense over a time range: the last minute, the last hour, this user’s current session. Windowing is how you slice an unbounded stream into finite chunks that aggregations can actually compute over.
Volley gives you three shapes of window — tumbling, sliding, and session — all driven by event time (the time the record happened) rather than wall-clock time (when it arrived). This means late or out-of-order data still lands in the right window.
The shortest path: .key_by(col("user_id")).window(TumblingWindows::of(Duration::from_secs(60))).aggregate_expr(vec![sum(col("amount"))], state_backend). Read on for how to pick the window shape and tune watermarks and late data.
Window Types
Tumbling Windows
Fixed-size, non-overlapping windows:
#![allow(unused)]
fn main() {
.window(TumblingWindows::of(Duration::from_secs(60)))
}
Each record belongs to exactly one window.
Sliding Windows
Fixed-size, overlapping windows:
#![allow(unused)]
fn main() {
.window(SlidingWindows::of(
Duration::from_secs(60), // window size
Duration::from_secs(10), // slide interval
))
}
Each record may belong to multiple windows.
Session Windows
Dynamic windows that merge on activity:
#![allow(unused)]
fn main() {
.window(SessionWindows::with_gap(Duration::from_secs(30)))
}
A new session starts when the gap between records exceeds the threshold.
Event-Time Semantics
Event stream: --[e1 t=1]--[e2 t=3]--[e3 t=7]--[watermark t=10]--[e4 t=5]-->
TumblingWindows(size=5):
Window [0, 5): e1, e2 -- fires at watermark t=10 (window end <= 10)
Window [5, 10): e3 -- fires at watermark t=10
e4 (t=5): late data -- routed to side output if allowed_lateness passed
SessionWindows(gap=3):
Session 1: e1(t=1), e2(t=3) -- gap(3-1)=2 < 3, merged
Session 2: e3(t=7) -- gap(7-3)=4 >= 3, new session
Windows fire when a watermark advances past the window’s end time.
Watermarks
Watermarks track event-time progress. The BoundedOutOfOrdernessGenerator allows configurable out-of-orderness before advancing the watermark:
#![allow(unused)]
fn main() {
.with_watermark_generator(
BoundedOutOfOrdernessGenerator::new(Duration::from_secs(5))
)
}
This allows records to arrive up to 5 seconds late before the watermark advances past them.
Late Data
Records that arrive after the watermark has passed their window are considered late. Configure side_output_late_data() with a tag and attach a side output sink via with_late_data_sink() to route late events to a separate destination (e.g., a dead-letter queue):
#![allow(unused)]
fn main() {
let late_tag = OutputTag::new("late-events");
StreamExecutionEnvironment::new()
.from_source(source)
.with_watermarks(WatermarkConfig::new(Duration::from_secs(5)))
.with_late_data_sink(late_events_sink) // side output sink
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(60)))
.allowed_lateness(Duration::from_secs(10))
.side_output_late_data(late_tag)
.aggregate_expr(vec![sum(col("amount"))], state_backend)
.to_sink(main_sink)
.execute("job")
.await?;
}
Late events are written to the side output sink as StreamRecords with the schema from late_event_schema(): event_data (JSON string), event_time, watermark, window_start, window_end, lateness_ms. The side output sink participates in checkpoint barriers for exactly-once semantics.
Without with_late_data_sink(), late events are logged and discarded.
Batch Processing Performance
The window operator is designed to exploit Arrow’s columnar batch processing. When a RecordBatch arrives with N rows, the operator avoids per-row state backend I/O through two mechanisms:
Write-behind accumulator cache
Accumulator state (partial aggregation results per key per window) is cached in an in-memory HashMap. During process(), the operator reads and updates accumulators entirely in memory. State is only flushed to RocksDB when a checkpoint barrier arrives — the same write-behind pattern used by the non-windowed aggregate_expr() operator.
This means a batch of 1000 rows touching 50 unique (key, window) pairs does zero RocksDB I/O during processing. The only state backend calls happen on cache misses after recovery, or when the pending_windows LRU evicts entries.
Group-then-update strategy
Instead of processing each row individually, the operator groups row indices by (key, window) and then calls update_batch() once per group with a multi-row Arrow array:
Batch: 4000 rows, 100 keys, SlidingWindows(10s, 2s) → 5 windows per row
Step 1 — Assign & group:
For each row, compute (key, window) pairs → HashMap<(key, window), [row indices]>
Result: ~500 groups (100 keys × 5 windows), ~8 rows per group
Step 2 — Batch update:
For each group:
arrow::compute::take(value_column, group_indices) → multi-row array
accumulator.update_batch([multi_row_array]) → one call, not 8
This reduces 20,000 update_batch() calls (4000 rows × 5 windows) to 500 calls with ~8-row arrays each.
Batch-level event time fast path: When all rows share the same event time (no timestamp_column configured), window assignment is computed once for the entire batch rather than per-row. The operator then only groups by key, which is even cheaper.
Session window fallback: Session windows require per-row sequential processing because each event can trigger a merge of overlapping sessions. The batch path is used for tumbling and sliding windows only.
Memory Management
The window operator tracks active (key, window) pairs in an in-memory index called pending_windows. With high-cardinality keys or wide/sliding windows, this index can grow large.
Bounding Window Memory with LRU
Use with_max_pending_windows() to cap the in-memory index size. Evicted entries are persisted to the state backend and scanned during watermark advancement, so no data is lost:
#![allow(unused)]
fn main() {
stream
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(60)))
.with_max_pending_windows(100_000) // bound in-memory window index
.aggregate_expr(vec![sum(col("amount"))], state_backend)
}
When unset (the default), the index is unbounded — identical to previous releases.
Sizing guidance:
| Key cardinality | Windows per key | Recommended limit | Approx. memory |
|---|---|---|---|
| < 10K | Any | Unbounded (default) | < 10 MB |
| 10K–100K | 1 (tumbling) | 100,000 | ~10 MB |
| 100K–1M | 1 (tumbling) | 500,000 | ~50 MB |
| Any | 10+ (sliding) | Multiply by panes | Varies |
Each pending entry is ~100 bytes. Evicted entries add a small RocksDB I/O cost on watermark advance.
Note:
with_max_pending_windows()is not recommended with session windows. Session window merging requires scanning the pending index for overlapping sessions, and evicted entries may cause additional state backend I/O during merges.
Monitoring
Monitor the volley_window_pending_size gauge and volley_window_pending_evictions_total counter to tune the limit. A high eviction rate indicates the LRU is too small for the workload. See Observability for the full metrics reference.
Per-Record Tracing
Volley can trace individual records as they flow through a pipeline, creating OpenTelemetry spans at each processing stage. This gives you a complete picture of what happened to a record: which operators processed it, how long each step took, what the input and output looked like, and whether any errors occurred.
Quick Start
#![allow(unused)]
fn main() {
use volley_core::prelude::*;
use volley_core::observability::{TracingConfig, SamplingStrategy};
let results = StreamExecutionEnvironment::new()
.from_source(my_source)
.with_tracing(TracingConfig {
sampling: SamplingStrategy::Ratio(0.01),
..Default::default()
})
.map(|record| {
// Your transformation
Ok(record)
})
.to_sink(my_sink)
.execute("traced-pipeline")
.await?;
}
Open Jaeger or Grafana Tempo to see traces. Each sampled record appears as a trace with child spans for source, operators, and sink.
How It Works
- Sampling decision happens at the source. Based on your
SamplingStrategy, the runtime decides whether to trace each record. - Root span is created for sampled records (
volley.record). The span’s context is attached to theStreamRecord.tracefield. - Child spans are created automatically at each pipeline stage (operators, sink). No code changes needed in your operators.
- Payload previews (optional) capture a JSON snapshot of the first row at each operator’s input and output, size-capped to
max_payload_bytes. - Cleanup happens at pipeline exit. Any root spans for records that were filtered or lost to errors are ended.
Sampling
At production throughput (thousands of records per second), tracing every record would overwhelm your tracing backend. Use Ratio sampling:
| Throughput | Suggested ratio | Traces/sec |
|---|---|---|
| 1K rec/s | 0.10 (10%) | ~100 |
| 10K rec/s | 0.01 (1%) | ~100 |
| 100K rec/s | 0.001 (0.1%) | ~100 |
Use Always only during development or debugging.
Window and Aggregate Traces
Window and aggregate operators accumulate input records into new output records. The output gets its own trace, linked back to the input traces via OpenTelemetry span links. This preserves lineage without creating misleading parent-child relationships.
The volley.window.input_trace_count attribute tells you how many sampled records contributed to each aggregation result.
Kafka Trace Propagation
When using Kafka connectors, trace context propagates automatically via W3C traceparent headers:
- Kafka source extracts
traceparentfrom message headers - Kafka sink injects
traceparentinto produced messages
A traced request in an upstream service continues its trace through Volley and into downstream consumers. The runtime respects upstream sampling decisions (parent-based sampling).
Configuration Reference
| Field | Type | Default | Description |
|---|---|---|---|
sampling | SamplingStrategy | Never | Which records to trace |
max_payload_bytes | usize | 1024 | Max JSON preview size per span event |
capture_payload | bool | true | Whether to attach input/output previews |
Viewing Traces
Volley emits traces via the OTLP gRPC exporter configured in ObservabilityConfig. Point it at your collector:
#![allow(unused)]
fn main() {
.with_observability(ObservabilityConfig::new()
.with_node_id("node-1")
.with_otlp_endpoint("http://otel-collector:4317"))
.with_tracing(TracingConfig {
sampling: SamplingStrategy::Ratio(0.01),
..Default::default()
})
}
Compatible backends: Jaeger, Grafana Tempo, Datadog, Honeycomb, or any OTLP-compatible collector.
Error Handling
When a record fails during processing (e.g., a transformation returns an error), the record’s trace span is:
- Marked with
otel.status_code = ERROR - Annotated with the error message via
span.record_error() - Ended at the point of failure
Failed records are visible in your tracing backend by filtering on otel.status_code = ERROR. This makes tracing a useful debugging tool even when records are dropped or filtered.
Performance Impact
Tracing adds minimal overhead when sampling is configured appropriately:
| Sampling | Overhead | Notes |
|---|---|---|
Never | ~0% | No spans created; trace context still propagated for Kafka |
Ratio(0.001) | < 1% | Recommended for high-throughput production |
Ratio(0.01) | ~1-2% | Good for staging or moderate-throughput production |
Always | ~5-10% | Development and debugging only |
The overhead comes from span creation and OTLP export, not from sampling decisions (which are a single random number comparison).
Troubleshooting
No traces appearing in the backend
- Verify
with_observability()is configured with the correctotlp_endpoint - Check that the OTLP collector is reachable from your application
- Confirm
samplingis not set toNever(the default) - Check collector logs for rejected spans (often caused by resource limits)
Traces appear but are missing operators
- Ensure
with_tracing()is called before operators are added to the pipeline - Custom operators using
apply_operator()create spans automatically; no manual instrumentation needed
Kafka traces not connecting to upstream services
- The upstream producer must inject W3C
traceparentheaders into Kafka message headers - If the upstream doesn’t send
traceparent, Volley creates a new root trace (no parent linkage)
ML Inference
Enrich every record with a model prediction — sentiment scores on chat messages, fraud scores on transactions, embeddings on product descriptions — without leaving your pipeline. The volley-ml crate gives you stream operators that call ONNX or Candle models in-process, or forward batches to an external model server (vLLM, Triton, LiteLLM). The ergonomics match the rest of Volley: a config builder, a method on your stream, and the model output becomes extra Arrow columns downstream.
Backend Configuration
Configure the ML backend once and share it across operators:
#![allow(unused)]
fn main() {
use std::sync::Arc;
use volley_ml::prelude::*;
// CPU-only ONNX (simplest)
let ml = Arc::new(MlBackendConfig::onnx_cpu());
// ONNX on CUDA GPU with limits
let ml = Arc::new(MlBackendConfig::onnx_cuda(0)
.with_num_threads(4)
.with_memory_limit(4 * 1024 * 1024 * 1024));
// Candle on CPU (pure Rust, no C++ toolchain)
let ml = Arc::new(MlBackendConfig::candle_cpu());
// Candle on Metal (macOS GPU)
let ml = Arc::new(MlBackendConfig::candle_metal());
// ONNX on macOS Metal / Neural Engine via CoreML
let ml = Arc::new(MlBackendConfig::onnx_coreml());
// CoreML with specific compute units and model format
let ml = Arc::new(MlBackendConfig::onnx_coreml()
.with_coreml_compute_units(CoreMLComputeUnits::CpuAndGpu)
.with_coreml_model_format(CoreMLModelFormat::MLProgram));
// Candle with explicit architecture and HF revision
let ml = Arc::new(MlBackendConfig::candle_cpu()
.with_architecture(CandleArchitecture::ModernBert)
.with_hf_revision("v1.0"));
}
Loading ONNX Models from HuggingFace Hub
Pass a HuggingFace repo ID as the model path — the backend downloads and caches the ONNX model and tokenizer automatically:
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cpu());
stream
.embed(EmbeddingConfig::new(
"sentence-transformers/all-MiniLM-L6-v2", // HF repo ID
"text",
ml.clone(),
))
.await?
}
Models are cached in ~/.cache/huggingface/hub/ and reused across runs.
Revision Pinning
Pin a specific revision with @:
#![allow(unused)]
fn main() {
EmbeddingConfig::new("user/model@v1.0", "text", ml.clone())
}
Or set it on the config:
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cpu()
.with_hf_revision("abc123"));
}
ONNX Model Variants
Many repos include optimized variants. Select one with with_onnx_model_file():
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cpu()
.with_onnx_model_file("onnx/model_O2.onnx")); // O2 optimized
}
Common variants:
onnx/model.onnx— standard (default)onnx/model_O2.onnx— O2 optimizedonnx/model_qint8_arm64.onnx— quantized for ARM64
Operators
Embedding (.embed())
Generate vector embeddings from a text column:
#![allow(unused)]
fn main() {
stream
.embed(EmbeddingConfig::new("models/e5-small.onnx", "text", ml.clone())
.with_output_column("embedding")
.with_embedding_dim(384))
.await?
}
Appends a FixedSizeList<Float32> column to the output.
Classification (.classify())
Classify records and get label + confidence score:
#![allow(unused)]
fn main() {
stream
.classify(ClassifyConfig::new("models/fraud.onnx", "features", ml.clone())
.with_label_column("is_fraud")
.with_score_column("fraud_score")
.with_labels(vec!["legitimate", "fraud"]))
.await?
}
Appends Utf8 label and Float32 score columns.
Generic Inference (.infer())
Run any ONNX model on selected columns:
#![allow(unused)]
fn main() {
stream
.infer(InferenceConfig::new("models/custom.onnx", ml.clone())
.with_input_columns(vec!["feature_1", "feature_2"])
.with_output_columns(vec![
("prediction", DataType::Float32),
]))
.await?
}
External Model Server (.infer_remote())
Call OpenAI-compatible APIs (vLLM, TGI, LiteLLM):
#![allow(unused)]
fn main() {
stream
.infer_remote(RemoteInferenceConfig::new(
"http://localhost:8000/v1/embeddings",
ApiFormat::OpenAI,
)
.with_input_columns(vec!["text"])
.with_output_columns(vec![("embedding", DataType::Float32)])
.with_max_concurrent(32)
.with_timeout(Duration::from_secs(10)))
.await?
}
Candle Backend
The Candle backend provides pure-Rust inference using HuggingFace’s Candle library. No C++ toolchain required.
Loading from HuggingFace Hub
Pass a HuggingFace repo ID as the model path — the backend downloads and caches model weights, config, and tokenizer automatically:
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cpu());
stream
.embed(EmbeddingConfig::new(
"sentence-transformers/all-MiniLM-L6-v2", // HF repo ID
"text",
ml.clone(),
))
.await?
}
Pin a specific revision with @:
#![allow(unused)]
fn main() {
EmbeddingConfig::new("user/model@v1.0", "text", ml.clone())
}
Or set it on the config:
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cpu()
.with_hf_revision("abc123"));
}
Loading from Local Files
Point to a local directory containing model.safetensors, config.json, and optionally tokenizer.json:
#![allow(unused)]
fn main() {
stream
.embed(EmbeddingConfig::new("/models/my-bert", "text", ml.clone()))
.await?
}
Supported Architectures
The backend auto-detects the architecture from the model’s config.json (model_type field):
Encoders
model_type | Architecture | Enum Variant | Use Cases |
|---|---|---|---|
bert, roberta | BERT | Bert | Embeddings, classification |
distilbert | DistilBERT | DistilBert | Faster embeddings (6-layer distilled) |
nomic_bert | NomicBERT | NomicBert | Nomic Embed text embeddings |
jina_bert | JinaBERT | JinaBert | Jina AI embeddings (ALiBi attention) |
modernbert | ModernBERT | ModernBert | Improved BERT with rotary embeddings |
deberta-v2 | DeBERTa V2 | DebertaV2 | High-accuracy classification |
xlm-roberta | XLM-RoBERTa | XlmRoberta | Multilingual embeddings |
Decoders
model_type | Architecture | Enum Variant | Use Cases |
|---|---|---|---|
llama | Llama | Llama | Classification, inference |
mistral | Mistral | Mistral | Classification, inference |
gemma2 | Gemma 2 | Gemma2 | Classification, inference |
phi3 | Phi-3 | Phi3 | Small/efficient inference |
deepseek_v2 | DeepSeek V2 | DeepSeekV2 | MoE inference |
Encoder models output [batch, hidden_size] (CLS token extraction). Decoder
models output [batch, vocab_size] (last-token logits).
Override auto-detection with with_architecture():
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cpu()
.with_architecture(CandleArchitecture::ModernBert));
}
Tokenizer Integration
When the tokenizers feature is enabled and the model has a tokenizer.json, text columns are automatically tokenized before the forward pass. For HuggingFace Hub models, Volley also checks onnx/tokenizer.json when the repo keeps shared tokenizer assets under the onnx/ subdirectory. No manual tokenization needed.
For .classify(), the backend must return per-class logits. Volley now validates that the model output width matches the configured label count and raises an operator error when a backend returns encoder embeddings instead of classifier logits.
volley-ml = { version = "1.0.0", features = ["candle", "tokenizers"] }
GPU Acceleration
GPU inference is 10–100× faster for transformer models. Volley supports CUDA (NVIDIA) and Metal (macOS) GPUs.
CUDA (NVIDIA GPUs)
ONNX Runtime: CUDA is handled at runtime via execution providers — no
compile-time feature flag needed. Use MlBackendConfig::onnx_cuda(device_id):
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cuda(0)); // GPU 0
}
If CUDA is unavailable, the ONNX Runtime silently falls back to CPU.
Candle: Requires the cuda compile-time feature (pulls in cudarc CUDA
bindings):
volley-ml = { version = "1.0.0", features = ["candle", "tokenizers", "cuda"] }
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cuda(0));
}
Prerequisites: NVIDIA driver and CUDA toolkit must be installed. Verify
with nvidia-smi or volley doctor.
Metal (macOS GPUs)
Requires the metal compile-time feature:
volley-ml = { version = "1.0.0", features = ["candle", "tokenizers", "metal"] }
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_metal());
}
CoreML (ONNX on macOS)
The CoreML execution provider accelerates ONNX models on macOS using Metal GPU
and/or the Apple Neural Engine. Requires the coreml compile-time feature:
volley-ml = { version = "1.0.0", features = ["onnx", "coreml"] }
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_coreml());
}
If CoreML is unavailable (e.g., on Linux), the ONNX Runtime silently falls back to CPU.
Partial operator coverage: CoreML does not support every ONNX operation. Unsupported ops automatically fall back to CPU within the same session. For example, DistilBERT has 387 graph nodes but only 17 are CoreML-eligible. This means most computation still runs on CPU, and the CoreML compilation overhead can make the “GPU” path slower than pure CPU for small models or small batch sizes. Use
GPU=cputo benchmark both paths. Models with convolution-heavy architectures (vision models) tend to have better CoreML coverage than attention-heavy transformers.
Compute units control which hardware CoreML dispatches to:
| Value | Hardware |
|---|---|
All (default) | CPU + GPU + Neural Engine |
CpuAndGpu | CPU + Metal GPU only |
CpuAndNeuralEngine | CPU + Neural Engine only |
CpuOnly | CPU only (debugging) |
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_coreml()
.with_coreml_compute_units(CoreMLComputeUnits::CpuAndGpu));
}
Model format controls the internal CoreML representation:
| Format | Requirement | Notes |
|---|---|---|
NeuralNetwork (default) | macOS 10.15+ | Broadest compatibility |
MLProgram | macOS 12+ | More operators, potentially faster |
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_coreml()
.with_coreml_model_format(CoreMLModelFormat::MLProgram));
}
GPU Detection
Check GPU availability at runtime without loading a model:
#![allow(unused)]
fn main() {
let info = volley_ml::gpu::detect_gpu();
println!("{}", info.summary);
// "CUDA: disabled (enable `cuda` feature); Metal: available; CoreML: available"
if info.cuda_available {
println!("Using GPU with {} CUDA device(s)", info.cuda_device_count);
}
if info.coreml_available {
println!("CoreML available for ONNX Metal acceleration");
}
}
The volley doctor CLI command also reports GPU status.
Feature Flags
Add volley-ml to your Cargo.toml with the features you need:
[dependencies]
# ONNX Runtime backend (CPU)
volley-ml = { version = "1.0.0", features = ["onnx"] }
# ONNX on CUDA GPU (runtime EP registration, no compile-time flag)
volley-ml = { version = "1.0.0", features = ["onnx"] }
# Candle backend with tokenizers (pure Rust, CPU)
volley-ml = { version = "1.0.0", features = ["candle", "tokenizers"] }
# Candle on CUDA GPU
volley-ml = { version = "1.0.0", features = ["candle", "tokenizers", "cuda"] }
# Candle on macOS Metal GPU
volley-ml = { version = "1.0.0", features = ["candle", "tokenizers", "metal"] }
# ONNX on macOS CoreML (Metal GPU + Neural Engine)
volley-ml = { version = "1.0.0", features = ["onnx", "coreml"] }
# External model servers
volley-ml = { version = "1.0.0", features = ["remote-http"] }
Backend Comparison
ONNX Runtime (onnx) | Candle (candle) | |
|---|---|---|
| Language | C++ via FFI | Pure Rust |
| Model format | ONNX (universal) | SafeTensors / HF Hub |
| Model loading | Local file or HuggingFace Hub | Local file or HuggingFace Hub |
| Tokenizer | External (user handles) | Built-in (tokenizers feature) |
| GPU | CUDA, TensorRT, CoreML (coreml feature) | CUDA, Metal |
| Best for | Any ONNX-exportable model | HF transformer models |
| Build | Needs C++ toolchain (auto-downloaded) | Pure cargo build |
Performance Tuning
Session Pooling (ONNX)
By default, a single ONNX session is shared across all inference calls. For concurrent pipelines, pool multiple sessions to eliminate lock contention:
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cuda(0)
.with_session_pool_size(4)); // 4 sessions, round-robin
}
Each session is an independent model copy. A pool of 2–4 is usually sufficient.
Embedding Cache
Enable an LRU cache to skip inference for repeated text:
#![allow(unused)]
fn main() {
stream
.embed(EmbeddingConfig::new("model.onnx", "text", ml.clone())
.with_cache(10_000)) // Cache up to 10,000 unique embeddings
.await?
}
Identical input strings are served from cache. This can eliminate 50–90% of model calls in pipelines with repeated product names, log messages, etc.
Half-Precision Inference (FP16 / BF16)
Load Candle model weights in half precision for 2× memory savings and faster GPU inference on hardware with tensor cores:
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cuda(0)
.with_dtype(ModelDType::F16));
}
On CPU, F16/BF16 are automatically promoted to F32 since most CPUs lack native half-precision arithmetic.
Remote Batch Accumulation
When upstream batches are small, buffer them before sending to the model server:
#![allow(unused)]
fn main() {
RemoteInferenceConfig::new("http://model-server:8000/v1/embeddings", ApiFormat::OpenAI)
.with_batch_accumulation(64, Duration::from_millis(50))
.with_max_concurrent(16)
}
This sends a single HTTP request once 64 rows accumulate or 50ms elapse, reducing round trips.
Running the Example
The ML inference example works out of the box with zero configuration. It auto-downloads a sentiment classification model from HuggingFace Hub and auto-detects Metal (CoreML) on macOS:
# Just works — downloads model from Hub, auto-detects Metal on macOS
cargo run --example ml_inference_pipeline -p volley-examples --features ml
Override defaults with environment variables:
MODEL_PATH=./my-model.onnx— use a local ONNX model instead of downloading from HubGPU=cpu— force CPU backend (disables Metal/CoreML auto-detection)
Recent Bug Fixes
- Hub tokenizer resolution — tokenizer download now checks sibling paths
(e.g.,
onnx/tokenizer.json) whentokenizer.jsonis not at the repo root. - Softmax normalization — classification scores now use softmax normalization, producing proper [0, 1] probabilities instead of raw logits.
CLI Template
Scaffold an ML pipeline project with one command:
volley new --template ml-pipeline my-classifier
cd my-classifier
This generates a Kafka-to-Kafka pipeline with ONNX classification:
src/main.rs— Reads text from Kafka, classifies with.classify(), writes enriched records to output topicCargo.toml— Includesvolley-mlwithonnxandtokenizersfeatures.env.example— Kafka connection andMODEL_PATHconfiguration
Edit src/main.rs to customize:
- Change the model: Update
MODEL_PATHand the label list in.classify() - Swap operators: Replace
.classify()with.embed()for embeddings or.infer()for generic inference - Enable GPU: Change
MlBackendConfig::onnx_cpu()toMlBackendConfig::onnx_cuda(0) - Use Candle: Switch to
MlBackendConfig::candle_cpu()for pure-Rust inference - Add remote inference: Chain
.infer_remote(RemoteInferenceConfig::new(...))
Integration Testing
Volley provides the volley-test-harness crate — a shared test framework inspired by Apache Flink’s testing patterns. It gives you containers, assertion harnesses, and a standardized connector test suite so you can validate pipelines end-to-end with real infrastructure.
Architecture
The test harness is layered — each layer builds on the one below it:
┌──────────────────────────────────────────────────┐
│ End-to-End Tests (tests/) │
│ Full pipelines with real connectors │
├──────────────────────────────────────────────────┤
│ Connector Framework (connector/) │
│ ConnectorTestContext + source/sink suites │
├──────────────────────────────────────────────────┤
│ Assertion Harnesses (assertions/) │
│ MetricsSnapshot, TraceCapture, ThroughputRecorder│
├──────────────────────────────────────────────────┤
│ TestCluster (cluster.rs) │
│ In-process pipeline runner + observability │
├──────────────────────────────────────────────────┤
│ Container Management (containers/) │
│ Kafka, MinIO with retry logic + ARM64 images │
└──────────────────────────────────────────────────┘
Quick Start
Add volley-test-harness as a dev-dependency in your connector crate:
[dev-dependencies]
volley-test-harness = { path = "../volley-test-harness" }
async-trait = { workspace = true }
arrow = { workspace = true }
anyhow = { workspace = true }
Container Management
Shared testcontainers with retry logic and one-container-per-binary lifecycle via OnceCell. Containers start lazily on first use and are cleaned up when the test process exits.
Kafka
#![allow(unused)]
fn main() {
use volley_test_harness::containers::KafkaContainer;
// Starts a native ARM64 Kafka container on first call.
let servers = KafkaContainer::bootstrap_servers().await;
// Each test gets an isolated topic.
let topic = KafkaContainer::unique_topic("my-test");
KafkaContainer::create_topic(&topic, 1).await;
// Seed test data.
KafkaContainer::produce_json(&topic, &[
serde_json::json!({"name": "alice", "score": 42}),
]).await;
// Verify output.
let messages = KafkaContainer::consume_json(&topic, 1, 10).await;
assert_eq!(messages[0]["name"], "alice");
}
MinIO (S3-compatible)
#![allow(unused)]
fn main() {
use volley_test_harness::containers::{MinioContainer, MINIO_ACCESS_KEY, MINIO_SECRET_KEY};
let endpoint = MinioContainer::endpoint().await;
let bucket = MinioContainer::unique_bucket("test");
MinioContainer::create_bucket(&bucket).await;
}
Polaris (Apache Iceberg REST catalog)
#![allow(unused)]
fn main() {
use volley_test_harness::containers::PolarisContainer;
// Starts a Polaris container backed by MinIO. MinIO must be started first.
let catalog_url = PolarisContainer::catalog_url().await;
// Each test gets an isolated catalog name.
let catalog = PolarisContainer::unique_catalog("my-test");
PolarisContainer::create_catalog(&catalog).await;
}
PolarisContainer depends on MinioContainer for the underlying storage — start MinioContainer before calling any PolarisContainer method.
Connector Test Framework
Like Flink’s Connector Testing Framework — implement ConnectorTestContext for your connector and get a standardized source/sink test suite for free.
Implementing ConnectorTestContext
#![allow(unused)]
fn main() {
use volley_test_harness::connector::ConnectorTestContext;
struct MyConnectorTestContext { /* ... */ }
#[async_trait]
impl ConnectorTestContext for MyConnectorTestContext {
fn name(&self) -> &str { "MyConnector" }
fn test_schema(&self) -> Arc<Schema> { /* ... */ }
async fn seed_test_data(&self, records: &[RecordBatch]) -> anyhow::Result<()> {
// Write records into the external system.
}
async fn create_source(&self) -> anyhow::Result<Option<Box<dyn DynSource>>> {
// Create a source pointing at the seeded data.
}
async fn create_sink(&self) -> anyhow::Result<Option<Box<dyn DynSink>>> {
// Create a sink writing to a test destination.
}
async fn read_sink_results(&self) -> anyhow::Result<Vec<RecordBatch>> {
// Read back what the sink wrote for verification.
}
async fn create_empty_source(&self) -> anyhow::Result<Option<Box<dyn DynSource>>> {
// Optional: source pointing at an empty data set.
Ok(None) // Return None to skip the empty-source test.
}
}
}
Running the Suites
#![allow(unused)]
fn main() {
#[tokio::test]
async fn my_connector_source_suite() {
let ctx = MyConnectorTestContext::new().await;
volley_test_harness::connector::run_source_suite(&ctx).await;
}
#[tokio::test]
async fn my_connector_sink_suite() {
let ctx = MyConnectorTestContext::new().await;
volley_test_harness::connector::run_sink_suite(&ctx).await;
}
}
The source suite runs three tests:
- test_basic_read — seeds 5 records, verifies the source returns data
- test_read_all_records — seeds 10 records, reads until all consumed
- test_empty_source — verifies graceful handling of empty data sets
The sink suite runs three tests:
- test_basic_write — writes records, flushes, verifies they’re readable
- test_write_and_flush — multiple writes followed by flush
- test_checkpoint_triggers_flush — verifies
on_checkpoint()commits data
See volley-connector-kafka/tests/connector_suite.rs for the reference implementation.
TestCluster
An in-process pipeline execution wrapper with built-in observability capture. Registers an InMemorySpanExporter as the global OpenTelemetry tracer so spans created during pipeline execution are captured for assertions.
#![allow(unused)]
fn main() {
use volley_test_harness::cluster::TestCluster;
let cluster = TestCluster::new("my-test");
let pipeline = StreamExecutionEnvironment::new()
.from_source(source)
.with_tracing(cluster.tracing_config()) // 100% sampling, in-memory export
.with_operator(my_operator)
.with_operator_id("my-op")
.to_sink(sink);
pipeline.execute("my-pipeline").await?;
// Inspect captured spans.
let traces = cluster.traces();
println!("Captured {} spans", traces.span_count());
traces.print_timing_summary();
}
Assertion Harnesses
TraceCapture
Assert on OpenTelemetry spans captured by the TestCluster:
#![allow(unused)]
fn main() {
let traces = cluster.traces();
// Verify span counts.
traces.assert_span_count("volley.operator", 10);
// Verify trace structure.
traces.assert_valid_tree(); // No orphan spans.
// Print per-operator timing breakdown.
traces.print_timing_summary();
}
The timing summary shows per-span-name breakdown with operator identity:
Span Count Total ms Avg ms Max ms
--------------------------------------------------------------------------------
volley.record 10 559.2 55.9 100.3
volley.sink 8 72.0 9.0 11.0
volley.operator (ml-embed) 8 56.8 7.1 25.9
volley.operator (status-filter) 10 1.1 0.1 0.8
volley.operator (proto-decode) 10 0.8 0.1 0.5
volley.source 10 0.4 0.0 0.3
Use .with_operator_id("name") on your pipeline to assign human-readable names.
MetricsSnapshot
Scrape and assert on Prometheus metrics:
#![allow(unused)]
fn main() {
let metrics = MetricsSnapshot::scrape(prometheus_port).await?;
metrics.assert_counter_exists("volley_source_records_polled");
metrics.assert_counter_gte("volley_records_processed_total", 100.0);
metrics.assert_gauge_in_range("volley_pipeline_health", 1.0, 1.0);
}
ThroughputRecorder
Measure and assert on pipeline throughput:
#![allow(unused)]
fn main() {
let mut recorder = ThroughputRecorder::new();
recorder.start();
pipeline.execute("bench").await?;
recorder.stop(record_count);
recorder.assert_throughput_gte(100.0); // min records/sec
recorder.assert_elapsed_under_secs(30.0); // max wall time
println!("{:.1} records/sec", recorder.records_per_second().unwrap());
}
Test Data Generators
The data module provides test data factories:
#![allow(unused)]
fn main() {
use volley_test_harness::data::*;
// Protobuf messages (serialized bytes).
let protos = generate_crawl_results(50);
// Every 5th record is a failure (HTTP 503), rest are HTTP 200.
// Arrow RecordBatch (for non-protobuf tests).
let batch = generate_crawl_batch(50);
// The schema used by both generators.
let schema = crawl_result_schema();
}
The CrawlResult type has impl_to_arrow! so it works with auto_decode_operator::<CrawlResult>("payload").
Running Tests
All integration tests require Docker for testcontainers.
# Kafka connector suite (6 tests, ~45s)
cargo test -p volley-connector-kafka --test connector_suite -- --nocapture
# Iceberg connector suite (Polaris + MinIO, ~60s)
cargo test -p volley-connector-iceberg --test connector_suite -- --nocapture
# E2E golden path — full pipeline with windowing, metrics, traces, and ML (1 test, ~30s)
cargo test -p volley-test-harness --test e2e_golden_path -- --nocapture
# With Metal GPU disabled (CPU fallback)
cargo test -p volley-test-harness --test e2e_golden_path --no-default-features -- --nocapture
Where Tests Live
Connector-specific integration tests belong in their connector crate, importing shared infrastructure from volley-test-harness:
| Crate | Tests | Purpose |
|---|---|---|
volley-connector-kafka | tests/connector_suite.rs | Kafka source/sink through ConnectorTestContext |
volley-connector-iceberg | tests/connector_suite.rs | Iceberg source/sink through ConnectorTestContext (Polaris + MinIO) |
volley-test-harness | tests/e2e_golden_path.rs | Cross-connector end-to-end pipeline test |
The pattern: volley-test-harness provides the framework, each connector crate owns its own integration tests.
Golden Path Test Coverage
The E2E golden path test (e2e_golden_path.rs) exercises the full pipeline:
- Correctness — Kafka protobuf → decode → filter → ML embed → Kafka sink, asserting output records match expectations
- Windowed aggregation — tumbling window over the event stream with watermark advancement
- Throughput —
ThroughputRecorderasserts sustained records/sec - Metrics —
MetricsSnapshotscrapes the embedded Prometheus endpoint and asserts counters (volley_records_processed_total,volley_source_records_polled, pipeline health gauge) - Traces —
TraceCapturefromTestClusterasserts span count, valid parent–child tree, and per-operator timing viaprint_timing_summary()
Horizontal Scaling Guide
Run a single Volley pipeline across several Kubernetes pods when one machine can’t keep up — too many Kafka partitions, too much keyed state, or you need a pod failure to leave processing intact. You don’t change your pipeline code; you add a ClusterConfig, a shared ReadWriteMany checkpoint volume, and a replicas count in the operator CR. Volley handles key distribution, shuffling between pods, and live rescaling.
This page is the deployment checklist. For the design behind it (key groups, Arrow Flight shuffle, leader election), see Horizontal Scaling in Architecture.
Prerequisites
- Kubernetes cluster (v1.26+)
- A
ReadWriteManystorage class (AWS EFS, GCP Filestore, Azure Files) - The Volley K8s operator deployed (see Kubernetes Operator)
- Your pipeline built with the
distributedfeature enabled
When to Use Horizontal Scaling
Single-node mode handles most workloads. Consider horizontal scaling when:
- Your key space is too large for one node’s memory (state doesn’t fit in RAM + RocksDB)
- You need fault tolerance across nodes (pod failure shouldn’t stop the pipeline)
- Your source has more partitions than one node can consume (e.g., 100+ Kafka partitions)
If your bottleneck is CPU on stateless operators (filter, map), scale vertically first. Horizontal scaling adds network overhead for the key shuffle.
Step 1: Enable the Distributed Feature
Add the distributed feature to your pipeline’s Cargo.toml:
[dependencies]
volley-core = { version = "0.8", features = ["distributed"] }
Step 2: Configure ClusterConfig
In your pipeline code, add with_cluster() to the environment:
#![allow(unused)]
fn main() {
use volley_core::prelude::*;
// In production, use ClusterConfig::from_env() to auto-detect from K8s env vars
let cluster = ClusterConfig::from_env();
let mut env = StreamExecutionEnvironment::new();
if let Some(config) = cluster {
env = env.with_cluster(config);
}
env.from_source(source)
.key_by(col("user_id"))
.aggregate_expr(vec![sum(col("amount"))], state_backend)
.to_sink(sink)
.execute("my-pipeline")
.await?;
}
The pipeline API is identical — with_cluster() is the only addition. When ClusterConfig is absent (e.g., running locally), the pipeline runs in single-node mode with zero overhead.
Step 3: Create a Shared Checkpoint PVC
All workers need access to the same checkpoint directory. Create a ReadWriteMany PVC:
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: volley-shared-ckpt
namespace: my-pipeline
spec:
accessModes: [ReadWriteMany]
storageClassName: efs # AWS EFS, or your cloud's RWX storage class
resources:
requests:
storage: 50Gi
kubectl apply -f shared-pvc.yaml
Step 4: Deploy with the K8s Operator
Create a VolleyApplication with the cluster section:
apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
name: my-pipeline
namespace: my-pipeline
spec:
image: registry.example.com/my-pipeline:v1.0
replicas: 3
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
cluster:
maxKeyGroups: 256 # default, power of 2
flightPort: 50051 # default
sharedCheckpoints:
claimName: volley-shared-ckpt # the PVC from step 3
state:
size: 10Gi # per-worker local RocksDB
checkpoints:
size: 5Gi # per-worker local checkpoint staging
The operator automatically:
- Injects
VOLLEY_*environment variables (VOLLEY_NUM_WORKERS,VOLLEY_HEADLESS_SERVICE,VOLLEY_MAX_KEY_GROUPS,VOLLEY_FLIGHT_PORT,VOLLEY_APP_NAME) - Adds the Arrow Flight port (50051) to the container and headless service
- Mounts the shared checkpoint PVC at
/mnt/volley/checkpoints
Step 5: Verify
# Check pods are running
kubectl get pods -n my-pipeline
# NAME READY STATUS RESTARTS AGE
# my-pipeline-0 1/1 Running 0 2m
# my-pipeline-1 1/1 Running 0 1m
# my-pipeline-2 1/1 Running 0 30s
# Check env vars
kubectl exec my-pipeline-0 -n my-pipeline -- env | grep VOLLEY
# VOLLEY_NUM_WORKERS=3
# VOLLEY_HEADLESS_SERVICE=my-pipeline-headless.my-pipeline.svc
# VOLLEY_MAX_KEY_GROUPS=256
# VOLLEY_FLIGHT_PORT=50051
# VOLLEY_APP_NAME=my-pipeline
# Check Flight port on headless service
kubectl get svc my-pipeline-headless -n my-pipeline -o jsonpath='{.spec.ports[*].name}'
# health flight
# Check shared checkpoint mount
kubectl exec my-pipeline-0 -n my-pipeline -- ls /mnt/volley/checkpoints
Scaling
Change the replica count to scale:
kubectl patch vapp my-pipeline -n my-pipeline --type merge -p '{"spec":{"replicas":5}}'
The coordinator triggers a checkpoint, recomputes key group assignments, and workers pick up their new key groups from the shared filesystem. No restart required.
Monitoring
Key metrics to watch in distributed mode:
| Metric | What it tells you |
|---|---|
volley_shuffle_local_records_total | Records staying on this worker |
volley_shuffle_remote_records_total{target_worker="N"} | Records sent to each remote worker |
volley_flight_send_errors_total | Transport errors (indicates worker connectivity issues) |
volley_flight_reconnects_total{target_worker} | Successful reconnects after a HalfOpen probe — a steady trickle is a peer that keeps flapping; a single spike lines up with one pod restart |
volley_circuit_breaker_state{target_worker} | 0=Closed, 1=Open, 2=HalfOpen per remote worker |
volley_circuit_breaker_trips_total{target_worker} | Closed → Open transitions |
volley_circuit_breaker_probes_succeeded_total{target_worker} | HalfOpen → Closed transitions; ratio with the trips counter is the reconnect success rate |
volley_cluster_worker_id | This worker’s ordinal |
volley_cluster_num_workers | Total cluster size |
volley_cluster_key_groups_assigned | Key groups owned by this worker |
volley_barrier_alignment_wait_ms | Time to align barriers across all inputs |
Troubleshooting
Pipeline stalls after a pod restart: Check if the BarrierAligner is waiting for a dead input. The Flight server should clean up stale connections via TCP keepalive (10s). If the stall persists, check volley_flight_send_errors_total for rising error counts. After a peer pod restart, the HalfOpen probe rebuilds the outbound tonic::Channel, reregisters the shuffle channel, and replays every unacknowledged frame from the client’s 8 MiB per-peer outbox onto the fresh stream (the server dedups via its last_delivered_seq so survivors land exactly once). Confirm volley_flight_reconnects_total{target_worker} increments and volley_circuit_breaker_state returns to 0 (Closed). Because replay is at-least-once, the epoch in flight at the moment of the crash often recovers without aborting — check that volley_dag_sink_aborts_total stays flat across the restart window. Longer outages (a reconnect that runs past the 2PC deadline) still fall back to a cluster-wide abort, with the next epoch committing cleanly.
Outbox backpressure (sender appears to stall per-peer): The client’s outbox is byte-bounded (8 MiB / peer by default; see DEFAULT_OUTBOX_MAX_BYTES). When a remote consumer goes silent — network partition, GC pause, OOM — the encoder parks on the byte budget until ACKs trim the backlog. Symptoms: volley_shuffle_remote_records_total{target_worker="N"} flatlines while other targets keep climbing. Root-cause by looking at the peer’s volley_flight_recv_messages_total and its drain-task logs; the outbox itself is doing the right thing by pushing back instead of running the worker out of memory.
Uneven record distribution: Check volley_shuffle_remote_records_total per target worker. The key group assignment is contiguous — if your key distribution is skewed, some workers will handle more traffic. Consider increasing maxKeyGroups for finer granularity.
Checkpoint failures: Verify the shared PVC is mounted and writable on all pods:
kubectl exec my-pipeline-0 -- touch /mnt/volley/checkpoints/test && echo OK
DNS resolution failures at startup: Workers discover each other via StatefulSet DNS. If pods start before DNS propagates, the runtime retries with exponential backoff (100ms to 5s). Check logs for “Waiting for remote worker” messages.
Kafka
Volley provides a Kafka source and sink via the volley-connector-kafka crate. The sink defaults to non-transactional idempotent mode for maximum throughput, with opt-in exactly-once transactional semantics.
Setup
Add the Kafka feature to your Cargo.toml:
[dependencies]
volley-connector-kafka = { git = "https://github.com/volley-streams/volley" }
System dependencies: cmake is required for building rdkafka. OpenSSL and libcurl are vendored and statically linked — no additional system packages needed. See Installation.
Authentication
Use KafkaSecurityConfig to configure authentication for both sources and sinks. The same config struct works with .with_security() on either KafkaSourceConfig or KafkaSinkConfig.
Confluent Cloud / SASL_SSL
The most common setup for managed Kafka (Confluent Cloud, AWS MSK Serverless, Aiven):
#![allow(unused)]
fn main() {
use volley_connector_kafka::{KafkaSourceConfig, KafkaSinkConfig, KafkaSecurityConfig};
let security = KafkaSecurityConfig::sasl_ssl("API_KEY", "API_SECRET");
let source_config = KafkaSourceConfig::new(
"pkc-xxxxx.us-east-1.aws.confluent.cloud:9092", "events", "my-group", schema,
).with_security(security.clone());
let sink_config = KafkaSinkConfig::new(
"pkc-xxxxx.us-east-1.aws.confluent.cloud:9092", "output",
).with_security(security);
}
This sets security.protocol=SASL_SSL and sasl.mechanism=PLAIN, which is the correct combination for Confluent Cloud and most SASL-based providers.
SCRAM Authentication
For brokers that use SCRAM (e.g., AWS MSK with IAM disabled, self-hosted clusters):
#![allow(unused)]
fn main() {
let security = KafkaSecurityConfig::sasl_ssl("username", "password")
.with_sasl_mechanism("SCRAM-SHA-256");
}
Supported SASL mechanisms: PLAIN (default), SCRAM-SHA-256, SCRAM-SHA-512.
mTLS (Mutual TLS)
For clusters that authenticate clients via X.509 certificates:
#![allow(unused)]
fn main() {
let security = KafkaSecurityConfig::ssl()
.with_ssl_ca("/etc/kafka/ca.pem")
.with_ssl_cert("/etc/kafka/client.pem")
.with_ssl_key("/etc/kafka/client.key");
let source_config = KafkaSourceConfig::new("broker:9093", "events", "my-group", schema)
.with_security(security);
}
Key Types
| Type | Description |
|---|---|
KafkaSourceConfig | Configuration for the Kafka consumer (brokers, topic, group, format, batch size) |
KafkaSinkConfig | Configuration for the Kafka producer (brokers, topic, format, delivery mode) |
KafkaSecurityConfig | Shared authentication/TLS config for source and sink |
KafkaSourceFormat | Message format enum: Json (default), Protobuf, Avro |
KafkaSinkFormat | Serialization format enum: Json (default), Protobuf, Avro |
KafkaSource | Source that polls Kafka for records |
KafkaSink | Sink that writes records to Kafka |
KafkaEnvExt | Extension trait for StreamExecutionEnvironment |
KafkaStreamExt | Extension trait for DataStream |
ConfluentSchemaRegistry | Confluent Schema Registry HTTP client (from volley-connectors) |
env.from_kafka_protobuf::<M>() | Trait method: Kafka source + ProtobufDecodeOperator |
env.from_kafka_avro() | Trait method: Kafka source + AvroDecodeOperator + Schema Registry |
Example
use std::sync::Arc;
use std::time::Duration;
use arrow::datatypes::{DataType, Field, Schema};
use volley_core::prelude::*;
use volley_connector_kafka::{KafkaEnvExt, KafkaStreamExt, KafkaSourceConfig, KafkaSinkConfig};
#[tokio::main]
async fn main() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("user_id", DataType::Utf8, false),
Field::new("amount", DataType::Float64, false),
]));
let source_config = KafkaSourceConfig::new("localhost:9092", "events", "my-group", schema);
let sink_config = KafkaSinkConfig::new("localhost:9092", "output");
let tmp = tempfile::tempdir().unwrap();
StreamExecutionEnvironment::new()
.from_kafka(source_config).await?
.filter_expr(col("amount").gt(lit(0.0)))
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(60)))
.aggregate_expr(
vec![sum(col("amount"))],
RocksDbBackend::open(tmp.path().join("state")).unwrap(),
)
.to_kafka(sink_config).await?
.execute("kafka-aggregation-job")
.await?;
Ok(())
}
For end-to-end exactly-once Kafka (source offsets committed inside
the sink’s Kafka transaction), swap in the _exactly_once pair:
let (stream, cgm) = env.from_kafka_exactly_once(source_config).await?;
stream
// ... operators ...
.to_kafka_exactly_once(
KafkaSinkConfig::new_transactional("localhost:9092", "output", "agg-txn"),
cgm,
)
.await?
.execute("kafka-eos")
.await?;
Protobuf Format
For Protobuf-encoded Kafka messages, use KafkaSourceConfig::new_protobuf() and the from_kafka_protobuf trait method on KafkaEnvExt:
#![allow(unused)]
fn main() {
use volley_connector_kafka::{KafkaEnvExt, KafkaSourceConfig};
mod proto {
include!(concat!(env!("OUT_DIR"), "/events.rs"));
}
let config = KafkaSourceConfig::new_protobuf("localhost:9092", "events", "my-group")
.with_batch_size(100);
let stream = StreamExecutionEnvironment::new()
.from_kafka_protobuf::<proto::Event>(config).await?;
}
The source emits raw binary payloads in a Binary “payload” column. The ProtobufDecodeOperator (wired automatically by from_kafka_protobuf) decodes them into typed Arrow columns using the compiled prost type. The type M must implement ToArrow, typically via the impl_to_arrow! proc-macro from volley-derive.
Scaffolding: volley new with Kafka source and protobuf format generates build.rs, proto/events.proto, and prost dependencies.
Avro Format (Schema Registry)
For Avro-encoded Kafka messages with Confluent Schema Registry, use KafkaSourceConfig::new_avro() and the from_kafka_avro trait method:
#![allow(unused)]
fn main() {
use volley_connector_kafka::{KafkaEnvExt, KafkaSourceConfig};
let config = KafkaSourceConfig::new_avro(
"localhost:9092",
"events",
"my-group",
"http://localhost:8081", // Schema Registry URL
)
.with_schema_registry_auth("user", "password") // optional basic auth
.with_batch_size(100);
let stream = StreamExecutionEnvironment::new()
.from_kafka_avro(config).await?;
}
The source emits raw binary payloads. The AvroDecodeOperator (wired automatically by from_kafka_avro):
- Strips the Confluent 5-byte wire format header (
[0x00][4-byte schema ID][payload]) - Fetches the Avro schema from the Schema Registry by ID (cached after first fetch)
- Decodes the Avro binary into
apache_avro::types::Value - Converts directly to Arrow arrays using typed
ArrayBuilders (no JSON intermediary)
The Arrow schema is inferred from the Avro schema. Supported type mappings:
| Avro Type | Arrow Type |
|---|---|
boolean | Boolean |
int | Int32 |
long | Int64 |
float | Float32 |
double | Float64 |
string | Utf8 |
bytes | Binary |
fixed | Binary |
enum | Utf8 |
union(null, T) | T (nullable) |
Plain Avro (No Schema Registry)
For Avro messages without the Confluent wire format, use AvroDecodeOperator directly with a static schema:
#![allow(unused)]
fn main() {
use volley_connectors::decode::avro::AvroDecodeOperator;
let avro_schema = apache_avro::Schema::parse_str(r#"{ ... }"#).unwrap();
let stream = env.from_kafka(config).await?
.with_operator(AvroDecodeOperator::with_static_schema("payload", avro_schema));
}
Schema Evolution
For registry-backed Kafka sources with automatic schema evolution, use KafkaSourceConfig::new_dynamic() and from_kafka_with_schema_evolution(). This automatically chains the appropriate decode operator (Avro, Protobuf, or JSON Schema) based on the configured format and resolves schemas from the registry at runtime.
#![allow(unused)]
fn main() {
use std::sync::Arc;
use volley_connector_kafka::{KafkaEnvExt, KafkaSourceConfig, KafkaSourceFormat};
use volley_connectors::decode::{SchemaEvolutionConfig, SchemaResolutionConfig};
use volley_connectors::schema::{ConfluentSchemaRegistry, SubjectNameStrategy};
use volley_connectors::decode::wire::ConfluentWireFormat;
use volley_core::schema::SchemaEvolutionMode;
use volley_core::error_policy::ErrorPolicy;
// Registry-backed Avro source with schema evolution
let config = KafkaSourceConfig::new_dynamic("localhost:9092", "orders", "my-group")
.with_format(KafkaSourceFormat::Avro)
.with_schema_evolution(SchemaEvolutionConfig {
resolution: SchemaResolutionConfig {
registry: Arc::new(ConfluentSchemaRegistry::new("http://localhost:8081")),
wire_format: Arc::new(ConfluentWireFormat),
subject_strategy: SubjectNameStrategy::TopicName {
topic: "orders".into(),
is_key: false,
},
},
evolution_mode: SchemaEvolutionMode::Adaptive,
error_policy: ErrorPolicy::Skip,
});
let stream = env.from_kafka_with_schema_evolution(config).await?;
}
The new_dynamic() constructor does not require an upfront Arrow schema — the schema is resolved from the registry at runtime based on message headers. The with_schema_evolution() builder method configures:
- Registry: Which schema registry to query (e.g.,
ConfluentSchemaRegistry). - Wire format: How to extract the schema ID from message bytes (e.g.,
ConfluentWireFormatfor the standard 5-byte header). - Subject strategy: How to resolve topic names to registry subjects (e.g.,
TopicNameappends-valueor-key). - Evolution mode:
Strict(default, reject changes) orAdaptive(accept backward-compatible changes). - Error policy:
Fail(abort on decode error),Skip(drop failed records), orDeadLetterQueue(route failures to a DLQ sink).
A keyed variant is also available: env.from_kafka_with_schema_evolution_keyed(config).
Batch Reads
By default, KafkaSource reads one message per poll_next() call (single-row RecordBatch). For high-throughput pipelines, enable batch reads:
#![allow(unused)]
fn main() {
let config = KafkaSourceConfig::new("localhost:9092", "events", "my-group", schema)
.with_batch_size(100) // accumulate up to 100 messages
.with_batch_timeout(Duration::from_millis(50)); // wait 50ms for next message
}
The source accumulates up to batch_size messages, concatenating them into a single multi-row RecordBatch. All downstream operators handle multi-row batches correctly. Partial batches are returned on timeout (no blocking).
Batch metadata: event_time_ms is set to the max timestamp in the batch (correct for watermark advancement). For per-row event times in windowed operations, configure timestamp_column on your WindowConfig.
Consumer Tuning
Fine-tune the Kafka consumer’s fetch behavior for throughput vs. latency:
#![allow(unused)]
fn main() {
let config = KafkaSourceConfig::new("localhost:9092", "events", "my-group", schema)
.with_batch_size(500)
.with_fetch_min_bytes(32_768) // wait for 32KB of data (default: 1)
.with_fetch_max_wait_ms(100) // broker waits up to 100ms to fill (default: 500)
.with_max_partition_fetch_bytes(2_097_152); // 2MB per partition (default: 1MB)
}
| Method | Default | Effect |
|---|---|---|
with_fetch_min_bytes(bytes) | 1 | Minimum bytes the broker accumulates before responding. Higher values reduce fetch requests at the cost of latency. |
with_fetch_max_wait_ms(ms) | 500 | Max time the broker waits to fill fetch_min_bytes. Acts as an upper bound on added latency. |
with_max_partition_fetch_bytes(bytes) | 1,048,576 (1MB) | Max data returned per partition per fetch. Increase for partitions with large messages. |
with_property(key, value) | — | Escape hatch for any rdkafka consumer setting not covered above. |
Tip: For high-throughput pipelines, combine with_batch_size() with with_fetch_min_bytes() so the broker pre-aggregates data and the source fills batches faster.
Retry on Transient Errors
Both the source and sink support optional retry with exponential backoff for transient broker errors (e.g., broker unavailable, request timeouts, leader changes, network exceptions). Configure via with_retry():
#![allow(unused)]
fn main() {
use std::time::Duration;
use volley_core::resilience::RetryConfig;
// Source with retry
let source_config = KafkaSourceConfig::new("localhost:9092", "topic", "group", schema)
.with_retry(
RetryConfig::new()
.with_max_retries(5)
.with_initial_backoff(Duration::from_millis(200))
.with_max_backoff(Duration::from_secs(30)),
);
// Sink with retry
let sink_config = KafkaSinkConfig::new_transactional("localhost:9092", "output", "txn-1")
.with_retry(
RetryConfig::new()
.with_max_retries(3)
.with_initial_backoff(Duration::from_millis(100)),
);
}
When retry is not configured (the default), errors propagate immediately — existing behavior is unchanged.
Source behavior: Both single-message and batch accumulation paths retry recv() calls. Subsequent messages within a batch (after the first) are not retried — a partial batch is returned instead.
Sink behavior: Individual send() calls are retried. Only the send is retried, not the surrounding transaction. If retries are exhausted, the transaction is aborted (existing behavior).
Metrics: volley_retry_attempts_total counter with component label (kafka_source or kafka_sink).
Sink Delivery Modes
The Kafka sink supports three delivery modes, from lowest overhead to strongest guarantees:
Fire-and-Forget (acks=0)
No acknowledgment from the broker. Highest throughput, but messages can be lost:
#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new("localhost:9092", "output")
.with_acks("0");
}
When acks is "0", idempotence is automatically disabled since the broker never confirms delivery.
Idempotent (default)
The default mode. The producer uses Kafka’s idempotent delivery (enable.idempotence=true) to guarantee no duplicates from producer retries, without the overhead of transactions:
#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new("localhost:9092", "output");
}
This is the right choice for most pipelines. It guarantees at-least-once delivery with no duplicate writes from the producer side, and has no per-batch transaction commit overhead.
Transactional (exactly-once)
Opt-in via new_transactional(). Wraps writes in Kafka transactions for exactly-once semantics:
#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new_transactional("localhost:9092", "output", "my-txn-id");
}
See Exactly-Once Semantics below for details.
Producer Tuning
Fine-tune the Kafka producer for throughput vs. latency:
#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new("localhost:9092", "output")
.with_linger_ms(20) // batch for 20ms before sending (default: 5)
.with_compression("lz4") // compress batches (default: "none")
.with_acks("1"); // leader-only ack (default: "all")
}
| Method | Default | Effect |
|---|---|---|
with_acks(acks) | "all" | "all" = full ISR ack, "1" = leader only, "0" = fire-and-forget. |
with_linger_ms(ms) | 5 | How long rdkafka waits to batch messages before sending. Higher values increase batching and throughput at the cost of latency. |
with_compression(codec) | "none" | Compression codec: "none", "gzip", "snappy", "lz4", "zstd". LZ4 and Zstd offer the best throughput-to-ratio tradeoff. |
with_property(key, value) | — | Escape hatch for any rdkafka producer setting not covered above. |
Tip: For high-throughput sinks, with_linger_ms(20) + with_compression("lz4") is a good starting point. This lets rdkafka batch more messages per broker request and compress them in a single pass.
Exactly-Once Semantics
The Kafka sink supports exactly-once delivery when configured with new_transactional():
#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new_transactional("localhost:9092", "output", "my-txn-id");
}
How it works:
- Records are buffered during processing
- On checkpoint barrier, the sink commits the Kafka transaction
- On failure recovery, uncommitted transactions are aborted
- Processing resumes from the last checkpoint, replaying records
The consumer group offset is committed as part of the transaction, ensuring source and sink are in sync.
Note: Transactional mode adds a commit round-trip per checkpoint. For pipelines where at-least-once is acceptable, the default idempotent mode (
KafkaSinkConfig::new()) avoids this overhead entirely.
Trace Context Propagation
The Kafka source and sink automatically propagate W3C Trace Context (traceparent header) for distributed tracing:
- Source: If a consumed message has a
traceparentheader, the record inherits that trace context. The runtime’s sampler respects upstream sampling decisions (parent-based sampling). - Sink: If a record being written has trace context, a
traceparentheader is injected into the produced Kafka message.
This works automatically when per-record tracing is enabled via with_tracing(). No additional configuration is needed for Kafka trace propagation.
Consumer Lag Monitoring
The volley.kafka.consumer_lag metric tracks consumer group lag. Use it for KEDA autoscaling:
sum(volley_kafka_consumer_lag) by (job_name)
See Capacity Planning for KEDA trigger configuration.
Delta Lake
Write your stream to a Delta Lake table — queryable by Spark, DataFusion, Databricks, Trino, and anything else that speaks Delta. Pick this sink when downstream consumers want ACID tables with time travel and schema evolution rather than raw files. Volley commits Parquet files on every checkpoint and tags each commit with an epoch so replays after a crash never duplicate rows.
Key Types
| Type | Description |
|---|---|
DeltaSink | Sink that writes Parquet data to a Delta table |
DeltaSinkConfig | Configuration (table URI, partitioning, buffer size, file size) |
PartitionSpec | Specification for Hive-style partitioned writes |
Usage
#![allow(unused)]
fn main() {
use volley_connector_delta::{DeltaSink, DeltaSinkConfig};
let config = DeltaSinkConfig::new("s3://my-bucket/output-table");
let sink = DeltaSink::new(config).await?;
env.from_source(source)
.filter_expr(col("status").eq(lit("active")))
.to_sink(sink)
.execute("delta-writer")
.await?;
}
Configuration Reference
#![allow(unused)]
fn main() {
let config = DeltaSinkConfig::new("s3://my-bucket/output-table")
.with_pipeline_id("my-pipeline") // idempotent commit tag (default: "volley-default")
.with_max_records(50_000) // records buffered before flush (default: 10,000)
.with_target_file_size(256 * 1024 * 1024) // target Parquet file size (default: 128 MB)
.with_partition_spec(partition_spec); // Hive-style partitioning (default: none)
}
| Method | Default | Description |
|---|---|---|
with_pipeline_id() | "volley-default" | Identifies the pipeline in commit metadata for idempotent writes. Use a unique ID per pipeline writing to the same table |
with_max_records() | 10,000 | Number of records buffered in memory before writing a Parquet file. Higher values produce fewer, larger files |
with_target_file_size() | 128 MB | Target size for individual Parquet files. The sink flushes when buffered data approaches this size |
with_partition_spec() | None | Enables Hive-style partitioned writes — see Partitioned Writes |
Partitioned Writes
DeltaSink supports Hive-style partitioned writes for efficient query pruning:
#![allow(unused)]
fn main() {
use volley_connector_delta::PartitionSpec;
let config = DeltaSinkConfig::new("s3://my-bucket/events")
.with_partition_spec(PartitionSpec::columns(vec![
"date".to_string(),
"region".to_string(),
]));
}
This produces a directory layout like:
s3://my-bucket/events/
date=2026-03-31/region=us-east-1/part-0001.parquet
date=2026-03-31/region=eu-west-1/part-0002.parquet
Partition column guidelines:
- Use low-cardinality columns (date, region, status) to avoid excessive small files
- Partition columns must exist in the record schema
- Order matters: put the most selective column first for query engines
Exactly-Once Semantics
Delta commits include an epoch tag in the commit metadata. On recovery:
- The sink reads the Delta log to find the latest committed epoch for this
pipeline_id - If the current epoch was already committed, the write is skipped
- Replayed records after a failure never produce duplicate data
The pipeline_id scopes idempotency — multiple pipelines can write to the same table without conflicts as long as each uses a unique ID.
Storage Backends
Delta tables can be stored on any Delta-compatible storage:
| Storage | URI Format | Credentials |
|---|---|---|
| AWS S3 | s3://bucket/path | Default credential chain (env, ~/.aws/credentials, instance profile) |
| Azure Blob | az://container/path | DefaultAzureCredential chain |
| Local filesystem | /path/to/table | File permissions |
Performance Tuning
| Scenario | Recommendation |
|---|---|
| High throughput (>100K rec/s) | Increase with_max_records(50_000) to reduce commit frequency |
| Large records (>1 KB each) | Decrease with_max_records() or increase with_target_file_size() |
| Many partitions | Monitor file count; consider periodic compaction |
| Low latency requirements | Decrease with_max_records() for more frequent flushes |
Table Compaction
With many partitions and frequent checkpoints, Delta tables accumulate small files that degrade read performance. Run compaction periodically:
-- In a query engine (Spark, DataFusion, etc.)
OPTIMIZE 's3://my-bucket/events';
Automated compaction within the Volley sink is planned for a future release.
Troubleshooting
“Table not found” or empty table
- Delta tables are created automatically on first write if they don’t exist
- Verify the storage URI is correct and credentials have write access
Duplicate data after recovery
- Ensure each pipeline uses a unique
pipeline_id - If two pipelines share the same ID, epoch deduplication may skip valid writes
Small files accumulating
- Increase
with_max_records()to buffer more records per file - Run
OPTIMIZEperiodically from a query engine - Consider adjusting checkpoint interval to reduce commit frequency
Slow writes to S3
- S3 has higher latency per PUT than local disk; increase
with_target_file_size()to reduce PUT count - Ensure the application runs in the same AWS region as the S3 bucket
Apache Iceberg
Write your stream to an Apache Iceberg table registered in a REST catalog (Polaris, Tabular, Gravitino). Pick this sink when your organization runs on Iceberg — snapshot isolation, hidden partitioning, and engine-agnostic reads from Trino, Spark, DuckDB, and more. Volley buffers records into Parquet data files and commits a new Iceberg snapshot on every checkpoint; the epoch is written into the snapshot metadata so a replay after a crash skips already-committed epochs.
Key Types
| Type | Description |
|---|---|
IcebergSink | Sink that writes Parquet data files to an Iceberg table |
IcebergSinkConfig | Configuration (catalog URI, warehouse, namespace, table, storage) |
IcebergStorage | Storage backend enum (S3, Azure, GCS) |
Usage
#![allow(unused)]
fn main() {
use volley_connector_iceberg::{IcebergSink, IcebergSinkConfig, IcebergStorage};
let config = IcebergSinkConfig::new(
"http://polaris:8181", // REST catalog URI
"my-warehouse", // catalog warehouse
vec!["db".into()], // namespace
"events", // table name
IcebergStorage::S3 {
region: "us-east-1".into(),
bucket: "my-bucket".into(),
endpoint: None,
},
);
let sink = IcebergSink::new(config).await?;
env.from_source(source)
.filter_expr(col("status").eq(lit("active")))
.to_sink(sink)
.execute("iceberg-writer")
.await?;
}
Configuration Reference
Required Parameters
| Parameter | Description |
|---|---|
catalog_uri | REST catalog endpoint (e.g., http://polaris:8181) |
catalog_warehouse | Warehouse identifier in the catalog |
namespace | Iceberg namespace as a list of strings (e.g., ["db", "schema"]) |
table_name | Target table name |
storage | Storage backend — see Storage Backends |
Optional Parameters
#![allow(unused)]
fn main() {
let config = IcebergSinkConfig::new(catalog_uri, warehouse, namespace, table, storage)
.with_credential("client-id:client-secret") // OAuth2 credential for Polaris
.with_max_records(10_000) // buffer size before flush (default: 10,000)
.with_flush_interval(Duration::from_secs(30)); // time-based flush trigger
}
| Method | Default | Description |
|---|---|---|
with_credential() | None | OAuth2 credential for authenticated catalogs (Polaris, Tabular) |
with_max_records() | 10,000 | Number of records buffered before writing a Parquet data file |
with_flush_interval() | None | Time-based flush trigger; writes a file even if max_records hasn’t been reached |
Storage Backends
S3
#![allow(unused)]
fn main() {
IcebergStorage::S3 {
region: "us-east-1".into(),
bucket: "my-iceberg-data".into(),
endpoint: None, // use None for AWS, Some("http://localhost:9000") for MinIO
}
}
AWS credentials are resolved via the default credential chain (environment variables, ~/.aws/credentials, instance profile).
For local development with MinIO:
#![allow(unused)]
fn main() {
IcebergStorage::S3 {
region: "us-east-1".into(),
bucket: "test-bucket".into(),
endpoint: Some("http://localhost:9000".into()),
}
}
Azure Blob Storage
#![allow(unused)]
fn main() {
IcebergStorage::Azure {
storage_account: "mystorageaccount".into(),
container: "iceberg-data".into(),
access_key: None, // uses DefaultAzureCredential when None
}
}
When access_key is None, authentication uses Azure’s DefaultAzureCredential chain (environment variables, managed identity, Azure CLI).
Google Cloud Storage
#![allow(unused)]
fn main() {
IcebergStorage::Gcs {
project_id: "my-gcp-project".into(),
bucket: "iceberg-data".into(),
}
}
GCS credentials are resolved via Application Default Credentials (GOOGLE_APPLICATION_CREDENTIALS or metadata server).
Exactly-Once Semantics
The Iceberg sink achieves exactly-once via snapshot commits with epoch tracking:
- Records are buffered in memory and written as Parquet data files
- On checkpoint barrier, a new Iceberg snapshot is committed via the REST catalog
- The checkpoint epoch is stored in the snapshot’s summary metadata
- On recovery, the sink reads the latest snapshot’s epoch and skips already-committed epochs
This means replayed records after a failure never produce duplicate data in the table.
REST Catalog Compatibility
Volley uses the Iceberg REST catalog spec for table management. Compatible catalogs:
| Catalog | Notes |
|---|---|
| Apache Polaris | Recommended. Use with_credential() for OAuth2 auth |
| Tabular | Hosted REST catalog. Use with_credential() |
| AWS Glue (via REST adapter) | Requires a REST adapter in front of Glue |
| Gravitino | REST-compatible catalog |
Troubleshooting
“Connection refused” to catalog URI
- Verify the catalog is running and reachable from your application
- Check firewall rules and network policies (especially in Kubernetes)
“Unauthorized” or “Forbidden” from catalog
- Ensure
with_credential()is set with a valid OAuth2 client credential - For Polaris: verify the principal has
TABLE_WRITE_DATAprivilege on the target table
“Table not found”
- Iceberg tables must be created in the catalog before the sink writes to them
- Verify
namespaceandtable_namematch the catalog’s table identifier exactly
Small files accumulating
- Increase
with_max_records()to buffer more records per file - Set
with_flush_interval()to a longer duration - Consider running Iceberg’s
rewrite_data_filesprocedure periodically
Lance
Write your stream into a Lance dataset — the columnar format behind LanceDB, built in Rust and optimised for ML, vector search, and random-access reads. Pick this sink when downstream consumers want fast point lookups or vector indexes rather than scan-style analytics. Volley stages each batch of records, tags it with the checkpoint epoch, and commits atomically through Lance’s transaction API so replays after a crash never duplicate data.
Key Types
| Type | Description |
|---|---|
LanceSink | Sink that writes Arrow records to a Lance dataset |
LanceSinkConfig | Builder-style configuration (URI, pipeline id, file sizing, storage, recovery limit) |
LanceStorage | Enum configuring the object store backend (Local, S3, Gcs, Azure) |
LanceStreamExt | Umbrella extension trait adding DataStream::to_lance(config) |
Quick Start
Attach the sink to a pipeline via the LanceStreamExt trait re-exported
from volley_connectors:
#![allow(unused)]
fn main() {
use volley_connectors::blob_store::lance::{LanceStreamExt, LanceSinkConfig, LanceStorage};
use volley_core::prelude::*;
let config = LanceSinkConfig::new("s3://my-bucket/datasets/events")
.with_pipeline_id("events-v1")
.with_storage(LanceStorage::S3 {
region: "us-east-1".into(),
bucket: "my-bucket".into(),
endpoint: None,
access_key_id: None,
secret_access_key: None,
});
StreamExecutionEnvironment::new()
.from_kafka(kafka_cfg).await?
.filter_expr(col("status").eq(lit("active")))
.to_lance(config).await?
.execute("kafka-to-lance")
.await?;
}
Enable the umbrella feature in your Cargo.toml:
volley-connectors = { version = "0.8", features = ["lance-sink"] }
Or depend on the standalone crate directly:
volley-connector-lance = "0.8"
Configuration Reference
#![allow(unused)]
fn main() {
let config = LanceSinkConfig::new("s3://bucket/path")
.with_pipeline_id("my-pipeline") // default: "volley-default"
.with_storage(LanceStorage::Local) // default: Local
.with_max_rows_per_file(1_048_576) // default: 1_048_576
.with_max_rows_per_group(1024) // default: 1024
.with_max_bytes_per_file(90 * 1024 * 1024 * 1024) // default: 90 GB
.with_max_records_buffer(100_000) // default: 100_000
.with_recovery_history_limit(256); // default: 256
}
| Method | Default | Description |
|---|---|---|
with_pipeline_id() | "volley-default" | Stable identifier embedded in commit metadata. Used for idempotent recovery — use a unique value per pipeline writing to the same dataset |
with_storage() | LanceStorage::Local | Object store backend; see Storage Backends |
with_max_rows_per_file() | 1,048,576 | Maximum rows per Lance data file. Larger values produce fewer, bigger files |
with_max_rows_per_group() | 1,024 | Maximum rows per row group (columnar encoding unit) |
with_max_bytes_per_file() | 90 GB | Soft per-file size ceiling. Lance has a hard 100 GB limit on S3 |
with_max_records_buffer() | 100,000 | Soft in-memory row budget between checkpoints. Exceeding it logs a warning but does not force an untagged flush — all commits wait for the next checkpoint barrier so the epoch tag is always present |
with_recovery_history_limit() | 256 | Maximum number of historical Lance transactions scanned on startup to recover last_committed_epoch. Increase if you retain a long uncompacted history |
Exactly-Once Semantics
LanceSink uses a two-phase commit built on Lance’s native transaction
API:
- Stage. Buffered batches are materialized through
InsertBuilder::execute_uncommitted_stream, producing an uncommittedTransactionthat represents the staged data files without making them visible. - Tag. The sink sets the transaction’s
transaction_propertiesfield to{"volley.pipeline_id": id, "volley.epoch": N}before committing. These key-value pairs are persisted to the Lance transaction file on disk. - Commit.
CommitBuilder::execute(transaction)atomically publishes the new dataset version.
On startup, LanceSink::new calls Dataset::get_transactions to fetch up
to recovery_history_limit recent transactions, scans each one for a
matching volley.pipeline_id, and takes the maximum volley.epoch as
last_committed_epoch. Any checkpoint barrier at or below that epoch is
dropped as a duplicate on the next restart.
Note: Lance 4.0’s
CommitBuilder::with_transaction_propertiesstores the map on the builder but does not forward it to the stagedTransactionduringexecute. To work around this, the sink writes the properties directly toTransaction.transaction_properties— which is a public field on the struct — before callingexecute. This is a tracked upstream issue; when Lance fixes it, the workaround can be replaced with the documented builder method.
Storage Backends
Lance delegates I/O to object_store under the hood, and LanceStorage
maps cleanly onto its configuration keys.
| Variant | URI | Credentials |
|---|---|---|
LanceStorage::Local | filesystem path or file://… | none |
LanceStorage::S3 { region, bucket, endpoint, access_key_id, secret_access_key } | s3://bucket/path | inline fields, or AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY env vars, or IAM instance role |
LanceStorage::Gcs { project_id, bucket, service_account_json } | gs://bucket/path | inline JSON, or GOOGLE_APPLICATION_CREDENTIALS |
LanceStorage::Azure { storage_account, container, access_key } | az://container/path | inline key, or AZURE_STORAGE_ACCOUNT_KEY / DefaultAzureCredential |
Setting endpoint = Some("http://…") on S3 also emits allow_http=true
so MinIO / localstack work without extra plumbing.
Limitations
- No Hive-style partitioning in v1. Lance uses fragments internally rather than filesystem-visible partitions, and the partition story is deferred.
- No schema evolution within a single
pipeline_id— the first batch’s schema is assumed to hold for every subsequent batch. - Single writer per (
dataset_uri,pipeline_id) — concurrent writers conflict on commit and return an error rather than silently serializing. - Recovery horizon is bounded by
recovery_history_limit. If a pipeline accumulates more uncompacted versions than the limit without a successful recovery, older commits may be invisible to the scan and epochs can be re-executed. Compact regularly or raise the limit.
Troubleshooting
Recovery does not pick up the last committed epoch
— Ensure the new sink uses the same pipeline_id as the writer, and
that recovery_history_limit covers the relevant version range. A
warn! log is emitted if a transaction’s volley.epoch property is
malformed.
Build fails with “Could not find protoc”
— Lance’s build scripts compile protobuf definitions at build time.
Install the protoc binary (apt-get install protobuf-compiler on
Debian/Ubuntu, brew install protobuf on macOS) before building.
Example
The standalone memory_to_lance example was retired in the 1.0
example cleanup (connector-specific examples moved out of
volley-examples). The shape is identical to any other sink:
use volley_connector_lance::LanceStreamExt;
use volley_connectors::memory::MemorySource;
use volley_core::prelude::*;
StreamExecutionEnvironment::new()
.from_source(MemorySource::new(records))
.to_lance(config) // `LanceStreamExt` installs the sink
.execute("trades-to-lance")
.await?;
to_lance(config) commits on every checkpoint barrier and on the final
shutdown barrier — bounded sources are covered even if a periodic
checkpoint hasn’t fired. For partitioning, vector-index creation, and
compaction, see the tracked roadmap in TODOS.md.
AWS S3
Ingest files from an S3 bucket as a streaming source. Drop new objects in the bucket, and S3’s event notifications push their keys to an SQS queue; the volley-connector-aws-s3 crate polls that queue, reads each object, decodes it (Parquet, JSON, CSV, or Avro), and emits Arrow record batches. Use this when your upstream system writes files to S3 rather than publishing to Kafka.
Architecture
SQS Queue S3 Bucket
(object notifications) (data files)
| |
v v
SqsNotificationSource S3BlobReader
| |
+----------+ +----------+
| |
v v
BlobSource
|
v
Decoder (Parquet / JSON / CSV / Avro)
|
v
RecordBatch --> Pipeline
- S3 sends object-created notifications to an SQS queue
SqsNotificationSourcepolls the queue for new notificationsS3BlobReaderreads the object data from S3- The shared decoder layer converts the data to Arrow RecordBatch
Key Types
| Type | Description |
|---|---|
S3Source | Configured S3 source (combines SQS + S3 reader) |
S3BlobReader | Reads object data from S3 |
SqsNotificationSource | Polls SQS for S3 event notifications |
Supported Formats
| Format | File Extensions |
|---|---|
| Parquet | .parquet |
| JSON | .json, .jsonl |
| CSV | .csv |
| Avro | .avro |
Format detection is automatic based on file extension, or can be configured explicitly.
Usage
#![allow(unused)]
fn main() {
use volley_connector_aws_s3::S3Source;
use volley_core::prelude::*;
let source = S3Source::builder()
.sqs_queue_url("https://sqs.us-east-1.amazonaws.com/123456789/my-queue")
.build()
.await?;
env.from_source(source)
.filter_expr(col("status").eq(lit("active")))
.to_sink(sink)
.execute("s3-ingest")
.await?;
}
AWS credentials are resolved via the standard AWS credential chain (environment variables, IAM role, credential file).
Blob Store Connectors
The volley-connector-blob-store crate provides a cloud-agnostic abstraction for reading from object storage. Azure Blob Storage and Google Cloud Storage are built on this shared foundation.
Shared Architecture
All cloud blob store connectors follow the same pattern:
Cloud Queue/Topic Cloud Storage
(SQS / Azure Queue) (S3 / Azure Blob)
| |
v v
NotificationSource BlobReader
| |
+--------+ +-----------+
| |
v v
BlobSource
|
v
Decoder (Parquet / JSON / CSV / Avro)
|
v
RecordBatch --> Pipeline
To add a new cloud backend, implement NotificationSource and BlobReader. The decoder layer is shared.
Azure Blob Storage
From volley-connector-azure-blob:
| Type | Description |
|---|---|
AzureBlobSourceBuilder | Builder with automatic credential discovery |
AzureBlobSource | Configured Azure Blob source |
AzureBlobReader | Reads blobs from Azure Blob Storage |
AzureQueueNotificationSource | Polls Azure Queue Storage for blob notifications |
Enable with feature flag blob-store-azure on volley-connectors.
Authentication
By default, Azure connectors use DefaultAzureCredential which discovers credentials automatically. The credential chain tries, in order:
- Environment variables —
AZURE_TENANT_ID,AZURE_CLIENT_ID,AZURE_CLIENT_SECRET - Workload Identity — Kubernetes pods with Azure Workload Identity federation
- Managed Identity — Azure VM, App Service, or Container Instance identity
- Azure CLI — Cached
az logincredentials (local development)
To override with an explicit storage account key, use with_access_key():
#![allow(unused)]
fn main() {
use volley_connector_azure_blob::AzureBlobSourceBuilder;
// DefaultAzureCredential (recommended for production)
let source = AzureBlobSourceBuilder::new("myaccount", "my-queue", "my-container")
.build(config).await?;
// Explicit access key (override)
let source = AzureBlobSourceBuilder::new("myaccount", "my-queue", "my-container")
.with_access_key("MYACCESSKEY...")
.build(config).await?;
}
Google Cloud Storage
GCS is supported as a sink only. The previous Pub/Sub-driven source was
removed in v1.1.0 because the google-cloud-pubsub streaming pull API was not
stable enough for production use.
From volley-connector-gcp-gcs:
| Type | Description |
|---|---|
GcsBlobWriter | Writes objects to GCS (for use with BufferedBlobSink) |
Enable with feature flag blob-store-gcs on volley-connectors. GCS credentials
are discovered through Application Default Credentials
(GOOGLE_APPLICATION_CREDENTIALS, GKE Workload Identity, GCE metadata, or
gcloud auth application-default login).
Blob Store Sinks
The volley-connectors crate also provides buffered blob store sinks for writing output files to S3, Azure Blob Storage, or GCS. These sinks batch records and flush them as files based on record count or time interval.
Encoders
| Encoder | Output Format | File Extension |
|---|---|---|
ParquetEncoder | Apache Parquet | .parquet |
JsonLinesEncoder | JSON Lines (newline-delimited JSON) | .jsonl |
CsvEncoder | CSV with optional header | .csv |
Usage
#![allow(unused)]
fn main() {
use volley_connectors::blob_store::{BufferedBlobSink, BufferedBlobSinkConfig};
use volley_connectors::blob_store::encoders::ParquetEncoder;
use volley_connectors::blob_store::aws::S3BlobWriter;
let encoder = Box::new(ParquetEncoder::new());
let config = BufferedBlobSinkConfig::new("my-bucket", "output/")
.with_max_records(100_000)
.with_flush_interval(Duration::from_secs(60));
let sink = BufferedBlobSink::new(
Box::new(S3BlobWriter::new(s3_client)),
encoder,
config,
);
}
For Azure Blob Storage, use AzureBlobWriter instead of S3BlobWriter:
#![allow(unused)]
fn main() {
use volley_connectors::blob_store::azure::AzureBlobWriter;
let sink = BufferedBlobSink::new(
Box::new(AzureBlobWriter::new(blob_service_client, "my-container")),
encoder,
config,
);
}
For Google Cloud Storage, use GcsBlobWriter:
#![allow(unused)]
fn main() {
use volley_connectors::blob_store::gcs::GcsBlobWriter;
let sink = BufferedBlobSink::new(
Box::new(GcsBlobWriter::new(gcs_client)),
encoder,
config,
);
}
Enable with feature flags blob-store-aws, blob-store-azure, or blob-store-gcs on volley-connectors.
Supported Source Formats
All blob store sources share the same decoder layer:
| Format | File Extensions |
|---|---|
| Parquet | .parquet |
| JSON | .json, .jsonl |
| CSV | .csv |
| Avro | .avro |
Feature Flags
[dependencies]
volley-connectors = {
git = "https://github.com/volley-streams/volley",
features = ["blob-store-azure", "blob-store-gcs"]
}
See also: AWS S3 for the S3-specific connector.
ADBC (Arrow Database Connectivity)
Volley provides ADBC-based database connectivity via two crates:
volley-connector-adbc– A sink that writes Arrow RecordBatches to any database with an ADBC driver (PostgreSQL, BigQuery, Snowflake, Clickhouse, Databricks, MySQL, Redshift, Exasol, and more).volley-enrichment-adbc– A real-time enrichment operator that queries external databases to enrich streaming records with lookup data, backed by an LRU+TTL cache.
ADBC uses a C-API driver model (dlopen/dlsym), so adding support for a new database only requires installing the corresponding ADBC driver shared library – no connector-specific code needed.
Setup
Add crate dependencies
[dependencies]
# Sink (write to a database)
volley-connector-adbc = { git = "https://github.com/volley-streams/volley" }
# Enrichment operator (query a database to enrich records)
volley-enrichment-adbc = { git = "https://github.com/volley-streams/volley" }
Install the ADBC driver shared library
ADBC drivers are platform-specific shared libraries. Install the driver for your target database:
PostgreSQL:
# macOS (Homebrew)
brew install apache-arrow-adbc-driver-postgresql
# Ubuntu/Debian
apt-get install libadbc-driver-postgresql
# Or download from https://github.com/apache/arrow-adbc/releases
BigQuery:
# Download the adbc_driver_bigquery shared library from
# https://github.com/apache/arrow-adbc/releases
The driver name (e.g., "adbc_driver_postgresql") is resolved via platform library search paths (LD_LIBRARY_PATH on Linux, DYLD_LIBRARY_PATH on macOS). Alternatively, pass a full file path to the shared library.
ADBC Sink
The sink writes Arrow RecordBatches to a database table. It supports insert (bulk ingest) and upsert (ON CONFLICT) modes with write-behind buffering.
Basic insert
#![allow(unused)]
fn main() {
use volley_connector_adbc::{AdbcConfig, AdbcSinkConfig, AdbcStreamExt};
let adbc = AdbcConfig::new("adbc_driver_postgresql", "postgresql://localhost/mydb")
.with_pool_size(4);
let sink_config = AdbcSinkConfig::new(adbc, "events")
.with_batch_size(5000)
.with_create_table(true);
env.from_kafka(source_config).await?
.filter_expr(col("status").eq(lit("active")))
.to_adbc(sink_config).await?
.execute("kafka-to-postgres").await?;
}
Upsert mode
Set with_upsert_key() to enable upsert semantics. The sink auto-generates INSERT ... ON CONFLICT (key) DO UPDATE SET SQL:
#![allow(unused)]
fn main() {
let sink_config = AdbcSinkConfig::new(adbc, "events")
.with_upsert_key("event_id")
.with_batch_size(5000);
}
For composite (multi-column) keys, use with_upsert_keys():
#![allow(unused)]
fn main() {
let sink_config = AdbcSinkConfig::new(adbc, "events")
.with_upsert_keys(vec!["tenant_id", "event_id"])
.with_batch_size(5000);
}
For databases with non-standard upsert syntax, provide custom SQL:
#![allow(unused)]
fn main() {
let sink_config = AdbcSinkConfig::new(adbc, "events")
.with_upsert_query(
"INSERT INTO events (id, name, value) VALUES ($1, $2, $3) \
ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, value = EXCLUDED.value"
);
}
Sink configuration reference
| Method | Default | Description |
|---|---|---|
AdbcSinkConfig::new(adbc, table) | – | Required: ADBC config and target table name |
.with_upsert_key(column) | None | Single column for upsert (ON CONFLICT) semantics. None = insert-only |
.with_upsert_keys(vec) | None | Multiple columns for composite upsert (ON CONFLICT). Overrides with_upsert_key |
.with_upsert_query(sql) | None | Custom SQL for upsert. Overrides auto-generated SQL |
.with_batch_size(n) | 1000 | Number of buffered rows before auto-flush |
.with_create_table(bool) | false | Auto-create the table from the first batch’s Arrow schema |
Delivery guarantees
- At-least-once (default) –
on_checkpoint()flushes buffered batches. If the pipeline crashes after flushing but before the checkpoint completes, some records may be written twice on recovery. - Effectively exactly-once via upsert – When
upsert_keysis set (viawith_upsert_key()orwith_upsert_keys()), duplicate writes are idempotent. Choose a natural key that makes re-delivery safe.
ADBC Enrichment Operator
The enrichment operator queries an external database to enrich streaming records. It extracts lookup keys from each batch, issues a single micro-batch query, and appends enrichment columns to the output. An LRU+TTL cache avoids redundant queries.
Key-column lookup
The simplest mode: specify a stream column and a remote table. The operator generates SELECT [columns] FROM table WHERE target_key IN (k1, k2, ...).
#![allow(unused)]
fn main() {
use volley_enrichment_adbc::{AdbcEnrichConfig, AdbcEnrichStreamExt, EnrichMissStrategy};
use volley_connector_adbc::AdbcConfig;
use std::time::Duration;
let pg = AdbcConfig::new("adbc_driver_postgresql", "postgresql://localhost/mydb");
env.from_kafka(source_config).await?
.enrich_with_adbc(
AdbcEnrichConfig::new(pg, "customers", "customer_id")
.with_select_columns(vec!["tier", "region", "lifetime_value"])
.with_prefix("customer_")
.with_cache_ttl(Duration::from_secs(300))
.with_on_miss(EnrichMissStrategy::NullFill)
)
.filter_expr(col("customer_tier").eq(lit("premium")))
.to_kafka(sink_config).await?
.execute("enrich-orders").await?;
}
Parameterized query
For complex lookups (joins, filters, computed columns), provide a SQL template with IN $1:
#![allow(unused)]
fn main() {
let bq = AdbcConfig::new("adbc_driver_bigquery", "bigquery://project-id");
env.from_kafka(source_config).await?
.enrich_with_adbc(
AdbcEnrichConfig::new_with_query(
bq,
"SELECT category, risk_score FROM products WHERE sku IN $1 AND active = true",
"product_sku"
)
)
.to_adbc(AdbcSinkConfig::new(pg, "enriched_events")).await?
.execute("enrich-and-store").await?;
}
Miss strategies
When a lookup key has no match in the database:
| Strategy | Behavior |
|---|---|
NullFill (default) | Add enrichment columns filled with nulls |
DropRecord | Drop the record entirely |
PassThrough | Keep the original record without enrichment columns |
Enrichment configuration reference
| Method | Default | Description |
|---|---|---|
AdbcEnrichConfig::new(adbc, table, key) | – | Key-column lookup mode |
AdbcEnrichConfig::new_with_query(adbc, sql, key) | – | Parameterized query mode |
.with_target_key(column) | Same as lookup_key | Remote column to match against (if different from the stream column) |
.with_select_columns(vec) | All columns | Which columns to fetch from the remote table |
.with_prefix(prefix) | None | Prefix for enriched columns (e.g., "customer_") |
.with_cache_ttl(duration) | 5 minutes | Time-to-live for cached entries |
.with_cache_max_entries(n) | 10,000 | Maximum entries in the LRU cache |
.with_on_miss(strategy) | NullFill | Strategy for unmatched records |
.with_prefetch(bool) | false | Overlap DB queries with batch processing for reduced latency |
Supported key types
The lookup key column can be Utf8 (string), Int32, or Int64. Integer keys are interpolated into SQL without quoting and converted to strings internally for cache lookups.
Cache behavior
The enrichment operator uses an LRU cache with per-entry TTL:
- Cache hits return the stored enrichment row without querying the database.
- Cache misses (key not found) are cached as
Noneto avoid repeated lookups for non-existent keys. - TTL expiry evicts stale entries so the operator picks up changes in the remote database.
- On recovery, the cache starts cold and repopulates organically. Enrichment data is external and authoritative, so there is no state to recover.
Connection Configuration Reference
AdbcConfig is shared between the sink and enrichment operator:
| Method | Default | Description |
|---|---|---|
AdbcConfig::new(driver, uri) | – | Required: driver name/path and connection URI |
AdbcConfig::sqlite(path) | – | Convenience: SQLite driver with database path |
AdbcConfig::flight_sql(uri) | – | Convenience: Flight SQL driver with gRPC endpoint |
.with_option(key, value) | {} | Add a driver-specific key-value option |
.with_pool_size(n) | 4 | Connection pool size (round-robin) |
.with_health_check(config) | enabled, 30s, SELECT 1 | Connection health check configuration |
Driver name resolution: If the driver name contains / or \, it is treated as a file path. Otherwise, it is resolved via platform library search paths (e.g., LD_LIBRARY_PATH, DYLD_LIBRARY_PATH).
Common driver names:
| Database | Driver Name |
|---|---|
| PostgreSQL | adbc_driver_postgresql |
| BigQuery | adbc_driver_bigquery |
| Snowflake | adbc_driver_snowflake |
| SQLite | adbc_driver_sqlite |
| Flight SQL | adbc_driver_flightsql |
Observability
| Metric | Type | Description |
|---|---|---|
adbc_sink_rows_written_total | counter | Rows written, by table |
adbc_sink_flush_duration_seconds | histogram | Flush latency |
adbc_connection_reconnects_total | counter | Pool connections replaced by health check |
adbc_enrich_cache_hit_total | counter | Cache hits |
adbc_enrich_cache_miss_total | counter | Cache misses |
adbc_enrich_cache_size | gauge | Current cache entry count |
adbc_enrich_query_duration_seconds | histogram | Micro-batch query latency |
adbc_enrich_query_errors_total | counter | Failed enrichment queries |
Integration Testing
The ADBC integration tests require Docker (for PostgreSQL via testcontainers) and the adbc_driver_postgresql shared library.
Set the ADBC_POSTGRESQL_DRIVER environment variable if the driver is not on the default library search path:
# Point to the driver if not on LD_LIBRARY_PATH
export ADBC_POSTGRESQL_DRIVER=/usr/local/lib/libadbc_driver_postgresql.so
# Run the sink integration tests
cargo test -p volley-connector-adbc --test sink_integration -- --ignored
# Run the enrichment integration tests
cargo test -p volley-enrichment-adbc --test enrich_integration -- --ignored
Tests use volley-test-harness::containers::PostgresContainer to spin up a PostgreSQL instance automatically.
Troubleshooting
“Failed to load ADBC driver”
- Verify the driver shared library is installed and on the library search path.
- On macOS: check
DYLD_LIBRARY_PATH. On Linux: checkLD_LIBRARY_PATH. - Try passing the full file path instead of just the driver name.
“Failed to create ADBC connection”
- Verify the connection URI is correct and the database is reachable.
- Check that driver-specific options (credentials, project ID, etc.) are set via
with_option().
Slow enrichment queries
- Increase
cache_ttlandcache_max_entriesto reduce query frequency. - Add an index on the lookup column in the remote database.
- Use
with_select_columns()to fetch only the columns you need.
Duplicate rows after recovery
- Use
with_upsert_key()to make writes idempotent on a natural key. - Without upsert, the sink provides at-least-once delivery.
ScyllaDB / Cassandra
Volley provides ScyllaDB (and Cassandra-compatible) integration via two crates:
volley-connector-scylla— A sink that writes ArrowRecordBatches to a ScyllaDB table using micro-batch CQL writes, with full 2PC support and first-class vector column handling.volley-enrichment-scylla— A real-time enrichment operator that queries ScyllaDB to enrich streaming records, with LRU+TTL caching, compound key support, and ANN vector similarity search.
Both crates use the scylla Rust driver, which provides native token-aware routing and efficient CQL batch execution.
Targets: ScyllaDB 5.x / 6.x and Apache Cassandra 4.x / 5.x.
Setup
Add crate dependencies
[dependencies]
# Sink (write to ScyllaDB)
volley-connector-scylla = { git = "https://github.com/volley-streams/volley" }
# Enrichment operator (query ScyllaDB to enrich records)
volley-enrichment-scylla = { git = "https://github.com/volley-streams/volley" }
Prerequisites
A running ScyllaDB (or Cassandra) cluster is required. For local development, start a single node with Docker:
docker run --rm -d -p 9042:9042 --name scylla scylladb/scylla:6.1
ScyllaDB Sink
The sink buffers incoming rows and flushes them as CQL batch statements. It supports append mode (INSERT), upsert mode (INSERT IF NOT EXISTS), and checkpoint-aligned 2PC.
Basic append
#![allow(unused)]
fn main() {
use volley_connector_scylla::{ScyllaConfig, ScyllaSinkConfig, ScyllaStreamExt};
let scylla = ScyllaConfig::new(vec!["127.0.0.1:9042"])
.with_keyspace("my_keyspace");
let sink_config = ScyllaSinkConfig::new(scylla, "events")
.with_batch_size(500)
.with_create_table(true);
env.from_kafka(source_config).await?
.filter_expr(col("status").eq(lit("active")))
.to_scylla(sink_config).await?
.execute("kafka-to-scylla").await?;
}
Upsert mode
Use ScyllaSinkConfig::new_upsert() for idempotent writes. Duplicate records
with the same partition key are silently discarded:
#![allow(unused)]
fn main() {
let sink_config = ScyllaSinkConfig::new_upsert(scylla, "events", vec!["event_id"])
.with_batch_size(500);
}
For tables with a composite primary key, pass all key columns:
#![allow(unused)]
fn main() {
let sink_config = ScyllaSinkConfig::new_upsert(scylla, "events", vec!["tenant_id", "event_id"])
.with_batch_size(500);
}
Auto-create table
When with_create_table(true) is set, the sink issues a CREATE TABLE IF NOT EXISTS
DDL statement on startup. The table schema is derived from the Arrow schema of the
first batch. Arrow-to-CQL type mappings:
| Arrow type | CQL type |
|---|---|
Int32 | int |
Int64 | bigint |
Float32 | float |
Float64 | double |
Utf8 / LargeUtf8 | text |
Boolean | boolean |
Timestamp(ms, ...) | timestamp |
FixedSizeList<Float16/32/64, N> | vector<float, N> |
Vector columns
ScyllaDB’s native vector<float> type is fully supported. Arrow
FixedSizeList columns with Float16, Float32, or Float64 children are
automatically serialised as vector values. This is the recommended way to
store embedding vectors for ANN search:
#![allow(unused)]
fn main() {
// Arrow schema with a vector embedding column
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new(
"embedding",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, false)),
384,
),
false,
),
]));
let sink_config = ScyllaSinkConfig::new(scylla, "embeddings")
.with_create_table(true)
.with_vector_index("embedding"); // CREATE CUSTOM INDEX for ANN
}
Sink configuration reference
| Method | Default | Description |
|---|---|---|
ScyllaSinkConfig::new(config, table) | — | Append mode. Required: ScyllaDB config and target table name |
ScyllaSinkConfig::new_upsert(config, table, keys) | — | Upsert mode. keys lists partition + clustering key columns |
.with_batch_size(n) | 500 | Number of buffered rows before auto-flush |
.with_create_table(bool) | false | Auto-create the table from the first batch’s Arrow schema |
.with_vector_index(column) | None | Create a ScyllaDB custom ANN index on this vector column |
.with_ttl(seconds) | None | Per-row TTL in seconds (USING TTL n on every INSERT) |
Delivery guarantees
- Append mode (default) — at-least-once. On checkpoint, buffered rows are flushed. If the pipeline crashes after the flush but before the checkpoint completes, the rows may be written again on recovery.
- Upsert mode — effectively exactly-once for tables with a natural primary key. Duplicate INSERTs on recovery are idempotent because ScyllaDB discards rows whose partition key already exists.
- 2PC (transactional pipelines) —
prepare_commitstages the CQL batch;finalize_commitexecutes it;abort_commitdiscards it. Sinks in a multi-sink DAG all commit atomically per epoch.
ScyllaDB Enrichment Operator
The enrichment operator queries ScyllaDB to join lookup data into the stream.
For each incoming batch it extracts unique keys, issues per-key CQL SELECT
queries (batched via async concurrency), and left-joins the results back into
the batch. An LRU+TTL cache avoids redundant queries.
Simple key lookup
#![allow(unused)]
fn main() {
use volley_enrichment_scylla::{ScyllaEnrichConfig, ScyllaEnrichStreamExt, EnrichMissStrategy};
use volley_connector_scylla::ScyllaConfig;
use std::time::Duration;
let scylla = ScyllaConfig::new(vec!["127.0.0.1:9042"])
.with_keyspace("my_keyspace");
env.from_kafka(source_config).await?
.enrich_with_scylla(
ScyllaEnrichConfig::new(scylla, "customers", "customer_id")
.with_select_columns(vec!["tier", "region", "lifetime_value"])
.with_prefix("customer_")
.with_cache_ttl(Duration::from_secs(300))
.with_on_miss(EnrichMissStrategy::NullFill)
)
.filter_expr(col("customer_tier").eq(lit("premium")))
.to_kafka(sink_config).await?
.execute("enrich-orders").await?;
}
This issues SELECT tier, region, lifetime_value FROM customers WHERE customer_id = ?
for each unique key in the batch.
Compound key lookup
For tables with a composite primary key, specify all key columns and the corresponding stream columns:
#![allow(unused)]
fn main() {
ScyllaEnrichConfig::new_compound(
scylla,
"order_items",
vec![
("order_id", "order_id"), // (stream_column, table_column)
("item_sku", "sku"),
],
)
.with_select_columns(vec!["price", "weight_kg"])
.with_prefix("item_")
}
This issues SELECT price, weight_kg FROM order_items WHERE order_id = ? AND sku = ?.
ANN vector similarity search
For vector similarity search, use ScyllaEnrichConfig::new_ann(). The query
vector is read from a designated column in the stream batch:
#![allow(unused)]
fn main() {
ScyllaEnrichConfig::new_ann(
scylla,
"product_embeddings", // table
"embedding", // vector column in the table
"query_embedding", // vector column in the stream batch
)
.with_select_columns(vec!["product_id", "title", "category"])
.with_ann_limit(10) // LIMIT n in the ANN query
.with_prefix("nearest_")
}
This issues SELECT product_id, title, category FROM product_embeddings ORDER BY embedding ANN OF ? LIMIT 10
for each row in the batch, enabling real-time semantic search enrichment.
Miss strategies
When a lookup key has no match in ScyllaDB:
| Strategy | Behavior |
|---|---|
NullFill (default) | Add enrichment columns filled with nulls |
DropRecord | Drop the record entirely |
PassThrough | Keep the original record without enrichment columns |
Enrichment configuration reference
| Method | Default | Description |
|---|---|---|
ScyllaEnrichConfig::new(config, table, key) | — | Simple-key lookup mode |
ScyllaEnrichConfig::new_compound(config, table, key_pairs) | — | Compound-key lookup mode |
ScyllaEnrichConfig::new_ann(config, table, table_vec_col, stream_vec_col) | — | ANN vector search mode |
.with_select_columns(vec) | All columns | Columns to fetch from the remote table |
.with_prefix(prefix) | None | Prefix for enriched columns (e.g., "customer_") |
.with_cache_ttl(duration) | 5 minutes | Time-to-live for cached lookup entries |
.with_cache_max_entries(n) | 10,000 | Maximum entries in the LRU cache |
.with_on_miss(strategy) | NullFill | Strategy for unmatched records |
.with_prefetch(bool) | false | Overlap ScyllaDB queries with batch processing for reduced latency |
.with_ann_limit(n) | 5 | Number of ANN results to return per query (ANN mode only) |
Connection configuration reference
ScyllaConfig is shared between the sink and enrichment operator:
| Method | Default | Description |
|---|---|---|
ScyllaConfig::new(contact_points) | — | Required: list of "host:port" seed nodes |
.with_keyspace(keyspace) | None | Default keyspace for all queries |
.with_username(user) | None | CQL authentication username |
.with_password(pass) | None | CQL authentication password |
.with_connect_timeout(duration) | 5 s | Connection establishment timeout |
.with_request_timeout(duration) | 30 s | Per-query timeout |
.with_consistency(level) | LocalQuorum | CQL consistency level for writes |
Full pipeline example
A Kafka-to-ScyllaDB pipeline that enriches incoming order events with product data before writing to a destination table:
use volley_core::prelude::*;
use volley_connector_kafka::{KafkaEnvExt, KafkaStreamExt};
use volley_connector_scylla::{ScyllaConfig, ScyllaSinkConfig, ScyllaStreamExt};
use volley_enrichment_scylla::{ScyllaEnrichConfig, ScyllaEnrichStreamExt};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
let scylla = ScyllaConfig::new(vec!["scylla-node1:9042", "scylla-node2:9042"])
.with_keyspace("ecommerce")
.with_username("volley")
.with_password("secret");
let source_config = KafkaSourceConfig::new(
"broker:9092",
"orders",
"volley-order-processor",
schema,
);
let sink_config = ScyllaSinkConfig::new_upsert(
scylla.clone(),
"enriched_orders",
vec!["order_id"],
)
.with_batch_size(1000)
.with_create_table(true);
StreamExecutionEnvironment::new()
.with_checkpoints(CheckpointConfig::new(Duration::from_secs(30)))
.from_kafka(source_config).await?
.enrich_with_scylla(
ScyllaEnrichConfig::new(scylla, "products", "product_id")
.with_select_columns(vec!["name", "category", "unit_price"])
.with_prefix("product_")
.with_cache_ttl(Duration::from_secs(600))
)
.filter_expr(col("product_category").eq(lit("electronics")))
.select_expr(vec![
col("order_id"),
col("customer_id"),
col("product_name"),
col("product_unit_price"),
col("quantity"),
(col("product_unit_price") * col("quantity")).alias("total"),
])
.to_scylla(sink_config).await?
.execute("orders-enrich-to-scylla")
.await?;
Ok(())
}
Observability
| Metric | Type | Description |
|---|---|---|
scylla_sink_rows_written_total | counter | Rows written, labelled by table |
scylla_sink_batch_duration_seconds | histogram | CQL batch execution latency |
scylla_sink_retries_total | counter | Batch retries due to transient errors |
scylla_enrich_cache_hit_total | counter | Cache hits |
scylla_enrich_cache_miss_total | counter | Cache misses |
scylla_enrich_cache_size | gauge | Current cache entry count |
scylla_enrich_query_duration_seconds | histogram | Per-key query latency |
scylla_enrich_query_errors_total | counter | Failed enrichment queries |
Integration testing
The integration tests require Docker and a ScyllaDB image. They run via
volley-test-harness which starts a scylladb/scylla container automatically
via testcontainers:
# Run sink integration tests
cargo test -p volley-connector-scylla --test sink_integration -- --ignored
# Run enrichment integration tests
cargo test -p volley-enrichment-scylla --test enrich_integration -- --ignored
Troubleshooting
“Unable to connect to any node”
- Verify the contact points include the correct host and port (default: 9042).
- Check network reachability from the pipeline process to the ScyllaDB nodes.
- For Docker, ensure the container exposes port 9042 and the pipeline connects to the host IP.
“Keyspace does not exist”
- Create the keyspace before starting the pipeline:
CREATE KEYSPACE my_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
Slow enrichment queries
- Increase
cache_ttlandcache_max_entriesto reduce query frequency. - Ensure the lookup column is the partition key (range queries on non-key columns require ALLOW FILTERING and are slow).
- Use
with_prefetch(true)to overlap ScyllaDB I/O with downstream processing.
ANN search returns no results
- Verify a custom index exists on the vector column:
CREATE CUSTOM INDEX ON table_name (vector_col) USING 'StorageAttachedIndex'; - ANN search requires ScyllaDB 6.0+ or the SAI extension on Cassandra 5+.
Kubernetes Operator
Volley includes a Kubernetes operator that automates deployment and management of Volley stream processing applications. Define a VolleyApplication custom resource and the operator handles StatefulSets, persistent storage, health checks, and Prometheus integration.
Prerequisites
- Kubernetes cluster (v1.26+)
- prometheus-operator (optional, for automatic metrics scraping)
- KEDA (optional, for autoscaling)
Installing the Operator
Build and deploy the operator:
# Build the operator binary
cargo build --release -p volley-k8s-operator
# Generate the CRD manifest
./target/release/volley-k8s-operator generate-crd > volley-crd.json
# Apply the CRD to your cluster
kubectl apply -f volley-crd.json
# Run the operator (in-cluster or locally for development)
./target/release/volley-k8s-operator
Deploying a Volley Application
Create a VolleyApplication resource:
apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
name: my-pipeline
namespace: default
spec:
# Container image (required)
image: registry.example.com/my-volley-app:v1.0
# Resource requirements (required)
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
# RocksDB state storage
state:
storageClassName: fast-ssd
size: 10Gi
# Checkpoint storage
checkpoints:
storageClassName: fast-ssd
size: 20Gi
# Environment variables
env:
- name: RUST_LOG
value: "info"
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka.default.svc:9092"
kubectl apply -f my-pipeline.yaml
The operator creates the following resources automatically:
| Resource | Purpose |
|---|---|
| StatefulSet | Runs the app with stable pod identity and persistent volumes |
| PVC (state) | Persistent storage for RocksDB state (/var/lib/volley/state) |
| PVC (checkpoints) | Persistent storage for checkpoints (/var/lib/volley/checkpoints) |
| Service (headless) | StatefulSet DNS for stable network identity |
| Service (metrics) | Exposes the Prometheus metrics port |
| ServiceMonitor | Prometheus-operator auto-discovery for metrics scraping |
| PodDisruptionBudget | Prevents involuntary eviction during checkpointing |
Health Checks
The operator automatically configures liveness and readiness probes backed by Volley’s built-in HealthReporter:
| Probe | Path | Behaviour |
|---|---|---|
| Liveness | /health (port 8080) | Returns healthy when pipeline is Running or Starting |
| Readiness | /ready (port 8080) | Returns ready only when pipeline is Running |
Customize the health check configuration:
spec:
health:
port: 8080
livenessPath: /health
readinessPath: /ready
initialDelaySeconds: 30
periodSeconds: 10
Observability
Metrics are exposed on port 9090 by default. When prometheus-operator is installed, the operator creates a ServiceMonitor for automatic scraping:
spec:
observability:
metricsPort: 9090
metricsPath: /metrics
serviceMonitor:
enabled: true
interval: 15s
labels:
release: prometheus # match your Prometheus selector
otlpEndpoint: http://otel-collector.monitoring:4317 # optional tracing
Key Volley metrics available for dashboards and alerting:
| Metric | Type | Description |
|---|---|---|
volley.source.records_polled | Counter | Records read from source |
volley.records.processed | Counter | Records processed per operator |
volley.stage.latency_ms | Histogram | Per-operator processing latency |
volley.checkpoint.duration_ms | Histogram | Checkpoint completion time |
volley.channel.utilization | Gauge | Backpressure indicator (0.0-1.0) |
volley.kafka.consumer_lag | Gauge | Kafka consumer group lag |
volley.pipeline.uptime_seconds | Gauge | Pipeline uptime |
Node Isolation
Following Flink best practice, each Volley application should run on dedicated nodes. The operator supports node selectors, tolerations, and pod anti-affinity:
spec:
isolation:
# Schedule only on nodes labelled for stream processing
nodeSelector:
volley.io/pool: stream-processing
# Tolerate the dedicated taint
tolerations:
- key: volley.io/dedicated
operator: Equal
value: "true"
effect: NoSchedule
# One pod per node (enabled by default)
podAntiAffinity: true
Distributed Execution (Horizontal Scaling)
To run a pipeline across multiple pods, add the cluster section to your spec:
First, create a ReadWriteMany PVC backed by your cloud provider’s managed NFS (AWS EFS, GCP Filestore, Azure Files):
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: volley-shared-ckpt
spec:
accessModes: [ReadWriteMany]
storageClassName: efs # your RWX storage class
resources:
requests:
storage: 50Gi
Then reference it in your VolleyApplication:
spec:
replicas: 3
cluster:
maxKeyGroups: 256 # virtual partitions (default, power of 2)
flightPort: 50051 # Arrow Flight port (default)
sharedCheckpoints:
claimName: volley-shared-ckpt # the PVC you created above
When cluster is present, the operator automatically:
| Action | Detail |
|---|---|
Injects VOLLEY_* env vars | VOLLEY_NUM_WORKERS, VOLLEY_HEADLESS_SERVICE, VOLLEY_MAX_KEY_GROUPS, VOLLEY_FLIGHT_PORT, VOLLEY_APP_NAME, VOLLEY_SHARED_CHECKPOINT_DIR |
| Adds Flight port | Container port 50051 + headless service port for inter-pod communication |
| Mounts shared PVC | The existing ReadWriteMany PVC is mounted into all pods at /mnt/volley/checkpoints |
The Volley runtime reads these env vars via ClusterConfig::from_env() and automatically enters distributed mode.
Shared checkpoint storage uses a pre-provisioned ReadWriteMany PVC (managed NFS from your cloud provider). All workers write checkpoints to the same shared mount, so recovery and rescaling are near-instant — no data transfer needed.
Rescaling is live: change spec.replicas and the coordinator triggers a checkpoint, recomputes key group assignments, and workers pick up their new key groups from the shared filesystem. No restart required.
Autoscaling with KEDA (Future)
The operator has built-in support for KEDA autoscaling. When enabled, it creates a ScaledObject that can scale based on Kafka consumer lag, Prometheus metrics, or CPU/memory:
spec:
autoscaling:
enabled: true
minReplicas: 1
maxReplicas: 10
triggers:
# Scale on Kafka consumer lag
- type: kafka
metadata:
bootstrapServers: kafka.default.svc:9092
consumerGroup: my-pipeline-group
topic: input-events
lagThreshold: "1000"
# Scale on backpressure (channel utilization)
- type: prometheus
metadata:
serverAddress: http://prometheus.monitoring:9090
query: avg(volley_channel_utilization{job="my-pipeline"})
threshold: "0.8"
# Scale on CPU
- type: cpu
metadata:
value: "70"
Checking Application Status
# List all Volley applications
kubectl get volleyapplications
# or use the short name:
kubectl get vapp
# Check status
kubectl describe vapp my-pipeline
# View operator logs
kubectl logs deployment/volley-k8s-operator
The status subresource reports the current phase (Pending, Running, Degraded, Failed), replica counts, and a human-readable message.
Full Spec Reference
Click to expand the full VolleyApplication spec
apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
name: my-pipeline
namespace: default
spec:
image: registry.example.com/my-app:v1.0 # required
imagePullPolicy: IfNotPresent # default: IfNotPresent
imagePullSecrets:
- name: registry-secret
resources: # required
requests: { cpu: "2", memory: "4Gi" }
limits: { cpu: "4", memory: "8Gi" }
replicas: 1 # default: 1
args: ["--config", "/etc/volley/config.yaml"]
env:
- name: RUST_LOG
value: "info"
- name: DB_PASSWORD
valueFrom:
secretKeyRef: { name: db-creds, key: password }
state:
storageClassName: fast-ssd
size: 10Gi
mountPath: /var/lib/volley/state # default
checkpoints:
storageClassName: fast-ssd
size: 20Gi
mountPath: /var/lib/volley/checkpoints # default
observability:
metricsPort: 9090 # default: 9090
metricsPath: /metrics # default: /metrics
serviceMonitor:
enabled: true # default: true
interval: 15s # default: 15s
labels: { release: prometheus }
otlpEndpoint: http://otel-collector:4317
health:
port: 8080 # default: 8080
livenessPath: /health # default: /health
readinessPath: /ready # default: /ready
initialDelaySeconds: 30 # default: 30
periodSeconds: 10 # default: 10
isolation:
nodeSelector: { volley.io/pool: stream-processing }
tolerations:
- key: volley.io/dedicated
operator: Equal
value: "true"
effect: NoSchedule
podAntiAffinity: true # default: true
kafka:
bootstrapServers: kafka:9092
consumerGroup: my-group
topics: [input-events]
autoscaling:
enabled: false # default: false
minReplicas: 1
maxReplicas: 10
triggers:
- type: kafka
metadata:
bootstrapServers: kafka:9092
consumerGroup: my-group
topic: input-events
lagThreshold: "1000"
cluster: # distributed execution (optional)
maxKeyGroups: 256 # default: 256 (power of 2)
flightPort: 50051 # default: 50051
sharedCheckpoints:
claimName: volley-shared-ckpt # existing ReadWriteMany PVC
mountPath: /mnt/volley/checkpoints # default
labels: {} # additional labels on all resources
annotations: {} # additional annotations on all resources
Volley Capacity Planning Guide
This guide helps pipeline authors size CPU, memory, and storage for Volley deployments, and provides PromQL queries for ongoing capacity monitoring.
Throughput Reference
Run the NEXMark benchmarks (volley-benchmark/) for throughput numbers on your hardware. Throughput depends on operator complexity, state backend I/O, and connector latency.
Real-world throughput will vary depending on:
- Operator complexity (aggregations, joins, windowing)
- State backend I/O (RocksDB reads/writes)
- Connector latency (Kafka, blob storage)
- Checkpoint frequency
CPU Sizing
| Throughput target | CPU requests | CPU limits | Notes |
|---|---|---|---|
| < 100K rec/s | 500m | 1 | Light pipelines, simple transforms |
| 100K–1M rec/s | 1 | 2 | Moderate load, stateful operators |
| 1M–5M rec/s | 2 | 4 | High throughput, batched processing |
| > 5M rec/s | 4 | 8 | Maximum throughput, parallelism=4+ |
Volley uses tokio async runtime. CPU is primarily consumed by:
- Record deserialization/serialization
- Operator logic (map, filter, aggregate, window)
- RocksDB compaction (background threads)
- Kafka consumer polling and producer flushing
Recommendation: Start with 1 CPU request and monitor container_cpu_usage_seconds_total. Scale up if sustained utilization exceeds 70%.
Memory Sizing
| Workload | Memory requests | Memory limits | Notes |
|---|---|---|---|
| Stateless transforms | 256Mi | 512Mi | No state backend |
| Light stateful (< 100K keys) | 512Mi | 1Gi | Small RocksDB footprint |
| Medium stateful (100K–1M keys) | 1Gi | 2Gi | RocksDB block cache |
| Heavy stateful (> 1M keys) | 2Gi | 4Gi | Large working set |
Memory is consumed by:
- Channel buffers: In-flight records between pipeline stages
- RocksDB block cache: Cached SST blocks for read performance (defaults to ~50% of available memory)
- Arrow RecordBatch buffers: Columnar data in transit
- Kafka consumer buffers: Pre-fetched messages
- Window pending index: Active
(key, window)pairs tracked by window operators (~100 bytes each) - Aggregate LRU cache: In-memory accumulator cache for keyed aggregations
Recommendation: Set memory limits to 2x requests. Monitor container_memory_working_set_bytes and watch for OOM kills.
Bounding Stateful Operator Memory
For high-cardinality keyed workloads, use the built-in LRU mechanisms to cap in-memory state:
- Aggregate operators:
keyed_stream.with_max_cache_entries(n)— bounds the in-memory accumulator cache. Evicted entries are flushed to RocksDB. - Window operators:
windowed_stream.with_max_pending_windows(n)— bounds the in-memory window index. Evicted entries are persisted to the state backend and scanned during watermark advance.
As a rule of thumb, each aggregate cache entry is ~200-500 bytes (depends on accumulator type), and each pending window entry is ~100 bytes. Size the limits based on your available memory budget:
aggregate_memory ≈ max_cache_entries × 300 bytes
window_memory ≈ max_pending_windows × 100 bytes
Monitor volley_state_cache_evictions_total and volley_window_pending_evictions_total — a sustained high eviction rate means the LRU is undersized and incurring excessive RocksDB I/O.
RocksDB Storage Sizing
Growth Formula
state_size = key_count × avg_value_size × amplification_factor
Where:
key_count: Number of unique keys in your keyed stateavg_value_size: Average serialized value size (JSON encoding adds ~30% overhead)amplification_factor: RocksDB space amplification, typically 1.1–1.5x depending on compaction
Checkpoint Storage
checkpoint_storage = state_size × checkpoint_retention_count
RocksDB checkpoints use hardlinks to SST files, so incremental growth is small when data doesn’t change between checkpoints. However, as data mutates, old SST files are retained for older checkpoints.
PVC Sizing Recommendations
| State size | State PVC | Checkpoint PVC | Retention |
|---|---|---|---|
| < 1 GB | 5Gi | 5Gi | 3 checkpoints |
| 1–10 GB | 20Gi | 20Gi | 3 checkpoints |
| 10–50 GB | 100Gi | 100Gi | 2 checkpoints |
| > 50 GB | 2× state size | 2× state size | 2 checkpoints |
Recommendation: Provision at least 2× your expected state size for headroom. Monitor actual usage with the PromQL queries below and adjust.
Helm Values Example
state:
enabled: true
size: "20Gi"
storageClassName: gp3 # Use provisioned IOPS for write-heavy workloads
checkpoints:
enabled: true
size: "20Gi"
Capacity Monitoring PromQL Queries
Throughput
# Records ingested per second by pipeline
sum(rate(volley_source_records_polled_total[5m])) by (job_name)
# Records processed per second by pipeline
sum(rate(volley_records_processed_total[5m])) by (job_name)
# Processing ratio (should be close to 1.0)
sum(rate(volley_records_processed_total[5m])) by (job_name)
/
sum(rate(volley_source_records_polled_total[5m])) by (job_name)
Backpressure
# Channel utilization by stage (>0.8 = backpressure)
avg(volley_channel_utilization) by (job_name, stage)
# Stages with sustained backpressure
avg_over_time(volley_channel_utilization[15m]) > 0.8
Kafka Consumer Lag
# Total consumer lag by pipeline
sum(volley_kafka_consumer_lag) by (job_name)
# Lag growth rate (positive = falling behind)
deriv(sum(volley_kafka_consumer_lag) by (job_name)[15m:1m])
Latency
# p99 stage latency
histogram_quantile(0.99, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))
# p50 stage latency
histogram_quantile(0.5, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))
# Checkpoint duration p99
histogram_quantile(0.99, sum(rate(volley_checkpoint_duration_ms_bucket[5m])) by (le, job_name))
Error Rate
# Error rate by type
sum(rate(volley_errors_total[5m])) by (job_name, error_type)
# Source errors by connector
sum(rate(volley_source_errors_total[5m])) by (job_name, source_id)
# Sink errors by connector
sum(rate(volley_sink_errors_total[5m])) by (job_name, sink_id)
# Error ratio (for SLO tracking)
sum(rate(volley_errors_total[5m])) by (job_name)
/
clamp_min(sum(rate(volley_records_processed_total[5m])) by (job_name), 1)
Resource Utilization
# Watermark lag (event-time vs. wall-clock)
max(volley_watermark_lag_ms) by (job_name)
# Sink flush duration p99
histogram_quantile(0.99, sum(rate(volley_sink_flush_duration_ms_bucket[5m])) by (le, job_name))
KEDA Autoscaling Reference
Example KEDA trigger configurations for the Helm chart:
Scale on Kafka Consumer Lag
autoscaling:
enabled: true
minReplicas: 1
maxReplicas: 8
triggers:
- type: kafka
metadata:
bootstrapServers: "kafka:9092"
consumerGroup: "my-pipeline"
topic: "input-topic"
lagThreshold: "10000" # Scale up when lag exceeds 10K messages
Scale on CPU
autoscaling:
enabled: true
minReplicas: 1
maxReplicas: 4
triggers:
- type: cpu
metadata:
type: Utilization
value: "70" # Scale up at 70% CPU utilization
Scale on Custom Prometheus Metric
autoscaling:
enabled: true
minReplicas: 1
maxReplicas: 8
triggers:
- type: prometheus
metadata:
serverAddress: "http://prometheus:9090"
query: "sum(volley_channel_utilization{job_name='my-pipeline'}) / count(volley_channel_utilization{job_name='my-pipeline'})"
threshold: "0.8" # Scale up when average saturation > 80%
Capacity Planning Checklist
- Estimate throughput: What is your expected input rate (records/sec)?
- Size CPU/memory: Use the tables above as a starting point
- Estimate state size:
key_count × avg_value_size × 1.5 - Size PVCs: 2× expected state size for both state and checkpoint volumes
- Configure autoscaling: Set KEDA triggers based on lag or saturation
- Deploy and monitor: Use the PromQL queries above to validate sizing
- Iterate: Adjust resources based on actual utilization patterns
Observability
Find out what your pipeline is doing without reaching for logs. Every Volley pipeline emits Prometheus metrics (throughput, latency, backpressure, consumer lag, checkpoint health), OpenTelemetry traces (per-stage spans, optionally per-record with sampling), structured JSON logs with trace_id for cross-referencing, and HTTP health endpoints for Kubernetes probes. All of it boots with zero config and Just Works™ when you scrape the metrics port and point an OTLP collector at the tracing endpoint.
This page is the reference for what you get. For the “which knobs do I turn?” guide on capacity and sizing, see Capacity Planning.
Metrics
Volley exposes 15+ metrics via Prometheus exposition format. All metrics include node_id and job_name labels.
| Metric | Type | Labels | Description |
|---|---|---|---|
volley.source.records_polled | Counter | source_id | Records read from source |
volley.records.processed | Counter | operator_id | Records processed per operator |
volley.stage.latency_ms | Histogram | stage | Per-operator processing latency |
volley.checkpoint.duration_ms | Histogram | — | Checkpoint completion time |
volley.checkpoint.epoch | Gauge | — | Current checkpoint epoch |
volley.checkpoint.failures | Counter | — | Checkpoint failure count |
volley.channel.utilization | Gauge | stage | Backpressure indicator (0.0-1.0) |
volley.watermark.lag_ms | Gauge | — | Event-time lag |
volley.sink.records_written | Counter | sink_id | Records written per sink |
volley.sink.flush.duration_ms | Histogram | sink_id | Sink flush latency |
volley.pipeline.uptime_seconds | Gauge | — | Pipeline uptime |
volley.pipeline.health | Gauge | — | Health state (0=Starting, 1=Running, 2=Degraded, 3=Failed, 4=ShuttingDown) |
volley.errors | Counter | error_type | Errors by type |
volley.source.errors | Counter | source_id | Per-connector source errors |
volley.sink.errors | Counter | sink_id | Per-connector sink errors |
volley.kafka.consumer_lag | Gauge | — | Kafka consumer group lag |
volley.kafka.commit.duration_ms | Histogram | — | Kafka commit latency |
volley.kafka.commit.failures | Counter | — | Kafka commit failures |
volley.blob.objects_read | Counter | — | Blob source throughput |
volley.blob.bytes_read | Counter | — | Blob I/O volume |
volley.state.cache_evictions | Counter | operator_id | Aggregate LRU cache evictions |
volley.window.pending_evictions | Counter | — | Window pending index LRU evictions |
volley.window.pending_size | Gauge | — | In-memory pending window entry count |
volley.checkpoint.gc_cleaned | Counter | — | Old checkpoints cleaned by GC |
ML Inference Metrics
When using volley-ml operators (.infer(), .embed(), .classify(), .infer_remote()), additional inference-specific metrics are emitted:
| Metric | Type | Labels | Description |
|---|---|---|---|
volley.ml.inference.duration_ms | Histogram | backend, model_name | Per-batch inference latency (backend call only) |
volley.ml.inference.batch_rows | Histogram | backend, model_name | Rows per inference batch (batch utilization) |
volley.ml.model_load.duration_ms | Histogram | backend, model_name | Model load time (emitted once at startup) |
volley.ml.inference.errors | Counter | backend, model_name, error_type | Inference errors by backend and type |
volley.ml.http.request.duration_ms | Histogram | endpoint | HTTP round-trip latency for remote model servers |
volley.ml.http.queue_depth | Gauge | endpoint | Available concurrency permits (inverse of queue depth) |
volley.ml.http.errors | Counter | endpoint, status_code | HTTP errors from remote model servers |
The backend label is "onnx" or "candle". The model_name label is the model path or HuggingFace repo ID passed to the operator config.
Golden Signals PromQL
# Latency (p99)
histogram_quantile(0.99, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))
# Traffic (records/sec)
sum(rate(volley_source_records_polled_total[5m])) by (job_name)
# Errors (rate by type)
sum(rate(volley_errors_total[5m])) by (job_name, error_type)
# Error ratio
sum(rate(volley_errors_total[5m])) by (job_name)
/
sum(rate(volley_records_processed_total[5m])) by (job_name)
# Saturation (channel utilization)
avg(volley_channel_utilization) by (job_name, stage)
# Saturation (Kafka consumer lag)
sum(volley_kafka_consumer_lag) by (job_name)
ML Inference PromQL
# Inference latency (p99) by backend
histogram_quantile(0.99, sum(rate(volley_ml_inference_duration_ms_bucket[5m])) by (le, backend))
# Inference throughput (rows/sec)
sum(rate(volley_ml_inference_batch_rows_sum[5m])) by (backend)
# Inference error rate
sum(rate(volley_ml_inference_errors_total[5m])) by (backend, error_type)
# HTTP model server latency (p95)
histogram_quantile(0.95, sum(rate(volley_ml_http_request_duration_ms_bucket[5m])) by (le, endpoint))
State & Window PromQL
# Aggregate cache eviction rate (high = LRU undersized)
rate(volley_state_cache_evictions_total[5m])
# Window pending index eviction rate
rate(volley_window_pending_evictions_total[5m])
# In-memory pending window count
volley_window_pending_size
# Checkpoint GC cleanup rate
rate(volley_checkpoint_gc_cleaned_total[5m])
Tracing
OpenTelemetry integration via OTLP gRPC export with tracing-opentelemetry bridge:
- Resource attributes include
service.nameandnode.id - Graceful fallback if OTLP endpoint is unavailable
- Trace-log correlation:
trace_idandspan_idinjected into JSON log lines when a span is active
Configure the OTLP endpoint:
# In VolleyApplication CRD
spec:
observability:
otlpEndpoint: http://otel-collector.monitoring:4317
Per-Record Tracing
Volley supports per-record trace propagation with configurable sampling. When enabled, individual records flowing through the pipeline carry OpenTelemetry trace context. Each operator creates a child span showing processing time, input/output row counts, and optional payload previews.
Configuration
#![allow(unused)]
fn main() {
use volley_core::observability::{TracingConfig, SamplingStrategy};
StreamExecutionEnvironment::new()
.from_source(source)
.with_tracing(TracingConfig {
sampling: SamplingStrategy::Ratio(0.01), // trace 1% of records
max_payload_bytes: 1024, // max JSON preview size
capture_payload: true, // enable input/output previews
})
.filter_expr(col("status").eq(lit("active")))
.select_expr(vec![col("user_id"), col("amount")])
.to_sink(sink)
.execute("my-job")
.await?;
}
Sampling Strategies
| Strategy | Description | Use case |
|---|---|---|
Always | Trace every record | Development, debugging |
Never | Disable tracing | Production default |
Ratio(f64) | Trace a random fraction | Production with sampling (e.g., 0.01 = 1%) |
Non-sampled records pay near-zero overhead (a single branch check).
Span Hierarchy
For a pipeline source -> map -> filter -> sink, a sampled record produces:
volley.record (root span)
├── volley.source — source output metadata
├── volley.operator [0] — map operator (input/output rows, timing, preview)
├── volley.operator [1] — filter operator
└── volley.sink — sink write (timing, errors)
Each span includes attributes:
| Attribute | Description |
|---|---|
volley.operator.name | Auto-generated operator name (e.g., operator-0) |
volley.operator.index | Position in the operator chain |
volley.record.event_time_ms | Record’s event timestamp |
volley.record.input_rows | Input batch row count |
volley.record.output_rows | Output batch row count |
When capture_payload is enabled, input and output JSON previews are attached as span events.
Window and Aggregate Outputs
Window and aggregate operators accumulate multiple input records and emit new result records. These outputs get a new trace with span links back to the input traces:
volley.window.output (new root, links to input traces)
├── volley.operator [next]
└── volley.sink
The volley.window.input_trace_count attribute shows how many sampled input records contributed to the aggregation.
Kafka Trace Context Propagation
When using the Kafka source, Volley automatically extracts W3C traceparent headers from consumed messages. If an upstream service traced a request that produced a Kafka message, the trace continues through Volley.
Similarly, the Kafka sink injects traceparent headers into produced messages, propagating trace context to downstream consumers.
This enables full end-to-end distributed tracing across service boundaries:
upstream service → Kafka → Volley pipeline → Kafka → downstream service
trace-id=abc trace-id=abc trace-id=abc
Structured Logging
ObservabilityConfig supports a log_format field with Text (default) and Json modes:
#![allow(unused)]
fn main() {
ObservabilityConfig::new()
.with_json_logging()
.init();
}
JSON mode produces structured output compatible with Loki, Elasticsearch, and CloudWatch Logs. When OpenTelemetry is active, each log line includes trace_id and span_id fields for cross-referencing with distributed traces.
Useful RUST_LOG filters
# Default (info): lifecycle events, checkpoint completions, source/sink creation
RUST_LOG=info
# Debug state backend, window, and watermark behavior
RUST_LOG=info,volley_core::state=debug,volley_core::operators::window=debug
# Debug file decoding (Parquet, JSON Lines, CSV)
RUST_LOG=info,volley_connectors::blob_store::decoders=debug
# Deep debugging: per-record RocksDB get/put/delete
RUST_LOG=info,volley_core::state::rocks=trace
# Debug protobuf decode with batch context
RUST_LOG=info,volley_core::operators::protobuf=debug
Health Reporting
The HealthReporter tracks pipeline state with atomic transitions:
| State | Liveness | Readiness | Description |
|---|---|---|---|
Starting | healthy | not ready | Pipeline initializing |
Running | healthy | ready | Normal operation |
Degraded | healthy | not ready | Partial failure (with reason) |
Failed | unhealthy | not ready | Complete failure (with reason) |
ShuttingDown | unhealthy | not ready | Graceful shutdown in progress |
The volley.pipeline.health gauge emits the state as a numeric value on every transition, enabling Prometheus alerting.
SLO Recording Rules
Prometheus recording rules are provided at deploy/prometheus/recording-rules.yaml:
volley:availability:ratio_rate5m— availability SLI ratiovolley:latency_sli:ratio_rate5m— latency SLI (fraction under 100ms)volley:error_budget:remaining— remaining error budget against 99.9% target
Alerting Rules
Burn-rate alerting rules at deploy/prometheus/alerting-rules.yaml:
VolleyHighErrorBurnRate(critical) — 14.4x burn over 1hVolleySlowErrorBurnRate(warning) — 1x burn over 3dVolleyPipelineFailed(critical) — health==3 for 1mVolleyPipelineDegraded(warning) — health==2 for 5m
Kubernetes Integration
The K8s operator automatically creates a ServiceMonitor for Prometheus scraping. See Kubernetes Operator for ServiceMonitor configuration.
Volley SRE Gap Analysis
Date: 2026-03-30 Reference framework: msitarzewski/agency-agents engineering-sre.md
Important Context: Framework vs. Service
Volley is a stream processing framework (library + Kubernetes operator), not a deployed service. This analysis evaluates:
- Framework enablement — How well does Volley equip pipeline authors to follow SRE practices?
- Framework development practices — How well does Volley’s own CI/testing/release process follow SRE principles?
Org-level concerns (on-call rotations, blameless culture documentation, MTTR tracking) are out of scope — those belong to teams operating Volley-based pipelines, not the framework itself.
Executive Summary
| Pillar | Score | Verdict |
|---|---|---|
| SLOs & Error Budgets | C+ (55%) | SLI-ready metrics + Prometheus recording/alerting rule templates + SLO definition template; no runtime error budget gauge yet |
| Observability | A- (88%) | Strong metrics + tracing + structured JSON logging + alerting rule templates + trace-log correlation + per-connector error counters; missing dashboards |
| Toil Reduction | B- (65%) | CI + K8s operator + Dockerfile + Helm chart; no CD pipeline or benchmark regression tracking |
| Chaos Engineering | C- (40%) | Checkpoint recovery unit-tested + fault injection integration tests for checkpoint/RocksDB failures; no Kafka fault injection |
| Capacity Planning | B- (65%) | KEDA autoscaling scaffolded, benchmarks exist, capacity planning guide with sizing tables and PromQL queries |
| Progressive Rollouts | F (5%) | Operator uses default StatefulSet RollingUpdate; no canary or automated rollback |
| Incident Response Primitives | C+ (55%) | PipelineHealth enum + K8s probes + volley.pipeline.health gauge + alerting rules; missing severity-to-SLO mapping docs |
Overall: Volley has a strong observability foundation, solid Kubernetes integration, SLO recording rules, alerting templates, structured logging with trace-log correlation, a Dockerfile, Helm chart, SLO definition template, fault injection tests, and capacity planning guide. Remaining gaps are dashboard templates, CD pipeline, progressive rollout support, and Kafka fault injection.
Scoring Methodology
| Grade | Range | Meaning |
|---|---|---|
| A | 90-100% | Production-ready, matches SRE framework recommendations |
| B | 70-89% | Strong foundation, minor gaps |
| C | 50-69% | Partial coverage, significant gaps |
| D | 25-49% | Minimal coverage, major gaps |
| F | 0-24% | Absent or negligible |
Criteria per pillar: API completeness, built-in tooling, documentation for pipeline authors, integration with ecosystem tooling (Prometheus, Grafana, K8s).
Pillar 1: SLOs & Error Budgets — C (50%)
Current State
Volley exposes metrics that serve as natural SLI candidates (volley-observability/src/metrics.rs):
| Metric | Type | SLI Use |
|---|---|---|
volley.records.processed | Counter | Availability — total processed records |
volley.errors | Counter | Availability — error count for SLI ratio |
volley.stage.latency_ms | Histogram | Latency — processing duration distribution |
volley.pipeline.uptime_seconds | Gauge | Availability — pipeline uptime |
volley.checkpoint.failures | Counter | Reliability — checkpoint failure rate |
The PipelineHealth enum (volley-observability/src/health.rs) provides binary health states (Starting, Running, Degraded, Failed, ShuttingDown) but these are probe-level signals, not SLO-aligned measurements.
What Exists
Prometheus recording rules are provided at deploy/prometheus/recording-rules.yaml computing:
volley:availability:ratio_rate5mandvolley:availability:ratio_rate30d— availability SLI ratiosvolley:latency_sli:ratio_rate5m— latency SLI (fraction under 100ms)volley:error_budget:remaining— remaining error budget against 99.9% targetvolley:throughput:rate5mandvolley:saturation:avg— capacity signals
Multi-window burn rate alerting rules are provided at deploy/prometheus/alerting-rules.yaml:
VolleyHighErrorBurnRate(critical) — 14.4x burn over 1h confirmed by 5m windowVolleySlowErrorBurnRate(warning) — 1x burn over 3d confirmed by 6h window
Remaining Gaps
- No SLO definition template or configuration format for pipeline authors to declare per-pipeline targets
- No runtime error budget gauge metric in
volley-observability(the recording rule computes it in Prometheus, but no native Rust gauge) - No documentation connecting error budget exhaustion to operational decisions (e.g., deployment freeze policy)
Recommendations
(P1) Add an error_budget_remaining gauge to volley-observability that pipeline authors can enable to expose real-time budget consumption natively from the runtime, complementing the Prometheus recording rule.
(P1) Provide an SLO definition template format (YAML) that pipeline authors can use to declare per-pipeline availability and latency targets, which can be consumed by the Prometheus rules.
(P2) Document an error budget policy template: when budget < 25%, freeze non-critical deployments; when budget < 10%, enter incident response mode.
Pillar 2: Observability — B+ (80%)
Current State
Metrics (volley-observability/src/metrics.rs): 15+ named metric constants covering sources, operators, checkpoints, channels, watermarks, sinks, pipeline health, Kafka, and blob storage. All use node_id/job_name labels. Three helper functions (increment_counter, set_gauge, record_histogram) enforce consistent labeling.
Tracing (volley-observability/src/tracing_init.rs): OpenTelemetry integration via OTLP gRPC export with tracing-opentelemetry bridge. Resource attributes include service.name and node.id. Graceful fallback if OTLP endpoint unavailable.
Health (volley-observability/src/health.rs): HealthReporter with atomic state tracking, liveness/readiness semantics. K8s probes configured in operator’s StatefulSet builder.
ServiceMonitor (volley-operator/src/crd.rs): Auto-created by operator with configurable scrape interval and labels.
What Exists
- Structured JSON logging:
ObservabilityConfigsupports alog_formatfield withText(default) andJsonmodes. When set toJson,tracing_subscriber::fmt::json()is used for structured output compatible with log aggregation systems (Loki, Elasticsearch, CloudWatch Logs). Pipeline authors enable it via.with_json_logging(). - Alerting rule templates: Prometheus alerting rules are provided at
deploy/prometheus/alerting-rules.yamlcovering error burn rate (fast + slow), pipeline health state, checkpoint failures, channel saturation, Kafka consumer lag, and p99 latency.
What Was Added
- Trace-log correlation:
TraceContextFormatinvolley-core/src/observability/trace_context.rssafely injectstrace_idandspan_idfields into JSON log output viaserde_jsonround-trip when an OpenTelemetry span is active, enabling cross-referencing between logs and distributed traces. - Per-connector error counters:
volley.source.errorsandvolley.sink.errorsmetrics defined involley-observability/src/metrics.rswithsource_id/sink_idlabels, and incremented involley-core/src/builder/runtime.rson poll/write/flush failures.connector_id()method added toSource,Sink,DynSource, andDynSinktraits, implemented across Kafka, blob store, and Iceberg connectors.
Remaining Gaps
- No dashboard templates: No Grafana dashboard JSON mapping the four golden signals to Volley metrics.
Recommendations
(P0) Provide a golden signals Grafana dashboard template using Volley’s actual metrics:
| Golden Signal | Volley Metric | PromQL |
|---|---|---|
| Latency | volley.stage.latency_ms | histogram_quantile(0.99, rate(volley_stage_latency_ms_bucket[5m])) |
| Traffic | volley.source.records_polled | sum(rate(volley_source_records_polled_total[5m])) by (job_name) |
| Errors | volley.errors | sum(rate(volley_errors_total[5m])) by (job_name, error_type) |
| Saturation | volley.channel.utilization | avg(volley_channel_utilization) by (job_name, stage) |
(P1) Enable trace-log correlation by configuring the fmt layer with OpenTelemetry trace context propagation, so JSON log lines include trace_id and span_id fields.
(P2) Add per-connector error counters to metrics.rs (volley.source.errors, volley.sink.errors) with source_id/sink_id labels for finer-grained fault attribution.
Pillar 3: Toil Reduction — B- (65%)
Current State
CI pipeline (.github/workflows/ci.yml): Format checking (cargo fmt), linting (cargo clippy with default + kafka features), testing (default + kafka variants), benchmark compilation check. Uses Rust cache for faster builds.
K8s operator (volley-operator/): Automates StatefulSet, headless Service, metrics Service, ServiceMonitor, PodDisruptionBudget, and KEDA ScaledObject creation from a single VolleyApplication CRD. Server-side apply with idempotent reconciliation.
Checkpoint cleanup: RecoveryCoordinator handles automated old checkpoint cleanup.
What Exists
- Dockerfile: A multi-stage Dockerfile is provided at
deploy/docker/Dockerfilewith a Rust builder stage and minimal distroless runtime stage. Pipeline authors can extend or customize it for their specific binaries. - Helm chart: A Helm chart exists at
deploy/helm/volley/that templates theVolleyApplicationCRD withvalues.yamlcovering all CRD fields (resources, storage, observability, health, autoscaling, Kafka, isolation).
Remaining Gaps
- Dockerfile not referenced in documentation: The README and operator docs do not mention the Dockerfile or describe how pipeline authors should customize it for their applications.
- Helm chart has no production-hardening overlays: No example values files for common deployment topologies (small/medium/large, multi-AZ).
- No CD pipeline: CI exists but no automated release workflow (crate publishing, image building).
- No benchmark regression tracking: Benchmarks compile-checked but results not tracked over time.
Recommendations
(P1) Add a CD workflow (.github/workflows/release.yml) triggered on git tags that builds container images and publishes to a registry.
(P1) Add production-hardened example values files to the Helm chart (e.g., values-production.yaml with resource limits, anti-affinity, KEDA autoscaling).
(P2) Add benchmark regression detection in CI using criterion with stored baselines.
Pillar 4: Chaos Engineering — D (30%)
Current State
Checkpoint recovery is tested via unit tests — the HealthReporter transitions through Starting -> Running -> Degraded -> Failed states and these are verified. The operator handles reconciliation errors with retry backoff (15s requeue on failure).
Gaps
- No fault injection tests simulating Kafka broker disconnect, disk full, or OOM conditions.
- No integration tests exercising the Degraded -> Running recovery path under realistic failure.
- No Chaos Mesh or LitmusChaos experiment definitions for K8s-deployed pipelines.
Recommendations
(P1) Add integration tests for framework resilience:
- Checkpoint directory becomes read-only mid-pipeline
- Kafka broker disconnect during exactly-once transaction commit
- RocksDB write failure during state flush
(P1) Provide Chaos Mesh experiment templates for K8s-deployed Volley applications:
apiVersion: chaos-mesh.org/v1alpha1
kind: PodChaos
metadata:
name: volley-pod-kill
spec:
action: pod-kill
mode: one
selector:
labelSelectors:
app.kubernetes.io/managed-by: volley-operator
scheduler:
cron: "@every 30m"
(P2) Add toxiproxy integration to Kafka integration tests for network degradation simulation.
Pillar 5: Capacity Planning — C (45%)
Current State
Autoscaling: KEDA support fully scaffolded in operator (AutoscalingSpec in CRD) with Kafka lag, Prometheus metric, and CPU-based triggers. ScaledObject is created during reconciliation.
Benchmarks: NEXMark benchmark suite (volley-benchmark/) provides industry-standard throughput, latency, and memory measurements.
Resource management: CRD requires explicit resources.requests and resources.limits for CPU/memory.
Gaps
- No sizing guide mapping resource allocation to expected throughput.
- No documentation on RocksDB storage growth patterns.
- No tested KEDA trigger threshold examples.
- No resource utilization dashboards.
Recommendations
(P1) Create a capacity planning guide covering:
- CPU/memory requirements per records/sec (based on benchmark data)
- RocksDB storage growth formula:
key_count * avg_value_size * checkpoint_retention_count - PVC sizing guidance based on state size and checkpoint interval
- Key PromQL queries for capacity monitoring:
- Throughput:
rate(volley_source_records_polled_total[5m]) - Backpressure:
volley_channel_utilization > 0.8 - Input backlog:
volley_kafka_consumer_lag
- Throughput:
(P2) Document tested KEDA trigger configurations with proven thresholds (e.g., scale at consumer lag > 10000).
Pillar 6: Progressive Rollouts — F (5%)
Current State
The operator creates StatefulSets with a configurable replicas count but uses the default RollingUpdate strategy. No canary, blue-green, or partition-based rollout mechanism exists. imagePullPolicy is configurable but no image tag pinning is enforced.
Gaps
- No
updateStrategyfield in the CRD spec. - No partition-based canary rollout support.
- No automated rollback on health check failure post-update.
- No Argo Rollouts or Flagger integration.
Recommendations
(P1) Add an updateStrategy field to VolleyApplicationSpec CRD with partition-based canary support:
#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct UpdateStrategySpec {
/// Partition value for canary rollouts. Pods with ordinal >= partition
/// get the new version. Set to replicas-1 to canary a single pod.
#[serde(default, rename = "partition")]
pub partition: Option<i32>,
}
}
(P2) Provide an Argo Rollouts AnalysisTemplate example that checks volley_errors_total rate and volley_stage_latency_ms p99 after each rollout step.
Pillar 7: Incident Response Primitives — C+ (55%)
Current State
Health states: PipelineHealth enum provides 5 states — Starting, Running, Degraded (with reason string), Failed (with reason string), ShuttingDown. These map to K8s liveness (healthy = Starting|Running) and readiness (ready = Running only).
Health gauge: volley.pipeline.health gauge is emitted on every state transition via HealthReporter, enabling Prometheus alerting on health state changes. Values: 0=Starting, 1=Running, 2=Degraded, 3=Failed, 4=ShuttingDown.
Operator status: CRD status tracks phases (Pending, Running, Degraded, Failed) with readyReplicas count and human-readable message.
Probes: Liveness at /health (port 8080), readiness at /ready, with configurable initialDelaySeconds and periodSeconds.
Alerting rules: deploy/prometheus/alerting-rules.yaml includes VolleyPipelineFailed (critical, health==3 for 1m) and VolleyPipelineDegraded (warning, health==2 for 5m).
Remaining Gaps
- No documentation mapping severity levels to PipelineHealth states and SLO impact.
- No runbook links in alert annotations (placeholder URLs only).
Recommendations
(P1) Document severity-to-health mapping for pipeline authors:
| Severity | PipelineHealth | SLO Impact | Response |
|---|---|---|---|
| SEV1 | Failed | SLO breached, pipeline stopped | Immediate: investigate root cause, restore from checkpoint |
| SEV2 | Degraded | Error budget burning above normal | Urgent: identify degradation source (e.g., checkpoint failures) |
| SEV3 | Running with elevated latency | Within budget but trending | Investigate: check channel.utilization, watermark.lag_ms |
(P1) Add runbook URLs to the existing alerting rules in deploy/prometheus/alerting-rules.yaml, linking to operational documentation for each failure mode.
Prioritized Implementation Roadmap
P0 — Completed
SLO recording rule templates—deploy/prometheus/recording-rules.yamlBurn rate alerting rule templates—deploy/prometheus/alerting-rules.yamlStructured JSON logging—ObservabilityConfig.log_formatwithLogFormat::JsonMulti-stage Dockerfile—deploy/docker/DockerfileHelm chart—deploy/helm/volley/Pipeline health gauge—volley.pipeline.healthemitted byHealthReporterHealth state alerting rules—VolleyPipelineFailed,VolleyPipelineDegradedin alerting rules
P0 — Remaining
- Golden signals dashboard template (Grafana JSON)
- Severity-to-PipelineHealth mapping documentation
P1 — Framework maturity
- Runtime error budget gauge metric in
volley-observability SLO definition template format for per-pipeline targets—deploy/prometheus/slo-template.yamlTrace-log correlation (trace_id in structured log output)—volley-core/src/observability/trace_context.rswithTraceContextFormatFault injection integration tests (checkpoint, RocksDB failures)—volley-core/tests/fault_injection_test.rs(9 tests)- Chaos Mesh experiment templates
Capacity planning guide (sizing, storage growth, key PromQL)—docs/src/operations/capacity-planning.mdupdateStrategyCRD field for partition-based canary rollouts- Runbook URLs in alerting rule annotations
- CD workflow for framework releases
- Production-hardened Helm values examples
P2 — Excellence
Per-connector error counters (— constants involley.source.errors,volley.sink.errors)volley-observability/src/metrics.rs, wired up involley-core/src/builder/runtime.rswithconnector_id()on Source/Sink traits- Benchmark regression detection in CI
- Toxiproxy integration for Kafka fault injection
- Argo Rollouts AnalysisTemplate example
- Tested KEDA trigger threshold documentation
- Error budget policy template documentation
Appendix A: Full Metric Inventory
All metrics defined in volley-observability/src/metrics.rs:
| Constant | Metric Name | Type | Labels | SRE Use |
|---|---|---|---|---|
SOURCE_RECORDS_POLLED | volley.source.records_polled | Counter | node_id, job_name, source_id | Traffic golden signal |
RECORDS_PROCESSED | volley.records.processed | Counter | node_id, job_name, operator_id | Throughput, SLI denominator |
STAGE_LATENCY_MS | volley.stage.latency_ms | Histogram | node_id, job_name, stage | Latency golden signal, SLI |
CHECKPOINT_DURATION_MS | volley.checkpoint.duration_ms | Histogram | node_id, job_name | Checkpoint health |
CHECKPOINT_EPOCH | volley.checkpoint.epoch | Gauge | node_id, job_name | Progress tracking |
CHECKPOINT_FAILURES | volley.checkpoint.failures | Counter | node_id, job_name | Reliability, alerting |
CHANNEL_UTILIZATION | volley.channel.utilization | Gauge | node_id, job_name, stage | Saturation golden signal |
WATERMARK_LAG_MS | volley.watermark.lag_ms | Gauge | node_id, job_name | Event-time lag |
SINK_RECORDS_WRITTEN | volley.sink.records_written | Counter | node_id, job_name, sink_id | Output throughput |
SINK_FLUSH_DURATION_MS | volley.sink.flush.duration_ms | Histogram | node_id, job_name, sink_id | Sink latency |
PIPELINE_UPTIME_SECONDS | volley.pipeline.uptime_seconds | Gauge | node_id, job_name | Availability |
PIPELINE_HEALTH | volley.pipeline.health | Gauge | node_id, job_name | Health state (0-4), alerting on Failed/Degraded |
ERRORS_TOTAL | volley.errors | Counter | node_id, job_name, error_type | Error golden signal, SLI numerator |
KAFKA_CONSUMER_LAG | volley.kafka.consumer_lag | Gauge | node_id, job_name | Backlog, autoscaling trigger |
KAFKA_COMMIT_DURATION_MS | volley.kafka.commit.duration_ms | Histogram | node_id, job_name | Kafka commit latency |
KAFKA_COMMIT_FAILURES | volley.kafka.commit.failures | Counter | node_id, job_name | Exactly-once reliability |
BLOB_OBJECTS_READ | volley.blob.objects_read | Counter | node_id, job_name | Blob source throughput |
BLOB_BYTES_READ | volley.blob.bytes_read | Counter | node_id, job_name | Blob I/O volume |
Label constants: LABEL_NODE_ID, LABEL_JOB_NAME, LABEL_STAGE, LABEL_OPERATOR_ID, LABEL_SOURCE_ID, LABEL_SINK_ID, LABEL_ERROR_TYPE
Appendix B: Reference Prometheus Rules
See Pillar 1 for SLO recording rules and burn rate alerts, and Pillar 7 for health state alerting rules.
Appendix C: Golden Signals PromQL Reference
# Latency (p50, p99)
histogram_quantile(0.5, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))
histogram_quantile(0.99, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))
# Traffic (records/sec by pipeline)
sum(rate(volley_source_records_polled_total[5m])) by (job_name)
# Errors (error rate by type)
sum(rate(volley_errors_total[5m])) by (job_name, error_type)
# Error ratio
sum(rate(volley_errors_total[5m])) by (job_name)
/
sum(rate(volley_records_processed_total[5m])) by (job_name)
# Saturation (channel utilization, higher = more backpressure)
avg(volley_channel_utilization) by (job_name, stage)
# Saturation (Kafka consumer lag)
sum(volley_kafka_consumer_lag) by (job_name)
NEXMark Benchmarks
NEXMark is the standard workload used to compare stream processing systems. It simulates an online auction — people, bids, auctions, a steady event rate — and defines a handful of queries (projection, windowed aggregation, stream-to-stream join) that together exercise the throughput, latency, and stateful-operator paths that matter in real pipelines.
Volley ships five of those queries as runnable examples so you can measure what your hardware does with your data shape, and compare against Flink or any other system that also publishes NEXMark numbers.
Queries
| Query | Description | What it tests |
|---|---|---|
| Q1 | Currency Conversion | Raw throughput — stateless map/projection |
| Q4 | Average Selling Price per Category | Windowed join between bids and auctions; post-join filtering; min-watermark across dual sources |
| Q5 | Hot Items | Sliding window aggregation under sustained load |
| Q7 | Highest Bid per Session | Session window merging; high-cardinality keyed state; RocksDB under sustained load |
| Q8 | Monitor New Users | Windowed stream-to-stream join |
Running
# Q8 — Windowed join (multi-source pipeline)
cargo run --example nexmark_q8 -p volley-benchmark --release
# Q5 — Sliding window aggregation (recommended starting point)
cargo run --example nexmark_q5 -p volley-benchmark --release
# Q4 — Windowed join (average selling price per category)
cargo run --example nexmark_q4 -p volley-benchmark --release
# Q7 — Session windows (highest bid per session per bidder)
cargo run --example nexmark_q7 -p volley-benchmark --release
# Q1 — Stateless projection baseline
cargo run --example nexmark_q1 -p volley-benchmark --release
Configuration
| Variable | Default | Description |
|---|---|---|
NEXMARK_DURATION | 10 | Benchmark duration in seconds |
NEXMARK_BATCH_SIZE | 4000 | Records per Arrow RecordBatch |
NEXMARK_PARALLELISM | 4 | Parallel operator instances (Q5) |
NEXMARK_AUCTIONS | 100 | Number of active auctions (Q5) |
NEXMARK_WINDOW_SIZE | 10 | Window size in seconds (Q5) |
NEXMARK_WINDOW_SLIDE | 2 | Window slide in seconds (Q5) |
NEXMARK_INTER_EVENT_MS | 1 | Inter-event time in milliseconds |
Metrics
- Sustained throughput (records/sec) — measured over the full run, not peak
- p50/p99/p999 latency — per-batch processing time from source to sink
- Memory footprint — RSS delta under load
- Recovery time — restore from checkpoint after failure (planned)
Comparison Baseline
When comparing against Apache Flink:
- Use tuned Flink — RocksDB state backend, off-heap memory, equivalent parallelism
- Match batch semantics — set Flink’s
table.exec.mini-batch.sizeto match Volley’s batch size - Same hardware, same event rate, same total events
- Discard first 10% of measurements for JIT warmup (Flink) and cache warming (both)
See volley-benchmark/README.md for full methodology.
Stress testing
In addition to the short benchmark runs above, Volley ships a stress testing suite for long-running pass/fail validation. Stress scenarios run for hours against Kafka-backed pipelines, auto-calibrate the throughput ceiling, and assert hard SLO thresholds.
Scenarios
| Scenario | What it tests |
|---|---|
stress_sustained | Sliding window aggregation at 80% of calibrated ceiling for 2 h — verifies no throughput drift, latency regression, or memory growth |
stress_backpressure | Three-phase overload test: baseline → 120% ceiling → recovery — asserts catch-up within 60 s |
stress_checkpoint_recovery | Crash mid-run and restart from checkpoint — asserts state restoration within 30 s |
stress_nexmark_q4 | Kafka-backed Q4 (windowed join) under sustained load |
stress_nexmark_q7 | Kafka-backed Q7 (session windows, high cardinality) under sustained load |
Running
# All scenarios — default 2 h each (requires Docker for Kafka)
./scripts/stress-test.sh
# CI mode — 10 min per scenario
./scripts/stress-test.sh --ci
# Single scenario with custom settings
STRESS_DURATION=30m STRESS_TARGET_RATE=5000 ./scripts/stress-test.sh sustained
Pass/fail thresholds
| Threshold | Default limit |
|---|---|
| Throughput drift | ≤ 5% drop from calibrated operating point |
| p99 latency | ≤ 2× warmup baseline |
| p999 latency | ≤ 5× warmup baseline |
| Memory growth | ≤ 20% RSS increase from warmup to end |
| Checkpoint duration | ≤ 2× warmup baseline |
Reports are written as JSON to target/stress-reports/ and uploaded as a CI artifact when run via the stress.yml GitHub Actions workflow.
Examples
Runnable examples live in the volley-examples crate. Six examples, each
one DAG shape.
Running
# Memory-only examples (no external deps)
cargo run --example in_memory_pipeline -p volley-examples
cargo run --example windowed_pipeline -p volley-examples
cargo run --example fanout_multisink -p volley-examples
cargo run --example observable_pipeline -p volley-examples
# Kafka (requires a live broker + two topics)
cargo run --example kafka_pipeline -p volley-examples --features kafka
# ML (first run downloads a HuggingFace model; cached thereafter)
cargo run --example ml_inference_pipeline -p volley-examples --features ml --release
What each example shows
| Example | What it teaches | Features |
|---|---|---|
in_memory_pipeline | Minimum DAG shape: from_source → filter_expr → to_sink, plus the sink.handle() pattern for reading output after execute(). | (none) |
windowed_pipeline | Tumbling / sliding / session windows over key_by → aggregate_expr, event-time semantics, RocksDB state backend. | (none) |
fanout_multisink | DAG 1.0 marquee: tee() broadcasts to two branches, each filters differently, both sinks commit atomically in one 2PC epoch. | (none) |
observable_pipeline | with_observability(...) + HealthReporter + spawn_health_server → /healthz / /readyz / /startupz; Prometheus / OTLP hooks are one uncomment away. | (none) |
kafka_pipeline | Kafka-to-Kafka exactly-once: from_kafka_exactly_once returns (stream, ConsumerGroupMetadata); to_kafka_exactly_once threads the CGM so source offsets commit inside the sink’s Kafka transaction. | kafka |
ml_inference_pipeline | Streaming ONNX classification, backend auto-select (CoreML / CUDA / CPU), HuggingFace Hub auto-download. | ml |
NEXMark benchmarks
Throughput and windowed-join benchmarks live in a sibling crate:
cargo run --example nexmark_q1 -p volley-benchmark --release
cargo run --example nexmark_q5 -p volley-benchmark --release
cargo run --example nexmark_q8 -p volley-benchmark --release
See volley-benchmark/README.md
for methodology and baselines.
Walkthrough
See Your first pipeline for a step-by-step of
in_memory_pipeline.
Your first pipeline
This is the shortest useful Volley pipeline: read records from memory, drop the cheap ones, and collect the survivors. It mirrors the in_memory_pipeline example in the repo, so you can run it end-to-end without setting up Kafka or cloud storage.
What it does
- Builds five sample trades as Arrow
RecordBatches. - Filters to
price > 200using a vectorised DataFusion expression. - Collects the output via a
MemorySink.
Key concepts
Create the environment
let env = StreamExecutionEnvironment::new();
All pipeline construction starts from an environment. The same
environment can own multiple branches and sinks — see the
fanout_multisink example for multi-sink DAGs.
Attach a source
env.from_source(MemorySource::new(records))
from_source registers the source node and returns a DataStream<NoSink>.
For production, use from_kafka, from_kafka_exactly_once, or one of
the blob-store source builders.
Apply operators
Expression operators run through DataFusion’s vectorised kernels and are the default choice:
.filter_expr(col("price").gt(lit(200.0)))
.select_expr(vec![col("symbol"), col("price")])
Closure operators (filter, map, flat_map) remain available for
logic that can’t be expressed with columnar expressions — for example,
inspecting StreamRecord metadata like event_time_ms or attaching
custom per-record state. Prefer the expression form when you can.
Key, window, aggregate
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(300)))
.aggregate_expr(
vec![sum(col("amount"))],
RocksDbBackend::open(tmp.path().join("state")).unwrap(),
)
key_by → KeyedStream (hash-partitioned by key). window →
WindowedKeyedStream. aggregate_expr consumes the windowed stream
and emits one record per window per key. The second argument is a
BatchStateBackend — RocksDbBackend implements it directly, so you
don’t need the old KeyedStateBackend wrapper.
Attach a sink, execute, read the output
let sink = MemorySink::new();
let output = sink.handle(); // shared handle survives the move
let report = stream.to_sink(sink).execute("my-job").await?;
println!("committed epochs: {}", report.epochs_committed);
for r in output.lock().unwrap().iter() { /* ... */ }
execute returns ExecutionReport { records_written, epochs_committed },
not the output records. The sink’s handle() returns an
Arc<Mutex<Vec<StreamRecord>>> that points at the sink’s internal
buffer — lock it after execution to inspect the rows.
Run it
cargo run --example in_memory_pipeline -p volley-examples
Expected output (abridged):
=== Volley in-memory DAG pipeline ===
Input: 5 trades
Execution report: 0 records written, 1 epochs committed
Filtered output (3 records):
1. GOOG @ $280 x 50
2. MSFT @ $310 x 75
3. GOOG @ $290 x 150
Next Steps
- Kafka Pipeline — connect to real data
- Operators Guide — write custom operators
- Windowing — deep dive into window types
- All Examples — browse the full examples list
Functions Reference
Volley provides built-in vectorized functions that work with the expression API
(select_expr, filter_expr). These functions operate on Apache Arrow arrays
and are exposed as DataFusion UDFs.
Functions are imported from volley_core::functions or via the prelude:
#![allow(unused)]
fn main() {
use volley_core::prelude::*; // imports simhash, hamming_similarity
// or
use volley_core::functions::hash::{simhash, hamming_similarity};
}
Hash Functions
Functions for computing and comparing hash fingerprints.
| Function | Signature | Description |
|---|---|---|
simhash | simhash(Utf8 / LargeUtf8) → UInt64 | 64-bit SimHash text fingerprint |
hamming_similarity | hamming_similarity(UInt64, UInt64) → Float64 | Bitwise similarity between two hashes |
simhash
Computes a 64-bit SimHash fingerprint for text similarity. Similar texts produce fingerprints with a small hamming distance, approximating cosine similarity of the original term-frequency vectors.
Signature: simhash(text: Utf8 | LargeUtf8) → UInt64
Algorithm: Tokenizes text into character trigrams, hashes each trigram with a 64-bit hash, accumulates a weighted bit vector, and thresholds to produce the final fingerprint.
Null handling: Null input produces null output.
Example:
#![allow(unused)]
fn main() {
use volley_core::prelude::*;
use datafusion_expr::col;
// As an expression in a pipeline
stream
.select_expr(vec![
col("id"),
simhash(col("text")).alias("text_hash"),
])
// As a direct Arrow kernel
use volley_core::functions::hash::simhash_kernel;
use arrow::array::StringArray;
let input = StringArray::from(vec!["hello world"]);
let hashes = simhash_kernel(&input).unwrap();
}
hamming_similarity
Computes the bitwise similarity between two UInt64 values, returning a Float64
between 0.0 (all bits differ) and 1.0 (identical).
Signature: hamming_similarity(a: UInt64, b: UInt64) → Float64
Formula: 1.0 - popcount(a XOR b) / 64.0
For a 64-bit hash, a similarity threshold of 0.95 means at most 3 bits differ.
Null handling: If either input is null, the output is null.
Example:
#![allow(unused)]
fn main() {
use volley_core::prelude::*;
use datafusion_expr::{col, lit};
// Filter out records where text hasn't changed enough
stream
.filter_expr(
hamming_similarity(
simhash(col("current_text")),
simhash(col("previous_text")),
).lt(lit(0.95))
)
// Direct Arrow kernel
use volley_core::functions::hash::hamming_similarity_kernel;
use arrow::array::UInt64Array;
let a = UInt64Array::from(vec![0xDEADBEEFu64]);
let b = UInt64Array::from(vec![0xDEADBEFFu64]);
let similarity = hamming_similarity_kernel(&a, &b).unwrap();
}
Proto Functions
Functions for decoding binary protobuf data into Arrow columnar format.
| Function | Signature | Description |
|---|---|---|
decode_proto | decode_proto(Binary / LargeBinary, &ProtoDescriptor) → Struct | Decode serialized protobuf messages into a struct column |
decode_proto
Decodes a binary column containing serialized protobuf messages into an Arrow
Struct column with all fields from the message definition.
Signature: decode_proto(data: Binary | LargeBinary, descriptor: &ProtoDescriptor) → Struct
The ProtoDescriptor is constructed once from a serialized FileDescriptorSet
and a fully-qualified message name. It caches the Arrow schema derived from the
protobuf definition.
Null handling: Null input rows and decode failures produce null struct rows.
Decode failures emit a tracing::warn! but do not stop the pipeline.
Requires: protobuf feature flag on volley-core.
Example:
#![allow(unused)]
fn main() {
use volley_core::functions::proto::{decode_proto, ProtoDescriptor};
use datafusion_expr::col;
let descr = ProtoDescriptor::new(DESCRIPTOR_BYTES, "crawl.CrawlRecord").unwrap();
stream
.select_expr(vec![
decode_proto(col("data"), &descr).alias("record"),
])
.select_expr(vec![
col("record.url"),
col("record.content"),
])
}
URL Functions
Functions for URL normalization and extraction.
| Function | Signature | Description |
|---|---|---|
normalize_url | normalize_url(Utf8 / LargeUtf8) → Utf8 | Normalize a URL |
normalize_url_with_params | normalize_url_with_params(Utf8 / LargeUtf8, Vec<String>) → Utf8 | Normalize with custom tracking params |
extract_domain | extract_domain(Utf8 / LargeUtf8) → Utf8 | Extract host/domain from URL |
normalize_url
Normalizes a URL by applying: scheme/host lowercasing, default port removal
(:80 for http, :443 for https), path normalization (/a/../b → /b),
percent-encoding normalization (decode unreserved chars), www. prefix removal,
fragment removal, query parameter sorting, tracking parameter removal, and
trailing slash removal.
Signature: normalize_url(url: Utf8 | LargeUtf8) → Utf8
Default tracking params removed: utm_source, utm_medium, utm_campaign,
utm_term, utm_content, fbclid, gclid, mc_eid, msclkid.
Null handling: Null, empty, and invalid URL inputs produce null output.
Requires: url-functions feature flag on volley-core.
Example:
#![allow(unused)]
fn main() {
use volley_core::functions::url::normalize_url;
use datafusion_expr::col;
stream.select_expr(vec![normalize_url(col("url")).alias("url")])
}
normalize_url_with_params
Same as normalize_url but with a custom tracking params blocklist that
fully replaces the defaults.
Signature: normalize_url_with_params(url: Utf8 | LargeUtf8, remove_params: Vec<String>) → Utf8
Example:
#![allow(unused)]
fn main() {
use volley_core::functions::url::normalize_url_with_params;
use datafusion_expr::col;
let custom_params = vec!["ref".to_string(), "src".to_string()];
stream.select_expr(vec![
normalize_url_with_params(col("url"), custom_params).alias("url"),
])
}
extract_domain
Extracts the full host/domain from a URL string, including subdomains.
Signature: extract_domain(url: Utf8 | LargeUtf8) → Utf8
Null handling: Null, empty, and invalid URL inputs produce null output.
Requires: url-functions feature flag on volley-core.
Example:
#![allow(unused)]
fn main() {
use volley_core::functions::url::extract_domain;
use datafusion_expr::col;
// "https://api.v2.example.com/endpoint" → "api.v2.example.com"
stream.select_expr(vec![extract_domain(col("url")).alias("domain")])
// Direct Arrow kernel
use volley_core::functions::url::extract_domain_kernel;
use arrow::array::StringArray;
let input = StringArray::from(vec!["http://sub.example.com/path"]);
let domains = extract_domain_kernel(&input).unwrap();
}
Contributing
Contributions to Volley are welcome! This page summarizes the development workflow. See the full Contributing Guide for complete details.
Development Setup
git clone https://github.com/volley-streams/volley.git
cd volley
cargo build --workspace
cargo test --workspace --exclude volley-python
System Dependencies
| Dependency | Required by | macOS | Ubuntu |
|---|---|---|---|
cmake | volley-connector-kafka | brew install cmake | apt install cmake |
libcurl4-openssl-dev | volley-connector-kafka | (included) | apt install libcurl4-openssl-dev |
protobuf-compiler | volley-scheduler | brew install protobuf | apt install protobuf-compiler |
Run volley doctor to check your environment.
Code Style
- Formatting:
cargo fmt --all - Linting:
cargo clippy --workspace -- -D warnings - Tests:
cargo test --workspace --exclude volley-python
All warnings are errors in CI (RUSTFLAGS: -Dwarnings).
Submitting Changes
- Fork the repository and create a feature branch
- Make your changes with clear, focused commits
- Ensure
cargo fmt,cargo clippy, andcargo testall pass - Open a pull request against
main
Project Structure
See Crate Map for a per-crate breakdown of the 17-crate workspace.
Release Process
Volley follows semantic versioning (SemVer) with a staged release process.
Version Format
All crates in the workspace share a single version defined in the root
Cargo.toml under [workspace.package]. Versions use 3-segment semver
with optional pre-release identifiers:
X.Y.Z-alpha.N— Alpha: feature-complete for the release scope. API may change. Not for production.X.Y.Z-beta.N— Beta: API frozen. Bug fixes only. Community testing encouraged.X.Y.Z— Stable: production-ready. Semver guarantees apply.
How to Release
- Bump the version in
Cargo.toml[workspace.package]section. - Update
CHANGELOG.md: move items from[Unreleased]to a new version section with today’s date. - Create a PR and merge to
main. - The
auto-tag.ymlworkflow detects the version change and creates a git tag (v{VERSION}). - The tag triggers
release.yml, which creates a GitHub Release with an auto-generated changelog.
Releases with -alpha, -beta, or -rc in the version are automatically marked as pre-releases.
Progression
1.0.0-alpha.1 → 1.0.0-alpha.2 → ... → 1.0.0-beta.1 → 1.0.0-beta.2 → ... → 1.0.0
- Alpha → Alpha: bump the alpha number for each iteration.
- Alpha → Beta: when the API is frozen and only bug fixes remain.
- Beta → Stable: when all known issues are resolved and testing is complete.
Installing
Install the CLI from source:
cargo install --git https://github.com/volley-streams/volley volley-cli --tag v1.0.0-alpha.1
Or from a local checkout:
cargo install --path volley-cli
Crate Publishing
Cargo crate publishing to crates.io is not yet automated. This is planned for a future release.
Crate Map
Volley is a 15-crate Cargo workspace. See the full Crate Map for detailed per-crate documentation.
Status: stable = production-ready | experimental = API may change | placeholder = stub/future work
Core
| Crate | Description | Key Types | Status |
|---|---|---|---|
volley-core | Stream processing engine with built-in observability and optional RocksDB state | StreamExecutionEnvironment, DataStream, KeyedStream, Operator, StreamRecord, ObservabilityConfig, RocksDbBackend | stable |
volley-derive | Procedural macros | impl_to_arrow! | stable |
Connectors
| Crate | Description | Key Types | Status |
|---|---|---|---|
volley-connectors | Umbrella crate (memory + feature-gated re-exports) | MemorySource, MemorySink | stable |
volley-connector-kafka | Kafka source + sink (exactly-once) | KafkaSource, KafkaSink, KafkaSourceConfig | stable |
volley-connector-blob-store | Cloud blob store abstraction + decoders | NotificationSource, BlobReader, BlobWriter | stable |
volley-connector-aws-s3 | AWS S3 + SQS source | S3Source, S3BlobReader, SqsNotificationSource | stable |
volley-connector-azure-blob | Azure Blob + Queue source | AzureBlobSource, AzureBlobReader | stable |
volley-connector-gcp-gcs | GCS blob sink (Pub/Sub source removed in v1.1.0) | GcsBlobWriter | stable |
volley-connector-delta | Delta Lake sink (exactly-once) | DeltaSink, DeltaSinkConfig | stable |
volley-connector-iceberg | Apache Iceberg sink (REST catalog) | IcebergSink, IcebergSinkConfig | stable |
Infrastructure
| Crate | Description | Status |
|---|---|---|
volley-k8s-operator | Kubernetes operator for VolleyApplication CRD | stable |
volley-cli | CLI tool (volley new, volley doctor) | stable |
volley-scheduler | Distributed execution (Ballista integration) | experimental |
volley-python | PyO3 Python bindings | placeholder |
Other
| Crate | Description | Status |
|---|---|---|
volley-examples | Runnable examples and benchmarks | stable |