Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

State Management

Volley’s state management is designed for high throughput: most state operations hit an in-memory cache, with RocksDB used only during checkpoints.

Architecture

+----------------------------------+
|        AggregateOperator         |
|  +----------------------------+  |
|  | Write-Behind Cache (HashMap)| |
|  | key -> Accumulator          | |
|  +----------------------------+  |
|         | on_checkpoint()        |
|         v                        |
|  +----------------------------+  |
|  | BatchStateBackend (RocksDB)| |
|  | WriteBatch for dirty keys   | |
|  | multi_get for bulk reads    | |
|  +----------------------------+  |
|         | hardlink snapshot      |
|         v                        |
|  +----------------------------+  |
|  | Checkpoint Directory        | |
|  | /checkpoints/epoch-N/       | |
|  +----------------------------+  |
+----------------------------------+

Write-Behind Cache

During normal processing, state reads and writes go to an in-memory HashMap cache. The cache holds accumulator values keyed by partition key. This eliminates all RocksDB I/O during processing, which is why Volley achieves 9M+ records/sec.

BatchStateBackend

When a checkpoint barrier arrives:

  1. on_checkpoint() is called on the operator
  2. Dirty keys from the cache are flushed to RocksDB via WriteBatch (a single atomic write)
  3. RocksDB creates a hardlink snapshot to the checkpoint directory
  4. The snapshot is nearly instant because hardlinks don’t copy data

Key operations:

  • write_batch() — Atomic batch write of dirty keys
  • multi_get() — Bulk read for cache misses (rare during steady-state)
  • Hardlink checkpoints — O(1) snapshot via filesystem hardlinks to SST files

Per-Key State Isolation

After .key_by(), each parallel operator instance has its own state namespace. Keys are hash-partitioned across instances, so state never conflicts between parallel operators.

Checkpoint Storage

/checkpoints/
  epoch-1/    # hardlink snapshot
  epoch-2/    # hardlink snapshot
  epoch-3/    # latest checkpoint

RocksDB checkpoints use hardlinks to SST files. Incremental growth is small when data doesn’t change between checkpoints. As data mutates, old SST files are retained for older checkpoints.

See the Capacity Planning guide for storage sizing recommendations.

Window State LRU

Window operators (WindowOperator, ExprWindowOperator) maintain an in-memory index called pending_windows that tracks active (key, window) pairs. Unlike the aggregate cache (which is a hot copy of data also in RocksDB), this index is the only record of which windows exist and need to fire on watermark advance.

To bound memory without data loss, the window LRU uses an evict-to-backend strategy:

+----------------------------------+
|        WindowOperator            |
|  +----------------------------+  |
|  | pending_windows (LRU)      |  |
|  | key:window_start -> (key,  |  |
|  |                    window)  |  |
|  +----------------------------+  |
|         | on eviction            |
|         v                        |
|  +----------------------------+  |
|  | State Backend (RocksDB)    |  |
|  | pending_index/{pending_key}|  |
|  +----------------------------+  |
|         | on watermark advance   |
|         v                        |
|  Scan persisted entries for      |
|  windows ready to fire           |
+----------------------------------+
  1. When the LRU is full, evicted entries are persisted to a pending_index/ namespace in the state backend
  2. On watermark advance, persisted entries are scanned (only when evictions have occurred, tracked by a has_evicted flag)
  3. Fired windows are deleted from both the LRU and the state backend

This ensures no window is orphaned — every pending window eventually fires or is flushed at end-of-stream, regardless of whether it was evicted from the in-memory cache.

Configure via with_max_pending_windows(n) on WindowedKeyedStream. When unset, the index is unbounded (default, no behavior change from prior releases).

See the design document for full details on trade-offs and the evict-to-backend strategy.

Recovery

On pipeline restart from a checkpoint, the RecoveryCoordinator restores source offsets and opens the RocksDB backend from the checkpoint directory. Stateful operators then lazily recover their in-memory structures on the first process() call:

  • Aggregate operators (ExprAggregateOperator, AggregateOperator) use StateBackend::scan_prefix() (backed by RocksDB’s prefix_iterator) to enumerate all keys with persisted accumulators. The cache and pending_keys maps are rebuilt from the scan results.
  • Window operators (WindowOperator, ExprWindowOperator) persist a WindowOperatorMeta struct (containing pending_windows and current_watermark) to a well-known state key during on_checkpoint(). On recovery, this metadata is loaded to restore window tracking state and watermark progress.

Recovery is lazy rather than eager: fresh pipelines (no state in RocksDB) pay no cost, and recovery happens only once before the first record is processed.