Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Your first pipeline

This is the shortest useful Volley pipeline: read records from memory, drop the cheap ones, and collect the survivors. It mirrors the in_memory_pipeline example in the repo, so you can run it end-to-end without setting up Kafka or cloud storage.

What it does

  1. Builds five sample trades as Arrow RecordBatches.
  2. Filters to price > 200 using a vectorised DataFusion expression.
  3. Collects the output via a MemorySink.

Key concepts

Create the environment

let env = StreamExecutionEnvironment::new();

All pipeline construction starts from an environment. The same environment can own multiple branches and sinks — see the fanout_multisink example for multi-sink DAGs.

Attach a source

env.from_source(MemorySource::new(records))

from_source registers the source node and returns a DataStream<NoSink>. For production, use from_kafka, from_kafka_exactly_once, or one of the blob-store source builders.

Apply operators

Expression operators run through DataFusion’s vectorised kernels and are the default choice:

.filter_expr(col("price").gt(lit(200.0)))
.select_expr(vec![col("symbol"), col("price")])

Closure operators (filter, map, flat_map) remain available for logic that can’t be expressed with columnar expressions — for example, inspecting StreamRecord metadata like event_time_ms or attaching custom per-record state. Prefer the expression form when you can.

Key, window, aggregate

.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(300)))
.aggregate_expr(
    vec![sum(col("amount"))],
    RocksDbBackend::open(tmp.path().join("state")).unwrap(),
)

key_byKeyedStream (hash-partitioned by key). windowWindowedKeyedStream. aggregate_expr consumes the windowed stream and emits one record per window per key. The second argument is a BatchStateBackendRocksDbBackend implements it directly, so you don’t need the old KeyedStateBackend wrapper.

Attach a sink, execute, read the output

let sink = MemorySink::new();
let output = sink.handle();               // shared handle survives the move

let report = stream.to_sink(sink).execute("my-job").await?;

println!("committed epochs: {}", report.epochs_committed);
for r in output.lock().unwrap().iter() { /* ... */ }

execute returns ExecutionReport { records_written, epochs_committed }, not the output records. The sink’s handle() returns an Arc<Mutex<Vec<StreamRecord>>> that points at the sink’s internal buffer — lock it after execution to inspect the rows.

Run it

cargo run --example in_memory_pipeline -p volley-examples

Expected output (abridged):

=== Volley in-memory DAG pipeline ===

Input: 5 trades
Execution report: 0 records written, 1 epochs committed

Filtered output (3 records):
  1. GOOG @ $280 x 50
  2. MSFT @ $310 x 75
  3. GOOG @ $290 x 150

Next Steps