Quick Start
Build your first Volley pipeline in 5 minutes.
Basic In-Memory Pipeline
This pipeline reads sample trade data from memory, filters high-value trades, and collects results:
use std::sync::Arc;
use arrow::array::{Float64Array, Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use volley_connectors::memory::{MemorySink, MemorySource};
use volley_core::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("symbol", DataType::Utf8, false),
Field::new("price", DataType::Float64, false),
Field::new("quantity", DataType::Int64, false),
]));
let records = vec![
StreamRecord::new(RecordBatch::try_new(schema.clone(), vec![
Arc::new(StringArray::from(vec!["AAPL"])),
Arc::new(Float64Array::from(vec![150.0])),
Arc::new(Int64Array::from(vec![100])),
]).unwrap()),
StreamRecord::new(RecordBatch::try_new(schema.clone(), vec![
Arc::new(StringArray::from(vec!["GOOG"])),
Arc::new(Float64Array::from(vec![280.0])),
Arc::new(Int64Array::from(vec![50])),
]).unwrap()),
];
let sink = MemorySink::new();
// `handle()` returns a shared reference to the sink's buffer that
// stays live after `to_sink()` consumes the sink itself.
let output = sink.handle();
let report = StreamExecutionEnvironment::new()
.from_source(MemorySource::new(records))
.filter_expr(col("price").gt(lit(200.0)))
.to_sink(sink)
.execute("filter-example")
.await?;
// `execute` returns an `ExecutionReport { records_written, epochs_committed }`.
// The filtered rows come back via the sink handle.
println!(
"{} rows, {} epochs",
report.records_written, report.epochs_committed
);
println!("kept {} records", output.lock().unwrap().len());
Ok(())
}
Run the full in-memory example from the repo:
cargo run --example in_memory_pipeline -p volley-examples
Expected output (abridged):
=== Volley in-memory DAG pipeline ===
Input: 5 trades
Execution report: 0 records written, 1 epochs committed
Filtered output (3 records):
1. GOOG @ $280 x 50
2. MSFT @ $310 x 75
3. GOOG @ $290 x 150
Windowed Aggregation
Add keyed windowing and expression-based aggregation (requires the state-rocksdb feature):
use std::sync::Arc;
use std::time::Duration;
use volley_core::prelude::*;
use volley_connectors::memory::{MemorySink, MemorySource};
#[tokio::main]
async fn main() -> Result<()> {
// `RocksDbBackend` implements `BatchStateBackend` directly — no
// `KeyedStateBackend` wrapper needed at the builder API.
let tmp = tempfile::tempdir().unwrap();
StreamExecutionEnvironment::new()
.from_source(MemorySource::new(records))
.filter_expr(col("amount").gt(lit(100)))
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(300)))
.aggregate_expr(
vec![sum(col("amount"))],
RocksDbBackend::open(tmp.path().join("state")).unwrap(),
)
.to_sink(MemorySink::new())
.execute("aggregation-job")
.await?;
Ok(())
}
Kafka Source to Kafka Sink
A real-world pipeline reading from Kafka, aggregating, and writing back to Kafka:
use std::sync::Arc;
use std::time::Duration;
use arrow::datatypes::{DataType, Field, Schema};
use volley_core::prelude::*;
use volley_connector_kafka::{KafkaEnvExt, KafkaStreamExt, KafkaSourceConfig, KafkaSinkConfig};
#[tokio::main]
async fn main() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("user_id", DataType::Utf8, false),
Field::new("amount", DataType::Float64, false),
]));
let source_config = KafkaSourceConfig::new("localhost:9092", "events", "my-group", schema);
let sink_config = KafkaSinkConfig::new("localhost:9092", "output");
let tmp = tempfile::tempdir().unwrap();
StreamExecutionEnvironment::new()
.from_kafka(source_config).await?
.filter_expr(col("amount").gt(lit(0.0)))
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(60)))
.aggregate_expr(
vec![sum(col("amount"))],
RocksDbBackend::open(tmp.path().join("state")).unwrap(),
)
.to_kafka(sink_config).await?
.execute("kafka-aggregation-job")
.await?;
Ok(())
}
For exactly-once Kafka EOS (source offsets committed inside the sink’s Kafka transaction), swap the source/sink pair:
let (stream, cgm) = env.from_kafka_exactly_once(source_config).await?;
stream
// ... operators ...
.to_kafka_exactly_once(KafkaSinkConfig::new_transactional(
"localhost:9092", "output", "my-pipeline-txn",
), cgm)
.await?
.execute("eos-job")
.await?;
Note: Kafka connectors require
cmakeinstalled on your system. See Installation for details.
What’s Happening
StreamExecutionEnvironment::new()— Creates the pipeline builder.from_source()/.from_kafka()— Attaches a source, producing aDataStream.filter_expr()/.select_expr()— Applies vectorised operators to the stream.key_by()— Partitions the stream by key, producing aKeyedStream.window()— Assigns records to time windows, producing aWindowedKeyedStream.aggregate_expr()— Computes an aggregation per window per key.to_kafka()/.to_sink()— Attaches a sink (transitions toDataStream<HasSink>).execute()— Starts the pipeline as concurrent Tokio tasks
The type system enforces valid construction at compile time. You can only call .aggregate_expr() on a WindowedKeyedStream, not on a raw DataStream. You can only call .execute() on a DataStream<HasSink>.
Next Steps
- Architecture Overview — understand how Volley works under the hood
- Examples — browse all runnable examples
- Operators Guide — write custom operators
- Connectors Guide — write custom connectors