Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Exactly-Once Semantics

Volley uses checkpoint barriers for exactly-once processing. The protocol ensures that every record is processed exactly once, even in the presence of failures.

Checkpoint Barrier Protocol

1. Coordinator injects CheckpointBarrier(epoch=N) into source
2. Barrier flows through pipeline in-band with data
3. Each operator receives barrier:
   a. Calls on_checkpoint() hook (flush caches, prepare state)
   b. Forwards barrier downstream
   c. State backend snapshots via RocksDB hardlink
4. Sink receives barrier:
   a. Commits buffered writes
   b. Acknowledges barrier to coordinator
5. Coordinator marks epoch N complete when all sinks ack

The key insight: barriers flow in-band alongside data records. No global pause is needed. Each operator processes the barrier when it arrives, then continues processing data.

Per-Sink Commit Strategies

Each sink type uses a different mechanism to achieve exactly-once:

SinkCommit StrategyRecovery
KafkaTransaction commit on barrier (+ send_offsets_to_transaction for Kafka↔Kafka EOS)Abort + replay on recovery
Delta LakeEpoch tag in commit metadataSkip already-committed epochs
IcebergSnapshot commit via REST catalogEpoch tracking in catalog

Kafka↔Kafka End-to-End Exactly-Once

When both source and sink are Kafka, Volley supports the full Flink-style EOS protocol via librdkafka’s send_offsets_to_transaction:

#![allow(unused)]
fn main() {
let (stream, cgm) = StreamExecutionEnvironment::new()
    .from_kafka_exactly_once(source_config).await?;

stream
    .filter_expr(col("status").eq(lit("active")))
    .to_kafka_exactly_once(sink_config, cgm).await?
    .execute("eos-pipeline").await?;
}

How It Works

  1. Setup: from_kafka_exactly_once() returns a (DataStream, ConsumerGroupMetadata) tuple. The ConsumerGroupMetadata (CGM) is a handle to the Kafka consumer group that the source belongs to. Pass it to to_kafka_exactly_once() so the sink can commit consumer offsets atomically with produced records.

  2. Barrier injection: When the checkpoint coordinator injects a barrier, the source records its current Kafka partition offsets in barrier.source_offsets.

  3. Barrier alignment: If the pipeline has parallel branches, the BarrierAligner merges source offsets from all branches before forwarding the barrier to the sink. (The BarrierAligner is an internal component that ensures all parallel inputs to an operator have delivered the same barrier epoch before proceeding.)

  4. Atomic commit: When the sink receives the barrier, it:

    • Calls producer.send_offsets_to_transaction(offsets, cgm) — this stages the consumer group offset advance inside the producer’s open transaction
    • Calls producer.commit_transaction() — this atomically commits both the produced output records and the consumer offset advance
  5. No auto-commit: In EOS mode, the sink disables batch-size and deadline-driven auto-commits. The only commit trigger is a checkpoint barrier. This means CheckpointConfig::interval must be shorter than the broker’s transaction.timeout.ms (default 60s), or the broker will fence the producer.

  6. Recovery: On restart, the consumer group’s last committed offset (written atomically with the transaction) determines where to resume. The runtime skips file-based offset restoration for transactional sources.

Recovery

When a failure occurs, records may be in various stages of processing:

Checkpoint N          In-flight records           Failure
     |                                                |
     v                                                v
  [committed] ---[r1]---[r2]---[r3]---[r4]---[r5]--- X
              ^                                       ^
         known good                            these are lost

The RecoveryCoordinator restores to the last known-good state:

  1. Find last complete checkpoint — the most recent epoch where all sinks acknowledged their commit
  2. Restore source offsets — rewind sources to the offsets recorded at that checkpoint (e.g., Kafka partition offsets, S3 file positions)
  3. Restore operator state — load RocksDB hardlink snapshots from the checkpoint directory, restoring window state, aggregation accumulators, and any custom operator state
  4. Resume processing — records between the checkpoint and the failure (r1–r5 above) are replayed from the source

Replayed records don’t produce duplicates because each sink type has an idempotent commit strategy:

  • Kafka: The incomplete transaction from the failed run is aborted. Replayed records are written in a new transaction. Consumers with isolation.level=read_committed never see the aborted records.
  • Delta Lake: The sink checks the Delta log for the current epoch’s pipeline_id. If an epoch was already committed, the write is skipped entirely.
  • Iceberg: The sink reads the latest snapshot’s epoch from catalog metadata. Already-committed epochs are skipped.

Operator Lifecycle

The on_checkpoint() hook gives operators a chance to prepare for the snapshot:

#![allow(unused)]
fn main() {
fn on_checkpoint(&mut self, epoch: u64) {
    // Flush any buffered state to RocksDB
    // The state backend will snapshot after this returns
}
}

For stateful operators using the write-behind cache, on_checkpoint() flushes dirty keys from the in-memory HashMap to RocksDB via BatchStateBackend::write_batch().