Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Quick Start

Build your first Volley pipeline in 5 minutes.

Basic In-Memory Pipeline

This pipeline reads sample trade data from memory, filters high-value trades, and collects results:

use std::sync::Arc;

use arrow::array::{Float64Array, Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use volley_connectors::memory::{MemorySink, MemorySource};
use volley_core::prelude::*;

#[tokio::main]
async fn main() -> Result<()> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("symbol", DataType::Utf8, false),
        Field::new("price", DataType::Float64, false),
        Field::new("quantity", DataType::Int64, false),
    ]));

    let records = vec![
        StreamRecord::new(RecordBatch::try_new(schema.clone(), vec![
            Arc::new(StringArray::from(vec!["AAPL"])),
            Arc::new(Float64Array::from(vec![150.0])),
            Arc::new(Int64Array::from(vec![100])),
        ]).unwrap()),
        StreamRecord::new(RecordBatch::try_new(schema.clone(), vec![
            Arc::new(StringArray::from(vec!["GOOG"])),
            Arc::new(Float64Array::from(vec![280.0])),
            Arc::new(Int64Array::from(vec![50])),
        ]).unwrap()),
    ];

    let sink = MemorySink::new();
    // `handle()` returns a shared reference to the sink's buffer that
    // stays live after `to_sink()` consumes the sink itself.
    let output = sink.handle();

    let report = StreamExecutionEnvironment::new()
        .from_source(MemorySource::new(records))
        .filter_expr(col("price").gt(lit(200.0)))
        .to_sink(sink)
        .execute("filter-example")
        .await?;

    // `execute` returns an `ExecutionReport { records_written, epochs_committed }`.
    // The filtered rows come back via the sink handle.
    println!(
        "{} rows, {} epochs",
        report.records_written, report.epochs_committed
    );
    println!("kept {} records", output.lock().unwrap().len());
    Ok(())
}

Run the full in-memory example from the repo:

cargo run --example in_memory_pipeline -p volley-examples

Expected output (abridged):

=== Volley in-memory DAG pipeline ===

Input: 5 trades

Execution report: 0 records written, 1 epochs committed

Filtered output (3 records):
  1. GOOG @ $280 x 50
  2. MSFT @ $310 x 75
  3. GOOG @ $290 x 150

Windowed Aggregation

Add keyed windowing and expression-based aggregation (requires the state-rocksdb feature):

use std::sync::Arc;
use std::time::Duration;
use volley_core::prelude::*;
use volley_connectors::memory::{MemorySink, MemorySource};

#[tokio::main]
async fn main() -> Result<()> {
    // `RocksDbBackend` implements `BatchStateBackend` directly — no
    // `KeyedStateBackend` wrapper needed at the builder API.
    let tmp = tempfile::tempdir().unwrap();

    StreamExecutionEnvironment::new()
        .from_source(MemorySource::new(records))
        .filter_expr(col("amount").gt(lit(100)))
        .key_by(col("user_id"))
        .window(TumblingWindows::of(Duration::from_secs(300)))
        .aggregate_expr(
            vec![sum(col("amount"))],
            RocksDbBackend::open(tmp.path().join("state")).unwrap(),
        )
        .to_sink(MemorySink::new())
        .execute("aggregation-job")
        .await?;

    Ok(())
}

Kafka Source to Kafka Sink

A real-world pipeline reading from Kafka, aggregating, and writing back to Kafka:

use std::sync::Arc;
use std::time::Duration;
use arrow::datatypes::{DataType, Field, Schema};
use volley_core::prelude::*;
use volley_connector_kafka::{KafkaEnvExt, KafkaStreamExt, KafkaSourceConfig, KafkaSinkConfig};

#[tokio::main]
async fn main() -> Result<()> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("user_id", DataType::Utf8, false),
        Field::new("amount", DataType::Float64, false),
    ]));

    let source_config = KafkaSourceConfig::new("localhost:9092", "events", "my-group", schema);
    let sink_config = KafkaSinkConfig::new("localhost:9092", "output");

    let tmp = tempfile::tempdir().unwrap();

    StreamExecutionEnvironment::new()
        .from_kafka(source_config).await?
        .filter_expr(col("amount").gt(lit(0.0)))
        .key_by(col("user_id"))
        .window(TumblingWindows::of(Duration::from_secs(60)))
        .aggregate_expr(
            vec![sum(col("amount"))],
            RocksDbBackend::open(tmp.path().join("state")).unwrap(),
        )
        .to_kafka(sink_config).await?
        .execute("kafka-aggregation-job")
        .await?;

    Ok(())
}

For exactly-once Kafka EOS (source offsets committed inside the sink’s Kafka transaction), swap the source/sink pair:

let (stream, cgm) = env.from_kafka_exactly_once(source_config).await?;
stream
    // ... operators ...
    .to_kafka_exactly_once(KafkaSinkConfig::new_transactional(
        "localhost:9092", "output", "my-pipeline-txn",
    ), cgm)
    .await?
    .execute("eos-job")
    .await?;

Note: Kafka connectors require cmake installed on your system. See Installation for details.

What’s Happening

  1. StreamExecutionEnvironment::new() — Creates the pipeline builder
  2. .from_source() / .from_kafka() — Attaches a source, producing a DataStream
  3. .filter_expr() / .select_expr() — Applies vectorised operators to the stream
  4. .key_by() — Partitions the stream by key, producing a KeyedStream
  5. .window() — Assigns records to time windows, producing a WindowedKeyedStream
  6. .aggregate_expr() — Computes an aggregation per window per key
  7. .to_kafka() / .to_sink() — Attaches a sink (transitions to DataStream<HasSink>)
  8. .execute() — Starts the pipeline as concurrent Tokio tasks

The type system enforces valid construction at compile time. You can only call .aggregate_expr() on a WindowedKeyedStream, not on a raw DataStream. You can only call .execute() on a DataStream<HasSink>.

Next Steps