Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Quick Start

Build your first Volley pipeline in 5 minutes.

Basic In-Memory Pipeline

This pipeline reads events from an iterator, filters them, groups by key, windows into 5-minute tumbling windows, and computes a sum aggregation:

use volley_core::prelude::*;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    let results = StreamExecutionEnvironment::new()
        .from_iter(events)
        .filter_expr(col("amount").gt(lit(100)))
        .key_by(col("user_id"))
        .window(TumblingWindows::of(Duration::from_secs(300)))
        .aggregate(AggregationType::Sum, "amount")
        .collect()
        .execute("aggregation-job")
        .await?;

    Ok(())
}

Run it:

cargo run --example in_memory_pipeline -p volley-examples

Expected output (records grouped by key and windowed):

Pipeline 'aggregation-job' started
  Source: MemorySource (4 records)
  Operators: filter_expr → key_by → window → aggregate
  Sink: MemorySink
Processing complete: 4 records in, 2 aggregated results out

Kafka Source to Kafka Sink

A real-world pipeline reading from Kafka, aggregating, and writing back to Kafka with exactly-once semantics:

use volley_core::prelude::*;
use volley_connector_kafka::{KafkaEnvExt, KafkaStreamExt, KafkaSourceConfig, KafkaSinkConfig};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    let source_config = KafkaSourceConfig::new("localhost:9092", "events", "my-group");
    let sink_config = KafkaSinkConfig::new("localhost:9092", "output");

    StreamExecutionEnvironment::new()
        .from_kafka(source_config).await?
        .filter_expr(col("amount").gt(lit(0)))
        .key_by(col("user_id"))
        .window(TumblingWindows::of(Duration::from_secs(60)))
        .aggregate(AggregationType::Sum, "amount")
        .to_kafka(sink_config).await?
        .execute("kafka-aggregation-job")
        .await?;

    Ok(())
}

Note: Kafka connectors require cmake installed on your system. See Installation for details.

What’s Happening

  1. StreamExecutionEnvironment::new() — Creates the pipeline builder
  2. .from_kafka() / .from_iter() — Attaches a source, producing a DataStream
  3. .filter_expr() / .map() — Applies operators to the stream
  4. .key_by() — Partitions the stream by key, producing a KeyedStream
  5. .window() — Assigns records to time windows, producing a WindowedKeyedStream
  6. .aggregate() — Computes an aggregation per window per key
  7. .to_kafka() / .collect() — Attaches a sink
  8. .execute() — Starts the pipeline as concurrent Tokio tasks

The type system enforces valid construction at compile time. You can only call .aggregate() on a WindowedKeyedStream, not on a raw DataStream.

Next Steps