Windowing
Windowing groups records by time intervals for aggregation. Volley supports three window types with event-time semantics.
Window Types
Tumbling Windows
Fixed-size, non-overlapping windows:
#![allow(unused)]
fn main() {
.window(TumblingWindows::of(Duration::from_secs(60)))
}
Each record belongs to exactly one window.
Sliding Windows
Fixed-size, overlapping windows:
#![allow(unused)]
fn main() {
.window(SlidingWindows::of(
Duration::from_secs(60), // window size
Duration::from_secs(10), // slide interval
))
}
Each record may belong to multiple windows.
Session Windows
Dynamic windows that merge on activity:
#![allow(unused)]
fn main() {
.window(SessionWindows::with_gap(Duration::from_secs(30)))
}
A new session starts when the gap between records exceeds the threshold.
Event-Time Semantics
Event stream: --[e1 t=1]--[e2 t=3]--[e3 t=7]--[watermark t=10]--[e4 t=5]-->
TumblingWindows(size=5):
Window [0, 5): e1, e2 -- fires at watermark t=10 (window end <= 10)
Window [5, 10): e3 -- fires at watermark t=10
e4 (t=5): late data -- routed to side output if allowed_lateness passed
SessionWindows(gap=3):
Session 1: e1(t=1), e2(t=3) -- gap(3-1)=2 < 3, merged
Session 2: e3(t=7) -- gap(7-3)=4 >= 3, new session
Windows fire when a watermark advances past the window’s end time.
Watermarks
Watermarks track event-time progress. The BoundedOutOfOrdernessGenerator allows configurable out-of-orderness before advancing the watermark:
#![allow(unused)]
fn main() {
.with_watermark_generator(
BoundedOutOfOrdernessGenerator::new(Duration::from_secs(5))
)
}
This allows records to arrive up to 5 seconds late before the watermark advances past them.
Late Data
Records that arrive after the watermark has passed their window are considered late. Configure side_output_late_data() with a tag and attach a side output sink via with_late_data_sink() to route late events to a separate destination (e.g., a dead-letter queue):
#![allow(unused)]
fn main() {
let late_tag = OutputTag::new("late-events");
StreamExecutionEnvironment::new()
.from_source(source)
.with_watermarks(WatermarkConfig::new(Duration::from_secs(5)))
.with_late_data_sink(late_events_sink) // side output sink
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(60)))
.allowed_lateness(Duration::from_secs(10))
.side_output_late_data(late_tag)
.aggregate_expr(sum(col("amount")), state_backend)
.to_sink(main_sink)
.execute("job")
.await?;
}
Late events are written to the side output sink as StreamRecords with the schema from late_event_schema(): event_data (JSON string), event_time, watermark, window_start, window_end, lateness_ms. The side output sink participates in checkpoint barriers for exactly-once semantics.
Without with_late_data_sink(), late events are logged and discarded.
Memory Management
The window operator tracks active (key, window) pairs in an in-memory index called pending_windows. With high-cardinality keys or wide/sliding windows, this index can grow large.
Bounding Window Memory with LRU
Use with_max_pending_windows() to cap the in-memory index size. Evicted entries are persisted to the state backend and scanned during watermark advancement, so no data is lost:
#![allow(unused)]
fn main() {
stream
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(60)))
.with_max_pending_windows(100_000) // bound in-memory window index
.aggregate_expr(sum(col("amount")), state_backend)
}
When unset (the default), the index is unbounded — identical to previous releases.
Sizing guidance:
| Key cardinality | Windows per key | Recommended limit | Approx. memory |
|---|---|---|---|
| < 10K | Any | Unbounded (default) | < 10 MB |
| 10K–100K | 1 (tumbling) | 100,000 | ~10 MB |
| 100K–1M | 1 (tumbling) | 500,000 | ~50 MB |
| Any | 10+ (sliding) | Multiply by panes | Varies |
Each pending entry is ~100 bytes. Evicted entries add a small RocksDB I/O cost on watermark advance.
Note:
with_max_pending_windows()is not recommended with session windows. Session window merging requires scanning the pending index for overlapping sessions, and evicted entries may cause additional state backend I/O during merges.
Monitoring
Monitor the volley_window_pending_size gauge and volley_window_pending_evictions_total counter to tune the limit. A high eviction rate indicates the LRU is too small for the workload. See Observability for the full metrics reference.