Writing Operators
Operators transform data as it flows through the pipeline. Volley provides built-in operators and lets you write custom ones.
Built-In Operators
| Method | Description |
|---|---|
.filter(fn) | Keep records matching a predicate |
.map(fn) | Transform each record |
.flat_map(fn) | Transform each record into zero or more records |
.apply_operator(op) | Apply a custom Operator implementation |
.key_by(field) | Partition by a key field (transitions to KeyedStream) |
.window(window_type) | Assign records to time windows (on KeyedStream) |
.aggregate(agg, field) | Compute aggregation per window per key |
Custom Operators
Implement the Operator trait from volley-core:
#![allow(unused)]
fn main() {
use volley_core::prelude::*;
struct MyOperator;
impl Operator for MyOperator {
fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
// Transform the record
// Return zero or more output records
vec![record]
}
fn on_checkpoint(&mut self, epoch: u64) {
// Optional: flush any buffered state before checkpoint
}
}
}
Apply it to a stream:
#![allow(unused)]
fn main() {
env.from_source(source)
.apply_operator(MyOperator)
.to_sink(sink)
.execute("my-job")
.await?;
}
Multi-Row RecordBatch Processing
Operators receive StreamRecord which wraps an Arrow RecordBatch. A batch may contain one or more rows. Your operator should handle multi-row batches correctly:
#![allow(unused)]
fn main() {
fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
let batch = &record.data;
let num_rows = batch.num_rows();
// Process all rows, not just row 0
// ...
}
}
Stateful Operators
For operators that maintain state across records, use the on_checkpoint() hook to flush state before a checkpoint snapshot:
#![allow(unused)]
fn main() {
struct CountingOperator {
counts: HashMap<String, u64>,
}
impl Operator for CountingOperator {
fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
// Update in-memory state
let key = record.key.clone().unwrap_or_default();
*self.counts.entry(key).or_default() += 1;
vec![record]
}
fn on_checkpoint(&mut self, epoch: u64) {
// Flush state to durable storage here
// The state backend will snapshot after this returns
}
}
}
See State Management for details on the write-behind cache and RocksDB backend.
Type-State Safety
The Pipeline Builder enforces valid operator chains at compile time. You can only call .aggregate() on a WindowedKeyedStream, not on a plain DataStream.