State Management
Volley’s state management is designed for high throughput: most state operations hit an in-memory cache, with RocksDB used only during checkpoints.
Architecture
+----------------------------------+
| AggregateOperator |
| +----------------------------+ |
| | Write-Behind Cache (HashMap)| |
| | key -> Accumulator | |
| +----------------------------+ |
| | on_checkpoint() |
| v |
| +----------------------------+ |
| | BatchStateBackend (RocksDB)| |
| | WriteBatch for dirty keys | |
| | multi_get for bulk reads | |
| +----------------------------+ |
| | hardlink snapshot |
| v |
| +----------------------------+ |
| | Checkpoint Directory | |
| | /checkpoints/epoch-N/ | |
| +----------------------------+ |
+----------------------------------+
Write-Behind Cache
During normal processing, state reads and writes go to an in-memory HashMap cache. The cache holds accumulator values keyed by partition key. This eliminates all RocksDB I/O during processing, which is why Volley achieves 9M+ records/sec.
BatchStateBackend
When a checkpoint barrier arrives:
on_checkpoint()is called on the operator- Dirty keys from the cache are flushed to RocksDB via
WriteBatch(a single atomic write) - RocksDB creates a hardlink snapshot to the checkpoint directory
- The snapshot is nearly instant because hardlinks don’t copy data
Key operations:
write_batch()— Atomic batch write of dirty keysmulti_get()— Bulk read for cache misses (rare during steady-state)- Hardlink checkpoints — O(1) snapshot via filesystem hardlinks to SST files
Per-Key State Isolation
After .key_by(), each parallel operator instance has its own state namespace. Keys are hash-partitioned across instances, so state never conflicts between parallel operators.
Checkpoint Storage
/checkpoints/
epoch-1/ # hardlink snapshot
epoch-2/ # hardlink snapshot
epoch-3/ # latest checkpoint
RocksDB checkpoints use hardlinks to SST files. Incremental growth is small when data doesn’t change between checkpoints. As data mutates, old SST files are retained for older checkpoints.
See the Capacity Planning guide for storage sizing recommendations.
Window State LRU
Window operators (WindowOperator, ExprWindowOperator) maintain an in-memory
index called pending_windows that tracks active (key, window) pairs. Unlike
the aggregate cache (which is a hot copy of data also in RocksDB), this index
is the only record of which windows exist and need to fire on watermark
advance.
To bound memory without data loss, the window LRU uses an evict-to-backend strategy:
+----------------------------------+
| WindowOperator |
| +----------------------------+ |
| | pending_windows (LRU) | |
| | key:window_start -> (key, | |
| | window) | |
| +----------------------------+ |
| | on eviction |
| v |
| +----------------------------+ |
| | State Backend (RocksDB) | |
| | pending_index/{pending_key}| |
| +----------------------------+ |
| | on watermark advance |
| v |
| Scan persisted entries for |
| windows ready to fire |
+----------------------------------+
- When the LRU is full, evicted entries are persisted to a
pending_index/namespace in the state backend - On watermark advance, persisted entries are scanned (only when evictions
have occurred, tracked by a
has_evictedflag) - Fired windows are deleted from both the LRU and the state backend
This ensures no window is orphaned — every pending window eventually fires or is flushed at end-of-stream, regardless of whether it was evicted from the in-memory cache.
Configure via with_max_pending_windows(n) on WindowedKeyedStream. When
unset, the index is unbounded (default, no behavior change from prior releases).
See the design document for full details on trade-offs and the evict-to-backend strategy.
Recovery
On pipeline restart from a checkpoint, the RecoveryCoordinator restores source offsets and opens the RocksDB backend from the checkpoint directory. Stateful operators then lazily recover their in-memory structures on the first process() call:
- Aggregate operators (
ExprAggregateOperator,AggregateOperator) useStateBackend::scan_prefix()(backed by RocksDB’sprefix_iterator) to enumerate all keys with persisted accumulators. Thecacheandpending_keysmaps are rebuilt from the scan results. - Window operators (
WindowOperator,ExprWindowOperator) persist aWindowOperatorMetastruct (containingpending_windowsandcurrent_watermark) to a well-known state key duringon_checkpoint(). On recovery, this metadata is loaded to restore window tracking state and watermark progress.
Recovery is lazy rather than eager: fresh pipelines (no state in RocksDB) pay no cost, and recovery happens only once before the first record is processed.