Delta Lake
Write your stream to a Delta Lake table — queryable by Spark, DataFusion, Databricks, Trino, and anything else that speaks Delta. Pick this sink when downstream consumers want ACID tables with time travel and schema evolution rather than raw files. Volley commits Parquet files on every checkpoint and tags each commit with an epoch so replays after a crash never duplicate rows.
Key Types
| Type | Description |
|---|---|
DeltaSink | Sink that writes Parquet data to a Delta table |
DeltaSinkConfig | Configuration (table URI, partitioning, buffer size, file size) |
PartitionSpec | Specification for Hive-style partitioned writes |
Usage
#![allow(unused)]
fn main() {
use volley_connector_delta::{DeltaSink, DeltaSinkConfig};
let config = DeltaSinkConfig::new("s3://my-bucket/output-table");
let sink = DeltaSink::new(config).await?;
env.from_source(source)
.filter_expr(col("status").eq(lit("active")))
.to_sink(sink)
.execute("delta-writer")
.await?;
}
Configuration Reference
#![allow(unused)]
fn main() {
let config = DeltaSinkConfig::new("s3://my-bucket/output-table")
.with_pipeline_id("my-pipeline") // idempotent commit tag (default: "volley-default")
.with_max_records(50_000) // records buffered before flush (default: 10,000)
.with_target_file_size(256 * 1024 * 1024) // target Parquet file size (default: 128 MB)
.with_partition_spec(partition_spec); // Hive-style partitioning (default: none)
}
| Method | Default | Description |
|---|---|---|
with_pipeline_id() | "volley-default" | Identifies the pipeline in commit metadata for idempotent writes. Use a unique ID per pipeline writing to the same table |
with_max_records() | 10,000 | Number of records buffered in memory before writing a Parquet file. Higher values produce fewer, larger files |
with_target_file_size() | 128 MB | Target size for individual Parquet files. The sink flushes when buffered data approaches this size |
with_partition_spec() | None | Enables Hive-style partitioned writes — see Partitioned Writes |
Partitioned Writes
DeltaSink supports Hive-style partitioned writes for efficient query pruning:
#![allow(unused)]
fn main() {
use volley_connector_delta::PartitionSpec;
let config = DeltaSinkConfig::new("s3://my-bucket/events")
.with_partition_spec(PartitionSpec::columns(vec![
"date".to_string(),
"region".to_string(),
]));
}
This produces a directory layout like:
s3://my-bucket/events/
date=2026-03-31/region=us-east-1/part-0001.parquet
date=2026-03-31/region=eu-west-1/part-0002.parquet
Partition column guidelines:
- Use low-cardinality columns (date, region, status) to avoid excessive small files
- Partition columns must exist in the record schema
- Order matters: put the most selective column first for query engines
Exactly-Once Semantics
Delta commits include an epoch tag in the commit metadata. On recovery:
- The sink reads the Delta log to find the latest committed epoch for this
pipeline_id - If the current epoch was already committed, the write is skipped
- Replayed records after a failure never produce duplicate data
The pipeline_id scopes idempotency — multiple pipelines can write to the same table without conflicts as long as each uses a unique ID.
Storage Backends
Delta tables can be stored on any Delta-compatible storage:
| Storage | URI Format | Credentials |
|---|---|---|
| AWS S3 | s3://bucket/path | Default credential chain (env, ~/.aws/credentials, instance profile) |
| Azure Blob | az://container/path | DefaultAzureCredential chain |
| Local filesystem | /path/to/table | File permissions |
Performance Tuning
| Scenario | Recommendation |
|---|---|
| High throughput (>100K rec/s) | Increase with_max_records(50_000) to reduce commit frequency |
| Large records (>1 KB each) | Decrease with_max_records() or increase with_target_file_size() |
| Many partitions | Monitor file count; consider periodic compaction |
| Low latency requirements | Decrease with_max_records() for more frequent flushes |
Table Compaction
With many partitions and frequent checkpoints, Delta tables accumulate small files that degrade read performance. Run compaction periodically:
-- In a query engine (Spark, DataFusion, etc.)
OPTIMIZE 's3://my-bucket/events';
Automated compaction within the Volley sink is planned for a future release.
Troubleshooting
“Table not found” or empty table
- Delta tables are created automatically on first write if they don’t exist
- Verify the storage URI is correct and credentials have write access
Duplicate data after recovery
- Ensure each pipeline uses a unique
pipeline_id - If two pipelines share the same ID, epoch deduplication may skip valid writes
Small files accumulating
- Increase
with_max_records()to buffer more records per file - Run
OPTIMIZEperiodically from a query engine - Consider adjusting checkpoint interval to reduce commit frequency
Slow writes to S3
- S3 has higher latency per PUT than local disk; increase
with_target_file_size()to reduce PUT count - Ensure the application runs in the same AWS region as the S3 bucket