Lance
The volley-connector-lance crate provides a sink that writes Arrow
RecordBatches to a Lance columnar dataset with
exactly-once semantics. Lance is a modern columnar format optimized for ML,
vector search, and random-access workloads; it is the on-disk format behind
LanceDB and is written in Rust from the ground up.
Key Types
| Type | Description |
|---|---|
LanceSink | Sink that writes Arrow records to a Lance dataset |
LanceSinkConfig | Builder-style configuration (URI, pipeline id, file sizing, storage, recovery limit) |
LanceStorage | Enum configuring the object store backend (Local, S3, Gcs, Azure) |
LanceStreamExt | Umbrella extension trait adding DataStream::to_lance(config) |
Quick Start
Attach the sink to a pipeline via the LanceStreamExt trait re-exported
from volley_connectors:
#![allow(unused)]
fn main() {
use volley_connectors::blob_store::lance::{LanceStreamExt, LanceSinkConfig, LanceStorage};
use volley_core::prelude::*;
let config = LanceSinkConfig::new("s3://my-bucket/datasets/events")
.with_pipeline_id("events-v1")
.with_storage(LanceStorage::S3 {
region: "us-east-1".into(),
bucket: "my-bucket".into(),
endpoint: None,
access_key_id: None,
secret_access_key: None,
});
StreamExecutionEnvironment::new()
.from_kafka(kafka_cfg).await?
.filter_expr(col("status").eq(lit("active")))
.to_lance(config).await?
.execute("kafka-to-lance")
.await?;
}
Enable the umbrella feature in your Cargo.toml:
volley-connectors = { version = "0.8", features = ["lance-sink"] }
Or depend on the standalone crate directly:
volley-connector-lance = "0.8"
Configuration Reference
#![allow(unused)]
fn main() {
let config = LanceSinkConfig::new("s3://bucket/path")
.with_pipeline_id("my-pipeline") // default: "volley-default"
.with_storage(LanceStorage::Local) // default: Local
.with_max_rows_per_file(1_048_576) // default: 1_048_576
.with_max_rows_per_group(1024) // default: 1024
.with_max_bytes_per_file(90 * 1024 * 1024 * 1024) // default: 90 GB
.with_max_records_buffer(100_000) // default: 100_000
.with_recovery_history_limit(256); // default: 256
}
| Method | Default | Description |
|---|---|---|
with_pipeline_id() | "volley-default" | Stable identifier embedded in commit metadata. Used for idempotent recovery — use a unique value per pipeline writing to the same dataset |
with_storage() | LanceStorage::Local | Object store backend; see Storage Backends |
with_max_rows_per_file() | 1,048,576 | Maximum rows per Lance data file. Larger values produce fewer, bigger files |
with_max_rows_per_group() | 1,024 | Maximum rows per row group (columnar encoding unit) |
with_max_bytes_per_file() | 90 GB | Soft per-file size ceiling. Lance has a hard 100 GB limit on S3 |
with_max_records_buffer() | 100,000 | Soft in-memory row budget between checkpoints. Exceeding it logs a warning but does not force an untagged flush — all commits wait for the next checkpoint barrier so the epoch tag is always present |
with_recovery_history_limit() | 256 | Maximum number of historical Lance transactions scanned on startup to recover last_committed_epoch. Increase if you retain a long uncompacted history |
Exactly-Once Semantics
LanceSink uses a two-phase commit built on Lance’s native transaction
API:
- Stage. Buffered batches are materialized through
InsertBuilder::execute_uncommitted_stream, producing an uncommittedTransactionthat represents the staged data files without making them visible. - Tag. The sink sets the transaction’s
transaction_propertiesfield to{"volley.pipeline_id": id, "volley.epoch": N}before committing. These key-value pairs are persisted to the Lance transaction file on disk. - Commit.
CommitBuilder::execute(transaction)atomically publishes the new dataset version.
On startup, LanceSink::new calls Dataset::get_transactions to fetch up
to recovery_history_limit recent transactions, scans each one for a
matching volley.pipeline_id, and takes the maximum volley.epoch as
last_committed_epoch. Any checkpoint barrier at or below that epoch is
dropped as a duplicate on the next restart.
Note: Lance 4.0’s
CommitBuilder::with_transaction_propertiesstores the map on the builder but does not forward it to the stagedTransactionduringexecute. To work around this, the sink writes the properties directly toTransaction.transaction_properties— which is a public field on the struct — before callingexecute. This is a tracked upstream issue; when Lance fixes it, the workaround can be replaced with the documented builder method.
Storage Backends
Lance delegates I/O to object_store under the hood, and LanceStorage
maps cleanly onto its configuration keys.
| Variant | URI | Credentials |
|---|---|---|
LanceStorage::Local | filesystem path or file://… | none |
LanceStorage::S3 { region, bucket, endpoint, access_key_id, secret_access_key } | s3://bucket/path | inline fields, or AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY env vars, or IAM instance role |
LanceStorage::Gcs { project_id, bucket, service_account_json } | gs://bucket/path | inline JSON, or GOOGLE_APPLICATION_CREDENTIALS |
LanceStorage::Azure { storage_account, container, access_key } | az://container/path | inline key, or AZURE_STORAGE_ACCOUNT_KEY / DefaultAzureCredential |
Setting endpoint = Some("http://…") on S3 also emits allow_http=true
so MinIO / localstack work without extra plumbing.
Limitations
- No Hive-style partitioning in v1. Lance uses fragments internally rather than filesystem-visible partitions, and the partition story is deferred.
- No schema evolution within a single
pipeline_id— the first batch’s schema is assumed to hold for every subsequent batch. - Single writer per (
dataset_uri,pipeline_id) — concurrent writers conflict on commit and return an error rather than silently serializing. - Recovery horizon is bounded by
recovery_history_limit. If a pipeline accumulates more uncompacted versions than the limit without a successful recovery, older commits may be invisible to the scan and epochs can be re-executed. Compact regularly or raise the limit.
Troubleshooting
Recovery does not pick up the last committed epoch
— Ensure the new sink uses the same pipeline_id as the writer, and
that recovery_history_limit covers the relevant version range. A
warn! log is emitted if a transaction’s volley.epoch property is
malformed.
Build fails with “Could not find protoc”
— Lance’s build scripts compile protobuf definitions at build time.
Install the protoc binary (apt-get install protobuf-compiler on
Debian/Ubuntu, brew install protobuf on macOS) before building.
Example
See volley-examples/examples/memory_to_lance.rs for a complete
end-to-end example that runs a MemorySource → LanceSink pipeline,
writes 5 trade records to a local Lance dataset via the full Volley
runtime (including the final shutdown barrier), then reopens the
dataset and prints all rows.
cargo run --example memory_to_lance -p volley-examples --release