Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Windowing

Windowing groups records by time intervals for aggregation. Volley supports three window types with event-time semantics.

Window Types

Tumbling Windows

Fixed-size, non-overlapping windows:

#![allow(unused)]
fn main() {
.window(TumblingWindows::of(Duration::from_secs(60)))
}

Each record belongs to exactly one window.

Sliding Windows

Fixed-size, overlapping windows:

#![allow(unused)]
fn main() {
.window(SlidingWindows::of(
    Duration::from_secs(60),  // window size
    Duration::from_secs(10),  // slide interval
))
}

Each record may belong to multiple windows.

Session Windows

Dynamic windows that merge on activity:

#![allow(unused)]
fn main() {
.window(SessionWindows::with_gap(Duration::from_secs(30)))
}

A new session starts when the gap between records exceeds the threshold.

Event-Time Semantics

Event stream:  --[e1 t=1]--[e2 t=3]--[e3 t=7]--[watermark t=10]--[e4 t=5]-->

TumblingWindows(size=5):
  Window [0, 5):  e1, e2           -- fires at watermark t=10 (window end <= 10)
  Window [5, 10): e3               -- fires at watermark t=10
  e4 (t=5):       late data        -- routed to side output if allowed_lateness passed

SessionWindows(gap=3):
  Session 1: e1(t=1), e2(t=3)     -- gap(3-1)=2 < 3, merged
  Session 2: e3(t=7)              -- gap(7-3)=4 >= 3, new session

Windows fire when a watermark advances past the window’s end time.

Watermarks

Watermarks track event-time progress. The BoundedOutOfOrdernessGenerator allows configurable out-of-orderness before advancing the watermark:

#![allow(unused)]
fn main() {
.with_watermark_generator(
    BoundedOutOfOrdernessGenerator::new(Duration::from_secs(5))
)
}

This allows records to arrive up to 5 seconds late before the watermark advances past them.

Late Data

Records that arrive after the watermark has passed their window are considered late. Configure side_output_late_data() with a tag and attach a side output sink via with_late_data_sink() to route late events to a separate destination (e.g., a dead-letter queue):

#![allow(unused)]
fn main() {
let late_tag = OutputTag::new("late-events");

StreamExecutionEnvironment::new()
    .from_source(source)
    .with_watermarks(WatermarkConfig::new(Duration::from_secs(5)))
    .with_late_data_sink(late_events_sink)  // side output sink
    .key_by(col("user_id"))
    .window(TumblingWindows::of(Duration::from_secs(60)))
    .allowed_lateness(Duration::from_secs(10))
    .side_output_late_data(late_tag)
    .aggregate_expr(sum(col("amount")), state_backend)
    .to_sink(main_sink)
    .execute("job")
    .await?;
}

Late events are written to the side output sink as StreamRecords with the schema from late_event_schema(): event_data (JSON string), event_time, watermark, window_start, window_end, lateness_ms. The side output sink participates in checkpoint barriers for exactly-once semantics.

Without with_late_data_sink(), late events are logged and discarded.

Memory Management

The window operator tracks active (key, window) pairs in an in-memory index called pending_windows. With high-cardinality keys or wide/sliding windows, this index can grow large.

Bounding Window Memory with LRU

Use with_max_pending_windows() to cap the in-memory index size. Evicted entries are persisted to the state backend and scanned during watermark advancement, so no data is lost:

#![allow(unused)]
fn main() {
stream
    .key_by(col("user_id"))
    .window(TumblingWindows::of(Duration::from_secs(60)))
    .with_max_pending_windows(100_000)  // bound in-memory window index
    .aggregate_expr(sum(col("amount")), state_backend)
}

When unset (the default), the index is unbounded — identical to previous releases.

Sizing guidance:

Key cardinalityWindows per keyRecommended limitApprox. memory
< 10KAnyUnbounded (default)< 10 MB
10K–100K1 (tumbling)100,000~10 MB
100K–1M1 (tumbling)500,000~50 MB
Any10+ (sliding)Multiply by panesVaries

Each pending entry is ~100 bytes. Evicted entries add a small RocksDB I/O cost on watermark advance.

Note: with_max_pending_windows() is not recommended with session windows. Session window merging requires scanning the pending index for overlapping sessions, and evicted entries may cause additional state backend I/O during merges.

Monitoring

Monitor the volley_window_pending_size gauge and volley_window_pending_evictions_total counter to tune the limit. A high eviction rate indicates the LRU is too small for the workload. See Observability for the full metrics reference.