Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Pipeline Builder

You build a Volley pipeline with a fluent chain — source first, then operators, then a sink. The builder catches mistakes while you type: partition a stream before windowing, attach a sink before calling .execute(). If you try to window an unkeyed stream or execute a pipeline that still has no sink, the code simply won’t compile.

This compile-time safety is built on Rust’s type system (a pattern called “typestate” — see the details below if you’re curious). Most of the time you don’t need to think about it; you just follow the chain.

The chain

StreamExecutionEnvironment  -->  DataStream<NoSink>  -->  KeyedStream  -->  WindowedKeyedStream
        |                           |                        |                      |
    from_source()              filter_expr()            key_by()             window()
    from_kafka()               select_expr()                                aggregate_expr()
                               filter()
                               map()
                               flat_map()
                               apply_operator()

DataStream<NoSink>  -->  DataStream<HasSink>  -->  execute()
        |                       |
    to_sink()               execute()
    to_kafka()

Each step returns a different type, and each type exposes only the operations that make sense at that point:

TypeWhat you haveWhat you can do next
DataStream<NoSink>An unkeyed streamfilter_expr, select_expr, filter, map, flat_map, apply_operator, key_by, to_sink, to_kafka, with_tracing, with_observability
KeyedStreamA stream partitioned by a keywindow, aggregate_expr, to_sink, to_kafka, with_tracing, with_observability
WindowedKeyedStreamKeyed and windowedaggregate_expr
DataStream<HasSink>A sink is attached — ready to runexecute

aggregate_expr() works on both KeyedStream (global aggregation across all records) and WindowedKeyedStream (per-window aggregation). The closure-based operators (filter, map, flat_map) still live on DataStream<NoSink> for cases where you need per-record logic that doesn’t fit a DataFusion expression.

Example

StreamExecutionEnvironment::new()
    // Returns DataStream<NoSink>
    .from_source(source)
    // Still DataStream<NoSink> — vectorized filter
    .filter_expr(col("amount").gt(lit(100)))
    // Returns KeyedStream
    .key_by(col("user_id"))
    // Returns WindowedKeyedStream
    .window(TumblingWindows::of(Duration::from_secs(60)))
    // `aggregate_expr` takes a Vec of aggregates + a `BatchStateBackend`
    // (`RocksDbBackend` implements the trait directly — no wrapper needed).
    // Consumes WindowedKeyedStream, returns DataStream<NoSink>.
    .aggregate_expr(vec![sum(col("amount"))], state_backend)
    // Returns DataStream<HasSink>
    .to_sink(sink)
    // `execute` returns `ExecutionReport { records_written, epochs_committed }`.
    .execute("my-job")
    .await?;

Parallelism

Set the parallelism level on the environment. After .key_by(), records are hash-partitioned across N parallel operator instances:

#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
    .set_parallelism(4)
    .from_source(...)
    // ...
}

Each parallel instance has its own state namespace. Checkpoint barriers are aligned across all partitions.

Typestate mechanics

The type progression above is an application of the “typestate” pattern: each builder method consumes self and returns a new type that only exposes the next set of valid operations. DataStream<NoSink> and DataStream<HasSink> share the same struct with a marker type parameter; .to_sink() flips the marker from NoSink to HasSink, which is the only variant that has an .execute() method. You can read the compile errors as a guide — if the compiler says a method doesn’t exist on your stream, you probably need to add (or remove) a step earlier in the chain.