Pipeline Builder
Volley uses Rust’s type system to enforce valid pipeline construction at compile time. Invalid pipelines (like calling .aggregate() on an unkeyed stream) are caught by the compiler, not at runtime.
Type Progression
StreamExecutionEnvironment --> DataStream --> KeyedStream --> WindowedKeyedStream
| | | |
from_source() filter() key_by() window()
from_iter() map() aggregate_expr()
from_kafka() flat_map()
filter_expr()
select_expr()
apply_operator()
Each builder method returns a different type:
| Type | Meaning | Available Operations |
|---|---|---|
DataStream | Unkeyed stream | filter, filter_expr, map, flat_map, select_expr, apply_operator, key_by, to_sink, collect, with_tracing, with_observability |
KeyedStream | Partitioned by key | window, aggregate_expr, to_sink, collect, with_tracing, with_observability |
WindowedKeyedStream | Keyed + windowed | aggregate_expr |
You can call .aggregate_expr() on both KeyedStream (global aggregation) and WindowedKeyedStream (windowed aggregation). Closure-based operators (filter, map, flat_map) remain alongside expression-based operators.
Example
#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
// Returns DataStream
.from_iter(events)
// Still DataStream — vectorized filter
.filter_expr(col("amount").gt(lit(100)))
// Returns KeyedStream
.key_by(col("user_id"))
// Returns WindowedKeyedStream
.window(TumblingWindows::of(Duration::from_secs(60)))
// Consumes WindowedKeyedStream, returns DataStream
.aggregate_expr(sum(col("amount")), state_backend)
// Attach sink
.collect()
// Execute the pipeline
.execute("my-job")
.await?;
}
Parallelism
Set the parallelism level on the environment. After .key_by(), records are hash-partitioned across N parallel operator instances:
#![allow(unused)]
fn main() {
StreamExecutionEnvironment::new()
.set_parallelism(4)
.from_source(...)
// ...
}
Each parallel instance has its own state namespace. Checkpoint barriers are aligned across all partitions.