Pipeline Builder
You build a Volley pipeline with a fluent chain — source first, then operators, then a sink. The builder catches mistakes while you type: partition a stream before windowing, attach a sink before calling .execute(). If you try to window an unkeyed stream or execute a pipeline that still has no sink, the code simply won’t compile.
This compile-time safety is built on Rust’s type system (a pattern called “typestate” — see the details below if you’re curious). Most of the time you don’t need to think about it; you just follow the chain.
The chain
StreamExecutionEnvironment --> DataStream<NoSink> --> KeyedStream --> WindowedKeyedStream
| | | |
from_source() filter_expr() key_by() window()
from_kafka() select_expr() aggregate_expr()
filter()
map()
flat_map()
apply_operator()
DataStream<NoSink> --> DataStream<HasSink> --> execute()
| |
to_sink() execute()
to_kafka()
Each step returns a different type, and each type exposes only the operations that make sense at that point:
| Type | What you have | What you can do next |
|---|---|---|
DataStream<NoSink> | An unkeyed stream | filter_expr, select_expr, filter, map, flat_map, apply_operator, key_by, to_sink, to_kafka, with_tracing, with_observability |
KeyedStream | A stream partitioned by a key | window, aggregate_expr, to_sink, to_kafka, with_tracing, with_observability |
WindowedKeyedStream | Keyed and windowed | aggregate_expr |
DataStream<HasSink> | A sink is attached — ready to run | execute |
aggregate_expr() works on both KeyedStream (global aggregation across all records) and WindowedKeyedStream (per-window aggregation). The closure-based operators (filter, map, flat_map) still live on DataStream<NoSink> for cases where you need per-record logic that doesn’t fit a DataFusion expression.
Example
StreamExecutionEnvironment::new()
// Returns DataStream<NoSink>
.from_source(source)
// Still DataStream<NoSink> — vectorized filter
.filter_expr(col("amount").gt(lit(100)))
// Returns KeyedStream
.key_by(col("user_id"))
// Returns WindowedKeyedStream
.window(TumblingWindows::of(Duration::from_secs(60)))
// `aggregate_expr` takes a Vec of aggregates + a `BatchStateBackend`
// (`RocksDbBackend` implements the trait directly — no wrapper needed).
// Consumes WindowedKeyedStream, returns DataStream<NoSink>.
.aggregate_expr(vec![sum(col("amount"))], state_backend)
// Returns DataStream<HasSink>
.to_sink(sink)
// `execute` returns `ExecutionReport { records_written, epochs_committed }`.
.execute("my-job")
.await?;
Parallelism
Set the parallelism level on the environment. After .key_by(), records are hash-partitioned across N parallel operator instances:
#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
.set_parallelism(4)
.from_source(...)
// ...
}
Each parallel instance has its own state namespace. Checkpoint barriers are aligned across all partitions.
Typestate mechanics
The type progression above is an application of the “typestate” pattern: each builder method consumes self and returns a new type that only exposes the next set of valid operations. DataStream<NoSink> and DataStream<HasSink> share the same struct with a marker type parameter; .to_sink() flips the marker from NoSink to HasSink, which is the only variant that has an .execute() method. You can read the compile errors as a guide — if the compiler says a method doesn’t exist on your stream, you probably need to add (or remove) a step earlier in the chain.