Windowing
Streams never end, so “the average price” only makes sense over a time range: the last minute, the last hour, this user’s current session. Windowing is how you slice an unbounded stream into finite chunks that aggregations can actually compute over.
Volley gives you three shapes of window — tumbling, sliding, and session — all driven by event time (the time the record happened) rather than wall-clock time (when it arrived). This means late or out-of-order data still lands in the right window.
The shortest path: .key_by(col("user_id")).window(TumblingWindows::of(Duration::from_secs(60))).aggregate_expr(vec![sum(col("amount"))], state_backend). Read on for how to pick the window shape and tune watermarks and late data.
Window Types
Tumbling Windows
Fixed-size, non-overlapping windows:
#![allow(unused)]
fn main() {
.window(TumblingWindows::of(Duration::from_secs(60)))
}
Each record belongs to exactly one window.
Sliding Windows
Fixed-size, overlapping windows:
#![allow(unused)]
fn main() {
.window(SlidingWindows::of(
Duration::from_secs(60), // window size
Duration::from_secs(10), // slide interval
))
}
Each record may belong to multiple windows.
Session Windows
Dynamic windows that merge on activity:
#![allow(unused)]
fn main() {
.window(SessionWindows::with_gap(Duration::from_secs(30)))
}
A new session starts when the gap between records exceeds the threshold.
Event-Time Semantics
Event stream: --[e1 t=1]--[e2 t=3]--[e3 t=7]--[watermark t=10]--[e4 t=5]-->
TumblingWindows(size=5):
Window [0, 5): e1, e2 -- fires at watermark t=10 (window end <= 10)
Window [5, 10): e3 -- fires at watermark t=10
e4 (t=5): late data -- routed to side output if allowed_lateness passed
SessionWindows(gap=3):
Session 1: e1(t=1), e2(t=3) -- gap(3-1)=2 < 3, merged
Session 2: e3(t=7) -- gap(7-3)=4 >= 3, new session
Windows fire when a watermark advances past the window’s end time.
Watermarks
Watermarks track event-time progress. The BoundedOutOfOrdernessGenerator allows configurable out-of-orderness before advancing the watermark:
#![allow(unused)]
fn main() {
.with_watermark_generator(
BoundedOutOfOrdernessGenerator::new(Duration::from_secs(5))
)
}
This allows records to arrive up to 5 seconds late before the watermark advances past them.
Late Data
Records that arrive after the watermark has passed their window are considered late. Configure side_output_late_data() with a tag and attach a side output sink via with_late_data_sink() to route late events to a separate destination (e.g., a dead-letter queue):
#![allow(unused)]
fn main() {
let late_tag = OutputTag::new("late-events");
StreamExecutionEnvironment::new()
.from_source(source)
.with_watermarks(WatermarkConfig::new(Duration::from_secs(5)))
.with_late_data_sink(late_events_sink) // side output sink
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(60)))
.allowed_lateness(Duration::from_secs(10))
.side_output_late_data(late_tag)
.aggregate_expr(vec![sum(col("amount"))], state_backend)
.to_sink(main_sink)
.execute("job")
.await?;
}
Late events are written to the side output sink as StreamRecords with the schema from late_event_schema(): event_data (JSON string), event_time, watermark, window_start, window_end, lateness_ms. The side output sink participates in checkpoint barriers for exactly-once semantics.
Without with_late_data_sink(), late events are logged and discarded.
Batch Processing Performance
The window operator is designed to exploit Arrow’s columnar batch processing. When a RecordBatch arrives with N rows, the operator avoids per-row state backend I/O through two mechanisms:
Write-behind accumulator cache
Accumulator state (partial aggregation results per key per window) is cached in an in-memory HashMap. During process(), the operator reads and updates accumulators entirely in memory. State is only flushed to RocksDB when a checkpoint barrier arrives — the same write-behind pattern used by the non-windowed aggregate_expr() operator.
This means a batch of 1000 rows touching 50 unique (key, window) pairs does zero RocksDB I/O during processing. The only state backend calls happen on cache misses after recovery, or when the pending_windows LRU evicts entries.
Group-then-update strategy
Instead of processing each row individually, the operator groups row indices by (key, window) and then calls update_batch() once per group with a multi-row Arrow array:
Batch: 4000 rows, 100 keys, SlidingWindows(10s, 2s) → 5 windows per row
Step 1 — Assign & group:
For each row, compute (key, window) pairs → HashMap<(key, window), [row indices]>
Result: ~500 groups (100 keys × 5 windows), ~8 rows per group
Step 2 — Batch update:
For each group:
arrow::compute::take(value_column, group_indices) → multi-row array
accumulator.update_batch([multi_row_array]) → one call, not 8
This reduces 20,000 update_batch() calls (4000 rows × 5 windows) to 500 calls with ~8-row arrays each.
Batch-level event time fast path: When all rows share the same event time (no timestamp_column configured), window assignment is computed once for the entire batch rather than per-row. The operator then only groups by key, which is even cheaper.
Session window fallback: Session windows require per-row sequential processing because each event can trigger a merge of overlapping sessions. The batch path is used for tumbling and sliding windows only.
Memory Management
The window operator tracks active (key, window) pairs in an in-memory index called pending_windows. With high-cardinality keys or wide/sliding windows, this index can grow large.
Bounding Window Memory with LRU
Use with_max_pending_windows() to cap the in-memory index size. Evicted entries are persisted to the state backend and scanned during watermark advancement, so no data is lost:
#![allow(unused)]
fn main() {
stream
.key_by(col("user_id"))
.window(TumblingWindows::of(Duration::from_secs(60)))
.with_max_pending_windows(100_000) // bound in-memory window index
.aggregate_expr(vec![sum(col("amount"))], state_backend)
}
When unset (the default), the index is unbounded — identical to previous releases.
Sizing guidance:
| Key cardinality | Windows per key | Recommended limit | Approx. memory |
|---|---|---|---|
| < 10K | Any | Unbounded (default) | < 10 MB |
| 10K–100K | 1 (tumbling) | 100,000 | ~10 MB |
| 100K–1M | 1 (tumbling) | 500,000 | ~50 MB |
| Any | 10+ (sliding) | Multiply by panes | Varies |
Each pending entry is ~100 bytes. Evicted entries add a small RocksDB I/O cost on watermark advance.
Note:
with_max_pending_windows()is not recommended with session windows. Session window merging requires scanning the pending index for overlapping sessions, and evicted entries may cause additional state backend I/O during merges.
Monitoring
Monitor the volley_window_pending_size gauge and volley_window_pending_evictions_total counter to tune the limit. A high eviction rate indicates the LRU is too small for the workload. See Observability for the full metrics reference.