Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Pipeline Builder

Volley uses Rust’s type system to enforce valid pipeline construction at compile time. Invalid pipelines (like calling .aggregate() on an unkeyed stream) are caught by the compiler, not at runtime.

Type Progression

StreamExecutionEnvironment  -->  DataStream  -->  KeyedStream  -->  WindowedKeyedStream
        |                           |                |                      |
    from_source()              filter()          key_by()             window()
    from_iter()                map()                                  aggregate_expr()
    from_kafka()               flat_map()
                               filter_expr()
                               select_expr()
                               apply_operator()

Each builder method returns a different type:

TypeMeaningAvailable Operations
DataStreamUnkeyed streamfilter, filter_expr, map, flat_map, select_expr, apply_operator, key_by, to_sink, collect, with_tracing, with_observability
KeyedStreamPartitioned by keywindow, aggregate_expr, to_sink, collect, with_tracing, with_observability
WindowedKeyedStreamKeyed + windowedaggregate_expr

You can call .aggregate_expr() on both KeyedStream (global aggregation) and WindowedKeyedStream (windowed aggregation). Closure-based operators (filter, map, flat_map) remain alongside expression-based operators.

Example

#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
    // Returns DataStream
    .from_iter(events)
    // Still DataStream — vectorized filter
    .filter_expr(col("amount").gt(lit(100)))
    // Returns KeyedStream
    .key_by(col("user_id"))
    // Returns WindowedKeyedStream
    .window(TumblingWindows::of(Duration::from_secs(60)))
    // Consumes WindowedKeyedStream, returns DataStream
    .aggregate_expr(sum(col("amount")), state_backend)
    // Attach sink
    .collect()
    // Execute the pipeline
    .execute("my-job")
    .await?;
}

Parallelism

Set the parallelism level on the environment. After .key_by(), records are hash-partitioned across N parallel operator instances:

#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
    .set_parallelism(4)
    .from_source(...)
    // ...
}

Each parallel instance has its own state namespace. Checkpoint barriers are aligned across all partitions.