Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Kafka

Volley provides a Kafka source and sink via the volley-connector-kafka crate. The sink defaults to non-transactional idempotent mode for maximum throughput, with opt-in exactly-once transactional semantics.

Setup

Add the Kafka feature to your Cargo.toml:

[dependencies]
volley-connector-kafka = { git = "https://github.com/volley-streams/volley" }

System dependencies: cmake is required for building rdkafka. OpenSSL and libcurl are vendored and statically linked — no additional system packages needed. See Installation.

Authentication

Use KafkaSecurityConfig to configure authentication for both sources and sinks. The same config struct works with .with_security() on either KafkaSourceConfig or KafkaSinkConfig.

Confluent Cloud / SASL_SSL

The most common setup for managed Kafka (Confluent Cloud, AWS MSK Serverless, Aiven):

#![allow(unused)]
fn main() {
use volley_connector_kafka::{KafkaSourceConfig, KafkaSinkConfig, KafkaSecurityConfig};

let security = KafkaSecurityConfig::sasl_ssl("API_KEY", "API_SECRET");

let source_config = KafkaSourceConfig::new("pkc-xxxxx.us-east-1.aws.confluent.cloud:9092", "events", "my-group")
    .with_security(security.clone());

let sink_config = KafkaSinkConfig::new("pkc-xxxxx.us-east-1.aws.confluent.cloud:9092", "output")
    .with_security(security);
}

This sets security.protocol=SASL_SSL and sasl.mechanism=PLAIN, which is the correct combination for Confluent Cloud and most SASL-based providers.

SCRAM Authentication

For brokers that use SCRAM (e.g., AWS MSK with IAM disabled, self-hosted clusters):

#![allow(unused)]
fn main() {
let security = KafkaSecurityConfig::sasl_ssl("username", "password")
    .with_sasl_mechanism("SCRAM-SHA-256");
}

Supported SASL mechanisms: PLAIN (default), SCRAM-SHA-256, SCRAM-SHA-512.

mTLS (Mutual TLS)

For clusters that authenticate clients via X.509 certificates:

#![allow(unused)]
fn main() {
let security = KafkaSecurityConfig::ssl()
    .with_ssl_ca("/etc/kafka/ca.pem")
    .with_ssl_cert("/etc/kafka/client.pem")
    .with_ssl_key("/etc/kafka/client.key");

let source_config = KafkaSourceConfig::new("broker:9093", "events", "my-group")
    .with_security(security);
}

Key Types

TypeDescription
KafkaSourceConfigConfiguration for the Kafka consumer (brokers, topic, group, format, batch size)
KafkaSinkConfigConfiguration for the Kafka producer (brokers, topic, format, delivery mode)
KafkaSecurityConfigShared authentication/TLS config for source and sink
KafkaSourceFormatMessage format enum: Json (default), Protobuf, Avro
KafkaSinkFormatSerialization format enum: Json (default), Protobuf, Avro
KafkaSourceSource that polls Kafka for records
KafkaSinkSink that writes records to Kafka
KafkaEnvExtExtension trait for StreamExecutionEnvironment
KafkaStreamExtExtension trait for DataStream
SchemaRegistryClientConfluent Schema Registry HTTP client with caching
env.from_kafka_protobuf::<M>()Trait method: Kafka source + ProtobufDecodeOperator
env.from_kafka_avro()Trait method: Kafka source + AvroDecodeOperator + Schema Registry

Example

use volley_core::prelude::*;
use volley_connector_kafka::{KafkaEnvExt, KafkaStreamExt, KafkaSourceConfig, KafkaSinkConfig};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    let source_config = KafkaSourceConfig::new("localhost:9092", "events", "my-group");
    let sink_config = KafkaSinkConfig::new("localhost:9092", "output");

    StreamExecutionEnvironment::new()
        .from_kafka(source_config).await?
        .filter(|record| true)
        .key_by(col("user_id"))
        .window(TumblingWindows::of(Duration::from_secs(60)))
        .aggregate(AggregationType::Sum, "amount")
        .to_kafka(sink_config).await?
        .execute("kafka-aggregation-job")
        .await?;

    Ok(())
}

Protobuf Format

For Protobuf-encoded Kafka messages, use KafkaSourceConfig::new_protobuf() and the from_kafka_protobuf trait method on KafkaEnvExt:

#![allow(unused)]
fn main() {
use volley_connector_kafka::{KafkaEnvExt, KafkaSourceConfig};

mod proto {
    include!(concat!(env!("OUT_DIR"), "/events.rs"));
}

let config = KafkaSourceConfig::new_protobuf("localhost:9092", "events", "my-group")
    .with_batch_size(100);

let stream = StreamExecutionEnvironment::new()
    .from_kafka_protobuf::<proto::Event>(config).await?;
}

The source emits raw binary payloads in a Binary “payload” column. The ProtobufDecodeOperator (wired automatically by from_kafka_protobuf) decodes them into typed Arrow columns using the compiled prost type. The type M must implement ToArrow, typically via the impl_to_arrow! proc-macro from volley-derive.

Scaffolding: volley new with Kafka source and protobuf format generates build.rs, proto/events.proto, and prost dependencies.

Avro Format (Schema Registry)

For Avro-encoded Kafka messages with Confluent Schema Registry, use KafkaSourceConfig::new_avro() and the from_kafka_avro trait method:

#![allow(unused)]
fn main() {
use volley_connector_kafka::{KafkaEnvExt, KafkaSourceConfig};

let config = KafkaSourceConfig::new_avro(
    "localhost:9092",
    "events",
    "my-group",
    "http://localhost:8081",  // Schema Registry URL
)
.with_schema_registry_auth("user", "password")  // optional basic auth
.with_batch_size(100);

let stream = StreamExecutionEnvironment::new()
    .from_kafka_avro(config).await?;
}

The source emits raw binary payloads. The AvroDecodeOperator (wired automatically by from_kafka_avro):

  1. Strips the Confluent 5-byte wire format header ([0x00][4-byte schema ID][payload])
  2. Fetches the Avro schema from the Schema Registry by ID (cached after first fetch)
  3. Decodes the Avro binary into apache_avro::types::Value
  4. Converts directly to Arrow arrays using typed ArrayBuilders (no JSON intermediary)

The Arrow schema is inferred from the Avro schema. Supported type mappings:

Avro TypeArrow Type
booleanBoolean
intInt32
longInt64
floatFloat32
doubleFloat64
stringUtf8
bytesBinary
fixedBinary
enumUtf8
union(null, T)T (nullable)

Plain Avro (No Schema Registry)

For Avro messages without the Confluent wire format, use AvroDecodeOperator directly with a static schema:

#![allow(unused)]
fn main() {
use volley_core::operators::avro::AvroDecodeOperator;

let avro_schema = apache_avro::Schema::parse_str(r#"{ ... }"#).unwrap();
let stream = env.from_kafka(config).await?
    .with_operator(AvroDecodeOperator::with_static_schema("payload", avro_schema));
}

Batch Reads

By default, KafkaSource reads one message per poll_next() call (single-row RecordBatch). For high-throughput pipelines, enable batch reads:

#![allow(unused)]
fn main() {
let config = KafkaSourceConfig::new("localhost:9092", "events", "my-group", schema)
    .with_batch_size(100)       // accumulate up to 100 messages
    .with_batch_timeout(Duration::from_millis(50));  // wait 50ms for next message
}

The source accumulates up to batch_size messages, concatenating them into a single multi-row RecordBatch. All downstream operators handle multi-row batches correctly. Partial batches are returned on timeout (no blocking).

Batch metadata: event_time_ms is set to the max timestamp in the batch (correct for watermark advancement). For per-row event times in windowed operations, configure timestamp_column on your WindowConfig.

Consumer Tuning

Fine-tune the Kafka consumer’s fetch behavior for throughput vs. latency:

#![allow(unused)]
fn main() {
let config = KafkaSourceConfig::new("localhost:9092", "events", "my-group")
    .with_batch_size(500)
    .with_fetch_min_bytes(32_768)         // wait for 32KB of data (default: 1)
    .with_fetch_max_wait_ms(100)          // broker waits up to 100ms to fill (default: 500)
    .with_max_partition_fetch_bytes(2_097_152); // 2MB per partition (default: 1MB)
}
MethodDefaultEffect
with_fetch_min_bytes(bytes)1Minimum bytes the broker accumulates before responding. Higher values reduce fetch requests at the cost of latency.
with_fetch_max_wait_ms(ms)500Max time the broker waits to fill fetch_min_bytes. Acts as an upper bound on added latency.
with_max_partition_fetch_bytes(bytes)1,048,576 (1MB)Max data returned per partition per fetch. Increase for partitions with large messages.
with_property(key, value)Escape hatch for any rdkafka consumer setting not covered above.

Tip: For high-throughput pipelines, combine with_batch_size() with with_fetch_min_bytes() so the broker pre-aggregates data and the source fills batches faster.

Retry on Transient Errors

Both the source and sink support optional retry with exponential backoff for transient broker errors (e.g., broker unavailable, request timeouts, leader changes, network exceptions). Configure via with_retry():

#![allow(unused)]
fn main() {
use std::time::Duration;
use volley_core::resilience::RetryConfig;

// Source with retry
let source_config = KafkaSourceConfig::new("localhost:9092", "topic", "group", schema)
    .with_retry(
        RetryConfig::new()
            .with_max_retries(5)
            .with_initial_backoff(Duration::from_millis(200))
            .with_max_backoff(Duration::from_secs(30)),
    );

// Sink with retry
let sink_config = KafkaSinkConfig::new_transactional("localhost:9092", "output", "txn-1")
    .with_retry(
        RetryConfig::new()
            .with_max_retries(3)
            .with_initial_backoff(Duration::from_millis(100)),
    );
}

When retry is not configured (the default), errors propagate immediately — existing behavior is unchanged.

Source behavior: Both single-message and batch accumulation paths retry recv() calls. Subsequent messages within a batch (after the first) are not retried — a partial batch is returned instead.

Sink behavior: Individual send() calls are retried. Only the send is retried, not the surrounding transaction. If retries are exhausted, the transaction is aborted (existing behavior).

Metrics: volley_retry_attempts_total counter with component label (kafka_source or kafka_sink).

Sink Delivery Modes

The Kafka sink supports three delivery modes, from lowest overhead to strongest guarantees:

Fire-and-Forget (acks=0)

No acknowledgment from the broker. Highest throughput, but messages can be lost:

#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new("localhost:9092", "output")
    .with_acks("0");
}

When acks is "0", idempotence is automatically disabled since the broker never confirms delivery.

Idempotent (default)

The default mode. The producer uses Kafka’s idempotent delivery (enable.idempotence=true) to guarantee no duplicates from producer retries, without the overhead of transactions:

#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new("localhost:9092", "output");
}

This is the right choice for most pipelines. It guarantees at-least-once delivery with no duplicate writes from the producer side, and has no per-batch transaction commit overhead.

Transactional (exactly-once)

Opt-in via new_transactional(). Wraps writes in Kafka transactions for exactly-once semantics:

#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new_transactional("localhost:9092", "output", "my-txn-id");
}

See Exactly-Once Semantics below for details.

Breaking change: The old 3-argument KafkaSinkConfig::new(servers, topic, txn_id) has been renamed to new_transactional(). The 2-argument new(servers, topic) now creates a non-transactional idempotent sink.

Producer Tuning

Fine-tune the Kafka producer for throughput vs. latency:

#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new("localhost:9092", "output")
    .with_linger_ms(20)                // batch for 20ms before sending (default: 5)
    .with_compression("lz4")           // compress batches (default: "none")
    .with_acks("1");                   // leader-only ack (default: "all")
}
MethodDefaultEffect
with_acks(acks)"all""all" = full ISR ack, "1" = leader only, "0" = fire-and-forget.
with_linger_ms(ms)5How long rdkafka waits to batch messages before sending. Higher values increase batching and throughput at the cost of latency.
with_compression(codec)"none"Compression codec: "none", "gzip", "snappy", "lz4", "zstd". LZ4 and Zstd offer the best throughput-to-ratio tradeoff.
with_property(key, value)Escape hatch for any rdkafka producer setting not covered above.

Tip: For high-throughput sinks, with_linger_ms(20) + with_compression("lz4") is a good starting point. This lets rdkafka batch more messages per broker request and compress them in a single pass.

Exactly-Once Semantics

The Kafka sink supports exactly-once delivery when configured with new_transactional():

#![allow(unused)]
fn main() {
let sink_config = KafkaSinkConfig::new_transactional("localhost:9092", "output", "my-txn-id");
}

How it works:

  1. Records are buffered during processing
  2. On checkpoint barrier, the sink commits the Kafka transaction
  3. On failure recovery, uncommitted transactions are aborted
  4. Processing resumes from the last checkpoint, replaying records

The consumer group offset is committed as part of the transaction, ensuring source and sink are in sync.

Note: Transactional mode adds a commit round-trip per checkpoint. For pipelines where at-least-once is acceptable, the default idempotent mode (KafkaSinkConfig::new()) avoids this overhead entirely.

Trace Context Propagation

The Kafka source and sink automatically propagate W3C Trace Context (traceparent header) for distributed tracing:

  • Source: If a consumed message has a traceparent header, the record inherits that trace context. The runtime’s sampler respects upstream sampling decisions (parent-based sampling).
  • Sink: If a record being written has trace context, a traceparent header is injected into the produced Kafka message.

This works automatically when per-record tracing is enabled via with_tracing(). No additional configuration is needed for Kafka trace propagation.

Consumer Lag Monitoring

The volley.kafka.consumer_lag metric tracks consumer group lag. Use it for KEDA autoscaling:

sum(volley_kafka_consumer_lag) by (job_name)

See Capacity Planning for KEDA trigger configuration.