Exactly-Once Semantics
Volley uses checkpoint barriers for exactly-once processing. The protocol ensures that every record is processed exactly once, even in the presence of failures.
Checkpoint Barrier Protocol
1. Coordinator injects CheckpointBarrier(epoch=N) into source
2. Barrier flows through pipeline in-band with data
3. Each operator receives barrier:
a. Calls on_checkpoint() hook (flush caches, prepare state)
b. Forwards barrier downstream
c. State backend snapshots via RocksDB hardlink
4. Sink receives barrier:
a. Commits buffered writes
b. Acknowledges barrier to coordinator
5. Coordinator marks epoch N complete when all sinks ack
The key insight: barriers flow in-band alongside data records. No global pause is needed. Each operator processes the barrier when it arrives, then continues processing data.
Per-Sink Commit Strategies
Each sink type uses a different mechanism to achieve exactly-once:
| Sink | Commit Strategy | Recovery |
|---|---|---|
| Kafka | Transaction commit on barrier (+ send_offsets_to_transaction for Kafka↔Kafka EOS) | Abort + replay on recovery |
| Delta Lake | Epoch tag in commit metadata | Skip already-committed epochs |
| Iceberg | Snapshot commit via REST catalog | Epoch tracking in catalog |
Kafka↔Kafka End-to-End Exactly-Once
When both source and sink are Kafka, Volley supports the full Flink-style EOS protocol via librdkafka’s send_offsets_to_transaction:
#![allow(unused)]
fn main() {
let (stream, cgm) = StreamExecutionEnvironment::new()
.from_kafka_exactly_once(source_config).await?;
stream
.filter_expr(col("status").eq(lit("active")))
.to_kafka_exactly_once(sink_config, cgm).await?
.execute("eos-pipeline").await?;
}
How It Works
-
Setup:
from_kafka_exactly_once()returns a(DataStream, ConsumerGroupMetadata)tuple. TheConsumerGroupMetadata(CGM) is a handle to the Kafka consumer group that the source belongs to. Pass it toto_kafka_exactly_once()so the sink can commit consumer offsets atomically with produced records. -
Barrier injection: When the checkpoint coordinator injects a barrier, the source records its current Kafka partition offsets in
barrier.source_offsets. -
Barrier alignment: If the pipeline has parallel branches, the
BarrierAlignermerges source offsets from all branches before forwarding the barrier to the sink. (TheBarrierAligneris an internal component that ensures all parallel inputs to an operator have delivered the same barrier epoch before proceeding.) -
Atomic commit: When the sink receives the barrier, it:
- Calls
producer.send_offsets_to_transaction(offsets, cgm)— this stages the consumer group offset advance inside the producer’s open transaction - Calls
producer.commit_transaction()— this atomically commits both the produced output records and the consumer offset advance
- Calls
-
No auto-commit: In EOS mode, the sink disables batch-size and deadline-driven auto-commits. The only commit trigger is a checkpoint barrier. This means
CheckpointConfig::intervalmust be shorter than the broker’stransaction.timeout.ms(default 60s), or the broker will fence the producer. -
Recovery: On restart, the consumer group’s last committed offset (written atomically with the transaction) determines where to resume. The runtime skips file-based offset restoration for transactional sources.
Recovery
When a failure occurs, records may be in various stages of processing:
Checkpoint N In-flight records Failure
| |
v v
[committed] ---[r1]---[r2]---[r3]---[r4]---[r5]--- X
^ ^
known good these are lost
The RecoveryCoordinator restores to the last known-good state:
- Find last complete checkpoint — the most recent epoch where all sinks acknowledged their commit
- Restore source offsets — rewind sources to the offsets recorded at that checkpoint (e.g., Kafka partition offsets, S3 file positions)
- Restore operator state — load RocksDB hardlink snapshots from the checkpoint directory, restoring window state, aggregation accumulators, and any custom operator state
- Resume processing — records between the checkpoint and the failure (r1–r5 above) are replayed from the source
Replayed records don’t produce duplicates because each sink type has an idempotent commit strategy:
- Kafka: The incomplete transaction from the failed run is aborted. Replayed records are written in a new transaction. Consumers with
isolation.level=read_committednever see the aborted records. - Delta Lake: The sink checks the Delta log for the current epoch’s
pipeline_id. If an epoch was already committed, the write is skipped entirely. - Iceberg: The sink reads the latest snapshot’s epoch from catalog metadata. Already-committed epochs are skipped.
Operator Lifecycle
The on_checkpoint() hook gives operators a chance to prepare for the snapshot:
#![allow(unused)]
fn main() {
fn on_checkpoint(&mut self, epoch: u64) {
// Flush any buffered state to RocksDB
// The state backend will snapshot after this returns
}
}
For stateful operators using the write-behind cache, on_checkpoint() flushes dirty keys from the in-memory HashMap to RocksDB via BatchStateBackend::write_batch().