Writing Operators
Operators are the steps in the middle of your pipeline — they filter, reshape, and aggregate records as they flow from source to sink. Most of what you need is already built in. This guide shows you the built-in operators to reach for first, and how to drop down to closures or a custom Operator trait when the built-ins can’t express what you need.
Expression-Based Operators (Recommended)
Expression operators use DataFusion expressions for vectorized, multi-row processing. These are the default choice — they operate on whole Arrow RecordBatches at once instead of one row at a time:
| Method | Description |
|---|---|
.filter_expr(expr) | Keep records matching an expression (e.g., col("price").gt(lit(100))) |
.select_expr(exprs) | Project/rename columns (e.g., vec![col("user_id"), col("total").alias("sum")]) |
.key_by(expr) | Partition by a key column (e.g., col("user_id")) — transitions to KeyedStream |
.window(window_type) | Assign records to time windows (on KeyedStream) — transitions to WindowedKeyedStream |
.aggregate_expr(aggs, state_backend) | Windowed aggregation. First arg is a Vec<AggregateExprDef>; second is any BatchStateBackend (e.g., RocksDbBackend directly — no wrapper needed). |
Example
use std::time::Duration;
use volley_core::prelude::*;
// source: impl Source, sink: impl DynSink (see the Quick Start for full examples)
let tmp = tempfile::tempdir().unwrap();
StreamExecutionEnvironment::new()
.from_source(source)
.filter_expr(col("price").gt(lit(100)))
.select_expr(vec![col("user_id"), col("price"), col("quantity")])
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(60)))
.aggregate_expr(
vec![sum(col("price"))],
RocksDbBackend::open(tmp.path().join("state")).unwrap(),
)
.to_sink(sink)
.execute("expr-pipeline")
.await?;
Available Aggregation Functions
| Function | Description |
|---|---|
sum(expr) | Sum of values |
count(expr) | Count of values |
avg(expr) | Average of values |
min(expr) | Minimum value |
max(expr) | Maximum value |
All are imported via volley_core::prelude::*. Expression helpers col() and lit() are also in the prelude.
Closure-Based Operators
Use closure operators when you need access to StreamRecord metadata (like event_time_ms or the per-record trace context) or when the logic doesn’t fit a DataFusion expression. They receive one StreamRecord at a time:
| Method | Description |
|---|---|
.filter(fn) | Keep records matching a predicate |
.map(fn) | Transform each record |
.flat_map(fn) | Transform each record into zero or more records |
.apply_operator(op) | Apply a custom Operator implementation |
#![allow(unused)]
fn main() {
// Drop records whose event time is older than a cutoff — a good fit for
// a closure because `event_time_ms` lives on StreamRecord, not the batch.
stream.filter(move |r| r.event_time_ms.map_or(false, |t| t > cutoff))
}
Note: Closure operators process one
StreamRecordat a time. For bulk column operations on the record payload, expression operators (filter_expr,select_expr) are more efficient because they use DataFusion’s vectorized kernels on the fullRecordBatch.
Custom Operators
Implement the Operator trait from volley-core:
#![allow(unused)]
fn main() {
use volley_core::prelude::*;
struct MyOperator;
impl Operator for MyOperator {
fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
// Transform the record
// Return zero or more output records
vec![record]
}
fn on_checkpoint(&mut self, epoch: u64) {
// Optional: flush any buffered state before checkpoint
}
}
}
Apply it to a stream:
#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
.from_source(source)
.apply_operator(MyOperator)
.to_sink(sink)
.execute("my-job")
.await?;
}
Multi-Row RecordBatch Processing
Operators receive StreamRecord which wraps an Arrow RecordBatch. A batch may contain one or more rows. Your operator should handle multi-row batches correctly:
#![allow(unused)]
fn main() {
fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
let batch = &record.batch;
let num_rows = batch.num_rows();
// Process all rows, not just row 0
// ...
}
}
Stateful Operators
For operators that maintain state across records, use the on_checkpoint() hook to flush state before a checkpoint snapshot:
#![allow(unused)]
fn main() {
struct CountingOperator {
counts: HashMap<String, u64>,
}
impl Operator for CountingOperator {
fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
// Extract key from the batch (assumes a "key" column exists)
let key = record.batch
.column_by_name("key")
.and_then(|col| col.as_any().downcast_ref::<StringArray>())
.map(|arr| arr.value(0).to_string())
.unwrap_or_default();
*self.counts.entry(key).or_default() += 1;
vec![record]
}
fn on_checkpoint(&mut self, epoch: u64) {
// Flush state to durable storage here
// The state backend will snapshot after this returns
}
}
}
See State Management for details on the write-behind cache and RocksDB backend.
Type-State Safety
The Pipeline Builder enforces valid operator chains at compile time. You can only call .aggregate_expr() on a WindowedKeyedStream, not on a plain DataStream. You can only call .execute() on a DataStream<HasSink>.