Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Windowing

Streams never end, so “the average price” only makes sense over a time range: the last minute, the last hour, this user’s current session. Windowing is how you slice an unbounded stream into finite chunks that aggregations can actually compute over.

Volley gives you three shapes of window — tumbling, sliding, and session — all driven by event time (the time the record happened) rather than wall-clock time (when it arrived). This means late or out-of-order data still lands in the right window.

The shortest path: .key_by(col("user_id")).window(TumblingWindows::of(Duration::from_secs(60))).aggregate_expr(vec![sum(col("amount"))], state_backend). Read on for how to pick the window shape and tune watermarks and late data.

Window Types

Tumbling Windows

Fixed-size, non-overlapping windows:

#![allow(unused)]
fn main() {
.window(TumblingWindows::of(Duration::from_secs(60)))
}

Each record belongs to exactly one window.

Sliding Windows

Fixed-size, overlapping windows:

#![allow(unused)]
fn main() {
.window(SlidingWindows::of(
    Duration::from_secs(60),  // window size
    Duration::from_secs(10),  // slide interval
))
}

Each record may belong to multiple windows.

Session Windows

Dynamic windows that merge on activity:

#![allow(unused)]
fn main() {
.window(SessionWindows::with_gap(Duration::from_secs(30)))
}

A new session starts when the gap between records exceeds the threshold.

Event-Time Semantics

Event stream:  --[e1 t=1]--[e2 t=3]--[e3 t=7]--[watermark t=10]--[e4 t=5]-->

TumblingWindows(size=5):
  Window [0, 5):  e1, e2           -- fires at watermark t=10 (window end <= 10)
  Window [5, 10): e3               -- fires at watermark t=10
  e4 (t=5):       late data        -- routed to side output if allowed_lateness passed

SessionWindows(gap=3):
  Session 1: e1(t=1), e2(t=3)     -- gap(3-1)=2 < 3, merged
  Session 2: e3(t=7)              -- gap(7-3)=4 >= 3, new session

Windows fire when a watermark advances past the window’s end time.

Watermarks

Watermarks track event-time progress. The BoundedOutOfOrdernessGenerator allows configurable out-of-orderness before advancing the watermark:

#![allow(unused)]
fn main() {
.with_watermark_generator(
    BoundedOutOfOrdernessGenerator::new(Duration::from_secs(5))
)
}

This allows records to arrive up to 5 seconds late before the watermark advances past them.

Late Data

Records that arrive after the watermark has passed their window are considered late. Configure side_output_late_data() with a tag and attach a side output sink via with_late_data_sink() to route late events to a separate destination (e.g., a dead-letter queue):

#![allow(unused)]
fn main() {
let late_tag = OutputTag::new("late-events");

StreamExecutionEnvironment::new()
    .from_source(source)
    .with_watermarks(WatermarkConfig::new(Duration::from_secs(5)))
    .with_late_data_sink(late_events_sink)  // side output sink
    .key_by(col("user_id"))
    .window(TumblingWindows::of(Duration::from_secs(60)))
    .allowed_lateness(Duration::from_secs(10))
    .side_output_late_data(late_tag)
    .aggregate_expr(vec![sum(col("amount"))], state_backend)
    .to_sink(main_sink)
    .execute("job")
    .await?;
}

Late events are written to the side output sink as StreamRecords with the schema from late_event_schema(): event_data (JSON string), event_time, watermark, window_start, window_end, lateness_ms. The side output sink participates in checkpoint barriers for exactly-once semantics.

Without with_late_data_sink(), late events are logged and discarded.

Batch Processing Performance

The window operator is designed to exploit Arrow’s columnar batch processing. When a RecordBatch arrives with N rows, the operator avoids per-row state backend I/O through two mechanisms:

Write-behind accumulator cache

Accumulator state (partial aggregation results per key per window) is cached in an in-memory HashMap. During process(), the operator reads and updates accumulators entirely in memory. State is only flushed to RocksDB when a checkpoint barrier arrives — the same write-behind pattern used by the non-windowed aggregate_expr() operator.

This means a batch of 1000 rows touching 50 unique (key, window) pairs does zero RocksDB I/O during processing. The only state backend calls happen on cache misses after recovery, or when the pending_windows LRU evicts entries.

Group-then-update strategy

Instead of processing each row individually, the operator groups row indices by (key, window) and then calls update_batch() once per group with a multi-row Arrow array:

Batch: 4000 rows, 100 keys, SlidingWindows(10s, 2s) → 5 windows per row

Step 1 — Assign & group:
  For each row, compute (key, window) pairs → HashMap<(key, window), [row indices]>
  Result: ~500 groups (100 keys × 5 windows), ~8 rows per group

Step 2 — Batch update:
  For each group:
    arrow::compute::take(value_column, group_indices) → multi-row array
    accumulator.update_batch([multi_row_array])       → one call, not 8

This reduces 20,000 update_batch() calls (4000 rows × 5 windows) to 500 calls with ~8-row arrays each.

Batch-level event time fast path: When all rows share the same event time (no timestamp_column configured), window assignment is computed once for the entire batch rather than per-row. The operator then only groups by key, which is even cheaper.

Session window fallback: Session windows require per-row sequential processing because each event can trigger a merge of overlapping sessions. The batch path is used for tumbling and sliding windows only.

Memory Management

The window operator tracks active (key, window) pairs in an in-memory index called pending_windows. With high-cardinality keys or wide/sliding windows, this index can grow large.

Bounding Window Memory with LRU

Use with_max_pending_windows() to cap the in-memory index size. Evicted entries are persisted to the state backend and scanned during watermark advancement, so no data is lost:

#![allow(unused)]
fn main() {
stream
    .key_by(col("user_id"))
    .window(TumblingWindows::of(Duration::from_secs(60)))
    .with_max_pending_windows(100_000)  // bound in-memory window index
    .aggregate_expr(vec![sum(col("amount"))], state_backend)
}

When unset (the default), the index is unbounded — identical to previous releases.

Sizing guidance:

Key cardinalityWindows per keyRecommended limitApprox. memory
< 10KAnyUnbounded (default)< 10 MB
10K–100K1 (tumbling)100,000~10 MB
100K–1M1 (tumbling)500,000~50 MB
Any10+ (sliding)Multiply by panesVaries

Each pending entry is ~100 bytes. Evicted entries add a small RocksDB I/O cost on watermark advance.

Note: with_max_pending_windows() is not recommended with session windows. Session window merging requires scanning the pending index for overlapping sessions, and evicted entries may cause additional state backend I/O during merges.

Monitoring

Monitor the volley_window_pending_size gauge and volley_window_pending_evictions_total counter to tune the limit. A high eviction rate indicates the LRU is too small for the workload. See Observability for the full metrics reference.