Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Your First Pipeline

This walkthrough covers the in_memory_pipeline example, which demonstrates the core Volley API.

What It Does

The pipeline:

  1. Reads events from an in-memory iterator
  2. Filters records
  3. Partitions by a key field
  4. Groups into 5-minute tumbling windows
  5. Computes a sum aggregation
  6. Collects results in memory

Key Concepts

Creating the Environment

#![allow(unused)]
fn main() {
let env = StreamExecutionEnvironment::new();
}

This creates the pipeline builder. All pipeline construction starts here.

Adding a Source

#![allow(unused)]
fn main() {
env.from_iter(events)
}

from_iter() creates a DataStream from a Rust iterator of StreamRecord values. For production use, you’d use from_kafka() or a blob store source.

Applying Operators

#![allow(unused)]
fn main() {
.filter_expr(col("amount").gt(lit(100)))
}

Operators on a DataStream: filter_expr, map, flat_map, apply_operator.

Keying and Windowing

#![allow(unused)]
fn main() {
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(300)))
.aggregate(AggregationType::Sum, "amount")
}

.key_by() transitions to a KeyedStream (hash-partitioned by key). .window() transitions to WindowedKeyedStream. .aggregate() computes the result per window per key.

Executing

#![allow(unused)]
fn main() {
.collect()
.execute("aggregation-job")
.await?;
}

.collect() attaches an in-memory sink. .execute() starts the pipeline as concurrent Tokio tasks and returns when all data is processed.

Run It

cargo run --example in_memory_pipeline -p volley-examples

Expected output:

Pipeline 'aggregation-job' started
  Source: MemorySource (4 records)
  Operators: filter_expr → key_by → window → aggregate
  Sink: MemorySink
Processing complete: 4 records in, 2 aggregated results out

Next Steps