Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Writing Operators

Operators transform data as it flows through the pipeline. Volley provides built-in operators and lets you write custom ones.

Built-In Operators

MethodDescription
.filter(fn)Keep records matching a predicate
.map(fn)Transform each record
.flat_map(fn)Transform each record into zero or more records
.apply_operator(op)Apply a custom Operator implementation
.key_by(field)Partition by a key field (transitions to KeyedStream)
.window(window_type)Assign records to time windows (on KeyedStream)
.aggregate(agg, field)Compute aggregation per window per key

Custom Operators

Implement the Operator trait from volley-core:

#![allow(unused)]
fn main() {
use volley_core::prelude::*;

struct MyOperator;

impl Operator for MyOperator {
    fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
        // Transform the record
        // Return zero or more output records
        vec![record]
    }

    fn on_checkpoint(&mut self, epoch: u64) {
        // Optional: flush any buffered state before checkpoint
    }
}
}

Apply it to a stream:

#![allow(unused)]
fn main() {
env.from_source(source)
    .apply_operator(MyOperator)
    .to_sink(sink)
    .execute("my-job")
    .await?;
}

Multi-Row RecordBatch Processing

Operators receive StreamRecord which wraps an Arrow RecordBatch. A batch may contain one or more rows. Your operator should handle multi-row batches correctly:

#![allow(unused)]
fn main() {
fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
    let batch = &record.data;
    let num_rows = batch.num_rows();
    // Process all rows, not just row 0
    // ...
}
}

Stateful Operators

For operators that maintain state across records, use the on_checkpoint() hook to flush state before a checkpoint snapshot:

#![allow(unused)]
fn main() {
struct CountingOperator {
    counts: HashMap<String, u64>,
}

impl Operator for CountingOperator {
    fn process(&mut self, record: StreamRecord) -> Vec<StreamRecord> {
        // Update in-memory state
        let key = record.key.clone().unwrap_or_default();
        *self.counts.entry(key).or_default() += 1;
        vec![record]
    }

    fn on_checkpoint(&mut self, epoch: u64) {
        // Flush state to durable storage here
        // The state backend will snapshot after this returns
    }
}
}

See State Management for details on the write-behind cache and RocksDB backend.

Type-State Safety

The Pipeline Builder enforces valid operator chains at compile time. You can only call .aggregate() on a WindowedKeyedStream, not on a plain DataStream.