Introduction
Volley is a high-performance stream processing framework built in Rust on Apache Arrow. It provides a Flink-inspired DataStream API with exactly-once semantics, achieving 9M+ records/sec sustained on a single node.
Volley is designed for both human developers and AI coding agents. The API is type-safe at compile time, the documentation is structured for machine readability, and the examples are runnable out of the box.
Key Features
- High Performance — 9M+ rec/sec sustained with multi-row Arrow batching and write-behind state caching
- Exactly-Once Semantics — Checkpoint-based fault tolerance with aligned barrier snapshotting
- Flink-like API — Fluent DataStream API with typestate-enforced compile-time safety
- Expression Engine — Vectorized
filter_expr(),select_expr(),aggregate_expr()powered by DataFusion (without SQL). Multi-type aggregation on native Arrow types (i32, i64, f64, etc.) - Event-Time Windowing — Tumbling, sliding, and session windows with watermark tracking
- State Management — RocksDB backend with write-behind cache and hardlink checkpoints
- Intra-Node Parallelism — Hash-partitioned parallel operator instances
- Kafka Integration — Source and sink with exactly-once transactional semantics
- Cloud Blob Sources — S3+SQS, Azure Blob+Queue, GCS+Pub/Sub with Parquet/JSON/CSV/Avro
- Table Format Sinks — Delta Lake and Apache Iceberg with exactly-once commits
- Kubernetes Operator — VolleyApplication CRD with autoscaling, health checks, and Prometheus
Built On
| Component | Purpose |
|---|---|
| Apache Arrow | In-memory columnar format |
| DataFusion | Query engine and expression evaluation |
| RocksDB | State management and checkpointing |
| Tokio | Async runtime for task-per-stage execution |
Architecture
+-----------------------------------------------------+
| Stream Processing Layer (Rust) |
| +- Fluent Builder API (DataStream, KeyedStream) |
| +- Multi-Row RecordBatch Operators |
| +- Watermarks & Event Time |
| +- Windowing (Tumbling, Sliding, Session) |
| +- Intra-Node Parallelism (hash-partitioned) |
| +- Checkpoint Coordinator |
+-----------------------+-----------------------------+
|
+-----------------------v-----------------------------+
| Async Runtime (Tokio) |
| +- Task-per-Stage Execution |
| +- Bounded Channels (Backpressure) |
| +- Graceful Shutdown |
+-----------------------+-----------------------------+
|
+-----------------------v-----------------------------+
| State Management (RocksDB) |
| +- Write-Behind Accumulator Cache |
| +- BatchStateBackend (multi_get / WriteBatch) |
| +- Per-Key State Isolation |
| +- Hardlink Checkpoints |
+-----------------------+-----------------------------+
|
+-----------------------v-----------------------------+
| Connectors |
| +- Kafka (exactly-once, feature-gated) |
| +- Cloud Blob Stores (S3, Azure, GCS) |
| +- Delta Lake, Iceberg (exactly-once sinks) |
| +- Memory (batched source, counting sink) |
+-----------------------------------------------------+
Performance
Benchmarked on a single node (12 CPUs, Apple Silicon):
| Configuration | Throughput |
|---|---|
| Single-row, parallelism=1 | 850K rec/s |
| Batch=1000, parallelism=4 | 8.7M rec/s |
| Sustained 10s (batch=1000, parallelism=4) | 9.4M rec/s |
Connectors
Sources
| Connector | Formats | Status |
|---|---|---|
| Kafka | JSON | Available (feature-gated) |
| AWS S3 + SQS | Parquet, JSON, CSV, Avro | Available |
| Azure Blob + Queue | Parquet, JSON, CSV, Avro | Available |
| GCS + Pub/Sub | Parquet, JSON, CSV, Avro | Available |
| Memory/Iterator | Arrow RecordBatch | Available |
Sinks
| Connector | Formats | Status |
|---|---|---|
| Kafka | JSON | Available (feature-gated, transactional) |
| Delta Lake | Parquet | Available (exactly-once) |
| Apache Iceberg | Parquet | Available (REST catalog) |
| Memory (collect) | Arrow RecordBatch | Available |
Next Steps
- Installation — set up your environment
- Quick Start — build your first pipeline in 5 minutes
- Architecture Overview — understand how Volley works
- API Reference — auto-generated rustdoc for all crates
Installation
Rust Toolchain
Volley requires a stable Rust toolchain. Install via rustup:
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
rustup default stable
System Dependencies
Most crates build with a standard Rust toolchain. Some connectors need additional system packages:
| Dependency | Required by | macOS | Ubuntu |
|---|---|---|---|
cmake | volley-connector-kafka (rdkafka) | brew install cmake | apt install cmake |
protobuf-compiler | volley-scheduler | brew install protobuf | apt install protobuf-compiler |
Note:
libcurlandOpenSSLare vendored and statically linked byrdkafka— no system packages needed.
Run volley doctor to check your environment for missing dependencies.
Adding Volley to a Project
Scaffold a new pipeline
volley new --template kafka-to-delta my-pipeline
cd my-pipeline
volley doctor
cargo build && cargo run
Available templates: kafka-to-delta, in-memory, s3-to-delta.
Add to an existing project
Add Volley crates to your Cargo.toml:
[dependencies]
volley-core = { git = "https://github.com/volley-streams/volley", features = ["state-rocksdb"] }
volley-connector-kafka = { git = "https://github.com/volley-streams/volley" }
Note: Volley currently uses git dependencies. crates.io publishing is planned — see the roadmap.
Feature Flags
The volley-connectors crate uses feature flags to control which connectors are compiled:
| Feature | Connectors Included |
|---|---|
state-rocksdb | RocksDB state backend (default, ~1.5 GB C++ build) |
blob-store | Cloud blob store abstraction |
blob-store-aws | AWS S3 + SQS source |
blob-store-azure | Azure Blob + Queue source |
blob-store-gcs | GCS + Pub/Sub source |
file-formats | Parquet, JSON, CSV, Avro decoders |
table-formats | Delta Lake + Iceberg sinks |
Tip: If you don’t need the RocksDB state backend (e.g. you’re only developing a Kafka connector), use
default-features = falseand enable only the features you need to save ~1.5 GB in build artifacts.
Troubleshooting
cmake not found
error: failed to run custom build command for `rdkafka-sys`
--- stderr
CMake not found
Install cmake: brew install cmake (macOS) or apt install cmake (Ubuntu).
RocksDB compilation slow or failing
RocksDB is a C++ dependency that takes ~3–5 minutes to compile on first build. If you don’t need state management (e.g., stateless transformations only), disable it:
volley-core = { git = "...", default-features = false }
protobuf-compiler not found
Only needed if building volley-scheduler. Install: brew install protobuf (macOS) or apt install protobuf-compiler (Ubuntu).
volley doctor reports issues
volley doctor checks for system dependencies, Docker availability, and toolchain versions. Address each reported issue individually. Common fixes:
- Docker not running: Start Docker Desktop or the Docker daemon
- Rust toolchain outdated:
rustup update stable - Missing system packages: See the System Dependencies table above
Verify Your Setup
# Build the workspace
cargo build --workspace
# Run tests
cargo test --workspace --exclude volley-python
# Run a quick example
cargo run --example in_memory_pipeline -p volley-examples
Quick Start
Build your first Volley pipeline in 5 minutes.
Basic In-Memory Pipeline
This pipeline reads events from an iterator, filters them, groups by key, windows into 5-minute tumbling windows, and computes a sum aggregation:
use volley_core::prelude::*;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
let results = StreamExecutionEnvironment::new()
.from_iter(events)
.filter_expr(col("amount").gt(lit(100)))
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(300)))
.aggregate(AggregationType::Sum, "amount")
.collect()
.execute("aggregation-job")
.await?;
Ok(())
}
Run it:
cargo run --example in_memory_pipeline -p volley-examples
Expected output (records grouped by key and windowed):
Pipeline 'aggregation-job' started
Source: MemorySource (4 records)
Operators: filter_expr → key_by → window → aggregate
Sink: MemorySink
Processing complete: 4 records in, 2 aggregated results out
Kafka Source to Kafka Sink
A real-world pipeline reading from Kafka, aggregating, and writing back to Kafka with exactly-once semantics:
use volley_core::prelude::*;
use volley_connector_kafka::{KafkaEnvExt, KafkaStreamExt, KafkaSourceConfig, KafkaSinkConfig};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
let source_config = KafkaSourceConfig::new("localhost:9092", "events", "my-group");
let sink_config = KafkaSinkConfig::new("localhost:9092", "output");
StreamExecutionEnvironment::new()
.from_kafka(source_config).await?
.filter_expr(col("amount").gt(lit(0)))
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(60)))
.aggregate(AggregationType::Sum, "amount")
.to_kafka(sink_config).await?
.execute("kafka-aggregation-job")
.await?;
Ok(())
}
Note: Kafka connectors require
cmakeinstalled on your system. See Installation for details.
What’s Happening
StreamExecutionEnvironment::new()— Creates the pipeline builder.from_kafka()/.from_iter()— Attaches a source, producing aDataStream.filter_expr()/.map()— Applies operators to the stream.key_by()— Partitions the stream by key, producing aKeyedStream.window()— Assigns records to time windows, producing aWindowedKeyedStream.aggregate()— Computes an aggregation per window per key.to_kafka()/.collect()— Attaches a sink.execute()— Starts the pipeline as concurrent Tokio tasks
The type system enforces valid construction at compile time. You can only call .aggregate() on a WindowedKeyedStream, not on a raw DataStream.
Next Steps
- Architecture Overview — understand how Volley works under the hood
- Examples — browse all runnable examples
- Operators Guide — write custom operators
- Connectors Guide — write custom connectors
Architecture Overview
Volley is a dataflow engine. You declare a pipeline as a graph of stages (source -> operators -> sink), and Volley executes it as concurrent Tokio tasks connected by bounded channels.
Pipeline Builder (Compile-Time)
StreamExecutionEnvironment::new()
.from_source(...)
.filter(...)
.key_by(...)
.window(...)
.aggregate(...)
.to_sink(...)
.execute("job-name")
Runtime Execution (Tokio Tasks)
At runtime, each stage runs as an independent Tokio task. Stages communicate via bounded tokio::sync::mpsc channels. Backpressure propagates naturally: when a downstream channel is full, the upstream task blocks on send.
+--------+ channel +----------+ channel +------+
| Source | ---------> | Operator | ---------> | Sink |
| Task | RecordBatch | Task | RecordBatch | Task |
+--------+ +----------+ +------+
| | |
| CheckpointBarrier | on_checkpoint() | on_checkpoint()
| (in-band) | flush state | commit writes
v v v
+---------------------------------------------------------+
| Checkpoint Coordinator |
| Injects barriers -> collects acks -> snapshots state |
+---------------------------------------------------------+
Checkpoint barriers and watermarks flow in-band alongside data records through the same channels. This is how Volley achieves aligned checkpoints without pausing the pipeline.
Intra-Node Parallelism
After .key_by(), records are hash-partitioned across N parallel operator instances. Each instance has its own state namespace. Checkpoint barriers are aligned across all partitions before snapshot.
+-- Partition 0 --> [Operator Instance 0] --+
Source --> HashPartitioner -- Partition 1 --> [Operator Instance 1] --> Merge --> Sink
+-- Partition 2 --> [Operator Instance 2] --+
Parallelism is set via .with_parallelism(N) on the stream builder.
Distributed Execution (Horizontal Scaling)
For workloads that exceed a single node, Volley supports distributed execution across multiple K8s pods. Enable it by passing a ClusterConfig:
#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
.with_cluster(ClusterConfig::new(worker_id, num_workers, headless_service, app_name))
.from_source(source)
// ... same pipeline API ...
.execute("job").await?;
}
In distributed mode, keyed operators use a two-level hashing scheme:
- Key group:
hash(key) % max_key_groups(256 virtual partitions by default) - Worker assignment: contiguous key group ranges assigned to each worker
Records destined for remote workers are shuffled via Arrow Flight DoExchange. Non-keyed operators (filter, map) run locally with no shuffle.
Worker 0 Worker 1
┌──────────────────────┐ ┌──────────────────────┐
│ Source │ │ Source │
│ ↓ │ │ ↓ │
│ ShuffleRouter ──────►│── Arrow Flight ──►│◄── ShuffleRouter │
│ ↓ │◄── Arrow Flight ──│ │
│ BarrierAligner │ │ BarrierAligner │
│ ↓ │ │ ↓ │
│ Sink │ │ Sink │
└──────────────────────┘ └──────────────────────┘
K8s primitives handle coordination:
- Leader election: K8s Lease API — one worker becomes coordinator
- Checkpoint metadata: stored in K8s ConfigMaps (no OwnerReference, survives redeployment)
- Checkpoint data: shared filesystem (EFS/Filestore/Azure Files) via
ReadWriteManyPVC - Worker discovery: StatefulSet headless service DNS
- Live rescaling: checkpoint + reassignment, no state migration needed (shared FS)
Single-node mode remains the default with zero overhead when ClusterConfig is absent.
Graceful Shutdown
Source stops producing, drains in-flight data through the pipeline, then each stage shuts down in order.
Source Task --> channel(capacity) --> Operator Task --> channel(capacity) --> Sink Task
^ ^
| |
backpressure backpressure
(bounded send) (bounded send)
Deep Dives
- Data Model — Arrow RecordBatch, StreamRecord, StreamElement
- Pipeline Builder — type-state pattern and compile-time safety
- Exactly-Once Semantics — checkpoint barriers and recovery
- State Management — write-behind cache, RocksDB, and checkpoints
Data Model
All data in Volley flows as Apache Arrow RecordBatch — a columnar, zero-copy, language-interoperable format. This means operators process batches of rows rather than individual records, enabling vectorized computation.
StreamRecord
The StreamRecord wrapper adds metadata to each batch:
StreamRecord {
batch: RecordBatch, // columnar Arrow data (1 or more rows)
event_time_ms: Option<i64>, // event timestamp (for windowing)
trace: Option<RecordTraceContext>, // per-record trace context (for distributed tracing)
}
batch— the actual payload, a columnar Arrow RecordBatchevent_time_ms— event timestamp used by window assignment and watermark trackingtrace— optional trace context for per-record distributed tracing. Set by the runtime’s sampler or extracted from source message headers (e.g., Kafka traceparent). When present, the runtime creates OpenTelemetry child spans at each pipeline stage.
StreamElement
Records share channels with control signals. The StreamElement enum wraps all possible messages:
StreamElement = Data(StreamRecord)
| Barrier(CheckpointBarrier)
| Watermark(Watermark)
- Data — normal data records flowing through the pipeline
- Barrier — checkpoint coordination signals injected by the Checkpoint Coordinator
- Watermark — event-time progress markers for window triggering
Barriers and watermarks flow in-band alongside data records through the same bounded channels. This is what enables aligned checkpoints without pausing the pipeline.
Why Arrow?
- Columnar format — efficient for analytical operations (aggregations, filters)
- Zero-copy — data passes between pipeline stages without serialization
- Multi-row batching — operators process many rows at once, amortizing per-record overhead
- Ecosystem interop — Arrow is the standard for DataFusion, Parquet, Delta Lake, Iceberg, and most analytics tooling
Pipeline Builder
Volley uses Rust’s type system to enforce valid pipeline construction at compile time. Invalid pipelines (like calling .aggregate() on an unkeyed stream) are caught by the compiler, not at runtime.
Type Progression
StreamExecutionEnvironment --> DataStream --> KeyedStream --> WindowedKeyedStream
| | | |
from_source() filter() key_by() window()
from_iter() map() aggregate_expr()
from_kafka() flat_map()
filter_expr()
select_expr()
apply_operator()
Each builder method returns a different type:
| Type | Meaning | Available Operations |
|---|---|---|
DataStream | Unkeyed stream | filter, filter_expr, map, flat_map, select_expr, apply_operator, key_by, to_sink, collect, with_tracing, with_observability |
KeyedStream | Partitioned by key | window, aggregate_expr, to_sink, collect, with_tracing, with_observability |
WindowedKeyedStream | Keyed + windowed | aggregate_expr |
You can call .aggregate_expr() on both KeyedStream (global aggregation) and WindowedKeyedStream (windowed aggregation). Closure-based operators (filter, map, flat_map) remain alongside expression-based operators.
Example
#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
// Returns DataStream
.from_iter(events)
// Still DataStream — vectorized filter
.filter_expr(col("amount").gt(lit(100)))
// Returns KeyedStream
.key_by(col("user_id"))
// Returns WindowedKeyedStream
.window(TumblingWindows::of(Duration::from_secs(60)))
// Consumes WindowedKeyedStream, returns DataStream
.aggregate_expr(sum(col("amount")), state_backend)
// Attach sink
.collect()
// Execute the pipeline
.execute("my-job")
.await?;
}
Parallelism
Set the parallelism level on the environment. After .key_by(), records are hash-partitioned across N parallel operator instances:
#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
.set_parallelism(4)
.from_source(...)
// ...
}
Each parallel instance has its own state namespace. Checkpoint barriers are aligned across all partitions.
Exactly-Once Semantics
Volley uses checkpoint barriers for exactly-once processing. The protocol ensures that every record is processed exactly once, even in the presence of failures.
Checkpoint Barrier Protocol
1. Coordinator injects CheckpointBarrier(epoch=N) into source
2. Barrier flows through pipeline in-band with data
3. Each operator receives barrier:
a. Calls on_checkpoint() hook (flush caches, prepare state)
b. Forwards barrier downstream
c. State backend snapshots via RocksDB hardlink
4. Sink receives barrier:
a. Commits buffered writes
b. Acknowledges barrier to coordinator
5. Coordinator marks epoch N complete when all sinks ack
The key insight: barriers flow in-band alongside data records. No global pause is needed. Each operator processes the barrier when it arrives, then continues processing data.
Per-Sink Commit Strategies
Each sink type uses a different mechanism to achieve exactly-once:
| Sink | Commit Strategy | Recovery |
|---|---|---|
| Kafka | Transaction commit on barrier (+ send_offsets_to_transaction for Kafka↔Kafka EOS) | Abort + replay on recovery |
| Delta Lake | Epoch tag in commit metadata | Skip already-committed epochs |
| Iceberg | Snapshot commit via REST catalog | Epoch tracking in catalog |
Kafka↔Kafka End-to-End Exactly-Once
When both source and sink are Kafka, Volley supports the full Flink-style EOS protocol via librdkafka’s send_offsets_to_transaction:
#![allow(unused)]
fn main() {
let (stream, cgm) = StreamExecutionEnvironment::new()
.from_kafka_exactly_once(source_config).await?;
stream
.filter_expr(col("status").eq(lit("active")))
.to_kafka_exactly_once(sink_config, cgm).await?
.execute("eos-pipeline").await?;
}
How It Works
-
Setup:
from_kafka_exactly_once()returns a(DataStream, ConsumerGroupMetadata)tuple. TheConsumerGroupMetadata(CGM) is a handle to the Kafka consumer group that the source belongs to. Pass it toto_kafka_exactly_once()so the sink can commit consumer offsets atomically with produced records. -
Barrier injection: When the checkpoint coordinator injects a barrier, the source records its current Kafka partition offsets in
barrier.source_offsets. -
Barrier alignment: If the pipeline has parallel branches, the
BarrierAlignermerges source offsets from all branches before forwarding the barrier to the sink. (TheBarrierAligneris an internal component that ensures all parallel inputs to an operator have delivered the same barrier epoch before proceeding.) -
Atomic commit: When the sink receives the barrier, it:
- Calls
producer.send_offsets_to_transaction(offsets, cgm)— this stages the consumer group offset advance inside the producer’s open transaction - Calls
producer.commit_transaction()— this atomically commits both the produced output records and the consumer offset advance
- Calls
-
No auto-commit: In EOS mode, the sink disables batch-size and deadline-driven auto-commits. The only commit trigger is a checkpoint barrier. This means
CheckpointConfig::intervalmust be shorter than the broker’stransaction.timeout.ms(default 60s), or the broker will fence the producer. -
Recovery: On restart, the consumer group’s last committed offset (written atomically with the transaction) determines where to resume. The runtime skips file-based offset restoration for transactional sources.
Recovery
When a failure occurs, records may be in various stages of processing:
Checkpoint N In-flight records Failure
| |
v v
[committed] ---[r1]---[r2]---[r3]---[r4]---[r5]--- X
^ ^
known good these are lost
The RecoveryCoordinator restores to the last known-good state:
- Find last complete checkpoint — the most recent epoch where all sinks acknowledged their commit
- Restore source offsets — rewind sources to the offsets recorded at that checkpoint (e.g., Kafka partition offsets, S3 file positions)
- Restore operator state — load RocksDB hardlink snapshots from the checkpoint directory, restoring window state, aggregation accumulators, and any custom operator state
- Resume processing — records between the checkpoint and the failure (r1–r5 above) are replayed from the source
Replayed records don’t produce duplicates because each sink type has an idempotent commit strategy:
- Kafka: The incomplete transaction from the failed run is aborted. Replayed records are written in a new transaction. Consumers with
isolation.level=read_committednever see the aborted records. - Delta Lake: The sink checks the Delta log for the current epoch’s
pipeline_id. If an epoch was already committed, the write is skipped entirely. - Iceberg: The sink reads the latest snapshot’s epoch from catalog metadata. Already-committed epochs are skipped.
Operator Lifecycle
The on_checkpoint() hook gives operators a chance to prepare for the snapshot:
#![allow(unused)]
fn main() {
fn on_checkpoint(&mut self, epoch: u64) {
// Flush any buffered state to RocksDB
// The state backend will snapshot after this returns
}
}
For stateful operators using the write-behind cache, on_checkpoint() flushes dirty keys from the in-memory HashMap to RocksDB via BatchStateBackend::write_batch().
State Management
Volley’s state management is designed for high throughput: most state operations hit an in-memory cache, with RocksDB used only during checkpoints.
Architecture
+----------------------------------+
| AggregateOperator |
| +----------------------------+ |
| | Write-Behind Cache (HashMap)| |
| | key -> Accumulator | |
| +----------------------------+ |
| | on_checkpoint() |
| v |
| +----------------------------+ |
| | BatchStateBackend (RocksDB)| |
| | WriteBatch for dirty keys | |
| | multi_get for bulk reads | |
| +----------------------------+ |
| | hardlink snapshot |
| v |
| +----------------------------+ |
| | Checkpoint Directory | |
| | /checkpoints/epoch-N/ | |
| +----------------------------+ |
+----------------------------------+
Write-Behind Cache
During normal processing, state reads and writes go to an in-memory HashMap cache. The cache holds accumulator values keyed by partition key. This eliminates all RocksDB I/O during processing, which is why Volley achieves 9M+ records/sec.
BatchStateBackend
When a checkpoint barrier arrives:
on_checkpoint()is called on the operator- Dirty keys from the cache are flushed to RocksDB via
WriteBatch(a single atomic write) - RocksDB creates a hardlink snapshot to the checkpoint directory
- The snapshot is nearly instant because hardlinks don’t copy data
Key operations:
write_batch()— Atomic batch write of dirty keysmulti_get()— Bulk read for cache misses (rare during steady-state)- Hardlink checkpoints — O(1) snapshot via filesystem hardlinks to SST files
Per-Key State Isolation
After .key_by(), each parallel operator instance has its own state namespace. Keys are hash-partitioned across instances, so state never conflicts between parallel operators.
Checkpoint Storage
/checkpoints/
epoch-1/ # hardlink snapshot
epoch-2/ # hardlink snapshot
epoch-3/ # latest checkpoint
RocksDB checkpoints use hardlinks to SST files. Incremental growth is small when data doesn’t change between checkpoints. As data mutates, old SST files are retained for older checkpoints.
See the Capacity Planning guide for storage sizing recommendations.
Window State LRU
Window operators (WindowOperator, ExprWindowOperator) maintain an in-memory
index called pending_windows that tracks active (key, window) pairs. Unlike
the aggregate cache (which is a hot copy of data also in RocksDB), this index
is the only record of which windows exist and need to fire on watermark
advance.
To bound memory without data loss, the window LRU uses an evict-to-backend strategy:
+----------------------------------+
| WindowOperator |
| +----------------------------+ |
| | pending_windows (LRU) | |
| | key:window_start -> (key, | |
| | window) | |
| +----------------------------+ |
| | on eviction |
| v |
| +----------------------------+ |
| | State Backend (RocksDB) | |
| | pending_index/{pending_key}| |
| +----------------------------+ |
| | on watermark advance |
| v |
| Scan persisted entries for |
| windows ready to fire |
+----------------------------------+
- When the LRU is full, evicted entries are persisted to a
pending_index/namespace in the state backend - On watermark advance, persisted entries are scanned (only when evictions
have occurred, tracked by a
has_evictedflag) - Fired windows are deleted from both the LRU and the state backend
This ensures no window is orphaned — every pending window eventually fires or is flushed at end-of-stream, regardless of whether it was evicted from the in-memory cache.
Configure via with_max_pending_windows(n) on WindowedKeyedStream. When
unset, the index is unbounded (default, no behavior change from prior releases).
See the design document for full details on trade-offs and the evict-to-backend strategy.
Recovery
On pipeline restart from a checkpoint, the RecoveryCoordinator restores source offsets and opens the RocksDB backend from the checkpoint directory. Stateful operators then lazily recover their in-memory structures on the first process() call:
- Aggregate operators (
ExprAggregateOperator,AggregateOperator) useStateBackend::scan_prefix()(backed by RocksDB’sprefix_iterator) to enumerate all keys with persisted accumulators. Thecacheandpending_keysmaps are rebuilt from the scan results. - Window operators (
WindowOperator,ExprWindowOperator) persist aWindowOperatorMetastruct (containingpending_windowsandcurrent_watermark) to a well-known state key duringon_checkpoint(). On recovery, this metadata is loaded to restore window tracking state and watermark progress.
Recovery is lazy rather than eager: fresh pipelines (no state in RocksDB) pay no cost, and recovery happens only once before the first record is processed.
Horizontal Scaling
Volley supports distributed execution across multiple K8s pods. Single-node mode remains the default with zero overhead. Horizontal scaling is opt-in via ClusterConfig.
Design Principles
- Symmetric workers — every pod runs the same binary. No dedicated driver node.
- K8s-native — uses Lease API for leader election, ConfigMaps for checkpoint metadata, StatefulSet DNS for discovery. No ZooKeeper.
- Single-node is free — when
ClusterConfigis absent, no distributed code paths execute. - Shared filesystem — checkpoint data on EFS/Filestore/Azure Files. No state migration needed for rescaling.
Two-Level Key Hashing
Records are partitioned across workers using a two-level scheme:
hash(key) → key_group → worker
- Key group:
hash(key) % max_key_groups(default 256). This is a virtual partition that never changes. - Worker assignment: contiguous ranges of key groups are assigned to workers. Rescaling reassigns ranges without rehashing.
256 key groups, 3 workers:
Worker 0: key groups [0, 86) — 86 groups
Worker 1: key groups [86, 171) — 85 groups
Worker 2: key groups [171, 256) — 85 groups
This design, borrowed from Apache Flink’s KeyGroupRangeAssignment, minimizes data movement during rescaling. Scaling from 2 to 3 workers moves roughly 1/3 of key groups.
Arrow Flight Shuffle
When a keyed operator runs in distributed mode, records are shuffled between workers via Apache Arrow Flight DoExchange:
Worker 0 Worker 1
┌──────────────────────┐ ┌──────────────────────┐
│ Source │ │ Source │
│ ↓ │ │ ↓ │
│ ShuffleRouter │ Arrow Flight │ ShuffleRouter │
│ ├─ local partition ─┼─────────────────→ │ ├─ local partition │
│ └─ remote → Flight ─┼─────────────────→ │ └─ remote → Flight│
│ │ ←──────────────── │ │
│ BarrierAligner │ │ BarrierAligner │
│ (local + remote) │ │ (local + remote) │
│ ↓ │ │ ↓ │
│ Sink │ │ Sink │
└──────────────────────┘ └──────────────────────┘
- ShuffleRouter replaces
PartitionRouterfor keyed operators. Routes viakey_group → worker → local mpsc or Flight sender. - Non-keyed operators (filter, map, flat_map) run locally — no shuffle.
- BarrierAligner merges both local partition outputs and remote Flight inputs.
- Barriers and watermarks are broadcast to all local partitions AND all remote workers.
Wire Format
StreamElement variants are encoded as FlightData with a discriminator byte in app_metadata[0]:
| Byte | Variant | data_body | app_metadata[1..] |
|---|---|---|---|
| 0x01 | Data | Arrow IPC RecordBatch | JSON record metadata |
| 0x02 | Barrier | empty | JSON CheckpointBarrier |
| 0x03 | Watermark | empty | JSON Watermark |
K8s-Native Coordination
Leader Election
One worker acquires a K8s Lease (coordination.k8s.io/v1) and becomes the coordinator. The coordinator handles checkpoint injection and key group assignment. It still processes data normally — coordination is lightweight.
If the leader dies, the lease expires and another worker takes over.
Checkpoint Metadata
Completed checkpoint metadata is stored in K8s ConfigMaps:
{app}-ckpt-epoch-{N}— per-epoch metadata (workers, state paths, source offsets){app}-ckpt-latest— pointer to the latest completed epoch
ConfigMaps have no OwnerReference, so they survive CRD deletion. This allows recovery after redeployment.
Shared Filesystem Checkpoints
Checkpoint data is written to a shared filesystem (AWS EFS, GCP Filestore, Azure Files) mounted as a ReadWriteMany PVC. All workers write to and read from the same mount.
/mnt/volley/checkpoints/
epoch-42/
worker-0/ ← RocksDB SST files
worker-1/
worker-2/
Recovery is near-instant: the pod reads the checkpoint directory directly. No download step.
Worker Discovery
Workers discover each other via StatefulSet headless service DNS:
{app}-{ordinal}.{app}-headless.{namespace}.svc:{flight_port}
Live Rescaling
Since all workers share the same filesystem, rescaling doesn’t require state migration:
- Coordinator detects replica count change
- Triggers a checkpoint — all workers flush in-memory state to shared FS
- Computes new
KeyGroupAssignmentand diffs against the old one - Broadcasts
RescaleEventviawatchchannel - Workers stop processing lost key groups, load state for gained ones from the shared mount
- Resume — no restart, no data transfer
Key Types
| Type | Location | Purpose |
|---|---|---|
ClusterConfig | cluster::config | Worker ID, num workers, headless service, app name, flight port |
KeyGroupAssignment | cluster::assignment | Maps key groups to workers with contiguous range assignment |
KeyGroupRange | cluster::assignment | A [start, end) range of key groups owned by one worker |
ShuffleRouter | cluster::shuffle | Routes records to local or remote destinations |
ShuffleFlightService | cluster::flight::server | Arrow Flight DoExchange server |
ShuffleFlightClient | cluster::flight::client | Arrow Flight DoExchange client with connection pooling |
LeaseLeaderElector | cluster::coordinator::leader | K8s Lease-based leader election |
DistributedCheckpointCoordinator | cluster::coordinator::checkpoint_coordinator | ConfigMap-based checkpoint metadata |
RescaleCoordinator | cluster::rescale | Checkpoint + key group reassignment |
Writing Operators
Operators transform data as it flows through the pipeline. Volley provides built-in operators and lets you write custom ones.
Built-In Operators
| Method | Description |
|---|---|
.filter(fn) | Keep records matching a predicate |
.map(fn) | Transform each record |
.flat_map(fn) | Transform each record into zero or more records |
.apply_operator(op) | Apply a custom Operator implementation |
.key_by(field) | Partition by a key field (transitions to KeyedStream) |
.window(window_type) | Assign records to time windows (on KeyedStream) |
.aggregate(agg, field) | Compute aggregation per window per key |
Custom Operators
Implement the Operator trait from volley-core:
#![allow(unused)]
fn main() {
use volley_core::prelude::*;
struct MyOperator;
impl Operator for MyOperator {
fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
// Transform the record
// Return zero or more output records
vec![record]
}
fn on_checkpoint(&mut self, epoch: u64) {
// Optional: flush any buffered state before checkpoint
}
}
}
Apply it to a stream:
#![allow(unused)]
fn main() {
env.from_source(source)
.apply_operator(MyOperator)
.to_sink(sink)
.execute("my-job")
.await?;
}
Multi-Row RecordBatch Processing
Operators receive StreamRecord which wraps an Arrow RecordBatch. A batch may contain one or more rows. Your operator should handle multi-row batches correctly:
#![allow(unused)]
fn main() {
fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
let batch = &record.data;
let num_rows = batch.num_rows();
// Process all rows, not just row 0
// ...
}
}
Stateful Operators
For operators that maintain state across records, use the on_checkpoint() hook to flush state before a checkpoint snapshot:
#![allow(unused)]
fn main() {
struct CountingOperator {
counts: HashMap<String, u64>,
}
impl Operator for CountingOperator {
fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
// Update in-memory state
let key = record.key.clone().unwrap_or_default();
*self.counts.entry(key).or_default() += 1;
vec![record]
}
fn on_checkpoint(&mut self, epoch: u64) {
// Flush state to durable storage here
// The state backend will snapshot after this returns
}
}
}
See State Management for details on the write-behind cache and RocksDB backend.
Type-State Safety
The Pipeline Builder enforces valid operator chains at compile time. You can only call .aggregate() on a WindowedKeyedStream, not on a plain DataStream.
Writing Connectors
Connectors are how Volley reads from and writes to external systems. Volley provides built-in connectors for Kafka, cloud blob stores, Delta Lake, and Iceberg. You can also write your own.
Source and Sink Traits
#![allow(unused)]
fn main() {
// Source: produces StreamRecords
trait Source: Send {
async fn poll(&mut self) -> Option<StreamRecord>;
fn connector_id(&self) -> &str;
}
// Sink: consumes StreamRecords
trait Sink: Send {
async fn write(&mut self, record: StreamRecord) -> Result<()>;
async fn flush(&mut self) -> Result<()>;
fn on_checkpoint(&mut self, epoch: u64);
fn connector_id(&self) -> &str;
}
}
Sources produce StreamRecord values via poll(). Sinks consume them via write() and flush(). Both support the checkpoint lifecycle via on_checkpoint().
Cloud Blob Store Pattern
All cloud blob store connectors follow a shared architecture:
Cloud Queue/Topic Cloud Storage
(SQS / Azure Queue / (S3 / Azure Blob /
Pub/Sub) GCS)
| |
v v
NotificationSource BlobReader
| |
+--------+ +-----------+
| |
v v
BlobSource (volley-connector-blob-store)
|
v
Decoder (Parquet / JSON / CSV / Avro)
|
v
RecordBatch --> Pipeline
To add a new cloud backend, implement two traits:
NotificationSource— polls a queue for new object notificationsBlobReader— reads object data from cloud storage
The decoder layer (Parquet, JSON, CSV, Avro -> Arrow RecordBatch) is shared across all cloud backends.
Feature Flags
The volley-connectors umbrella crate uses feature flags:
| Feature | What it includes |
|---|---|
kafka | KafkaSource, KafkaSink |
blob-store | BlobSource, decoders |
blob-store-aws | S3Source, SqsNotificationSource |
blob-store-azure | AzureBlobSource, AzureQueueNotificationSource |
blob-store-gcs | GcsSource, PubSubNotificationSource |
file-formats | Parquet, JSON, CSV, Avro decoders |
table-formats | Delta Lake + Iceberg sinks |
Existing Connectors
- Kafka — source and sink with exactly-once semantics
- Delta Lake — sink with epoch-tagged commits
- Iceberg — sink via REST catalog
- AWS S3 — source via SQS notifications
- Blob Store — Azure Blob and GCS sources
Windowing
Windowing groups records by time intervals for aggregation. Volley supports three window types with event-time semantics.
Window Types
Tumbling Windows
Fixed-size, non-overlapping windows:
#![allow(unused)]
fn main() {
.window(TumblingWindows::of(Duration::from_secs(60)))
}
Each record belongs to exactly one window.
Sliding Windows
Fixed-size, overlapping windows:
#![allow(unused)]
fn main() {
.window(SlidingWindows::of(
Duration::from_secs(60), // window size
Duration::from_secs(10), // slide interval
))
}
Each record may belong to multiple windows.
Session Windows
Dynamic windows that merge on activity:
#![allow(unused)]
fn main() {
.window(SessionWindows::with_gap(Duration::from_secs(30)))
}
A new session starts when the gap between records exceeds the threshold.
Event-Time Semantics
Event stream: --[e1 t=1]--[e2 t=3]--[e3 t=7]--[watermark t=10]--[e4 t=5]-->
TumblingWindows(size=5):
Window [0, 5): e1, e2 -- fires at watermark t=10 (window end <= 10)
Window [5, 10): e3 -- fires at watermark t=10
e4 (t=5): late data -- routed to side output if allowed_lateness passed
SessionWindows(gap=3):
Session 1: e1(t=1), e2(t=3) -- gap(3-1)=2 < 3, merged
Session 2: e3(t=7) -- gap(7-3)=4 >= 3, new session
Windows fire when a watermark advances past the window’s end time.
Watermarks
Watermarks track event-time progress. The BoundedOutOfOrdernessGenerator allows configurable out-of-orderness before advancing the watermark:
#![allow(unused)]
fn main() {
.with_watermark_generator(
BoundedOutOfOrdernessGenerator::new(Duration::from_secs(5))
)
}
This allows records to arrive up to 5 seconds late before the watermark advances past them.
Late Data
Records that arrive after the watermark has passed their window are considered late. Configure side_output_late_data() with a tag and attach a side output sink via with_late_data_sink() to route late events to a separate destination (e.g., a dead-letter queue):
#![allow(unused)]
fn main() {
let late_tag = OutputTag::new("late-events");
StreamExecutionEnvironment::new()
.from_source(source)
.with_watermarks(WatermarkConfig::new(Duration::from_secs(5)))
.with_late_data_sink(late_events_sink) // side output sink
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(60)))
.allowed_lateness(Duration::from_secs(10))
.side_output_late_data(late_tag)
.aggregate_expr(sum(col("amount")), state_backend)
.to_sink(main_sink)
.execute("job")
.await?;
}
Late events are written to the side output sink as StreamRecords with the schema from late_event_schema(): event_data (JSON string), event_time, watermark, window_start, window_end, lateness_ms. The side output sink participates in checkpoint barriers for exactly-once semantics.
Without with_late_data_sink(), late events are logged and discarded.
Memory Management
The window operator tracks active (key, window) pairs in an in-memory index called pending_windows. With high-cardinality keys or wide/sliding windows, this index can grow large.
Bounding Window Memory with LRU
Use with_max_pending_windows() to cap the in-memory index size. Evicted entries are persisted to the state backend and scanned during watermark advancement, so no data is lost:
#![allow(unused)]
fn main() {
stream
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(60)))
.with_max_pending_windows(100_000) // bound in-memory window index
.aggregate_expr(sum(col("amount")), state_backend)
}
When unset (the default), the index is unbounded — identical to previous releases.
Sizing guidance:
| Key cardinality | Windows per key | Recommended limit | Approx. memory |
|---|---|---|---|
| < 10K | Any | Unbounded (default) | < 10 MB |
| 10K–100K | 1 (tumbling) | 100,000 | ~10 MB |
| 100K–1M | 1 (tumbling) | 500,000 | ~50 MB |
| Any | 10+ (sliding) | Multiply by panes | Varies |
Each pending entry is ~100 bytes. Evicted entries add a small RocksDB I/O cost on watermark advance.
Note:
with_max_pending_windows()is not recommended with session windows. Session window merging requires scanning the pending index for overlapping sessions, and evicted entries may cause additional state backend I/O during merges.
Monitoring
Monitor the volley_window_pending_size gauge and volley_window_pending_evictions_total counter to tune the limit. A high eviction rate indicates the LRU is too small for the workload. See Observability for the full metrics reference.
Per-Record Tracing
Volley can trace individual records as they flow through a pipeline, creating OpenTelemetry spans at each processing stage. This gives you a complete picture of what happened to a record: which operators processed it, how long each step took, what the input and output looked like, and whether any errors occurred.
Quick Start
#![allow(unused)]
fn main() {
use volley_core::prelude::*;
use volley_core::observability::{TracingConfig, SamplingStrategy};
let results = StreamExecutionEnvironment::new()
.from_source(my_source)
.with_tracing(TracingConfig {
sampling: SamplingStrategy::Ratio(0.01),
..Default::default()
})
.map(|record| {
// Your transformation
Ok(record)
})
.to_sink(my_sink)
.execute("traced-pipeline")
.await?;
}
Open Jaeger or Grafana Tempo to see traces. Each sampled record appears as a trace with child spans for source, operators, and sink.
How It Works
- Sampling decision happens at the source. Based on your
SamplingStrategy, the runtime decides whether to trace each record. - Root span is created for sampled records (
volley.record). The span’s context is attached to theStreamRecord.tracefield. - Child spans are created automatically at each pipeline stage (operators, sink). No code changes needed in your operators.
- Payload previews (optional) capture a JSON snapshot of the first row at each operator’s input and output, size-capped to
max_payload_bytes. - Cleanup happens at pipeline exit. Any root spans for records that were filtered or lost to errors are ended.
Sampling
At production throughput (thousands of records per second), tracing every record would overwhelm your tracing backend. Use Ratio sampling:
| Throughput | Suggested ratio | Traces/sec |
|---|---|---|
| 1K rec/s | 0.10 (10%) | ~100 |
| 10K rec/s | 0.01 (1%) | ~100 |
| 100K rec/s | 0.001 (0.1%) | ~100 |
Use Always only during development or debugging.
Window and Aggregate Traces
Window and aggregate operators accumulate input records into new output records. The output gets its own trace, linked back to the input traces via OpenTelemetry span links. This preserves lineage without creating misleading parent-child relationships.
The volley.window.input_trace_count attribute tells you how many sampled records contributed to each aggregation result.
Kafka Trace Propagation
When using Kafka connectors, trace context propagates automatically via W3C traceparent headers:
- Kafka source extracts
traceparentfrom message headers - Kafka sink injects
traceparentinto produced messages
A traced request in an upstream service continues its trace through Volley and into downstream consumers. The runtime respects upstream sampling decisions (parent-based sampling).
Configuration Reference
| Field | Type | Default | Description |
|---|---|---|---|
sampling | SamplingStrategy | Never | Which records to trace |
max_payload_bytes | usize | 1024 | Max JSON preview size per span event |
capture_payload | bool | true | Whether to attach input/output previews |
Viewing Traces
Volley emits traces via the OTLP gRPC exporter configured in ObservabilityConfig. Point it at your collector:
#![allow(unused)]
fn main() {
.with_observability(ObservabilityConfig::new()
.with_node_id("node-1")
.with_otlp_endpoint("http://otel-collector:4317"))
.with_tracing(TracingConfig {
sampling: SamplingStrategy::Ratio(0.01),
..Default::default()
})
}
Compatible backends: Jaeger, Grafana Tempo, Datadog, Honeycomb, or any OTLP-compatible collector.
Error Handling
When a record fails during processing (e.g., a transformation returns an error), the record’s trace span is:
- Marked with
otel.status_code = ERROR - Annotated with the error message via
span.record_error() - Ended at the point of failure
Failed records are visible in your tracing backend by filtering on otel.status_code = ERROR. This makes tracing a useful debugging tool even when records are dropped or filtered.
Performance Impact
Tracing adds minimal overhead when sampling is configured appropriately:
| Sampling | Overhead | Notes |
|---|---|---|
Never | ~0% | No spans created; trace context still propagated for Kafka |
Ratio(0.001) | < 1% | Recommended for high-throughput production |
Ratio(0.01) | ~1-2% | Good for staging or moderate-throughput production |
Always | ~5-10% | Development and debugging only |
The overhead comes from span creation and OTLP export, not from sampling decisions (which are a single random number comparison).
Troubleshooting
No traces appearing in the backend
- Verify
with_observability()is configured with the correctotlp_endpoint - Check that the OTLP collector is reachable from your application
- Confirm
samplingis not set toNever(the default) - Check collector logs for rejected spans (often caused by resource limits)
Traces appear but are missing operators
- Ensure
with_tracing()is called before operators are added to the pipeline - Custom operators using
apply_operator()create spans automatically; no manual instrumentation needed
Kafka traces not connecting to upstream services
- The upstream producer must inject W3C
traceparentheaders into Kafka message headers - If the upstream doesn’t send
traceparent, Volley creates a new root trace (no parent linkage)
ML Inference
The volley-ml crate adds machine learning inference operators to Volley pipelines. Run models embedded in-process or call external model servers — all as standard stream operators.
Backend Configuration
Configure the ML backend once and share it across operators:
#![allow(unused)]
fn main() {
use std::sync::Arc;
use volley_ml::prelude::*;
// CPU-only ONNX (simplest)
let ml = Arc::new(MlBackendConfig::onnx_cpu());
// ONNX on CUDA GPU with limits
let ml = Arc::new(MlBackendConfig::onnx_cuda(0)
.with_num_threads(4)
.with_memory_limit(4 * 1024 * 1024 * 1024));
// Candle on CPU (pure Rust, no C++ toolchain)
let ml = Arc::new(MlBackendConfig::candle_cpu());
// Candle on Metal (macOS GPU)
let ml = Arc::new(MlBackendConfig::candle_metal());
// ONNX on macOS Metal / Neural Engine via CoreML
let ml = Arc::new(MlBackendConfig::onnx_coreml());
// CoreML with specific compute units and model format
let ml = Arc::new(MlBackendConfig::onnx_coreml()
.with_coreml_compute_units(CoreMLComputeUnits::CpuAndGpu)
.with_coreml_model_format(CoreMLModelFormat::MLProgram));
// Candle with explicit architecture and HF revision
let ml = Arc::new(MlBackendConfig::candle_cpu()
.with_architecture(CandleArchitecture::Bert)
.with_hf_revision("v1.0"));
}
Loading ONNX Models from HuggingFace Hub
Pass a HuggingFace repo ID as the model path — the backend downloads and caches the ONNX model and tokenizer automatically:
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cpu());
stream
.embed(EmbeddingConfig::new(
"sentence-transformers/all-MiniLM-L6-v2", // HF repo ID
"text",
ml.clone(),
))
.await?
}
Models are cached in ~/.cache/huggingface/hub/ and reused across runs.
Revision Pinning
Pin a specific revision with @:
#![allow(unused)]
fn main() {
EmbeddingConfig::new("user/model@v1.0", "text", ml.clone())
}
Or set it on the config:
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cpu()
.with_hf_revision("abc123"));
}
ONNX Model Variants
Many repos include optimized variants. Select one with with_onnx_model_file():
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cpu()
.with_onnx_model_file("onnx/model_O2.onnx")); // O2 optimized
}
Common variants:
onnx/model.onnx— standard (default)onnx/model_O2.onnx— O2 optimizedonnx/model_qint8_arm64.onnx— quantized for ARM64
Operators
Embedding (.embed())
Generate vector embeddings from a text column:
#![allow(unused)]
fn main() {
stream
.embed(EmbeddingConfig::new("models/e5-small.onnx", "text", ml.clone())
.with_output_column("embedding")
.with_embedding_dim(384))
.await?
}
Appends a FixedSizeList<Float32> column to the output.
Classification (.classify())
Classify records and get label + confidence score:
#![allow(unused)]
fn main() {
stream
.classify(ClassifyConfig::new("models/fraud.onnx", "features", ml.clone())
.with_label_column("is_fraud")
.with_score_column("fraud_score")
.with_labels(vec!["legitimate", "fraud"]))
.await?
}
Appends Utf8 label and Float32 score columns.
Generic Inference (.infer())
Run any ONNX model on selected columns:
#![allow(unused)]
fn main() {
stream
.infer(InferenceConfig::new("models/custom.onnx", ml.clone())
.with_input_columns(vec!["feature_1", "feature_2"])
.with_output_columns(vec![
("prediction", DataType::Float32),
]))
.await?
}
External Model Server (.infer_remote())
Call OpenAI-compatible APIs (vLLM, TGI, LiteLLM):
#![allow(unused)]
fn main() {
stream
.infer_remote(RemoteInferenceConfig::new(
"http://localhost:8000/v1/embeddings",
ApiFormat::OpenAI,
)
.with_input_columns(vec!["text"])
.with_output_columns(vec![("embedding", DataType::Float32)])
.with_max_concurrent(32)
.with_timeout(Duration::from_secs(10)))
.await?
}
Candle Backend
The Candle backend provides pure-Rust inference using HuggingFace’s Candle library. No C++ toolchain required.
Loading from HuggingFace Hub
Pass a HuggingFace repo ID as the model path — the backend downloads and caches model weights, config, and tokenizer automatically:
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cpu());
stream
.embed(EmbeddingConfig::new(
"sentence-transformers/all-MiniLM-L6-v2", // HF repo ID
"text",
ml.clone(),
))
.await?
}
Pin a specific revision with @:
#![allow(unused)]
fn main() {
EmbeddingConfig::new("user/model@v1.0", "text", ml.clone())
}
Or set it on the config:
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cpu()
.with_hf_revision("abc123"));
}
Loading from Local Files
Point to a local directory containing model.safetensors, config.json, and optionally tokenizer.json:
#![allow(unused)]
fn main() {
stream
.embed(EmbeddingConfig::new("/models/my-bert", "text", ml.clone()))
.await?
}
Supported Architectures
The backend auto-detects the architecture from the model’s config.json (model_type field):
model_type | Architecture | Use Cases |
|---|---|---|
bert, distilbert, roberta | BERT | Embeddings, classification |
Override auto-detection with with_architecture():
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cpu()
.with_architecture(CandleArchitecture::Bert));
}
Tokenizer Integration
When the tokenizers feature is enabled and the model has a tokenizer.json, text columns are automatically tokenized before the forward pass. No manual tokenization needed.
volley-ml = { version = "0.8.0", features = ["candle", "tokenizers"] }
GPU Acceleration
GPU inference is 10–100× faster for transformer models. Volley supports CUDA (NVIDIA) and Metal (macOS) GPUs.
CUDA (NVIDIA GPUs)
ONNX Runtime: CUDA is handled at runtime via execution providers — no
compile-time feature flag needed. Use MlBackendConfig::onnx_cuda(device_id):
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cuda(0)); // GPU 0
}
If CUDA is unavailable, the ONNX Runtime silently falls back to CPU.
Candle: Requires the cuda compile-time feature (pulls in cudarc CUDA
bindings):
volley-ml = { version = "0.8.2", features = ["candle", "tokenizers", "cuda"] }
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cuda(0));
}
Prerequisites: NVIDIA driver and CUDA toolkit must be installed. Verify
with nvidia-smi or volley doctor.
Metal (macOS GPUs)
Requires the metal compile-time feature:
volley-ml = { version = "0.8.2", features = ["candle", "tokenizers", "metal"] }
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_metal());
}
CoreML (ONNX on macOS)
The CoreML execution provider accelerates ONNX models on macOS using Metal GPU
and/or the Apple Neural Engine. Requires the coreml compile-time feature:
volley-ml = { version = "0.8.3", features = ["onnx", "coreml"] }
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_coreml());
}
If CoreML is unavailable (e.g., on Linux), the ONNX Runtime silently falls back to CPU.
Partial operator coverage: CoreML does not support every ONNX operation. Unsupported ops automatically fall back to CPU within the same session. For example, DistilBERT has 387 graph nodes but only 17 are CoreML-eligible. This means most computation still runs on CPU, and the CoreML compilation overhead can make the “GPU” path slower than pure CPU for small models or small batch sizes. Use
GPU=cputo benchmark both paths. Models with convolution-heavy architectures (vision models) tend to have better CoreML coverage than attention-heavy transformers.
Compute units control which hardware CoreML dispatches to:
| Value | Hardware |
|---|---|
All (default) | CPU + GPU + Neural Engine |
CpuAndGpu | CPU + Metal GPU only |
CpuAndNeuralEngine | CPU + Neural Engine only |
CpuOnly | CPU only (debugging) |
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_coreml()
.with_coreml_compute_units(CoreMLComputeUnits::CpuAndGpu));
}
Model format controls the internal CoreML representation:
| Format | Requirement | Notes |
|---|---|---|
NeuralNetwork (default) | macOS 10.15+ | Broadest compatibility |
MLProgram | macOS 12+ | More operators, potentially faster |
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_coreml()
.with_coreml_model_format(CoreMLModelFormat::MLProgram));
}
GPU Detection
Check GPU availability at runtime without loading a model:
#![allow(unused)]
fn main() {
let info = volley_ml::gpu::detect_gpu();
println!("{}", info.summary);
// "CUDA: disabled (enable `cuda` feature); Metal: available; CoreML: available"
if info.cuda_available {
println!("Using GPU with {} CUDA device(s)", info.cuda_device_count);
}
if info.coreml_available {
println!("CoreML available for ONNX Metal acceleration");
}
}
The volley doctor CLI command also reports GPU status.
Feature Flags
Add volley-ml to your Cargo.toml with the features you need:
[dependencies]
# ONNX Runtime backend (CPU)
volley-ml = { version = "0.8.2", features = ["onnx"] }
# ONNX on CUDA GPU (runtime EP registration, no compile-time flag)
volley-ml = { version = "0.8.2", features = ["onnx"] }
# Candle backend with tokenizers (pure Rust, CPU)
volley-ml = { version = "0.8.2", features = ["candle", "tokenizers"] }
# Candle on CUDA GPU
volley-ml = { version = "0.8.2", features = ["candle", "tokenizers", "cuda"] }
# Candle on macOS Metal GPU
volley-ml = { version = "0.8.2", features = ["candle", "tokenizers", "metal"] }
# ONNX on macOS CoreML (Metal GPU + Neural Engine)
volley-ml = { version = "0.8.3", features = ["onnx", "coreml"] }
# External model servers
volley-ml = { version = "0.8.2", features = ["remote-http"] }
Backend Comparison
ONNX Runtime (onnx) | Candle (candle) | |
|---|---|---|
| Language | C++ via FFI | Pure Rust |
| Model format | ONNX (universal) | SafeTensors / HF Hub |
| Model loading | Local file or HuggingFace Hub | Local file or HuggingFace Hub |
| Tokenizer | External (user handles) | Built-in (tokenizers feature) |
| GPU | CUDA, TensorRT, CoreML (coreml feature) | CUDA, Metal |
| Best for | Any ONNX-exportable model | HF transformer models |
| Build | Needs C++ toolchain (auto-downloaded) | Pure cargo build |
Performance Tuning
Session Pooling (ONNX)
By default, a single ONNX session is shared across all inference calls. For concurrent pipelines, pool multiple sessions to eliminate lock contention:
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::onnx_cuda(0)
.with_session_pool_size(4)); // 4 sessions, round-robin
}
Each session is an independent model copy. A pool of 2–4 is usually sufficient.
Embedding Cache
Enable an LRU cache to skip inference for repeated text:
#![allow(unused)]
fn main() {
stream
.embed(EmbeddingConfig::new("model.onnx", "text", ml.clone())
.with_cache(10_000)) // Cache up to 10,000 unique embeddings
.await?
}
Identical input strings are served from cache. This can eliminate 50–90% of model calls in pipelines with repeated product names, log messages, etc.
Half-Precision Inference (FP16 / BF16)
Load Candle model weights in half precision for 2× memory savings and faster GPU inference on hardware with tensor cores:
#![allow(unused)]
fn main() {
let ml = Arc::new(MlBackendConfig::candle_cuda(0)
.with_dtype(ModelDType::F16));
}
On CPU, F16/BF16 are automatically promoted to F32 since most CPUs lack native half-precision arithmetic.
Remote Batch Accumulation
When upstream batches are small, buffer them before sending to the model server:
#![allow(unused)]
fn main() {
RemoteInferenceConfig::new("http://model-server:8000/v1/embeddings", ApiFormat::OpenAI)
.with_batch_accumulation(64, Duration::from_millis(50))
.with_max_concurrent(16)
}
This sends a single HTTP request once 64 rows accumulate or 50ms elapse, reducing round trips.
Running the Example
The ML inference example works out of the box with zero configuration. It auto-downloads a sentiment classification model from HuggingFace Hub and auto-detects Metal (CoreML) on macOS:
# Just works — downloads model from Hub, auto-detects Metal on macOS
cargo run --example ml_inference_pipeline -p volley-examples --features ml
Override defaults with environment variables:
MODEL_PATH=./my-model.onnx— use a local ONNX model instead of downloading from HubGPU=cpu— force CPU backend (disables Metal/CoreML auto-detection)
Recent Bug Fixes
- Hub tokenizer resolution — tokenizer download now checks sibling paths
(e.g.,
onnx/tokenizer.json) whentokenizer.jsonis not at the repo root. - Softmax normalization — classification scores now use softmax normalization, producing proper [0, 1] probabilities instead of raw logits.
CLI Template
Scaffold an ML pipeline project with one command:
volley new --template ml-pipeline my-classifier
cd my-classifier
This generates a Kafka-to-Kafka pipeline with ONNX classification:
src/main.rs— Reads text from Kafka, classifies with.classify(), writes enriched records to output topicCargo.toml— Includesvolley-mlwithonnxandtokenizersfeatures.env.example— Kafka connection andMODEL_PATHconfiguration
Edit src/main.rs to customize:
- Change the model: Update
MODEL_PATHand the label list in.classify() - Swap operators: Replace
.classify()with.embed()for embeddings or.infer()for generic inference - Enable GPU: Change
MlBackendConfig::onnx_cpu()toMlBackendConfig::onnx_cuda(0) - Use Candle: Switch to
MlBackendConfig::candle_cpu()for pure-Rust inference - Add remote inference: Chain
.infer_remote(RemoteInferenceConfig::new(...))
Integration Testing
Volley provides the volley-test-harness crate — a shared test framework inspired by Apache Flink’s testing patterns. It gives you containers, assertion harnesses, and a standardized connector test suite so you can validate pipelines end-to-end with real infrastructure.
Architecture
The test harness is layered — each layer builds on the one below it:
┌──────────────────────────────────────────────────┐
│ End-to-End Tests (tests/) │
│ Full pipelines with real connectors │
├──────────────────────────────────────────────────┤
│ Connector Framework (connector/) │
│ ConnectorTestContext + source/sink suites │
├──────────────────────────────────────────────────┤
│ Assertion Harnesses (assertions/) │
│ MetricsSnapshot, TraceCapture, ThroughputRecorder│
├──────────────────────────────────────────────────┤
│ TestCluster (cluster.rs) │
│ In-process pipeline runner + observability │
├──────────────────────────────────────────────────┤
│ Container Management (containers/) │
│ Kafka, MinIO with retry logic + ARM64 images │
└──────────────────────────────────────────────────┘
Quick Start
Add volley-test-harness as a dev-dependency in your connector crate:
[dev-dependencies]
volley-test-harness = { path = "../volley-test-harness" }
async-trait = { workspace = true }
arrow = { workspace = true }
anyhow = { workspace = true }
Container Management
Shared testcontainers with retry logic and one-container-per-binary lifecycle via OnceCell. Containers start lazily on first use and are cleaned up when the test process exits.
Kafka
#![allow(unused)]
fn main() {
use volley_test_harness::containers::KafkaContainer;
// Starts a native ARM64 Kafka container on first call.
let servers = KafkaContainer::bootstrap_servers().await;
// Each test gets an isolated topic.
let topic = KafkaContainer::unique_topic("my-test");
KafkaContainer::create_topic(&topic, 1).await;
// Seed test data.
KafkaContainer::produce_json(&topic, &[
serde_json::json!({"name": "alice", "score": 42}),
]).await;
// Verify output.
let messages = KafkaContainer::consume_json(&topic, 1, 10).await;
assert_eq!(messages[0]["name"], "alice");
}
MinIO (S3-compatible)
#![allow(unused)]
fn main() {
use volley_test_harness::containers::{MinioContainer, MINIO_ACCESS_KEY, MINIO_SECRET_KEY};
let endpoint = MinioContainer::endpoint().await;
let bucket = MinioContainer::unique_bucket("test");
MinioContainer::create_bucket(&bucket).await;
}
Polaris (Apache Iceberg REST catalog)
#![allow(unused)]
fn main() {
use volley_test_harness::containers::PolarisContainer;
// Starts a Polaris container backed by MinIO. MinIO must be started first.
let catalog_url = PolarisContainer::catalog_url().await;
// Each test gets an isolated catalog name.
let catalog = PolarisContainer::unique_catalog("my-test");
PolarisContainer::create_catalog(&catalog).await;
}
PolarisContainer depends on MinioContainer for the underlying storage — start MinioContainer before calling any PolarisContainer method.
Connector Test Framework
Like Flink’s Connector Testing Framework — implement ConnectorTestContext for your connector and get a standardized source/sink test suite for free.
Implementing ConnectorTestContext
#![allow(unused)]
fn main() {
use volley_test_harness::connector::ConnectorTestContext;
struct MyConnectorTestContext { /* ... */ }
#[async_trait]
impl ConnectorTestContext for MyConnectorTestContext {
fn name(&self) -> &str { "MyConnector" }
fn test_schema(&self) -> Arc<Schema> { /* ... */ }
async fn seed_test_data(&self, records: &[RecordBatch]) -> anyhow::Result<()> {
// Write records into the external system.
}
async fn create_source(&self) -> anyhow::Result<Option<Box<dyn DynSource>>> {
// Create a source pointing at the seeded data.
}
async fn create_sink(&self) -> anyhow::Result<Option<Box<dyn DynSink>>> {
// Create a sink writing to a test destination.
}
async fn read_sink_results(&self) -> anyhow::Result<Vec<RecordBatch>> {
// Read back what the sink wrote for verification.
}
async fn create_empty_source(&self) -> anyhow::Result<Option<Box<dyn DynSource>>> {
// Optional: source pointing at an empty data set.
Ok(None) // Return None to skip the empty-source test.
}
}
}
Running the Suites
#![allow(unused)]
fn main() {
#[tokio::test]
async fn my_connector_source_suite() {
let ctx = MyConnectorTestContext::new().await;
volley_test_harness::connector::run_source_suite(&ctx).await;
}
#[tokio::test]
async fn my_connector_sink_suite() {
let ctx = MyConnectorTestContext::new().await;
volley_test_harness::connector::run_sink_suite(&ctx).await;
}
}
The source suite runs three tests:
- test_basic_read — seeds 5 records, verifies the source returns data
- test_read_all_records — seeds 10 records, reads until all consumed
- test_empty_source — verifies graceful handling of empty data sets
The sink suite runs three tests:
- test_basic_write — writes records, flushes, verifies they’re readable
- test_write_and_flush — multiple writes followed by flush
- test_checkpoint_triggers_flush — verifies
on_checkpoint()commits data
See volley-connector-kafka/tests/connector_suite.rs for the reference implementation.
TestCluster
An in-process pipeline execution wrapper with built-in observability capture. Registers an InMemorySpanExporter as the global OpenTelemetry tracer so spans created during pipeline execution are captured for assertions.
#![allow(unused)]
fn main() {
use volley_test_harness::cluster::TestCluster;
let cluster = TestCluster::new("my-test");
let pipeline = StreamExecutionEnvironment::new()
.from_source(source)
.with_tracing(cluster.tracing_config()) // 100% sampling, in-memory export
.with_operator(my_operator)
.with_operator_id("my-op")
.to_sink(sink);
pipeline.execute("my-pipeline").await?;
// Inspect captured spans.
let traces = cluster.traces();
println!("Captured {} spans", traces.span_count());
traces.print_timing_summary();
}
Assertion Harnesses
TraceCapture
Assert on OpenTelemetry spans captured by the TestCluster:
#![allow(unused)]
fn main() {
let traces = cluster.traces();
// Verify span counts.
traces.assert_span_count("volley.operator", 10);
// Verify trace structure.
traces.assert_valid_tree(); // No orphan spans.
// Print per-operator timing breakdown.
traces.print_timing_summary();
}
The timing summary shows per-span-name breakdown with operator identity:
Span Count Total ms Avg ms Max ms
--------------------------------------------------------------------------------
volley.record 10 559.2 55.9 100.3
volley.sink 8 72.0 9.0 11.0
volley.operator (ml-embed) 8 56.8 7.1 25.9
volley.operator (status-filter) 10 1.1 0.1 0.8
volley.operator (proto-decode) 10 0.8 0.1 0.5
volley.source 10 0.4 0.0 0.3
Use .with_operator_id("name") on your pipeline to assign human-readable names.
MetricsSnapshot
Scrape and assert on Prometheus metrics:
#![allow(unused)]
fn main() {
let metrics = MetricsSnapshot::scrape(prometheus_port).await?;
metrics.assert_counter_exists("volley_source_records_polled");
metrics.assert_counter_gte("volley_records_processed_total", 100.0);
metrics.assert_gauge_in_range("volley_pipeline_health", 1.0, 1.0);
}
ThroughputRecorder
Measure and assert on pipeline throughput:
#![allow(unused)]
fn main() {
let mut recorder = ThroughputRecorder::new();
recorder.start();
pipeline.execute("bench").await?;
recorder.stop(record_count);
recorder.assert_throughput_gte(100.0); // min records/sec
recorder.assert_elapsed_under_secs(30.0); // max wall time
println!("{:.1} records/sec", recorder.records_per_second().unwrap());
}
Test Data Generators
The data module provides test data factories:
#![allow(unused)]
fn main() {
use volley_test_harness::data::*;
// Protobuf messages (serialized bytes).
let protos = generate_crawl_results(50);
// Every 5th record is a failure (HTTP 503), rest are HTTP 200.
// Arrow RecordBatch (for non-protobuf tests).
let batch = generate_crawl_batch(50);
// The schema used by both generators.
let schema = crawl_result_schema();
}
The CrawlResult type has impl_to_arrow! so it works with auto_decode_operator::<CrawlResult>("payload").
Running Tests
All integration tests require Docker for testcontainers.
# Kafka connector suite (6 tests, ~45s)
cargo test -p volley-connector-kafka --test connector_suite -- --nocapture
# Iceberg connector suite (Polaris + MinIO, ~60s)
cargo test -p volley-connector-iceberg --test connector_suite -- --nocapture
# E2E golden path — full pipeline with windowing, metrics, traces, and ML (1 test, ~30s)
cargo test -p volley-test-harness --test e2e_golden_path -- --nocapture
# With Metal GPU disabled (CPU fallback)
cargo test -p volley-test-harness --test e2e_golden_path --no-default-features -- --nocapture
Where Tests Live
Connector-specific integration tests belong in their connector crate, importing shared infrastructure from volley-test-harness:
| Crate | Tests | Purpose |
|---|---|---|
volley-connector-kafka | tests/connector_suite.rs | Kafka source/sink through ConnectorTestContext |
volley-connector-iceberg | tests/connector_suite.rs | Iceberg source/sink through ConnectorTestContext (Polaris + MinIO) |
volley-test-harness | tests/e2e_golden_path.rs | Cross-connector end-to-end pipeline test |
The pattern: volley-test-harness provides the framework, each connector crate owns its own integration tests.
Golden Path Test Coverage
The E2E golden path test (e2e_golden_path.rs) exercises the full pipeline:
- Correctness — Kafka protobuf → decode → filter → ML embed → Kafka sink, asserting output records match expectations
- Windowed aggregation — tumbling window over the event stream with watermark advancement
- Throughput —
ThroughputRecorderasserts sustained records/sec - Metrics —
MetricsSnapshotscrapes the embedded Prometheus endpoint and asserts counters (volley_records_processed_total,volley_source_records_polled, pipeline health gauge) - Traces —
TraceCapturefromTestClusterasserts span count, valid parent–child tree, and per-operator timing viaprint_timing_summary()
Horizontal Scaling Guide
This guide covers how to run a Volley pipeline across multiple K8s pods for workloads that exceed a single node.
Prerequisites
- Kubernetes cluster (v1.26+)
- A
ReadWriteManystorage class (AWS EFS, GCP Filestore, Azure Files) - The Volley K8s operator deployed (see Kubernetes Operator)
- Your pipeline built with the
distributedfeature enabled
When to Use Horizontal Scaling
Single-node mode handles most workloads — Volley sustains 9M+ records/sec on one node. Consider horizontal scaling when:
- Your key space is too large for one node’s memory (state doesn’t fit in RAM + RocksDB)
- You need fault tolerance across nodes (pod failure shouldn’t stop the pipeline)
- Your source has more partitions than one node can consume (e.g., 100+ Kafka partitions)
If your bottleneck is CPU on stateless operators (filter, map), scale vertically first. Horizontal scaling adds network overhead for the key shuffle.
Step 1: Enable the Distributed Feature
Add the distributed feature to your pipeline’s Cargo.toml:
[dependencies]
volley-core = { version = "0.8", features = ["distributed"] }
Step 2: Configure ClusterConfig
In your pipeline code, add with_cluster() to the environment:
#![allow(unused)]
fn main() {
use volley_core::prelude::*;
// In production, use ClusterConfig::from_env() to auto-detect from K8s env vars
let cluster = ClusterConfig::from_env();
let mut env = StreamExecutionEnvironment::new();
if let Some(config) = cluster {
env = env.with_cluster(config);
}
env.from_source(source)
.key_by(col("user_id"))
.aggregate_expr(sum(col("amount")), state_backend)
.to_stream()
.to_sink(sink)
.execute("my-pipeline")
.await?;
}
The pipeline API is identical — with_cluster() is the only addition. When ClusterConfig is absent (e.g., running locally), the pipeline runs in single-node mode with zero overhead.
Step 3: Create a Shared Checkpoint PVC
All workers need access to the same checkpoint directory. Create a ReadWriteMany PVC:
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: volley-shared-ckpt
namespace: my-pipeline
spec:
accessModes: [ReadWriteMany]
storageClassName: efs # AWS EFS, or your cloud's RWX storage class
resources:
requests:
storage: 50Gi
kubectl apply -f shared-pvc.yaml
Step 4: Deploy with the K8s Operator
Create a VolleyApplication with the cluster section:
apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
name: my-pipeline
namespace: my-pipeline
spec:
image: registry.example.com/my-pipeline:v1.0
replicas: 3
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
cluster:
maxKeyGroups: 256 # default, power of 2
flightPort: 50051 # default
sharedCheckpoints:
claimName: volley-shared-ckpt # the PVC from step 3
state:
size: 10Gi # per-worker local RocksDB
checkpoints:
size: 5Gi # per-worker local checkpoint staging
The operator automatically:
- Injects
VOLLEY_*environment variables (VOLLEY_NUM_WORKERS,VOLLEY_HEADLESS_SERVICE,VOLLEY_MAX_KEY_GROUPS,VOLLEY_FLIGHT_PORT,VOLLEY_APP_NAME) - Adds the Arrow Flight port (50051) to the container and headless service
- Mounts the shared checkpoint PVC at
/mnt/volley/checkpoints
Step 5: Verify
# Check pods are running
kubectl get pods -n my-pipeline
# NAME READY STATUS RESTARTS AGE
# my-pipeline-0 1/1 Running 0 2m
# my-pipeline-1 1/1 Running 0 1m
# my-pipeline-2 1/1 Running 0 30s
# Check env vars
kubectl exec my-pipeline-0 -n my-pipeline -- env | grep VOLLEY
# VOLLEY_NUM_WORKERS=3
# VOLLEY_HEADLESS_SERVICE=my-pipeline-headless.my-pipeline.svc
# VOLLEY_MAX_KEY_GROUPS=256
# VOLLEY_FLIGHT_PORT=50051
# VOLLEY_APP_NAME=my-pipeline
# Check Flight port on headless service
kubectl get svc my-pipeline-headless -n my-pipeline -o jsonpath='{.spec.ports[*].name}'
# health flight
# Check shared checkpoint mount
kubectl exec my-pipeline-0 -n my-pipeline -- ls /mnt/volley/checkpoints
Scaling
Change the replica count to scale:
kubectl patch vapp my-pipeline -n my-pipeline --type merge -p '{"spec":{"replicas":5}}'
The coordinator triggers a checkpoint, recomputes key group assignments, and workers pick up their new key groups from the shared filesystem. No restart required.
Monitoring
Key metrics to watch in distributed mode:
| Metric | What it tells you |
|---|---|
volley_shuffle_local_records_total | Records staying on this worker |
volley_shuffle_remote_records_total{target_worker="N"} | Records sent to each remote worker |
volley_flight_send_errors_total | Transport errors (indicates worker connectivity issues) |
volley_cluster_worker_id | This worker’s ordinal |
volley_cluster_num_workers | Total cluster size |
volley_cluster_key_groups_assigned | Key groups owned by this worker |
volley_barrier_alignment_wait_ms | Time to align barriers across all inputs |
Troubleshooting
Pipeline stalls after a pod restart: Check if the BarrierAligner is waiting for a dead input. The Flight server should clean up stale connections via TCP keepalive (10s). If the stall persists, check volley_flight_send_errors_total for rising error counts.
Uneven record distribution: Check volley_shuffle_remote_records_total per target worker. The key group assignment is contiguous — if your key distribution is skewed, some workers will handle more traffic. Consider increasing maxKeyGroups for finer granularity.
Checkpoint failures: Verify the shared PVC is mounted and writable on all pods:
kubectl exec my-pipeline-0 -- touch /mnt/volley/checkpoints/test && echo OK
DNS resolution failures at startup: Workers discover each other via StatefulSet DNS. If pods start before DNS propagates, the runtime retries with exponential backoff (100ms to 5s). Check logs for “Waiting for remote worker” messages.
Kafka
Volley provides a Kafka source and sink via the volley-connector-kafka crate. The sink defaults to non-transactional idempotent mode for maximum throughput, with opt-in exactly-once transactional semantics.
Setup
Add the Kafka feature to your Cargo.toml:
[dependencies]
volley-connector-kafka = { git = "https://github.com/volley-streams/volley" }
System dependencies: cmake is required for building rdkafka. OpenSSL and libcurl are vendored and statically linked — no additional system packages needed. See Installation.
Authentication
Use KafkaSecurityConfig to configure authentication for both sources and sinks. The same config struct works with .with_security() on either KafkaSourceConfig or KafkaSinkConfig.
Confluent Cloud / SASL_SSL
The most common setup for managed Kafka (Confluent Cloud, AWS MSK Serverless, Aiven):
#![allow(unused)]
fn main() {
use volley_connector_kafka::{KafkaSourceConfig, KafkaSinkConfig, KafkaSecurityConfig};
let security = KafkaSecurityConfig::sasl_ssl("API_KEY", "API_SECRET");
let source_config = KafkaSourceConfig::new("pkc-xxxxx.us-east-1.aws.confluent.cloud:9092", "events", "my-group")
.with_security(security.clone());
let sink_config = KafkaSinkConfig::new("pkc-xxxxx.us-east-1.aws.confluent.cloud:9092", "output")
.with_security(security);
}
This sets security.protocol=SASL_SSL and sasl.mechanism=PLAIN, which is the correct combination for Confluent Cloud and most SASL-based providers.
SCRAM Authentication
For brokers that use SCRAM (e.g., AWS MSK with IAM disabled, self-hosted clusters):
#![allow(unused)]
fn main() {
let security = KafkaSecurityConfig::sasl_ssl("username", "password")
.with_sasl_mechanism("SCRAM-SHA-256");
}
Supported SASL mechanisms: PLAIN (default), SCRAM-SHA-256, SCRAM-SHA-512.
mTLS (Mutual TLS)
For clusters that authenticate clients via X.509 certificates:
#![allow(unused)]
fn main() {
let security = KafkaSecurityConfig::ssl()
.with_ssl_ca("/etc/kafka/ca.pem")
.with_ssl_cert("/etc/kafka/client.pem")
.with_ssl_key("/etc/kafka/client.key");
let source_config = KafkaSourceConfig::new("broker:9093", "events", "my-group")
.with_security(security);
}
Key Types
| Type | Description |
|---|---|
KafkaSourceConfig | Configuration for the Kafka consumer (brokers, topic, group, format, batch size) |
KafkaSinkConfig | Configuration for the Kafka producer (brokers, topic, format, delivery mode) |
KafkaSecurityConfig | Shared authentication/TLS config for source and sink |
KafkaSourceFormat | Message format enum: Json (default), Protobuf, Avro |
KafkaSinkFormat | Serialization format enum: Json (default), Protobuf, Avro |
KafkaSource | Source that polls Kafka for records |
KafkaSink | Sink that writes records to Kafka |
KafkaEnvExt | Extension trait for StreamExecutionEnvironment |
KafkaStreamExt | Extension trait for DataStream |
SchemaRegistryClient | Confluent Schema Registry HTTP client with caching |
env.from_kafka_protobuf::<M>() | Trait method: Kafka source + ProtobufDecodeOperator |
env.from_kafka_avro() | Trait method: Kafka source + AvroDecodeOperator + Schema Registry |
Example
use volley_core::prelude::*;
use volley_connector_kafka::{KafkaEnvExt, KafkaStreamExt, KafkaSourceConfig, KafkaSinkConfig};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
let source_config = KafkaSourceConfig::new("localhost:9092", "events", "my-group");
let sink_config = KafkaSinkConfig::new("localhost:9092", "output");
StreamExecutionEnvironment::new()
.from_kafka(source_config).await?
.filter(|record| true)
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(60)))
.aggregate(AggregationType::Sum, "amount")
.to_kafka(sink_config).await?
.execute("kafka-aggregation-job")
.await?;
Ok(())
}
Protobuf Format
For Protobuf-encoded Kafka messages, use KafkaSourceConfig::new_protobuf() and the from_kafka_protobuf trait method on KafkaEnvExt:
#![allow(unused)]
fn main() {
use volley_connector_kafka::{KafkaEnvExt, KafkaSourceConfig};
mod proto {
include!(concat!(env!("OUT_DIR"), "/events.rs"));
}
let config = KafkaSourceConfig::new_protobuf("localhost:9092", "events", "my-group")
.with_batch_size(100);
let stream = StreamExecutionEnvironment::new()
.from_kafka_protobuf::<proto::Event>(config).await?;
}
The source emits raw binary payloads in a Binary “payload” column. The ProtobufDecodeOperator (wired automatically by from_kafka_protobuf) decodes them into typed Arrow columns using the compiled prost type. The type M must implement ToArrow, typically via the impl_to_arrow! proc-macro from volley-derive.
Scaffolding: volley new with Kafka source and protobuf format generates build.rs, proto/events.proto, and prost dependencies.
Avro Format (Schema Registry)
For Avro-encoded Kafka messages with Confluent Schema Registry, use KafkaSourceConfig::new_avro() and the from_kafka_avro trait method:
#![allow(unused)]
fn main() {
use volley_connector_kafka::{KafkaEnvExt, KafkaSourceConfig};
let config = KafkaSourceConfig::new_avro(
"localhost:9092",
"events",
"my-group",
"http://localhost:8081", // Schema Registry URL
)
.with_schema_registry_auth("user", "password") // optional basic auth
.with_batch_size(100);
let stream = StreamExecutionEnvironment::new()
.from_kafka_avro(config).await?;
}
The source emits raw binary payloads. The AvroDecodeOperator (wired automatically by from_kafka_avro):
- Strips the Confluent 5-byte wire format header (
[0x00][4-byte schema ID][payload]) - Fetches the Avro schema from the Schema Registry by ID (cached after first fetch)
- Decodes the Avro binary into
apache_avro::types::Value - Converts directly to Arrow arrays using typed
ArrayBuilders (no JSON intermediary)
The Arrow schema is inferred from the Avro schema. Supported type mappings:
| Avro Type | Arrow Type |
|---|---|
boolean | Boolean |
int | Int32 |
long | Int64 |
float | Float32 |
double | Float64 |
string | Utf8 |
bytes | Binary |
fixed | Binary |
enum | Utf8 |
union(null, T) | T (nullable) |
Plain Avro (No Schema Registry)
For Avro messages without the Confluent wire format, use AvroDecodeOperator directly with a static schema:
#![allow(unused)]
fn main() {
use volley_core::operators::avro::AvroDecodeOperator;
let avro_schema = apache_avro::Schema::parse_str(r#"{ ... }"#).unwrap();
let stream = env.from_kafka(config).await?
.with_operator(AvroDecodeOperator::with_static_schema("payload", avro_schema));
}
Batch Reads
By default, KafkaSource reads one message per poll_next() call (single-row RecordBatch). For high-throughput pipelines, enable batch reads:
#![allow(unused)]
fn main() {
let config = KafkaSourceConfig::new("localhost:9092", "events", "my-group", schema)
.with_batch_size(100) // accumulate up to 100 messages
.with_batch_timeout(Duration::from_millis(50)); // wait 50ms for next message
}
The source accumulates up to batch_size messages, concatenating them into a single multi-row RecordBatch. All downstream operators handle multi-row batches correctly. Partial batches are returned on timeout (no blocking).
Batch metadata: event_time_ms is set to the max timestamp in the batch (correct for watermark advancement). For per-row event times in windowed operations, configure timestamp_column on your WindowConfig.
Consumer Tuning
Fine-tune the Kafka consumer’s fetch behavior for throughput vs. latency:
#![allow(unused)]
fn main() {
let config = KafkaSourceConfig::new("localhost:9092", "events", "my-group")
.with_batch_size(500)
.with_fetch_min_bytes(32_768) // wait for 32KB of data (default: 1)
.with_fetch_max_wait_ms(100) // broker waits up to 100ms to fill (default: 500)
.with_max_partition_fetch_bytes(2_097_152); // 2MB per partition (default: 1MB)
}
| Method | Default | Effect |
|---|---|---|
with_fetch_min_bytes(bytes) | 1 | Minimum bytes the broker accumulates before responding. Higher values reduce fetch requests at the cost of latency. |
with_fetch_max_wait_ms(ms) | 500 | Max time the broker waits to fill fetch_min_bytes. Acts as an upper bound on added latency. |
with_max_partition_fetch_bytes(bytes) | 1,048,576 (1MB) | Max data returned per partition per fetch. Increase for partitions with large messages. |
with_property(key, value) | — | Escape hatch for any rdkafka consumer setting not covered above. |
Tip: For high-throughput pipelines, combine with_batch_size() with with_fetch_min_bytes() so the broker pre-aggregates data and the source fills batches faster.
Retry on Transient Errors
Both the source and sink support optional retry with exponential backoff for transient broker errors (e.g., broker unavailable, request timeouts, leader changes, network exceptions). Configure via with_retry():
#![allow(unused)]
fn main() {
use std::time::Duration;
use volley_core::resilience::RetryConfig;
// Source with retry
let source_config = KafkaSourceConfig::new("localhost:9092", "topic", "group", schema)
.with_retry(
RetryConfig::new()
.with_max_retries(5)
.with_initial_backoff(Duration::from_millis(200))
.with_max_backoff(Duration::from_secs(30)),
);
// Sink with retry
let sink_config = KafkaSinkConfig::new_transactional("localhost:9092", "output", "txn-1")
.with_retry(
RetryConfig::new()
.with_max_retries(3)
.with_initial_backoff(Duration::from_millis(100)),
);
}
When retry is not configured (the default), errors propagate immediately — existing behavior is unchanged.
Source behavior: Both single-message and batch accumulation paths retry recv() calls. Subsequent messages within a batch (after the first) are not retried — a partial batch is returned instead.
Sink behavior: Individual send() calls are retried. Only the send is retried, not the surrounding transaction. If retries are exhausted, the transaction is aborted (existing behavior).
Metrics: volley_retry_attempts_total counter with component label (kafka_source or kafka_sink).
Sink Delivery Modes
The Kafka sink supports three delivery modes, from lowest overhead to strongest guarantees:
Fire-and-Forget (acks=0)
No acknowledgment from the broker. Highest throughput, but messages can be lost:
#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new("localhost:9092", "output")
.with_acks("0");
}
When acks is "0", idempotence is automatically disabled since the broker never confirms delivery.
Idempotent (default)
The default mode. The producer uses Kafka’s idempotent delivery (enable.idempotence=true) to guarantee no duplicates from producer retries, without the overhead of transactions:
#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new("localhost:9092", "output");
}
This is the right choice for most pipelines. It guarantees at-least-once delivery with no duplicate writes from the producer side, and has no per-batch transaction commit overhead.
Transactional (exactly-once)
Opt-in via new_transactional(). Wraps writes in Kafka transactions for exactly-once semantics:
#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new_transactional("localhost:9092", "output", "my-txn-id");
}
See Exactly-Once Semantics below for details.
Breaking change: The old 3-argument
KafkaSinkConfig::new(servers, topic, txn_id)has been renamed tonew_transactional(). The 2-argumentnew(servers, topic)now creates a non-transactional idempotent sink.
Producer Tuning
Fine-tune the Kafka producer for throughput vs. latency:
#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new("localhost:9092", "output")
.with_linger_ms(20) // batch for 20ms before sending (default: 5)
.with_compression("lz4") // compress batches (default: "none")
.with_acks("1"); // leader-only ack (default: "all")
}
| Method | Default | Effect |
|---|---|---|
with_acks(acks) | "all" | "all" = full ISR ack, "1" = leader only, "0" = fire-and-forget. |
with_linger_ms(ms) | 5 | How long rdkafka waits to batch messages before sending. Higher values increase batching and throughput at the cost of latency. |
with_compression(codec) | "none" | Compression codec: "none", "gzip", "snappy", "lz4", "zstd". LZ4 and Zstd offer the best throughput-to-ratio tradeoff. |
with_property(key, value) | — | Escape hatch for any rdkafka producer setting not covered above. |
Tip: For high-throughput sinks, with_linger_ms(20) + with_compression("lz4") is a good starting point. This lets rdkafka batch more messages per broker request and compress them in a single pass.
Exactly-Once Semantics
The Kafka sink supports exactly-once delivery when configured with new_transactional():
#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new_transactional("localhost:9092", "output", "my-txn-id");
}
How it works:
- Records are buffered during processing
- On checkpoint barrier, the sink commits the Kafka transaction
- On failure recovery, uncommitted transactions are aborted
- Processing resumes from the last checkpoint, replaying records
The consumer group offset is committed as part of the transaction, ensuring source and sink are in sync.
Note: Transactional mode adds a commit round-trip per checkpoint. For pipelines where at-least-once is acceptable, the default idempotent mode (
KafkaSinkConfig::new()) avoids this overhead entirely.
Trace Context Propagation
The Kafka source and sink automatically propagate W3C Trace Context (traceparent header) for distributed tracing:
- Source: If a consumed message has a
traceparentheader, the record inherits that trace context. The runtime’s sampler respects upstream sampling decisions (parent-based sampling). - Sink: If a record being written has trace context, a
traceparentheader is injected into the produced Kafka message.
This works automatically when per-record tracing is enabled via with_tracing(). No additional configuration is needed for Kafka trace propagation.
Consumer Lag Monitoring
The volley.kafka.consumer_lag metric tracks consumer group lag. Use it for KEDA autoscaling:
sum(volley_kafka_consumer_lag) by (job_name)
See Capacity Planning for KEDA trigger configuration.
Delta Lake
The volley-connector-delta crate provides a Delta Lake sink with exactly-once semantics via epoch-tagged commits.
Key Types
| Type | Description |
|---|---|
DeltaSink | Sink that writes Parquet data to a Delta table |
DeltaSinkConfig | Configuration (table URI, partitioning, buffer size, file size) |
PartitionSpec | Specification for Hive-style partitioned writes |
Usage
#![allow(unused)]
fn main() {
use volley_connector_delta::{DeltaSink, DeltaSinkConfig};
let config = DeltaSinkConfig::new("s3://my-bucket/output-table");
let sink = DeltaSink::new(config).await?;
env.from_source(source)
.filter_expr(col("status").eq(lit("active")))
.to_sink(sink)
.execute("delta-writer")
.await?;
}
Configuration Reference
#![allow(unused)]
fn main() {
let config = DeltaSinkConfig::new("s3://my-bucket/output-table")
.with_pipeline_id("my-pipeline") // idempotent commit tag (default: "volley-default")
.with_max_records(50_000) // records buffered before flush (default: 10,000)
.with_target_file_size(256 * 1024 * 1024) // target Parquet file size (default: 128 MB)
.with_partition_spec(partition_spec); // Hive-style partitioning (default: none)
}
| Method | Default | Description |
|---|---|---|
with_pipeline_id() | "volley-default" | Identifies the pipeline in commit metadata for idempotent writes. Use a unique ID per pipeline writing to the same table |
with_max_records() | 10,000 | Number of records buffered in memory before writing a Parquet file. Higher values produce fewer, larger files |
with_target_file_size() | 128 MB | Target size for individual Parquet files. The sink flushes when buffered data approaches this size |
with_partition_spec() | None | Enables Hive-style partitioned writes — see Partitioned Writes |
Partitioned Writes
DeltaSink supports Hive-style partitioned writes for efficient query pruning:
#![allow(unused)]
fn main() {
use volley_connector_delta::PartitionSpec;
let config = DeltaSinkConfig::new("s3://my-bucket/events")
.with_partition_spec(PartitionSpec::columns(vec![
"date".to_string(),
"region".to_string(),
]));
}
This produces a directory layout like:
s3://my-bucket/events/
date=2026-03-31/region=us-east-1/part-0001.parquet
date=2026-03-31/region=eu-west-1/part-0002.parquet
Partition column guidelines:
- Use low-cardinality columns (date, region, status) to avoid excessive small files
- Partition columns must exist in the record schema
- Order matters: put the most selective column first for query engines
Exactly-Once Semantics
Delta commits include an epoch tag in the commit metadata. On recovery:
- The sink reads the Delta log to find the latest committed epoch for this
pipeline_id - If the current epoch was already committed, the write is skipped
- Replayed records after a failure never produce duplicate data
The pipeline_id scopes idempotency — multiple pipelines can write to the same table without conflicts as long as each uses a unique ID.
Storage Backends
Delta tables can be stored on any Delta-compatible storage:
| Storage | URI Format | Credentials |
|---|---|---|
| AWS S3 | s3://bucket/path | Default credential chain (env, ~/.aws/credentials, instance profile) |
| Azure Blob | az://container/path | DefaultAzureCredential chain |
| Local filesystem | /path/to/table | File permissions |
Performance Tuning
| Scenario | Recommendation |
|---|---|
| High throughput (>100K rec/s) | Increase with_max_records(50_000) to reduce commit frequency |
| Large records (>1 KB each) | Decrease with_max_records() or increase with_target_file_size() |
| Many partitions | Monitor file count; consider periodic compaction |
| Low latency requirements | Decrease with_max_records() for more frequent flushes |
Table Compaction
With many partitions and frequent checkpoints, Delta tables accumulate small files that degrade read performance. Run compaction periodically:
-- In a query engine (Spark, DataFusion, etc.)
OPTIMIZE 's3://my-bucket/events';
Automated compaction within the Volley sink is planned for a future release.
Troubleshooting
“Table not found” or empty table
- Delta tables are created automatically on first write if they don’t exist
- Verify the storage URI is correct and credentials have write access
Duplicate data after recovery
- Ensure each pipeline uses a unique
pipeline_id - If two pipelines share the same ID, epoch deduplication may skip valid writes
Small files accumulating
- Increase
with_max_records()to buffer more records per file - Run
OPTIMIZEperiodically from a query engine - Consider adjusting checkpoint interval to reduce commit frequency
Slow writes to S3
- S3 has higher latency per PUT than local disk; increase
with_target_file_size()to reduce PUT count - Ensure the application runs in the same AWS region as the S3 bucket
Apache Iceberg
The volley-connector-iceberg crate provides an Iceberg sink via REST catalog with exactly-once semantics.
Key Types
| Type | Description |
|---|---|
IcebergSink | Sink that writes Parquet data files to an Iceberg table |
IcebergSinkConfig | Configuration (catalog URI, warehouse, namespace, table, storage) |
IcebergStorage | Storage backend enum (S3, Azure, GCS) |
Usage
#![allow(unused)]
fn main() {
use volley_connector_iceberg::{IcebergSink, IcebergSinkConfig, IcebergStorage};
let config = IcebergSinkConfig::new(
"http://polaris:8181", // REST catalog URI
"my-warehouse", // catalog warehouse
vec!["db".into()], // namespace
"events", // table name
IcebergStorage::S3 {
region: "us-east-1".into(),
bucket: "my-bucket".into(),
endpoint: None,
},
);
let sink = IcebergSink::new(config).await?;
env.from_source(source)
.filter_expr(col("status").eq(lit("active")))
.to_sink(sink)
.execute("iceberg-writer")
.await?;
}
Configuration Reference
Required Parameters
| Parameter | Description |
|---|---|
catalog_uri | REST catalog endpoint (e.g., http://polaris:8181) |
catalog_warehouse | Warehouse identifier in the catalog |
namespace | Iceberg namespace as a list of strings (e.g., ["db", "schema"]) |
table_name | Target table name |
storage | Storage backend — see Storage Backends |
Optional Parameters
#![allow(unused)]
fn main() {
let config = IcebergSinkConfig::new(catalog_uri, warehouse, namespace, table, storage)
.with_credential("client-id:client-secret") // OAuth2 credential for Polaris
.with_max_records(10_000) // buffer size before flush (default: 10,000)
.with_flush_interval(Duration::from_secs(30)); // time-based flush trigger
}
| Method | Default | Description |
|---|---|---|
with_credential() | None | OAuth2 credential for authenticated catalogs (Polaris, Tabular) |
with_max_records() | 10,000 | Number of records buffered before writing a Parquet data file |
with_flush_interval() | None | Time-based flush trigger; writes a file even if max_records hasn’t been reached |
Storage Backends
S3
#![allow(unused)]
fn main() {
IcebergStorage::S3 {
region: "us-east-1".into(),
bucket: "my-iceberg-data".into(),
endpoint: None, // use None for AWS, Some("http://localhost:9000") for MinIO
}
}
AWS credentials are resolved via the default credential chain (environment variables, ~/.aws/credentials, instance profile).
For local development with MinIO:
#![allow(unused)]
fn main() {
IcebergStorage::S3 {
region: "us-east-1".into(),
bucket: "test-bucket".into(),
endpoint: Some("http://localhost:9000".into()),
}
}
Azure Blob Storage
#![allow(unused)]
fn main() {
IcebergStorage::Azure {
storage_account: "mystorageaccount".into(),
container: "iceberg-data".into(),
access_key: None, // uses DefaultAzureCredential when None
}
}
When access_key is None, authentication uses Azure’s DefaultAzureCredential chain (environment variables, managed identity, Azure CLI).
Google Cloud Storage
#![allow(unused)]
fn main() {
IcebergStorage::Gcs {
project_id: "my-gcp-project".into(),
bucket: "iceberg-data".into(),
}
}
GCS credentials are resolved via Application Default Credentials (GOOGLE_APPLICATION_CREDENTIALS or metadata server).
Exactly-Once Semantics
The Iceberg sink achieves exactly-once via snapshot commits with epoch tracking:
- Records are buffered in memory and written as Parquet data files
- On checkpoint barrier, a new Iceberg snapshot is committed via the REST catalog
- The checkpoint epoch is stored in the snapshot’s summary metadata
- On recovery, the sink reads the latest snapshot’s epoch and skips already-committed epochs
This means replayed records after a failure never produce duplicate data in the table.
REST Catalog Compatibility
Volley uses the Iceberg REST catalog spec for table management. Compatible catalogs:
| Catalog | Notes |
|---|---|
| Apache Polaris | Recommended. Use with_credential() for OAuth2 auth |
| Tabular | Hosted REST catalog. Use with_credential() |
| AWS Glue (via REST adapter) | Requires a REST adapter in front of Glue |
| Gravitino | REST-compatible catalog |
Troubleshooting
“Connection refused” to catalog URI
- Verify the catalog is running and reachable from your application
- Check firewall rules and network policies (especially in Kubernetes)
“Unauthorized” or “Forbidden” from catalog
- Ensure
with_credential()is set with a valid OAuth2 client credential - For Polaris: verify the principal has
TABLE_WRITE_DATAprivilege on the target table
“Table not found”
- Iceberg tables must be created in the catalog before the sink writes to them
- Verify
namespaceandtable_namematch the catalog’s table identifier exactly
Small files accumulating
- Increase
with_max_records()to buffer more records per file - Set
with_flush_interval()to a longer duration - Consider running Iceberg’s
rewrite_data_filesprocedure periodically
Lance
The volley-connector-lance crate provides a sink that writes Arrow
RecordBatches to a Lance columnar dataset with
exactly-once semantics. Lance is a modern columnar format optimized for ML,
vector search, and random-access workloads; it is the on-disk format behind
LanceDB and is written in Rust from the ground up.
Key Types
| Type | Description |
|---|---|
LanceSink | Sink that writes Arrow records to a Lance dataset |
LanceSinkConfig | Builder-style configuration (URI, pipeline id, file sizing, storage, recovery limit) |
LanceStorage | Enum configuring the object store backend (Local, S3, Gcs, Azure) |
LanceStreamExt | Umbrella extension trait adding DataStream::to_lance(config) |
Quick Start
Attach the sink to a pipeline via the LanceStreamExt trait re-exported
from volley_connectors:
#![allow(unused)]
fn main() {
use volley_connectors::blob_store::lance::{LanceStreamExt, LanceSinkConfig, LanceStorage};
use volley_core::prelude::*;
let config = LanceSinkConfig::new("s3://my-bucket/datasets/events")
.with_pipeline_id("events-v1")
.with_storage(LanceStorage::S3 {
region: "us-east-1".into(),
bucket: "my-bucket".into(),
endpoint: None,
access_key_id: None,
secret_access_key: None,
});
StreamExecutionEnvironment::new()
.from_kafka(kafka_cfg).await?
.filter_expr(col("status").eq(lit("active")))
.to_lance(config).await?
.execute("kafka-to-lance")
.await?;
}
Enable the umbrella feature in your Cargo.toml:
volley-connectors = { version = "0.8", features = ["lance-sink"] }
Or depend on the standalone crate directly:
volley-connector-lance = "0.8"
Configuration Reference
#![allow(unused)]
fn main() {
let config = LanceSinkConfig::new("s3://bucket/path")
.with_pipeline_id("my-pipeline") // default: "volley-default"
.with_storage(LanceStorage::Local) // default: Local
.with_max_rows_per_file(1_048_576) // default: 1_048_576
.with_max_rows_per_group(1024) // default: 1024
.with_max_bytes_per_file(90 * 1024 * 1024 * 1024) // default: 90 GB
.with_max_records_buffer(100_000) // default: 100_000
.with_recovery_history_limit(256); // default: 256
}
| Method | Default | Description |
|---|---|---|
with_pipeline_id() | "volley-default" | Stable identifier embedded in commit metadata. Used for idempotent recovery — use a unique value per pipeline writing to the same dataset |
with_storage() | LanceStorage::Local | Object store backend; see Storage Backends |
with_max_rows_per_file() | 1,048,576 | Maximum rows per Lance data file. Larger values produce fewer, bigger files |
with_max_rows_per_group() | 1,024 | Maximum rows per row group (columnar encoding unit) |
with_max_bytes_per_file() | 90 GB | Soft per-file size ceiling. Lance has a hard 100 GB limit on S3 |
with_max_records_buffer() | 100,000 | Soft in-memory row budget between checkpoints. Exceeding it logs a warning but does not force an untagged flush — all commits wait for the next checkpoint barrier so the epoch tag is always present |
with_recovery_history_limit() | 256 | Maximum number of historical Lance transactions scanned on startup to recover last_committed_epoch. Increase if you retain a long uncompacted history |
Exactly-Once Semantics
LanceSink uses a two-phase commit built on Lance’s native transaction
API:
- Stage. Buffered batches are materialized through
InsertBuilder::execute_uncommitted_stream, producing an uncommittedTransactionthat represents the staged data files without making them visible. - Tag. The sink sets the transaction’s
transaction_propertiesfield to{"volley.pipeline_id": id, "volley.epoch": N}before committing. These key-value pairs are persisted to the Lance transaction file on disk. - Commit.
CommitBuilder::execute(transaction)atomically publishes the new dataset version.
On startup, LanceSink::new calls Dataset::get_transactions to fetch up
to recovery_history_limit recent transactions, scans each one for a
matching volley.pipeline_id, and takes the maximum volley.epoch as
last_committed_epoch. Any checkpoint barrier at or below that epoch is
dropped as a duplicate on the next restart.
Note: Lance 4.0’s
CommitBuilder::with_transaction_propertiesstores the map on the builder but does not forward it to the stagedTransactionduringexecute. To work around this, the sink writes the properties directly toTransaction.transaction_properties— which is a public field on the struct — before callingexecute. This is a tracked upstream issue; when Lance fixes it, the workaround can be replaced with the documented builder method.
Storage Backends
Lance delegates I/O to object_store under the hood, and LanceStorage
maps cleanly onto its configuration keys.
| Variant | URI | Credentials |
|---|---|---|
LanceStorage::Local | filesystem path or file://… | none |
LanceStorage::S3 { region, bucket, endpoint, access_key_id, secret_access_key } | s3://bucket/path | inline fields, or AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY env vars, or IAM instance role |
LanceStorage::Gcs { project_id, bucket, service_account_json } | gs://bucket/path | inline JSON, or GOOGLE_APPLICATION_CREDENTIALS |
LanceStorage::Azure { storage_account, container, access_key } | az://container/path | inline key, or AZURE_STORAGE_ACCOUNT_KEY / DefaultAzureCredential |
Setting endpoint = Some("http://…") on S3 also emits allow_http=true
so MinIO / localstack work without extra plumbing.
Limitations
- No Hive-style partitioning in v1. Lance uses fragments internally rather than filesystem-visible partitions, and the partition story is deferred.
- No schema evolution within a single
pipeline_id— the first batch’s schema is assumed to hold for every subsequent batch. - Single writer per (
dataset_uri,pipeline_id) — concurrent writers conflict on commit and return an error rather than silently serializing. - Recovery horizon is bounded by
recovery_history_limit. If a pipeline accumulates more uncompacted versions than the limit without a successful recovery, older commits may be invisible to the scan and epochs can be re-executed. Compact regularly or raise the limit.
Troubleshooting
Recovery does not pick up the last committed epoch
— Ensure the new sink uses the same pipeline_id as the writer, and
that recovery_history_limit covers the relevant version range. A
warn! log is emitted if a transaction’s volley.epoch property is
malformed.
Build fails with “Could not find protoc”
— Lance’s build scripts compile protobuf definitions at build time.
Install the protoc binary (apt-get install protobuf-compiler on
Debian/Ubuntu, brew install protobuf on macOS) before building.
Example
See volley-examples/examples/memory_to_lance.rs for a complete
end-to-end example that runs a MemorySource → LanceSink pipeline,
writes 5 trade records to a local Lance dataset via the full Volley
runtime (including the final shutdown barrier), then reopens the
dataset and prints all rows.
cargo run --example memory_to_lance -p volley-examples --release
AWS S3
The volley-connector-aws-s3 crate provides an S3 source that reads objects triggered by SQS notifications.
Architecture
SQS Queue S3 Bucket
(object notifications) (data files)
| |
v v
SqsNotificationSource S3BlobReader
| |
+----------+ +----------+
| |
v v
BlobSource
|
v
Decoder (Parquet / JSON / CSV / Avro)
|
v
RecordBatch --> Pipeline
- S3 sends object-created notifications to an SQS queue
SqsNotificationSourcepolls the queue for new notificationsS3BlobReaderreads the object data from S3- The shared decoder layer converts the data to Arrow RecordBatch
Key Types
| Type | Description |
|---|---|
S3Source | Configured S3 source (combines SQS + S3 reader) |
S3BlobReader | Reads object data from S3 |
SqsNotificationSource | Polls SQS for S3 event notifications |
Supported Formats
| Format | File Extensions |
|---|---|
| Parquet | .parquet |
| JSON | .json, .jsonl |
| CSV | .csv |
| Avro | .avro |
Format detection is automatic based on file extension, or can be configured explicitly.
Usage
#![allow(unused)]
fn main() {
use volley_connector_aws_s3::S3Source;
let source = S3Source::builder()
.sqs_queue_url("https://sqs.us-east-1.amazonaws.com/123456789/my-queue")
.build()
.await?;
env.from_source(source)
.map(transform)
.to_sink(sink)
.execute("s3-ingest")
.await?;
}
AWS credentials are resolved via the standard AWS credential chain (environment variables, IAM role, credential file).
Blob Store Connectors
The volley-connector-blob-store crate provides a cloud-agnostic abstraction for reading from object storage. Azure Blob Storage and Google Cloud Storage are built on this shared foundation.
Shared Architecture
All cloud blob store connectors follow the same pattern:
Cloud Queue/Topic Cloud Storage
(SQS / Azure Queue / (S3 / Azure Blob /
Pub/Sub) GCS)
| |
v v
NotificationSource BlobReader
| |
+--------+ +-----------+
| |
v v
BlobSource
|
v
Decoder (Parquet / JSON / CSV / Avro)
|
v
RecordBatch --> Pipeline
To add a new cloud backend, implement NotificationSource and BlobReader. The decoder layer is shared.
Azure Blob Storage
From volley-connector-azure-blob:
| Type | Description |
|---|---|
AzureBlobSourceBuilder | Builder with automatic credential discovery |
AzureBlobSource | Configured Azure Blob source |
AzureBlobReader | Reads blobs from Azure Blob Storage |
AzureQueueNotificationSource | Polls Azure Queue Storage for blob notifications |
Enable with feature flag blob-store-azure on volley-connectors.
Authentication
By default, Azure connectors use DefaultAzureCredential which discovers credentials automatically. The credential chain tries, in order:
- Environment variables —
AZURE_TENANT_ID,AZURE_CLIENT_ID,AZURE_CLIENT_SECRET - Workload Identity — Kubernetes pods with Azure Workload Identity federation
- Managed Identity — Azure VM, App Service, or Container Instance identity
- Azure CLI — Cached
az logincredentials (local development)
To override with an explicit storage account key, use with_access_key():
#![allow(unused)]
fn main() {
use volley_connector_azure_blob::AzureBlobSourceBuilder;
// DefaultAzureCredential (recommended for production)
let source = AzureBlobSourceBuilder::new("myaccount", "my-queue", "my-container")
.build(config).await?;
// Explicit access key (override)
let source = AzureBlobSourceBuilder::new("myaccount", "my-queue", "my-container")
.with_access_key("MYACCESSKEY...")
.build(config).await?;
}
Google Cloud Storage
From volley-connector-gcp-gcs:
| Type | Description |
|---|---|
GcsSource | Configured GCS source |
GcsBlobReader | Reads objects from GCS |
PubSubNotificationSource | Polls Pub/Sub for GCS object notifications |
Enable with feature flag blob-store-gcs on volley-connectors.
Authentication
GCS connectors use Application Default Credentials, which discovers credentials automatically. The credential chain tries, in order:
- Environment variable —
GOOGLE_APPLICATION_CREDENTIALSpointing to a service account key file - Workload Identity — Kubernetes pods with GKE Workload Identity federation
- Compute Engine metadata — GCE VM, Cloud Run, or GKE node identity
- gcloud CLI — Cached
gcloud auth application-default logincredentials (local development)
Usage
#![allow(unused)]
fn main() {
use volley_connector_gcp_gcs::GcsSource;
let source = GcsSource::new("my-gcs-bucket", "my-pubsub-subscription")
.build(config).await?;
}
Note: The GCS Pub/Sub notification integration is currently experimental. The
PubSubNotificationSourcedepends on thegoogle-cloud-pubsubcrate’s streaming pull API, which is not yet fully stabilized. The blob reader and event parser are functional — the gap is in notification polling reliability under sustained load.
Blob Store Sinks
The volley-connectors crate also provides buffered blob store sinks for writing output files to S3 or Azure Blob Storage. These sinks batch records and flush them as files based on record count or time interval.
Encoders
| Encoder | Output Format | File Extension |
|---|---|---|
ParquetEncoder | Apache Parquet | .parquet |
JsonLinesEncoder | JSON Lines (newline-delimited JSON) | .jsonl |
CsvEncoder | CSV with optional header | .csv |
Usage
#![allow(unused)]
fn main() {
use volley_connectors::blob_store::{BufferedBlobSink, BufferedBlobSinkConfig};
use volley_connectors::blob_store::encoders::ParquetEncoder;
use volley_connectors::blob_store::aws::S3BlobWriter;
let encoder = Box::new(ParquetEncoder::new());
let config = BufferedBlobSinkConfig::new("my-bucket", "output/")
.with_max_records(100_000)
.with_flush_interval(Duration::from_secs(60));
let sink = BufferedBlobSink::new(
Box::new(S3BlobWriter::new(s3_client)),
encoder,
config,
);
}
For Azure Blob Storage, use AzureBlobWriter instead of S3BlobWriter:
#![allow(unused)]
fn main() {
use volley_connectors::blob_store::azure::AzureBlobWriter;
let sink = BufferedBlobSink::new(
Box::new(AzureBlobWriter::new(blob_service_client, "my-container")),
encoder,
config,
);
}
Enable with feature flags blob-store-aws or blob-store-azure on volley-connectors.
Supported Source Formats
All blob store sources share the same decoder layer:
| Format | File Extensions |
|---|---|
| Parquet | .parquet |
| JSON | .json, .jsonl |
| CSV | .csv |
| Avro | .avro |
Feature Flags
[dependencies]
volley-connectors = {
git = "https://github.com/volley-streams/volley",
features = ["blob-store-azure", "blob-store-gcs"]
}
See also: AWS S3 for the S3-specific connector.
ADBC (Arrow Database Connectivity)
Volley provides ADBC-based database connectivity via two crates:
volley-connector-adbc– A sink that writes Arrow RecordBatches to any database with an ADBC driver (PostgreSQL, BigQuery, Snowflake, Clickhouse, Databricks, MySQL, Redshift, Exasol, and more).volley-enrichment-adbc– A real-time enrichment operator that queries external databases to enrich streaming records with lookup data, backed by an LRU+TTL cache.
ADBC uses a C-API driver model (dlopen/dlsym), so adding support for a new database only requires installing the corresponding ADBC driver shared library – no connector-specific code needed.
Setup
Add crate dependencies
[dependencies]
# Sink (write to a database)
volley-connector-adbc = { git = "https://github.com/volley-streams/volley" }
# Enrichment operator (query a database to enrich records)
volley-enrichment-adbc = { git = "https://github.com/volley-streams/volley" }
Install the ADBC driver shared library
ADBC drivers are platform-specific shared libraries. Install the driver for your target database:
PostgreSQL:
# macOS (Homebrew)
brew install apache-arrow-adbc-driver-postgresql
# Ubuntu/Debian
apt-get install libadbc-driver-postgresql
# Or download from https://github.com/apache/arrow-adbc/releases
BigQuery:
# Download the adbc_driver_bigquery shared library from
# https://github.com/apache/arrow-adbc/releases
The driver name (e.g., "adbc_driver_postgresql") is resolved via platform library search paths (LD_LIBRARY_PATH on Linux, DYLD_LIBRARY_PATH on macOS). Alternatively, pass a full file path to the shared library.
ADBC Sink
The sink writes Arrow RecordBatches to a database table. It supports insert (bulk ingest) and upsert (ON CONFLICT) modes with write-behind buffering.
Basic insert
#![allow(unused)]
fn main() {
use volley_connector_adbc::{AdbcConfig, AdbcSinkConfig, AdbcStreamExt};
let adbc = AdbcConfig::new("adbc_driver_postgresql", "postgresql://localhost/mydb")
.with_pool_size(4);
let sink_config = AdbcSinkConfig::new(adbc, "events")
.with_batch_size(5000)
.with_create_table(true);
env.from_kafka(source_config).await?
.filter_expr(col("status").eq(lit("active")))
.to_adbc(sink_config).await?
.execute("kafka-to-postgres").await?;
}
Upsert mode
Set with_upsert_key() to enable upsert semantics. The sink auto-generates INSERT ... ON CONFLICT (key) DO UPDATE SET SQL:
#![allow(unused)]
fn main() {
let sink_config = AdbcSinkConfig::new(adbc, "events")
.with_upsert_key("event_id")
.with_batch_size(5000);
}
For databases with non-standard upsert syntax, provide custom SQL:
#![allow(unused)]
fn main() {
let sink_config = AdbcSinkConfig::new(adbc, "events")
.with_upsert_query(
"INSERT INTO events (id, name, value) VALUES ($1, $2, $3) \
ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, value = EXCLUDED.value"
);
}
Sink configuration reference
| Method | Default | Description |
|---|---|---|
AdbcSinkConfig::new(adbc, table) | – | Required: ADBC config and target table name |
.with_upsert_key(column) | None | Column for upsert (ON CONFLICT) semantics. None = insert-only |
.with_upsert_query(sql) | None | Custom SQL for upsert. Overrides auto-generated SQL |
.with_batch_size(n) | 1000 | Number of buffered rows before auto-flush |
.with_create_table(bool) | false | Auto-create the table from the first batch’s Arrow schema |
Delivery guarantees
- At-least-once (default) –
on_checkpoint()flushes buffered batches. If the pipeline crashes after flushing but before the checkpoint completes, some records may be written twice on recovery. - Effectively exactly-once via upsert – When
upsert_keyis set, duplicate writes are idempotent. Choose a natural key that makes re-delivery safe.
ADBC Enrichment Operator
The enrichment operator queries an external database to enrich streaming records. It extracts lookup keys from each batch, issues a single micro-batch query, and appends enrichment columns to the output. An LRU+TTL cache avoids redundant queries.
Key-column lookup
The simplest mode: specify a stream column and a remote table. The operator generates SELECT [columns] FROM table WHERE target_key IN (k1, k2, ...).
#![allow(unused)]
fn main() {
use volley_enrichment_adbc::{AdbcEnrichConfig, AdbcEnrichStreamExt, EnrichMissStrategy};
use volley_connector_adbc::AdbcConfig;
use std::time::Duration;
let pg = AdbcConfig::new("adbc_driver_postgresql", "postgresql://localhost/mydb");
env.from_kafka(source_config).await?
.enrich_with_adbc(
AdbcEnrichConfig::new(pg, "customers", "customer_id")
.with_select_columns(vec!["tier", "region", "lifetime_value"])
.with_prefix("customer_")
.with_cache_ttl(Duration::from_secs(300))
.with_on_miss(EnrichMissStrategy::NullFill)
)
.filter_expr(col("customer_tier").eq(lit("premium")))
.to_kafka(sink_config).await?
.execute("enrich-orders").await?;
}
Parameterized query
For complex lookups (joins, filters, computed columns), provide a SQL template with IN $1:
#![allow(unused)]
fn main() {
let bq = AdbcConfig::new("adbc_driver_bigquery", "bigquery://project-id");
env.from_kafka(source_config).await?
.enrich_with_adbc(
AdbcEnrichConfig::new_with_query(
bq,
"SELECT category, risk_score FROM products WHERE sku IN $1 AND active = true",
"product_sku"
)
)
.to_adbc(AdbcSinkConfig::new(pg, "enriched_events")).await?
.execute("enrich-and-store").await?;
}
Miss strategies
When a lookup key has no match in the database:
| Strategy | Behavior |
|---|---|
NullFill (default) | Add enrichment columns filled with nulls |
DropRecord | Drop the record entirely |
PassThrough | Keep the original record without enrichment columns |
Enrichment configuration reference
| Method | Default | Description |
|---|---|---|
AdbcEnrichConfig::new(adbc, table, key) | – | Key-column lookup mode |
AdbcEnrichConfig::new_with_query(adbc, sql, key) | – | Parameterized query mode |
.with_target_key(column) | Same as lookup_key | Remote column to match against (if different from the stream column) |
.with_select_columns(vec) | All columns | Which columns to fetch from the remote table |
.with_prefix(prefix) | None | Prefix for enriched columns (e.g., "customer_") |
.with_cache_ttl(duration) | 5 minutes | Time-to-live for cached entries |
.with_cache_max_entries(n) | 10,000 | Maximum entries in the LRU cache |
.with_on_miss(strategy) | NullFill | Strategy for unmatched records |
Cache behavior
The enrichment operator uses an LRU cache with per-entry TTL:
- Cache hits return the stored enrichment row without querying the database.
- Cache misses (key not found) are cached as
Noneto avoid repeated lookups for non-existent keys. - TTL expiry evicts stale entries so the operator picks up changes in the remote database.
- On recovery, the cache starts cold and repopulates organically. Enrichment data is external and authoritative, so there is no state to recover.
Connection Configuration Reference
AdbcConfig is shared between the sink and enrichment operator:
| Method | Default | Description |
|---|---|---|
AdbcConfig::new(driver, uri) | – | Required: driver name/path and connection URI |
.with_option(key, value) | {} | Add a driver-specific key-value option |
.with_pool_size(n) | 4 | Connection pool size (round-robin) |
Driver name resolution: If the driver name contains / or \, it is treated as a file path. Otherwise, it is resolved via platform library search paths (e.g., LD_LIBRARY_PATH, DYLD_LIBRARY_PATH).
Common driver names:
| Database | Driver Name |
|---|---|
| PostgreSQL | adbc_driver_postgresql |
| BigQuery | adbc_driver_bigquery |
| Snowflake | adbc_driver_snowflake |
| SQLite | adbc_driver_sqlite |
| Flight SQL | adbc_driver_flightsql |
Observability
| Metric | Type | Description |
|---|---|---|
adbc_sink_rows_written_total | counter | Rows written, by table |
adbc_sink_flush_duration_seconds | histogram | Flush latency |
adbc_enrich_cache_hit_total | counter | Cache hits |
adbc_enrich_cache_miss_total | counter | Cache misses |
adbc_enrich_cache_size | gauge | Current cache entry count |
adbc_enrich_query_duration_seconds | histogram | Micro-batch query latency |
adbc_enrich_query_errors_total | counter | Failed enrichment queries |
Integration Testing
The ADBC integration tests require Docker (for PostgreSQL via testcontainers) and the adbc_driver_postgresql shared library.
Set the ADBC_POSTGRESQL_DRIVER environment variable if the driver is not on the default library search path:
# Point to the driver if not on LD_LIBRARY_PATH
export ADBC_POSTGRESQL_DRIVER=/usr/local/lib/libadbc_driver_postgresql.so
# Run the sink integration tests
cargo test -p volley-connector-adbc --test sink_integration -- --ignored
# Run the enrichment integration tests
cargo test -p volley-enrichment-adbc --test enrich_integration -- --ignored
Tests use volley-test-harness::containers::PostgresContainer to spin up a PostgreSQL instance automatically.
Troubleshooting
“Failed to load ADBC driver”
- Verify the driver shared library is installed and on the library search path.
- On macOS: check
DYLD_LIBRARY_PATH. On Linux: checkLD_LIBRARY_PATH. - Try passing the full file path instead of just the driver name.
“Failed to create ADBC connection”
- Verify the connection URI is correct and the database is reachable.
- Check that driver-specific options (credentials, project ID, etc.) are set via
with_option().
Slow enrichment queries
- Increase
cache_ttlandcache_max_entriesto reduce query frequency. - Add an index on the lookup column in the remote database.
- Use
with_select_columns()to fetch only the columns you need.
Duplicate rows after recovery
- Use
with_upsert_key()to make writes idempotent on a natural key. - Without upsert, the sink provides at-least-once delivery.
Kubernetes Operator
Volley includes a Kubernetes operator that automates deployment and management of Volley stream processing applications. Define a VolleyApplication custom resource and the operator handles StatefulSets, persistent storage, health checks, and Prometheus integration.
Prerequisites
- Kubernetes cluster (v1.26+)
- prometheus-operator (optional, for automatic metrics scraping)
- KEDA (optional, for autoscaling)
Installing the Operator
Build and deploy the operator:
# Build the operator binary
cargo build --release -p volley-k8s-operator
# Generate the CRD manifest
./target/release/volley-k8s-operator generate-crd > volley-crd.json
# Apply the CRD to your cluster
kubectl apply -f volley-crd.json
# Run the operator (in-cluster or locally for development)
./target/release/volley-k8s-operator
Deploying a Volley Application
Create a VolleyApplication resource:
apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
name: my-pipeline
namespace: default
spec:
# Container image (required)
image: registry.example.com/my-volley-app:v1.0
# Resource requirements (required)
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
# RocksDB state storage
state:
storageClassName: fast-ssd
size: 10Gi
# Checkpoint storage
checkpoints:
storageClassName: fast-ssd
size: 20Gi
# Environment variables
env:
- name: RUST_LOG
value: "info"
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka.default.svc:9092"
kubectl apply -f my-pipeline.yaml
The operator creates the following resources automatically:
| Resource | Purpose |
|---|---|
| StatefulSet | Runs the app with stable pod identity and persistent volumes |
| PVC (state) | Persistent storage for RocksDB state (/var/lib/volley/state) |
| PVC (checkpoints) | Persistent storage for checkpoints (/var/lib/volley/checkpoints) |
| Service (headless) | StatefulSet DNS for stable network identity |
| Service (metrics) | Exposes the Prometheus metrics port |
| ServiceMonitor | Prometheus-operator auto-discovery for metrics scraping |
| PodDisruptionBudget | Prevents involuntary eviction during checkpointing |
Health Checks
The operator automatically configures liveness and readiness probes backed by Volley’s built-in HealthReporter:
| Probe | Path | Behaviour |
|---|---|---|
| Liveness | /health (port 8080) | Returns healthy when pipeline is Running or Starting |
| Readiness | /ready (port 8080) | Returns ready only when pipeline is Running |
Customize the health check configuration:
spec:
health:
port: 8080
livenessPath: /health
readinessPath: /ready
initialDelaySeconds: 30
periodSeconds: 10
Observability
Metrics are exposed on port 9090 by default. When prometheus-operator is installed, the operator creates a ServiceMonitor for automatic scraping:
spec:
observability:
metricsPort: 9090
metricsPath: /metrics
serviceMonitor:
enabled: true
interval: 15s
labels:
release: prometheus # match your Prometheus selector
otlpEndpoint: http://otel-collector.monitoring:4317 # optional tracing
Key Volley metrics available for dashboards and alerting:
| Metric | Type | Description |
|---|---|---|
volley.source.records_polled | Counter | Records read from source |
volley.records.processed | Counter | Records processed per operator |
volley.stage.latency_ms | Histogram | Per-operator processing latency |
volley.checkpoint.duration_ms | Histogram | Checkpoint completion time |
volley.channel.utilization | Gauge | Backpressure indicator (0.0-1.0) |
volley.kafka.consumer_lag | Gauge | Kafka consumer group lag |
volley.pipeline.uptime_seconds | Gauge | Pipeline uptime |
Node Isolation
Following Flink best practice, each Volley application should run on dedicated nodes. The operator supports node selectors, tolerations, and pod anti-affinity:
spec:
isolation:
# Schedule only on nodes labelled for stream processing
nodeSelector:
volley.io/pool: stream-processing
# Tolerate the dedicated taint
tolerations:
- key: volley.io/dedicated
operator: Equal
value: "true"
effect: NoSchedule
# One pod per node (enabled by default)
podAntiAffinity: true
Distributed Execution (Horizontal Scaling)
To run a pipeline across multiple pods, add the cluster section to your spec:
First, create a ReadWriteMany PVC backed by your cloud provider’s managed NFS (AWS EFS, GCP Filestore, Azure Files):
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: volley-shared-ckpt
spec:
accessModes: [ReadWriteMany]
storageClassName: efs # your RWX storage class
resources:
requests:
storage: 50Gi
Then reference it in your VolleyApplication:
spec:
replicas: 3
cluster:
maxKeyGroups: 256 # virtual partitions (default, power of 2)
flightPort: 50051 # Arrow Flight port (default)
sharedCheckpoints:
claimName: volley-shared-ckpt # the PVC you created above
When cluster is present, the operator automatically:
| Action | Detail |
|---|---|
Injects VOLLEY_* env vars | VOLLEY_NUM_WORKERS, VOLLEY_HEADLESS_SERVICE, VOLLEY_MAX_KEY_GROUPS, VOLLEY_FLIGHT_PORT, VOLLEY_APP_NAME, VOLLEY_SHARED_CHECKPOINT_DIR |
| Adds Flight port | Container port 50051 + headless service port for inter-pod communication |
| Mounts shared PVC | The existing ReadWriteMany PVC is mounted into all pods at /mnt/volley/checkpoints |
The Volley runtime reads these env vars via ClusterConfig::from_env() and automatically enters distributed mode.
Shared checkpoint storage uses a pre-provisioned ReadWriteMany PVC (managed NFS from your cloud provider). All workers write checkpoints to the same shared mount, so recovery and rescaling are near-instant — no data transfer needed.
Rescaling is live: change spec.replicas and the coordinator triggers a checkpoint, recomputes key group assignments, and workers pick up their new key groups from the shared filesystem. No restart required.
Autoscaling with KEDA (Future)
The operator has built-in support for KEDA autoscaling. When enabled, it creates a ScaledObject that can scale based on Kafka consumer lag, Prometheus metrics, or CPU/memory:
spec:
autoscaling:
enabled: true
minReplicas: 1
maxReplicas: 10
triggers:
# Scale on Kafka consumer lag
- type: kafka
metadata:
bootstrapServers: kafka.default.svc:9092
consumerGroup: my-pipeline-group
topic: input-events
lagThreshold: "1000"
# Scale on backpressure (channel utilization)
- type: prometheus
metadata:
serverAddress: http://prometheus.monitoring:9090
query: avg(volley_channel_utilization{job="my-pipeline"})
threshold: "0.8"
# Scale on CPU
- type: cpu
metadata:
value: "70"
Checking Application Status
# List all Volley applications
kubectl get volleyapplications
# or use the short name:
kubectl get vapp
# Check status
kubectl describe vapp my-pipeline
# View operator logs
kubectl logs deployment/volley-k8s-operator
The status subresource reports the current phase (Pending, Running, Degraded, Failed), replica counts, and a human-readable message.
Full Spec Reference
Click to expand the full VolleyApplication spec
apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
name: my-pipeline
namespace: default
spec:
image: registry.example.com/my-app:v1.0 # required
imagePullPolicy: IfNotPresent # default: IfNotPresent
imagePullSecrets:
- name: registry-secret
resources: # required
requests: { cpu: "2", memory: "4Gi" }
limits: { cpu: "4", memory: "8Gi" }
replicas: 1 # default: 1
args: ["--config", "/etc/volley/config.yaml"]
env:
- name: RUST_LOG
value: "info"
- name: DB_PASSWORD
valueFrom:
secretKeyRef: { name: db-creds, key: password }
state:
storageClassName: fast-ssd
size: 10Gi
mountPath: /var/lib/volley/state # default
checkpoints:
storageClassName: fast-ssd
size: 20Gi
mountPath: /var/lib/volley/checkpoints # default
observability:
metricsPort: 9090 # default: 9090
metricsPath: /metrics # default: /metrics
serviceMonitor:
enabled: true # default: true
interval: 15s # default: 15s
labels: { release: prometheus }
otlpEndpoint: http://otel-collector:4317
health:
port: 8080 # default: 8080
livenessPath: /health # default: /health
readinessPath: /ready # default: /ready
initialDelaySeconds: 30 # default: 30
periodSeconds: 10 # default: 10
isolation:
nodeSelector: { volley.io/pool: stream-processing }
tolerations:
- key: volley.io/dedicated
operator: Equal
value: "true"
effect: NoSchedule
podAntiAffinity: true # default: true
kafka:
bootstrapServers: kafka:9092
consumerGroup: my-group
topics: [input-events]
autoscaling:
enabled: false # default: false
minReplicas: 1
maxReplicas: 10
triggers:
- type: kafka
metadata:
bootstrapServers: kafka:9092
consumerGroup: my-group
topic: input-events
lagThreshold: "1000"
cluster: # distributed execution (optional)
maxKeyGroups: 256 # default: 256 (power of 2)
flightPort: 50051 # default: 50051
sharedCheckpoints:
claimName: volley-shared-ckpt # existing ReadWriteMany PVC
mountPath: /mnt/volley/checkpoints # default
labels: {} # additional labels on all resources
annotations: {} # additional annotations on all resources
Volley Capacity Planning Guide
This guide helps pipeline authors size CPU, memory, and storage for Volley deployments, and provides PromQL queries for ongoing capacity monitoring.
Throughput Reference
Benchmark data from volley-examples/examples/benchmark_sustained.rs on a single node:
| Configuration | Throughput | Notes |
|---|---|---|
| Single record, parallelism=1 | ~850K rec/s | Baseline, no batching |
| Batch=1000, parallelism=1 | ~3.5M rec/s | Batching only |
| Batch=1000, parallelism=4 | ~8.7M rec/s | Batching + parallelism |
| Sustained 10s, batch=1000, parallelism=4 | ~9.4M rec/s | Steady-state throughput |
These numbers reflect in-memory processing with simple operators. Real-world throughput will be lower depending on:
- Operator complexity (aggregations, joins, windowing)
- State backend I/O (RocksDB reads/writes)
- Connector latency (Kafka, blob storage)
- Checkpoint frequency
CPU Sizing
| Throughput target | CPU requests | CPU limits | Notes |
|---|---|---|---|
| < 100K rec/s | 500m | 1 | Light pipelines, simple transforms |
| 100K–1M rec/s | 1 | 2 | Moderate load, stateful operators |
| 1M–5M rec/s | 2 | 4 | High throughput, batched processing |
| > 5M rec/s | 4 | 8 | Maximum throughput, parallelism=4+ |
Volley uses tokio async runtime. CPU is primarily consumed by:
- Record deserialization/serialization
- Operator logic (map, filter, aggregate, window)
- RocksDB compaction (background threads)
- Kafka consumer polling and producer flushing
Recommendation: Start with 1 CPU request and monitor container_cpu_usage_seconds_total. Scale up if sustained utilization exceeds 70%.
Memory Sizing
| Workload | Memory requests | Memory limits | Notes |
|---|---|---|---|
| Stateless transforms | 256Mi | 512Mi | No state backend |
| Light stateful (< 100K keys) | 512Mi | 1Gi | Small RocksDB footprint |
| Medium stateful (100K–1M keys) | 1Gi | 2Gi | RocksDB block cache |
| Heavy stateful (> 1M keys) | 2Gi | 4Gi | Large working set |
Memory is consumed by:
- Channel buffers: In-flight records between pipeline stages
- RocksDB block cache: Cached SST blocks for read performance (defaults to ~50% of available memory)
- Arrow RecordBatch buffers: Columnar data in transit
- Kafka consumer buffers: Pre-fetched messages
- Window pending index: Active
(key, window)pairs tracked by window operators (~100 bytes each) - Aggregate LRU cache: In-memory accumulator cache for keyed aggregations
Recommendation: Set memory limits to 2x requests. Monitor container_memory_working_set_bytes and watch for OOM kills.
Bounding Stateful Operator Memory
For high-cardinality keyed workloads, use the built-in LRU mechanisms to cap in-memory state:
- Aggregate operators:
keyed_stream.with_max_cache_entries(n)— bounds the in-memory accumulator cache. Evicted entries are flushed to RocksDB. - Window operators:
windowed_stream.with_max_pending_windows(n)— bounds the in-memory window index. Evicted entries are persisted to the state backend and scanned during watermark advance.
As a rule of thumb, each aggregate cache entry is ~200-500 bytes (depends on accumulator type), and each pending window entry is ~100 bytes. Size the limits based on your available memory budget:
aggregate_memory ≈ max_cache_entries × 300 bytes
window_memory ≈ max_pending_windows × 100 bytes
Monitor volley_state_cache_evictions_total and volley_window_pending_evictions_total — a sustained high eviction rate means the LRU is undersized and incurring excessive RocksDB I/O.
RocksDB Storage Sizing
Growth Formula
state_size = key_count × avg_value_size × amplification_factor
Where:
key_count: Number of unique keys in your keyed stateavg_value_size: Average serialized value size (JSON encoding adds ~30% overhead)amplification_factor: RocksDB space amplification, typically 1.1–1.5x depending on compaction
Checkpoint Storage
checkpoint_storage = state_size × checkpoint_retention_count
RocksDB checkpoints use hardlinks to SST files, so incremental growth is small when data doesn’t change between checkpoints. However, as data mutates, old SST files are retained for older checkpoints.
PVC Sizing Recommendations
| State size | State PVC | Checkpoint PVC | Retention |
|---|---|---|---|
| < 1 GB | 5Gi | 5Gi | 3 checkpoints |
| 1–10 GB | 20Gi | 20Gi | 3 checkpoints |
| 10–50 GB | 100Gi | 100Gi | 2 checkpoints |
| > 50 GB | 2× state size | 2× state size | 2 checkpoints |
Recommendation: Provision at least 2× your expected state size for headroom. Monitor actual usage with the PromQL queries below and adjust.
Helm Values Example
state:
enabled: true
size: "20Gi"
storageClassName: gp3 # Use provisioned IOPS for write-heavy workloads
checkpoints:
enabled: true
size: "20Gi"
Capacity Monitoring PromQL Queries
Throughput
# Records ingested per second by pipeline
sum(rate(volley_source_records_polled_total[5m])) by (job_name)
# Records processed per second by pipeline
sum(rate(volley_records_processed_total[5m])) by (job_name)
# Processing ratio (should be close to 1.0)
sum(rate(volley_records_processed_total[5m])) by (job_name)
/
sum(rate(volley_source_records_polled_total[5m])) by (job_name)
Backpressure
# Channel utilization by stage (>0.8 = backpressure)
avg(volley_channel_utilization) by (job_name, stage)
# Stages with sustained backpressure
avg_over_time(volley_channel_utilization[15m]) > 0.8
Kafka Consumer Lag
# Total consumer lag by pipeline
sum(volley_kafka_consumer_lag) by (job_name)
# Lag growth rate (positive = falling behind)
deriv(sum(volley_kafka_consumer_lag) by (job_name)[15m:1m])
Latency
# p99 stage latency
histogram_quantile(0.99, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))
# p50 stage latency
histogram_quantile(0.5, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))
# Checkpoint duration p99
histogram_quantile(0.99, sum(rate(volley_checkpoint_duration_ms_bucket[5m])) by (le, job_name))
Error Rate
# Error rate by type
sum(rate(volley_errors_total[5m])) by (job_name, error_type)
# Source errors by connector
sum(rate(volley_source_errors_total[5m])) by (job_name, source_id)
# Sink errors by connector
sum(rate(volley_sink_errors_total[5m])) by (job_name, sink_id)
# Error ratio (for SLO tracking)
sum(rate(volley_errors_total[5m])) by (job_name)
/
clamp_min(sum(rate(volley_records_processed_total[5m])) by (job_name), 1)
Resource Utilization
# Watermark lag (event-time vs. wall-clock)
max(volley_watermark_lag_ms) by (job_name)
# Sink flush duration p99
histogram_quantile(0.99, sum(rate(volley_sink_flush_duration_ms_bucket[5m])) by (le, job_name))
KEDA Autoscaling Reference
Example KEDA trigger configurations for the Helm chart:
Scale on Kafka Consumer Lag
autoscaling:
enabled: true
minReplicas: 1
maxReplicas: 8
triggers:
- type: kafka
metadata:
bootstrapServers: "kafka:9092"
consumerGroup: "my-pipeline"
topic: "input-topic"
lagThreshold: "10000" # Scale up when lag exceeds 10K messages
Scale on CPU
autoscaling:
enabled: true
minReplicas: 1
maxReplicas: 4
triggers:
- type: cpu
metadata:
type: Utilization
value: "70" # Scale up at 70% CPU utilization
Scale on Custom Prometheus Metric
autoscaling:
enabled: true
minReplicas: 1
maxReplicas: 8
triggers:
- type: prometheus
metadata:
serverAddress: "http://prometheus:9090"
query: "sum(volley_channel_utilization{job_name='my-pipeline'}) / count(volley_channel_utilization{job_name='my-pipeline'})"
threshold: "0.8" # Scale up when average saturation > 80%
Capacity Planning Checklist
- Estimate throughput: What is your expected input rate (records/sec)?
- Size CPU/memory: Use the tables above as a starting point
- Estimate state size:
key_count × avg_value_size × 1.5 - Size PVCs: 2× expected state size for both state and checkpoint volumes
- Configure autoscaling: Set KEDA triggers based on lag or saturation
- Deploy and monitor: Use the PromQL queries above to validate sizing
- Iterate: Adjust resources based on actual utilization patterns
Observability
Volley provides built-in observability via the volley_core::observability module: Prometheus metrics, OpenTelemetry tracing, structured logging, and health reporting. Observability auto-initializes at pipeline startup with zero-config defaults.
Metrics
Volley exposes 15+ metrics via Prometheus exposition format. All metrics include node_id and job_name labels.
| Metric | Type | Labels | Description |
|---|---|---|---|
volley.source.records_polled | Counter | source_id | Records read from source |
volley.records.processed | Counter | operator_id | Records processed per operator |
volley.stage.latency_ms | Histogram | stage | Per-operator processing latency |
volley.checkpoint.duration_ms | Histogram | — | Checkpoint completion time |
volley.checkpoint.epoch | Gauge | — | Current checkpoint epoch |
volley.checkpoint.failures | Counter | — | Checkpoint failure count |
volley.channel.utilization | Gauge | stage | Backpressure indicator (0.0-1.0) |
volley.watermark.lag_ms | Gauge | — | Event-time lag |
volley.sink.records_written | Counter | sink_id | Records written per sink |
volley.sink.flush.duration_ms | Histogram | sink_id | Sink flush latency |
volley.pipeline.uptime_seconds | Gauge | — | Pipeline uptime |
volley.pipeline.health | Gauge | — | Health state (0=Starting, 1=Running, 2=Degraded, 3=Failed, 4=ShuttingDown) |
volley.errors | Counter | error_type | Errors by type |
volley.source.errors | Counter | source_id | Per-connector source errors |
volley.sink.errors | Counter | sink_id | Per-connector sink errors |
volley.kafka.consumer_lag | Gauge | — | Kafka consumer group lag |
volley.kafka.commit.duration_ms | Histogram | — | Kafka commit latency |
volley.kafka.commit.failures | Counter | — | Kafka commit failures |
volley.blob.objects_read | Counter | — | Blob source throughput |
volley.blob.bytes_read | Counter | — | Blob I/O volume |
volley.state.cache_evictions | Counter | operator_id | Aggregate LRU cache evictions |
volley.window.pending_evictions | Counter | — | Window pending index LRU evictions |
volley.window.pending_size | Gauge | — | In-memory pending window entry count |
volley.checkpoint.gc_cleaned | Counter | — | Old checkpoints cleaned by GC |
ML Inference Metrics
When using volley-ml operators (.infer(), .embed(), .classify(), .infer_remote()), additional inference-specific metrics are emitted:
| Metric | Type | Labels | Description |
|---|---|---|---|
volley.ml.inference.duration_ms | Histogram | backend, model_name | Per-batch inference latency (backend call only) |
volley.ml.inference.batch_rows | Histogram | backend, model_name | Rows per inference batch (batch utilization) |
volley.ml.model_load.duration_ms | Histogram | backend, model_name | Model load time (emitted once at startup) |
volley.ml.inference.errors | Counter | backend, model_name, error_type | Inference errors by backend and type |
volley.ml.http.request.duration_ms | Histogram | endpoint | HTTP round-trip latency for remote model servers |
volley.ml.http.queue_depth | Gauge | endpoint | Available concurrency permits (inverse of queue depth) |
volley.ml.http.errors | Counter | endpoint, status_code | HTTP errors from remote model servers |
The backend label is "onnx" or "candle". The model_name label is the model path or HuggingFace repo ID passed to the operator config.
Golden Signals PromQL
# Latency (p99)
histogram_quantile(0.99, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))
# Traffic (records/sec)
sum(rate(volley_source_records_polled_total[5m])) by (job_name)
# Errors (rate by type)
sum(rate(volley_errors_total[5m])) by (job_name, error_type)
# Error ratio
sum(rate(volley_errors_total[5m])) by (job_name)
/
sum(rate(volley_records_processed_total[5m])) by (job_name)
# Saturation (channel utilization)
avg(volley_channel_utilization) by (job_name, stage)
# Saturation (Kafka consumer lag)
sum(volley_kafka_consumer_lag) by (job_name)
ML Inference PromQL
# Inference latency (p99) by backend
histogram_quantile(0.99, sum(rate(volley_ml_inference_duration_ms_bucket[5m])) by (le, backend))
# Inference throughput (rows/sec)
sum(rate(volley_ml_inference_batch_rows_sum[5m])) by (backend)
# Inference error rate
sum(rate(volley_ml_inference_errors_total[5m])) by (backend, error_type)
# HTTP model server latency (p95)
histogram_quantile(0.95, sum(rate(volley_ml_http_request_duration_ms_bucket[5m])) by (le, endpoint))
State & Window PromQL
# Aggregate cache eviction rate (high = LRU undersized)
rate(volley_state_cache_evictions_total[5m])
# Window pending index eviction rate
rate(volley_window_pending_evictions_total[5m])
# In-memory pending window count
volley_window_pending_size
# Checkpoint GC cleanup rate
rate(volley_checkpoint_gc_cleaned_total[5m])
Tracing
OpenTelemetry integration via OTLP gRPC export with tracing-opentelemetry bridge:
- Resource attributes include
service.nameandnode.id - Graceful fallback if OTLP endpoint is unavailable
- Trace-log correlation:
trace_idandspan_idinjected into JSON log lines when a span is active
Configure the OTLP endpoint:
# In VolleyApplication CRD
spec:
observability:
otlpEndpoint: http://otel-collector.monitoring:4317
Per-Record Tracing
Volley supports per-record trace propagation with configurable sampling. When enabled, individual records flowing through the pipeline carry OpenTelemetry trace context. Each operator creates a child span showing processing time, input/output row counts, and optional payload previews.
Configuration
#![allow(unused)]
fn main() {
use volley_core::observability::{TracingConfig, SamplingStrategy};
StreamExecutionEnvironment::new()
.from_source(source)
.with_tracing(TracingConfig {
sampling: SamplingStrategy::Ratio(0.01), // trace 1% of records
max_payload_bytes: 1024, // max JSON preview size
capture_payload: true, // enable input/output previews
})
.filter(|r| true)
.map(|r| Ok(r))
.to_sink(sink)
.execute("my-job")
.await?;
}
Sampling Strategies
| Strategy | Description | Use case |
|---|---|---|
Always | Trace every record | Development, debugging |
Never | Disable tracing | Production default |
Ratio(f64) | Trace a random fraction | Production with sampling (e.g., 0.01 = 1%) |
Non-sampled records pay near-zero overhead (a single branch check).
Span Hierarchy
For a pipeline source -> map -> filter -> sink, a sampled record produces:
volley.record (root span)
├── volley.source — source output metadata
├── volley.operator [0] — map operator (input/output rows, timing, preview)
├── volley.operator [1] — filter operator
└── volley.sink — sink write (timing, errors)
Each span includes attributes:
| Attribute | Description |
|---|---|
volley.operator.name | Auto-generated operator name (e.g., operator-0) |
volley.operator.index | Position in the operator chain |
volley.record.event_time_ms | Record’s event timestamp |
volley.record.input_rows | Input batch row count |
volley.record.output_rows | Output batch row count |
When capture_payload is enabled, input and output JSON previews are attached as span events.
Window and Aggregate Outputs
Window and aggregate operators accumulate multiple input records and emit new result records. These outputs get a new trace with span links back to the input traces:
volley.window.output (new root, links to input traces)
├── volley.operator [next]
└── volley.sink
The volley.window.input_trace_count attribute shows how many sampled input records contributed to the aggregation.
Kafka Trace Context Propagation
When using the Kafka source, Volley automatically extracts W3C traceparent headers from consumed messages. If an upstream service traced a request that produced a Kafka message, the trace continues through Volley.
Similarly, the Kafka sink injects traceparent headers into produced messages, propagating trace context to downstream consumers.
This enables full end-to-end distributed tracing across service boundaries:
upstream service → Kafka → Volley pipeline → Kafka → downstream service
trace-id=abc trace-id=abc trace-id=abc
Structured Logging
ObservabilityConfig supports a log_format field with Text (default) and Json modes:
#![allow(unused)]
fn main() {
ObservabilityConfig::new()
.with_json_logging()
.init();
}
JSON mode produces structured output compatible with Loki, Elasticsearch, and CloudWatch Logs. When OpenTelemetry is active, each log line includes trace_id and span_id fields for cross-referencing with distributed traces.
Useful RUST_LOG filters
# Default (info): lifecycle events, checkpoint completions, source/sink creation
RUST_LOG=info
# Debug state backend, window, and watermark behavior
RUST_LOG=info,volley_core::state=debug,volley_core::operators::window=debug
# Debug file decoding (Parquet, JSON Lines, CSV)
RUST_LOG=info,volley_connectors::blob_store::decoders=debug
# Deep debugging: per-record RocksDB get/put/delete
RUST_LOG=info,volley_core::state::rocks=trace
# Debug protobuf decode with batch context
RUST_LOG=info,volley_core::operators::protobuf=debug
Health Reporting
The HealthReporter tracks pipeline state with atomic transitions:
| State | Liveness | Readiness | Description |
|---|---|---|---|
Starting | healthy | not ready | Pipeline initializing |
Running | healthy | ready | Normal operation |
Degraded | healthy | not ready | Partial failure (with reason) |
Failed | unhealthy | not ready | Complete failure (with reason) |
ShuttingDown | unhealthy | not ready | Graceful shutdown in progress |
The volley.pipeline.health gauge emits the state as a numeric value on every transition, enabling Prometheus alerting.
SLO Recording Rules
Prometheus recording rules are provided at deploy/prometheus/recording-rules.yaml:
volley:availability:ratio_rate5m— availability SLI ratiovolley:latency_sli:ratio_rate5m— latency SLI (fraction under 100ms)volley:error_budget:remaining— remaining error budget against 99.9% target
Alerting Rules
Burn-rate alerting rules at deploy/prometheus/alerting-rules.yaml:
VolleyHighErrorBurnRate(critical) — 14.4x burn over 1hVolleySlowErrorBurnRate(warning) — 1x burn over 3dVolleyPipelineFailed(critical) — health==3 for 1mVolleyPipelineDegraded(warning) — health==2 for 5m
Kubernetes Integration
The K8s operator automatically creates a ServiceMonitor for Prometheus scraping. See Kubernetes Operator for ServiceMonitor configuration.
Volley SRE Gap Analysis
Date: 2026-03-30 Reference framework: msitarzewski/agency-agents engineering-sre.md
Important Context: Framework vs. Service
Volley is a stream processing framework (library + Kubernetes operator), not a deployed service. This analysis evaluates:
- Framework enablement — How well does Volley equip pipeline authors to follow SRE practices?
- Framework development practices — How well does Volley’s own CI/testing/release process follow SRE principles?
Org-level concerns (on-call rotations, blameless culture documentation, MTTR tracking) are out of scope — those belong to teams operating Volley-based pipelines, not the framework itself.
Executive Summary
| Pillar | Score | Verdict |
|---|---|---|
| SLOs & Error Budgets | C+ (55%) | SLI-ready metrics + Prometheus recording/alerting rule templates + SLO definition template; no runtime error budget gauge yet |
| Observability | A- (88%) | Strong metrics + tracing + structured JSON logging + alerting rule templates + trace-log correlation + per-connector error counters; missing dashboards |
| Toil Reduction | B- (65%) | CI + K8s operator + Dockerfile + Helm chart; no CD pipeline or benchmark regression tracking |
| Chaos Engineering | C- (40%) | Checkpoint recovery unit-tested + fault injection integration tests for checkpoint/RocksDB failures; no Kafka fault injection |
| Capacity Planning | B- (65%) | KEDA autoscaling scaffolded, benchmarks exist, capacity planning guide with sizing tables and PromQL queries |
| Progressive Rollouts | F (5%) | Operator uses default StatefulSet RollingUpdate; no canary or automated rollback |
| Incident Response Primitives | C+ (55%) | PipelineHealth enum + K8s probes + volley.pipeline.health gauge + alerting rules; missing severity-to-SLO mapping docs |
Overall: Volley has a strong observability foundation, solid Kubernetes integration, SLO recording rules, alerting templates, structured logging with trace-log correlation, a Dockerfile, Helm chart, SLO definition template, fault injection tests, and capacity planning guide. Remaining gaps are dashboard templates, CD pipeline, progressive rollout support, and Kafka fault injection.
Scoring Methodology
| Grade | Range | Meaning |
|---|---|---|
| A | 90-100% | Production-ready, matches SRE framework recommendations |
| B | 70-89% | Strong foundation, minor gaps |
| C | 50-69% | Partial coverage, significant gaps |
| D | 25-49% | Minimal coverage, major gaps |
| F | 0-24% | Absent or negligible |
Criteria per pillar: API completeness, built-in tooling, documentation for pipeline authors, integration with ecosystem tooling (Prometheus, Grafana, K8s).
Pillar 1: SLOs & Error Budgets — C (50%)
Current State
Volley exposes metrics that serve as natural SLI candidates (volley-observability/src/metrics.rs):
| Metric | Type | SLI Use |
|---|---|---|
volley.records.processed | Counter | Availability — total processed records |
volley.errors | Counter | Availability — error count for SLI ratio |
volley.stage.latency_ms | Histogram | Latency — processing duration distribution |
volley.pipeline.uptime_seconds | Gauge | Availability — pipeline uptime |
volley.checkpoint.failures | Counter | Reliability — checkpoint failure rate |
The PipelineHealth enum (volley-observability/src/health.rs) provides binary health states (Starting, Running, Degraded, Failed, ShuttingDown) but these are probe-level signals, not SLO-aligned measurements.
What Exists
Prometheus recording rules are provided at deploy/prometheus/recording-rules.yaml computing:
volley:availability:ratio_rate5mandvolley:availability:ratio_rate30d— availability SLI ratiosvolley:latency_sli:ratio_rate5m— latency SLI (fraction under 100ms)volley:error_budget:remaining— remaining error budget against 99.9% targetvolley:throughput:rate5mandvolley:saturation:avg— capacity signals
Multi-window burn rate alerting rules are provided at deploy/prometheus/alerting-rules.yaml:
VolleyHighErrorBurnRate(critical) — 14.4x burn over 1h confirmed by 5m windowVolleySlowErrorBurnRate(warning) — 1x burn over 3d confirmed by 6h window
Remaining Gaps
- No SLO definition template or configuration format for pipeline authors to declare per-pipeline targets
- No runtime error budget gauge metric in
volley-observability(the recording rule computes it in Prometheus, but no native Rust gauge) - No documentation connecting error budget exhaustion to operational decisions (e.g., deployment freeze policy)
Recommendations
(P1) Add an error_budget_remaining gauge to volley-observability that pipeline authors can enable to expose real-time budget consumption natively from the runtime, complementing the Prometheus recording rule.
(P1) Provide an SLO definition template format (YAML) that pipeline authors can use to declare per-pipeline availability and latency targets, which can be consumed by the Prometheus rules.
(P2) Document an error budget policy template: when budget < 25%, freeze non-critical deployments; when budget < 10%, enter incident response mode.
Pillar 2: Observability — B+ (80%)
Current State
Metrics (volley-observability/src/metrics.rs): 15+ named metric constants covering sources, operators, checkpoints, channels, watermarks, sinks, pipeline health, Kafka, and blob storage. All use node_id/job_name labels. Three helper functions (increment_counter, set_gauge, record_histogram) enforce consistent labeling.
Tracing (volley-observability/src/tracing_init.rs): OpenTelemetry integration via OTLP gRPC export with tracing-opentelemetry bridge. Resource attributes include service.name and node.id. Graceful fallback if OTLP endpoint unavailable.
Health (volley-observability/src/health.rs): HealthReporter with atomic state tracking, liveness/readiness semantics. K8s probes configured in operator’s StatefulSet builder.
ServiceMonitor (volley-operator/src/crd.rs): Auto-created by operator with configurable scrape interval and labels.
What Exists
- Structured JSON logging:
ObservabilityConfigsupports alog_formatfield withText(default) andJsonmodes. When set toJson,tracing_subscriber::fmt::json()is used for structured output compatible with log aggregation systems (Loki, Elasticsearch, CloudWatch Logs). Pipeline authors enable it via.with_json_logging(). - Alerting rule templates: Prometheus alerting rules are provided at
deploy/prometheus/alerting-rules.yamlcovering error burn rate (fast + slow), pipeline health state, checkpoint failures, channel saturation, Kafka consumer lag, and p99 latency.
What Was Added
- Trace-log correlation:
TraceContextFormatinvolley-core/src/observability/trace_context.rssafely injectstrace_idandspan_idfields into JSON log output viaserde_jsonround-trip when an OpenTelemetry span is active, enabling cross-referencing between logs and distributed traces. - Per-connector error counters:
volley.source.errorsandvolley.sink.errorsmetrics defined involley-observability/src/metrics.rswithsource_id/sink_idlabels, and incremented involley-core/src/builder/runtime.rson poll/write/flush failures.connector_id()method added toSource,Sink,DynSource, andDynSinktraits, implemented across Kafka, blob store, and Iceberg connectors.
Remaining Gaps
- No dashboard templates: No Grafana dashboard JSON mapping the four golden signals to Volley metrics.
Recommendations
(P0) Provide a golden signals Grafana dashboard template using Volley’s actual metrics:
| Golden Signal | Volley Metric | PromQL |
|---|---|---|
| Latency | volley.stage.latency_ms | histogram_quantile(0.99, rate(volley_stage_latency_ms_bucket[5m])) |
| Traffic | volley.source.records_polled | sum(rate(volley_source_records_polled_total[5m])) by (job_name) |
| Errors | volley.errors | sum(rate(volley_errors_total[5m])) by (job_name, error_type) |
| Saturation | volley.channel.utilization | avg(volley_channel_utilization) by (job_name, stage) |
(P1) Enable trace-log correlation by configuring the fmt layer with OpenTelemetry trace context propagation, so JSON log lines include trace_id and span_id fields.
(P2) Add per-connector error counters to metrics.rs (volley.source.errors, volley.sink.errors) with source_id/sink_id labels for finer-grained fault attribution.
Pillar 3: Toil Reduction — B- (65%)
Current State
CI pipeline (.github/workflows/ci.yml): Format checking (cargo fmt), linting (cargo clippy with default + kafka features), testing (default + kafka variants), benchmark compilation check. Uses Rust cache for faster builds.
K8s operator (volley-operator/): Automates StatefulSet, headless Service, metrics Service, ServiceMonitor, PodDisruptionBudget, and KEDA ScaledObject creation from a single VolleyApplication CRD. Server-side apply with idempotent reconciliation.
Checkpoint cleanup: RecoveryCoordinator handles automated old checkpoint cleanup.
What Exists
- Dockerfile: A multi-stage Dockerfile is provided at
deploy/docker/Dockerfilewith a Rust builder stage and minimal distroless runtime stage. Pipeline authors can extend or customize it for their specific binaries. - Helm chart: A Helm chart exists at
deploy/helm/volley/that templates theVolleyApplicationCRD withvalues.yamlcovering all CRD fields (resources, storage, observability, health, autoscaling, Kafka, isolation).
Remaining Gaps
- Dockerfile not referenced in documentation: The README and operator docs do not mention the Dockerfile or describe how pipeline authors should customize it for their applications.
- Helm chart has no production-hardening overlays: No example values files for common deployment topologies (small/medium/large, multi-AZ).
- No CD pipeline: CI exists but no automated release workflow (crate publishing, image building).
- No benchmark regression tracking: Benchmarks compile-checked but results not tracked over time.
Recommendations
(P1) Add a CD workflow (.github/workflows/release.yml) triggered on git tags that builds container images and publishes to a registry.
(P1) Add production-hardened example values files to the Helm chart (e.g., values-production.yaml with resource limits, anti-affinity, KEDA autoscaling).
(P2) Add benchmark regression detection in CI using criterion with stored baselines.
Pillar 4: Chaos Engineering — D (30%)
Current State
Checkpoint recovery is tested via unit tests — the HealthReporter transitions through Starting -> Running -> Degraded -> Failed states and these are verified. The operator handles reconciliation errors with retry backoff (15s requeue on failure).
Gaps
- No fault injection tests simulating Kafka broker disconnect, disk full, or OOM conditions.
- No integration tests exercising the Degraded -> Running recovery path under realistic failure.
- No Chaos Mesh or LitmusChaos experiment definitions for K8s-deployed pipelines.
Recommendations
(P1) Add integration tests for framework resilience:
- Checkpoint directory becomes read-only mid-pipeline
- Kafka broker disconnect during exactly-once transaction commit
- RocksDB write failure during state flush
(P1) Provide Chaos Mesh experiment templates for K8s-deployed Volley applications:
apiVersion: chaos-mesh.org/v1alpha1
kind: PodChaos
metadata:
name: volley-pod-kill
spec:
action: pod-kill
mode: one
selector:
labelSelectors:
app.kubernetes.io/managed-by: volley-operator
scheduler:
cron: "@every 30m"
(P2) Add toxiproxy integration to Kafka integration tests for network degradation simulation.
Pillar 5: Capacity Planning — C (45%)
Current State
Autoscaling: KEDA support fully scaffolded in operator (AutoscalingSpec in CRD) with Kafka lag, Prometheus metric, and CPU-based triggers. ScaledObject is created during reconciliation.
Benchmarks: benchmark_throughput and benchmark_sustained examples demonstrate 9.4M rec/s sustained on a single node.
Resource management: CRD requires explicit resources.requests and resources.limits for CPU/memory.
Gaps
- No sizing guide mapping resource allocation to expected throughput.
- No documentation on RocksDB storage growth patterns.
- No tested KEDA trigger threshold examples.
- No resource utilization dashboards.
Recommendations
(P1) Create a capacity planning guide covering:
- CPU/memory requirements per records/sec (based on benchmark data)
- RocksDB storage growth formula:
key_count * avg_value_size * checkpoint_retention_count - PVC sizing guidance based on state size and checkpoint interval
- Key PromQL queries for capacity monitoring:
- Throughput:
rate(volley_source_records_polled_total[5m]) - Backpressure:
volley_channel_utilization > 0.8 - Input backlog:
volley_kafka_consumer_lag
- Throughput:
(P2) Document tested KEDA trigger configurations with proven thresholds (e.g., scale at consumer lag > 10000).
Pillar 6: Progressive Rollouts — F (5%)
Current State
The operator creates StatefulSets with a configurable replicas count but uses the default RollingUpdate strategy. No canary, blue-green, or partition-based rollout mechanism exists. imagePullPolicy is configurable but no image tag pinning is enforced.
Gaps
- No
updateStrategyfield in the CRD spec. - No partition-based canary rollout support.
- No automated rollback on health check failure post-update.
- No Argo Rollouts or Flagger integration.
Recommendations
(P1) Add an updateStrategy field to VolleyApplicationSpec CRD with partition-based canary support:
#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct UpdateStrategySpec {
/// Partition value for canary rollouts. Pods with ordinal >= partition
/// get the new version. Set to replicas-1 to canary a single pod.
#[serde(default, rename = "partition")]
pub partition: Option<i32>,
}
}
(P2) Provide an Argo Rollouts AnalysisTemplate example that checks volley_errors_total rate and volley_stage_latency_ms p99 after each rollout step.
Pillar 7: Incident Response Primitives — C+ (55%)
Current State
Health states: PipelineHealth enum provides 5 states — Starting, Running, Degraded (with reason string), Failed (with reason string), ShuttingDown. These map to K8s liveness (healthy = Starting|Running) and readiness (ready = Running only).
Health gauge: volley.pipeline.health gauge is emitted on every state transition via HealthReporter, enabling Prometheus alerting on health state changes. Values: 0=Starting, 1=Running, 2=Degraded, 3=Failed, 4=ShuttingDown.
Operator status: CRD status tracks phases (Pending, Running, Degraded, Failed) with readyReplicas count and human-readable message.
Probes: Liveness at /health (port 8080), readiness at /ready, with configurable initialDelaySeconds and periodSeconds.
Alerting rules: deploy/prometheus/alerting-rules.yaml includes VolleyPipelineFailed (critical, health==3 for 1m) and VolleyPipelineDegraded (warning, health==2 for 5m).
Remaining Gaps
- No documentation mapping severity levels to PipelineHealth states and SLO impact.
- No runbook links in alert annotations (placeholder URLs only).
Recommendations
(P1) Document severity-to-health mapping for pipeline authors:
| Severity | PipelineHealth | SLO Impact | Response |
|---|---|---|---|
| SEV1 | Failed | SLO breached, pipeline stopped | Immediate: investigate root cause, restore from checkpoint |
| SEV2 | Degraded | Error budget burning above normal | Urgent: identify degradation source (e.g., checkpoint failures) |
| SEV3 | Running with elevated latency | Within budget but trending | Investigate: check channel.utilization, watermark.lag_ms |
(P1) Add runbook URLs to the existing alerting rules in deploy/prometheus/alerting-rules.yaml, linking to operational documentation for each failure mode.
Prioritized Implementation Roadmap
P0 — Completed
SLO recording rule templates—deploy/prometheus/recording-rules.yamlBurn rate alerting rule templates—deploy/prometheus/alerting-rules.yamlStructured JSON logging—ObservabilityConfig.log_formatwithLogFormat::JsonMulti-stage Dockerfile—deploy/docker/DockerfileHelm chart—deploy/helm/volley/Pipeline health gauge—volley.pipeline.healthemitted byHealthReporterHealth state alerting rules—VolleyPipelineFailed,VolleyPipelineDegradedin alerting rules
P0 — Remaining
- Golden signals dashboard template (Grafana JSON)
- Severity-to-PipelineHealth mapping documentation
P1 — Framework maturity
- Runtime error budget gauge metric in
volley-observability SLO definition template format for per-pipeline targets—deploy/prometheus/slo-template.yamlTrace-log correlation (trace_id in structured log output)—volley-core/src/observability/trace_context.rswithTraceContextFormatFault injection integration tests (checkpoint, RocksDB failures)—volley-core/tests/fault_injection_test.rs(9 tests)- Chaos Mesh experiment templates
Capacity planning guide (sizing, storage growth, key PromQL)—docs/src/operations/capacity-planning.mdupdateStrategyCRD field for partition-based canary rollouts- Runbook URLs in alerting rule annotations
- CD workflow for framework releases
- Production-hardened Helm values examples
P2 — Excellence
Per-connector error counters (— constants involley.source.errors,volley.sink.errors)volley-observability/src/metrics.rs, wired up involley-core/src/builder/runtime.rswithconnector_id()on Source/Sink traits- Benchmark regression detection in CI
- Toxiproxy integration for Kafka fault injection
- Argo Rollouts AnalysisTemplate example
- Tested KEDA trigger threshold documentation
- Error budget policy template documentation
Appendix A: Full Metric Inventory
All metrics defined in volley-observability/src/metrics.rs:
| Constant | Metric Name | Type | Labels | SRE Use |
|---|---|---|---|---|
SOURCE_RECORDS_POLLED | volley.source.records_polled | Counter | node_id, job_name, source_id | Traffic golden signal |
RECORDS_PROCESSED | volley.records.processed | Counter | node_id, job_name, operator_id | Throughput, SLI denominator |
STAGE_LATENCY_MS | volley.stage.latency_ms | Histogram | node_id, job_name, stage | Latency golden signal, SLI |
CHECKPOINT_DURATION_MS | volley.checkpoint.duration_ms | Histogram | node_id, job_name | Checkpoint health |
CHECKPOINT_EPOCH | volley.checkpoint.epoch | Gauge | node_id, job_name | Progress tracking |
CHECKPOINT_FAILURES | volley.checkpoint.failures | Counter | node_id, job_name | Reliability, alerting |
CHANNEL_UTILIZATION | volley.channel.utilization | Gauge | node_id, job_name, stage | Saturation golden signal |
WATERMARK_LAG_MS | volley.watermark.lag_ms | Gauge | node_id, job_name | Event-time lag |
SINK_RECORDS_WRITTEN | volley.sink.records_written | Counter | node_id, job_name, sink_id | Output throughput |
SINK_FLUSH_DURATION_MS | volley.sink.flush.duration_ms | Histogram | node_id, job_name, sink_id | Sink latency |
PIPELINE_UPTIME_SECONDS | volley.pipeline.uptime_seconds | Gauge | node_id, job_name | Availability |
PIPELINE_HEALTH | volley.pipeline.health | Gauge | node_id, job_name | Health state (0-4), alerting on Failed/Degraded |
ERRORS_TOTAL | volley.errors | Counter | node_id, job_name, error_type | Error golden signal, SLI numerator |
KAFKA_CONSUMER_LAG | volley.kafka.consumer_lag | Gauge | node_id, job_name | Backlog, autoscaling trigger |
KAFKA_COMMIT_DURATION_MS | volley.kafka.commit.duration_ms | Histogram | node_id, job_name | Kafka commit latency |
KAFKA_COMMIT_FAILURES | volley.kafka.commit.failures | Counter | node_id, job_name | Exactly-once reliability |
BLOB_OBJECTS_READ | volley.blob.objects_read | Counter | node_id, job_name | Blob source throughput |
BLOB_BYTES_READ | volley.blob.bytes_read | Counter | node_id, job_name | Blob I/O volume |
Label constants: LABEL_NODE_ID, LABEL_JOB_NAME, LABEL_STAGE, LABEL_OPERATOR_ID, LABEL_SOURCE_ID, LABEL_SINK_ID, LABEL_ERROR_TYPE
Appendix B: Reference Prometheus Rules
See Pillar 1 for SLO recording rules and burn rate alerts, and Pillar 7 for health state alerting rules.
Appendix C: Golden Signals PromQL Reference
# Latency (p50, p99)
histogram_quantile(0.5, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))
histogram_quantile(0.99, sum(rate(volley_stage_latency_ms_bucket[5m])) by (le, job_name))
# Traffic (records/sec by pipeline)
sum(rate(volley_source_records_polled_total[5m])) by (job_name)
# Errors (error rate by type)
sum(rate(volley_errors_total[5m])) by (job_name, error_type)
# Error ratio
sum(rate(volley_errors_total[5m])) by (job_name)
/
sum(rate(volley_records_processed_total[5m])) by (job_name)
# Saturation (channel utilization, higher = more backpressure)
avg(volley_channel_utilization) by (job_name, stage)
# Saturation (Kafka consumer lag)
sum(volley_kafka_consumer_lag) by (job_name)
Examples
Volley includes runnable examples in the volley-examples crate. Each demonstrates a different feature or pipeline pattern.
Running Examples
# Run any example
cargo run --example <name> -p volley-examples
# Run benchmarks in release mode
cargo run --example <name> -p volley-examples --release
Available Examples
| Example | Description |
|---|---|
in_memory_pipeline | Basic pipeline with filter, key_by, window, and aggregate using in-memory source |
kafka_pipeline | Kafka source to Kafka sink with windowed aggregation |
windowed_pipeline | Event-time windowing with watermarks and late data handling |
observable_pipeline | Pipeline with observability (Prometheus metrics, OpenTelemetry tracing) |
protobuf_to_delta | Protobuf decode -> Delta Lake sink with partitioned writes |
nested_proto_partitioned | Nested protobuf types with Hive-style partitioned writes |
benchmark_throughput | Single-shot throughput benchmark across batch sizes and parallelism levels |
benchmark_sustained | Sustained throughput benchmark (10s run, batch=1000, parallelism=4) |
Prerequisites
Most examples work out of the box. Some have additional requirements:
kafka_pipeline— requires a running Kafka broker atlocalhost:9092andcmakeinstalledprotobuf_to_delta,nested_proto_partitioned— requires a writable path for Delta table output- Benchmarks — run with
--releasefor meaningful numbers
Walkthrough
See Your First Pipeline for a step-by-step walkthrough of the in_memory_pipeline example.
Your First Pipeline
This walkthrough covers the in_memory_pipeline example, which demonstrates the core Volley API.
What It Does
The pipeline:
- Reads events from an in-memory iterator
- Filters records
- Partitions by a key field
- Groups into 5-minute tumbling windows
- Computes a sum aggregation
- Collects results in memory
Key Concepts
Creating the Environment
#![allow(unused)]
fn main() {
let env = StreamExecutionEnvironment::new();
}
This creates the pipeline builder. All pipeline construction starts here.
Adding a Source
#![allow(unused)]
fn main() {
env.from_iter(events)
}
from_iter() creates a DataStream from a Rust iterator of StreamRecord values. For production use, you’d use from_kafka() or a blob store source.
Applying Operators
#![allow(unused)]
fn main() {
.filter_expr(col("amount").gt(lit(100)))
}
Operators on a DataStream: filter_expr, map, flat_map, apply_operator.
Keying and Windowing
#![allow(unused)]
fn main() {
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(300)))
.aggregate(AggregationType::Sum, "amount")
}
.key_by() transitions to a KeyedStream (hash-partitioned by key). .window() transitions to WindowedKeyedStream. .aggregate() computes the result per window per key.
Executing
#![allow(unused)]
fn main() {
.collect()
.execute("aggregation-job")
.await?;
}
.collect() attaches an in-memory sink. .execute() starts the pipeline as concurrent Tokio tasks and returns when all data is processed.
Run It
cargo run --example in_memory_pipeline -p volley-examples
Expected output:
Pipeline 'aggregation-job' started
Source: MemorySource (4 records)
Operators: filter_expr → key_by → window → aggregate
Sink: MemorySink
Processing complete: 4 records in, 2 aggregated results out
Next Steps
- Kafka Pipeline — connect to real data
- Operators Guide — write custom operators
- Windowing — deep dive into window types
- All Examples — browse the full examples list
Contributing
Contributions to Volley are welcome! This page summarizes the development workflow. See the full Contributing Guide for complete details.
Development Setup
git clone https://github.com/volley-streams/volley.git
cd volley
cargo build --workspace
cargo test --workspace --exclude volley-python
System Dependencies
| Dependency | Required by | macOS | Ubuntu |
|---|---|---|---|
cmake | volley-connector-kafka | brew install cmake | apt install cmake |
libcurl4-openssl-dev | volley-connector-kafka | (included) | apt install libcurl4-openssl-dev |
protobuf-compiler | volley-scheduler | brew install protobuf | apt install protobuf-compiler |
Run volley doctor to check your environment.
Code Style
- Formatting:
cargo fmt --all - Linting:
cargo clippy --workspace -- -D warnings - Tests:
cargo test --workspace --exclude volley-python
All warnings are errors in CI (RUSTFLAGS: -Dwarnings).
Submitting Changes
- Fork the repository and create a feature branch
- Make your changes with clear, focused commits
- Ensure
cargo fmt,cargo clippy, andcargo testall pass - Open a pull request against
main
Project Structure
See Crate Map for a per-crate breakdown of the 17-crate workspace.
Crate Map
Volley is a 15-crate Cargo workspace. See the full Crate Map for detailed per-crate documentation.
Status: stable = production-ready | experimental = API may change | placeholder = stub/future work
Core
| Crate | Description | Key Types | Status |
|---|---|---|---|
volley-core | Stream processing engine with built-in observability and optional RocksDB state | StreamExecutionEnvironment, DataStream, KeyedStream, Operator, StreamRecord, ObservabilityConfig, RocksDbBackend | stable |
volley-derive | Procedural macros | impl_to_arrow! | stable |
Connectors
| Crate | Description | Key Types | Status |
|---|---|---|---|
volley-connectors | Umbrella crate (memory + feature-gated re-exports) | MemorySource, MemorySink | stable |
volley-connector-kafka | Kafka source + sink (exactly-once) | KafkaSource, KafkaSink, KafkaSourceConfig | stable |
volley-connector-blob-store | Cloud blob store abstraction + decoders | NotificationSource, BlobReader, BlobWriter | stable |
volley-connector-aws-s3 | AWS S3 + SQS source | S3Source, S3BlobReader, SqsNotificationSource | stable |
volley-connector-azure-blob | Azure Blob + Queue source | AzureBlobSource, AzureBlobReader | stable |
volley-connector-gcp-gcs | GCS + Pub/Sub source | GcsSource, GcsBlobReader, PubSubNotificationSource | stable |
volley-connector-delta | Delta Lake sink (exactly-once) | DeltaSink, DeltaSinkConfig | stable |
volley-connector-iceberg | Apache Iceberg sink (REST catalog) | IcebergSink, IcebergSinkConfig | stable |
Infrastructure
| Crate | Description | Status |
|---|---|---|
volley-k8s-operator | Kubernetes operator for VolleyApplication CRD | stable |
volley-cli | CLI tool (volley new, volley doctor) | stable |
volley-scheduler | Distributed execution (Ballista integration) | experimental |
volley-python | PyO3 Python bindings | placeholder |
Other
| Crate | Description | Status |
|---|---|---|
volley-examples | Runnable examples and benchmarks | stable |