Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Writing Operators

Operators are the steps in the middle of your pipeline — they filter, reshape, and aggregate records as they flow from source to sink. Most of what you need is already built in. This guide shows you the built-in operators to reach for first, and how to drop down to closures or a custom Operator trait when the built-ins can’t express what you need.

Expression operators use DataFusion expressions for vectorized, multi-row processing. These are the default choice — they operate on whole Arrow RecordBatches at once instead of one row at a time:

MethodDescription
.filter_expr(expr)Keep records matching an expression (e.g., col("price").gt(lit(100)))
.select_expr(exprs)Project/rename columns (e.g., vec![col("user_id"), col("total").alias("sum")])
.key_by(expr)Partition by a key column (e.g., col("user_id")) — transitions to KeyedStream
.window(window_type)Assign records to time windows (on KeyedStream) — transitions to WindowedKeyedStream
.aggregate_expr(aggs, state_backend)Windowed aggregation. First arg is a Vec<AggregateExprDef>; second is any BatchStateBackend (e.g., RocksDbBackend directly — no wrapper needed).

Example

use std::time::Duration;
use volley_core::prelude::*;

// source: impl Source, sink: impl DynSink (see the Quick Start for full examples)

let tmp = tempfile::tempdir().unwrap();

StreamExecutionEnvironment::new()
    .from_source(source)
    .filter_expr(col("price").gt(lit(100)))
    .select_expr(vec![col("user_id"), col("price"), col("quantity")])
    .key_by(col("user_id"))
    .window(TumblingWindows::of(Duration::from_secs(60)))
    .aggregate_expr(
        vec![sum(col("price"))],
        RocksDbBackend::open(tmp.path().join("state")).unwrap(),
    )
    .to_sink(sink)
    .execute("expr-pipeline")
    .await?;

Available Aggregation Functions

FunctionDescription
sum(expr)Sum of values
count(expr)Count of values
avg(expr)Average of values
min(expr)Minimum value
max(expr)Maximum value

All are imported via volley_core::prelude::*. Expression helpers col() and lit() are also in the prelude.

Closure-Based Operators

Use closure operators when you need access to StreamRecord metadata (like event_time_ms or the per-record trace context) or when the logic doesn’t fit a DataFusion expression. They receive one StreamRecord at a time:

MethodDescription
.filter(fn)Keep records matching a predicate
.map(fn)Transform each record
.flat_map(fn)Transform each record into zero or more records
.apply_operator(op)Apply a custom Operator implementation
#![allow(unused)]
fn main() {
// Drop records whose event time is older than a cutoff — a good fit for
// a closure because `event_time_ms` lives on StreamRecord, not the batch.
stream.filter(move |r| r.event_time_ms.map_or(false, |t| t > cutoff))
}

Note: Closure operators process one StreamRecord at a time. For bulk column operations on the record payload, expression operators (filter_expr, select_expr) are more efficient because they use DataFusion’s vectorized kernels on the full RecordBatch.

Custom Operators

Implement the Operator trait from volley-core:

#![allow(unused)]
fn main() {
use volley_core::prelude::*;

struct MyOperator;

impl Operator for MyOperator {
    fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
        // Transform the record
        // Return zero or more output records
        vec![record]
    }

    fn on_checkpoint(&mut self, epoch: u64) {
        // Optional: flush any buffered state before checkpoint
    }
}
}

Apply it to a stream:

#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
    .from_source(source)
    .apply_operator(MyOperator)
    .to_sink(sink)
    .execute("my-job")
    .await?;
}

Multi-Row RecordBatch Processing

Operators receive StreamRecord which wraps an Arrow RecordBatch. A batch may contain one or more rows. Your operator should handle multi-row batches correctly:

#![allow(unused)]
fn main() {
fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
    let batch = &record.batch;
    let num_rows = batch.num_rows();
    // Process all rows, not just row 0
    // ...
}
}

Stateful Operators

For operators that maintain state across records, use the on_checkpoint() hook to flush state before a checkpoint snapshot:

#![allow(unused)]
fn main() {
struct CountingOperator {
    counts: HashMap<String, u64>,
}

impl Operator for CountingOperator {
    fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
        // Extract key from the batch (assumes a "key" column exists)
        let key = record.batch
            .column_by_name("key")
            .and_then(|col| col.as_any().downcast_ref::<StringArray>())
            .map(|arr| arr.value(0).to_string())
            .unwrap_or_default();
        *self.counts.entry(key).or_default() += 1;
        vec![record]
    }

    fn on_checkpoint(&mut self, epoch: u64) {
        // Flush state to durable storage here
        // The state backend will snapshot after this returns
    }
}
}

See State Management for details on the write-behind cache and RocksDB backend.

Type-State Safety

The Pipeline Builder enforces valid operator chains at compile time. You can only call .aggregate_expr() on a WindowedKeyedStream, not on a plain DataStream. You can only call .execute() on a DataStream<HasSink>.