Your first pipeline
This is the shortest useful Volley pipeline: read records from memory, drop the cheap ones, and collect the survivors. It mirrors the in_memory_pipeline example in the repo, so you can run it end-to-end without setting up Kafka or cloud storage.
What it does
- Builds five sample trades as Arrow
RecordBatches. - Filters to
price > 200using a vectorised DataFusion expression. - Collects the output via a
MemorySink.
Key concepts
Create the environment
let env = StreamExecutionEnvironment::new();
All pipeline construction starts from an environment. The same
environment can own multiple branches and sinks — see the
fanout_multisink example for multi-sink DAGs.
Attach a source
env.from_source(MemorySource::new(records))
from_source registers the source node and returns a DataStream<NoSink>.
For production, use from_kafka, from_kafka_exactly_once, or one of
the blob-store source builders.
Apply operators
Expression operators run through DataFusion’s vectorised kernels and are the default choice:
.filter_expr(col("price").gt(lit(200.0)))
.select_expr(vec![col("symbol"), col("price")])
Closure operators (filter, map, flat_map) remain available for
logic that can’t be expressed with columnar expressions — for example,
inspecting StreamRecord metadata like event_time_ms or attaching
custom per-record state. Prefer the expression form when you can.
Key, window, aggregate
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(300)))
.aggregate_expr(
vec![sum(col("amount"))],
RocksDbBackend::open(tmp.path().join("state")).unwrap(),
)
key_by → KeyedStream (hash-partitioned by key). window →
WindowedKeyedStream. aggregate_expr consumes the windowed stream
and emits one record per window per key. The second argument is a
BatchStateBackend — RocksDbBackend implements it directly, so you
don’t need the old KeyedStateBackend wrapper.
Attach a sink, execute, read the output
let sink = MemorySink::new();
let output = sink.handle(); // shared handle survives the move
let report = stream.to_sink(sink).execute("my-job").await?;
println!("committed epochs: {}", report.epochs_committed);
for r in output.lock().unwrap().iter() { /* ... */ }
execute returns ExecutionReport { records_written, epochs_committed },
not the output records. The sink’s handle() returns an
Arc<Mutex<Vec<StreamRecord>>> that points at the sink’s internal
buffer — lock it after execution to inspect the rows.
Run it
cargo run --example in_memory_pipeline -p volley-examples
Expected output (abridged):
=== Volley in-memory DAG pipeline ===
Input: 5 trades
Execution report: 0 records written, 1 epochs committed
Filtered output (3 records):
1. GOOG @ $280 x 50
2. MSFT @ $310 x 75
3. GOOG @ $290 x 150
Next Steps
- Kafka Pipeline — connect to real data
- Operators Guide — write custom operators
- Windowing — deep dive into window types
- All Examples — browse the full examples list