Quick Start
Build your first Volley pipeline in 5 minutes.
Basic In-Memory Pipeline
This pipeline reads events from an iterator, filters them, groups by key, windows into 5-minute tumbling windows, and computes a sum aggregation:
use volley_core::prelude::*;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
let results = StreamExecutionEnvironment::new()
.from_iter(events)
.filter_expr(col("amount").gt(lit(100)))
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(300)))
.aggregate(AggregationType::Sum, "amount")
.collect()
.execute("aggregation-job")
.await?;
Ok(())
}
Run it:
cargo run --example in_memory_pipeline -p volley-examples
Expected output (records grouped by key and windowed):
Pipeline 'aggregation-job' started
Source: MemorySource (4 records)
Operators: filter_expr → key_by → window → aggregate
Sink: MemorySink
Processing complete: 4 records in, 2 aggregated results out
Kafka Source to Kafka Sink
A real-world pipeline reading from Kafka, aggregating, and writing back to Kafka with exactly-once semantics:
use volley_core::prelude::*;
use volley_connector_kafka::{KafkaEnvExt, KafkaStreamExt, KafkaSourceConfig, KafkaSinkConfig};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
let source_config = KafkaSourceConfig::new("localhost:9092", "events", "my-group");
let sink_config = KafkaSinkConfig::new("localhost:9092", "output");
StreamExecutionEnvironment::new()
.from_kafka(source_config).await?
.filter_expr(col("amount").gt(lit(0)))
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(60)))
.aggregate(AggregationType::Sum, "amount")
.to_kafka(sink_config).await?
.execute("kafka-aggregation-job")
.await?;
Ok(())
}
Note: Kafka connectors require
cmakeinstalled on your system. See Installation for details.
What’s Happening
StreamExecutionEnvironment::new()— Creates the pipeline builder.from_kafka()/.from_iter()— Attaches a source, producing aDataStream.filter_expr()/.map()— Applies operators to the stream.key_by()— Partitions the stream by key, producing aKeyedStream.window()— Assigns records to time windows, producing aWindowedKeyedStream.aggregate()— Computes an aggregation per window per key.to_kafka()/.collect()— Attaches a sink.execute()— Starts the pipeline as concurrent Tokio tasks
The type system enforces valid construction at compile time. You can only call .aggregate() on a WindowedKeyedStream, not on a raw DataStream.
Next Steps
- Architecture Overview — understand how Volley works under the hood
- Examples — browse all runnable examples
- Operators Guide — write custom operators
- Connectors Guide — write custom connectors