Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Delta Lake

The volley-connector-delta crate provides a Delta Lake sink with exactly-once semantics via epoch-tagged commits.

Key Types

TypeDescription
DeltaSinkSink that writes Parquet data to a Delta table
DeltaSinkConfigConfiguration (table URI, partitioning, buffer size, file size)
PartitionSpecSpecification for Hive-style partitioned writes

Usage

#![allow(unused)]
fn main() {
use volley_connector_delta::{DeltaSink, DeltaSinkConfig};

let config = DeltaSinkConfig::new("s3://my-bucket/output-table");
let sink = DeltaSink::new(config).await?;

env.from_source(source)
    .filter_expr(col("status").eq(lit("active")))
    .to_sink(sink)
    .execute("delta-writer")
    .await?;
}

Configuration Reference

#![allow(unused)]
fn main() {
let config = DeltaSinkConfig::new("s3://my-bucket/output-table")
    .with_pipeline_id("my-pipeline")        // idempotent commit tag (default: "volley-default")
    .with_max_records(50_000)               // records buffered before flush (default: 10,000)
    .with_target_file_size(256 * 1024 * 1024) // target Parquet file size (default: 128 MB)
    .with_partition_spec(partition_spec);    // Hive-style partitioning (default: none)
}
MethodDefaultDescription
with_pipeline_id()"volley-default"Identifies the pipeline in commit metadata for idempotent writes. Use a unique ID per pipeline writing to the same table
with_max_records()10,000Number of records buffered in memory before writing a Parquet file. Higher values produce fewer, larger files
with_target_file_size()128 MBTarget size for individual Parquet files. The sink flushes when buffered data approaches this size
with_partition_spec()NoneEnables Hive-style partitioned writes — see Partitioned Writes

Partitioned Writes

DeltaSink supports Hive-style partitioned writes for efficient query pruning:

#![allow(unused)]
fn main() {
use volley_connector_delta::PartitionSpec;

let config = DeltaSinkConfig::new("s3://my-bucket/events")
    .with_partition_spec(PartitionSpec::columns(vec![
        "date".to_string(),
        "region".to_string(),
    ]));
}

This produces a directory layout like:

s3://my-bucket/events/
  date=2026-03-31/region=us-east-1/part-0001.parquet
  date=2026-03-31/region=eu-west-1/part-0002.parquet

Partition column guidelines:

  • Use low-cardinality columns (date, region, status) to avoid excessive small files
  • Partition columns must exist in the record schema
  • Order matters: put the most selective column first for query engines

Exactly-Once Semantics

Delta commits include an epoch tag in the commit metadata. On recovery:

  1. The sink reads the Delta log to find the latest committed epoch for this pipeline_id
  2. If the current epoch was already committed, the write is skipped
  3. Replayed records after a failure never produce duplicate data

The pipeline_id scopes idempotency — multiple pipelines can write to the same table without conflicts as long as each uses a unique ID.

Storage Backends

Delta tables can be stored on any Delta-compatible storage:

StorageURI FormatCredentials
AWS S3s3://bucket/pathDefault credential chain (env, ~/.aws/credentials, instance profile)
Azure Blobaz://container/pathDefaultAzureCredential chain
Local filesystem/path/to/tableFile permissions

Performance Tuning

ScenarioRecommendation
High throughput (>100K rec/s)Increase with_max_records(50_000) to reduce commit frequency
Large records (>1 KB each)Decrease with_max_records() or increase with_target_file_size()
Many partitionsMonitor file count; consider periodic compaction
Low latency requirementsDecrease with_max_records() for more frequent flushes

Table Compaction

With many partitions and frequent checkpoints, Delta tables accumulate small files that degrade read performance. Run compaction periodically:

-- In a query engine (Spark, DataFusion, etc.)
OPTIMIZE 's3://my-bucket/events';

Automated compaction within the Volley sink is planned for a future release.

Troubleshooting

“Table not found” or empty table

  • Delta tables are created automatically on first write if they don’t exist
  • Verify the storage URI is correct and credentials have write access

Duplicate data after recovery

  • Ensure each pipeline uses a unique pipeline_id
  • If two pipelines share the same ID, epoch deduplication may skip valid writes

Small files accumulating

  • Increase with_max_records() to buffer more records per file
  • Run OPTIMIZE periodically from a query engine
  • Consider adjusting checkpoint interval to reduce commit frequency

Slow writes to S3

  • S3 has higher latency per PUT than local disk; increase with_target_file_size() to reduce PUT count
  • Ensure the application runs in the same AWS region as the S3 bucket