Delta Lake
The volley-connector-delta crate provides a Delta Lake sink with exactly-once semantics via epoch-tagged commits.
Key Types
| Type | Description |
|---|---|
DeltaSink | Sink that writes Parquet data to a Delta table |
DeltaSinkConfig | Configuration (table URI, partitioning, buffer size, file size) |
PartitionSpec | Specification for Hive-style partitioned writes |
Usage
#![allow(unused)]
fn main() {
use volley_connector_delta::{DeltaSink, DeltaSinkConfig};
let config = DeltaSinkConfig::new("s3://my-bucket/output-table");
let sink = DeltaSink::new(config).await?;
env.from_source(source)
.filter_expr(col("status").eq(lit("active")))
.to_sink(sink)
.execute("delta-writer")
.await?;
}
Configuration Reference
#![allow(unused)]
fn main() {
let config = DeltaSinkConfig::new("s3://my-bucket/output-table")
.with_pipeline_id("my-pipeline") // idempotent commit tag (default: "volley-default")
.with_max_records(50_000) // records buffered before flush (default: 10,000)
.with_target_file_size(256 * 1024 * 1024) // target Parquet file size (default: 128 MB)
.with_partition_spec(partition_spec); // Hive-style partitioning (default: none)
}
| Method | Default | Description |
|---|---|---|
with_pipeline_id() | "volley-default" | Identifies the pipeline in commit metadata for idempotent writes. Use a unique ID per pipeline writing to the same table |
with_max_records() | 10,000 | Number of records buffered in memory before writing a Parquet file. Higher values produce fewer, larger files |
with_target_file_size() | 128 MB | Target size for individual Parquet files. The sink flushes when buffered data approaches this size |
with_partition_spec() | None | Enables Hive-style partitioned writes — see Partitioned Writes |
Partitioned Writes
DeltaSink supports Hive-style partitioned writes for efficient query pruning:
#![allow(unused)]
fn main() {
use volley_connector_delta::PartitionSpec;
let config = DeltaSinkConfig::new("s3://my-bucket/events")
.with_partition_spec(PartitionSpec::columns(vec![
"date".to_string(),
"region".to_string(),
]));
}
This produces a directory layout like:
s3://my-bucket/events/
date=2026-03-31/region=us-east-1/part-0001.parquet
date=2026-03-31/region=eu-west-1/part-0002.parquet
Partition column guidelines:
- Use low-cardinality columns (date, region, status) to avoid excessive small files
- Partition columns must exist in the record schema
- Order matters: put the most selective column first for query engines
Exactly-Once Semantics
Delta commits include an epoch tag in the commit metadata. On recovery:
- The sink reads the Delta log to find the latest committed epoch for this
pipeline_id - If the current epoch was already committed, the write is skipped
- Replayed records after a failure never produce duplicate data
The pipeline_id scopes idempotency — multiple pipelines can write to the same table without conflicts as long as each uses a unique ID.
Storage Backends
Delta tables can be stored on any Delta-compatible storage:
| Storage | URI Format | Credentials |
|---|---|---|
| AWS S3 | s3://bucket/path | Default credential chain (env, ~/.aws/credentials, instance profile) |
| Azure Blob | az://container/path | DefaultAzureCredential chain |
| Local filesystem | /path/to/table | File permissions |
Performance Tuning
| Scenario | Recommendation |
|---|---|
| High throughput (>100K rec/s) | Increase with_max_records(50_000) to reduce commit frequency |
| Large records (>1 KB each) | Decrease with_max_records() or increase with_target_file_size() |
| Many partitions | Monitor file count; consider periodic compaction |
| Low latency requirements | Decrease with_max_records() for more frequent flushes |
Table Compaction
With many partitions and frequent checkpoints, Delta tables accumulate small files that degrade read performance. Run compaction periodically:
-- In a query engine (Spark, DataFusion, etc.)
OPTIMIZE 's3://my-bucket/events';
Automated compaction within the Volley sink is planned for a future release.
Troubleshooting
“Table not found” or empty table
- Delta tables are created automatically on first write if they don’t exist
- Verify the storage URI is correct and credentials have write access
Duplicate data after recovery
- Ensure each pipeline uses a unique
pipeline_id - If two pipelines share the same ID, epoch deduplication may skip valid writes
Small files accumulating
- Increase
with_max_records()to buffer more records per file - Run
OPTIMIZEperiodically from a query engine - Consider adjusting checkpoint interval to reduce commit frequency
Slow writes to S3
- S3 has higher latency per PUT than local disk; increase
with_target_file_size()to reduce PUT count - Ensure the application runs in the same AWS region as the S3 bucket