Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Delta Lake

Write your stream to a Delta Lake table — queryable by Spark, DataFusion, Databricks, Trino, and anything else that speaks Delta. Pick this sink when downstream consumers want ACID tables with time travel and schema evolution rather than raw files. Volley commits Parquet files on every checkpoint and tags each commit with an epoch so replays after a crash never duplicate rows.

Key Types

TypeDescription
DeltaSinkSink that writes Parquet data to a Delta table
DeltaSinkConfigConfiguration (table URI, partitioning, buffer size, file size)
PartitionSpecSpecification for Hive-style partitioned writes

Usage

#![allow(unused)]
fn main() {
use volley_connector_delta::{DeltaSink, DeltaSinkConfig};

let config = DeltaSinkConfig::new("s3://my-bucket/output-table");
let sink = DeltaSink::new(config).await?;

env.from_source(source)
    .filter_expr(col("status").eq(lit("active")))
    .to_sink(sink)
    .execute("delta-writer")
    .await?;
}

Configuration Reference

#![allow(unused)]
fn main() {
let config = DeltaSinkConfig::new("s3://my-bucket/output-table")
    .with_pipeline_id("my-pipeline")        // idempotent commit tag (default: "volley-default")
    .with_max_records(50_000)               // records buffered before flush (default: 10,000)
    .with_target_file_size(256 * 1024 * 1024) // target Parquet file size (default: 128 MB)
    .with_partition_spec(partition_spec);    // Hive-style partitioning (default: none)
}
MethodDefaultDescription
with_pipeline_id()"volley-default"Identifies the pipeline in commit metadata for idempotent writes. Use a unique ID per pipeline writing to the same table
with_max_records()10,000Number of records buffered in memory before writing a Parquet file. Higher values produce fewer, larger files
with_target_file_size()128 MBTarget size for individual Parquet files. The sink flushes when buffered data approaches this size
with_partition_spec()NoneEnables Hive-style partitioned writes — see Partitioned Writes

Partitioned Writes

DeltaSink supports Hive-style partitioned writes for efficient query pruning:

#![allow(unused)]
fn main() {
use volley_connector_delta::PartitionSpec;

let config = DeltaSinkConfig::new("s3://my-bucket/events")
    .with_partition_spec(PartitionSpec::columns(vec![
        "date".to_string(),
        "region".to_string(),
    ]));
}

This produces a directory layout like:

s3://my-bucket/events/
  date=2026-03-31/region=us-east-1/part-0001.parquet
  date=2026-03-31/region=eu-west-1/part-0002.parquet

Partition column guidelines:

  • Use low-cardinality columns (date, region, status) to avoid excessive small files
  • Partition columns must exist in the record schema
  • Order matters: put the most selective column first for query engines

Exactly-Once Semantics

Delta commits include an epoch tag in the commit metadata. On recovery:

  1. The sink reads the Delta log to find the latest committed epoch for this pipeline_id
  2. If the current epoch was already committed, the write is skipped
  3. Replayed records after a failure never produce duplicate data

The pipeline_id scopes idempotency — multiple pipelines can write to the same table without conflicts as long as each uses a unique ID.

Storage Backends

Delta tables can be stored on any Delta-compatible storage:

StorageURI FormatCredentials
AWS S3s3://bucket/pathDefault credential chain (env, ~/.aws/credentials, instance profile)
Azure Blobaz://container/pathDefaultAzureCredential chain
Local filesystem/path/to/tableFile permissions

Performance Tuning

ScenarioRecommendation
High throughput (>100K rec/s)Increase with_max_records(50_000) to reduce commit frequency
Large records (>1 KB each)Decrease with_max_records() or increase with_target_file_size()
Many partitionsMonitor file count; consider periodic compaction
Low latency requirementsDecrease with_max_records() for more frequent flushes

Table Compaction

With many partitions and frequent checkpoints, Delta tables accumulate small files that degrade read performance. Run compaction periodically:

-- In a query engine (Spark, DataFusion, etc.)
OPTIMIZE 's3://my-bucket/events';

Automated compaction within the Volley sink is planned for a future release.

Troubleshooting

“Table not found” or empty table

  • Delta tables are created automatically on first write if they don’t exist
  • Verify the storage URI is correct and credentials have write access

Duplicate data after recovery

  • Ensure each pipeline uses a unique pipeline_id
  • If two pipelines share the same ID, epoch deduplication may skip valid writes

Small files accumulating

  • Increase with_max_records() to buffer more records per file
  • Run OPTIMIZE periodically from a query engine
  • Consider adjusting checkpoint interval to reduce commit frequency

Slow writes to S3

  • S3 has higher latency per PUT than local disk; increase with_target_file_size() to reduce PUT count
  • Ensure the application runs in the same AWS region as the S3 bucket