Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Lance

The volley-connector-lance crate provides a sink that writes Arrow RecordBatches to a Lance columnar dataset with exactly-once semantics. Lance is a modern columnar format optimized for ML, vector search, and random-access workloads; it is the on-disk format behind LanceDB and is written in Rust from the ground up.

Key Types

TypeDescription
LanceSinkSink that writes Arrow records to a Lance dataset
LanceSinkConfigBuilder-style configuration (URI, pipeline id, file sizing, storage, recovery limit)
LanceStorageEnum configuring the object store backend (Local, S3, Gcs, Azure)
LanceStreamExtUmbrella extension trait adding DataStream::to_lance(config)

Quick Start

Attach the sink to a pipeline via the LanceStreamExt trait re-exported from volley_connectors:

#![allow(unused)]
fn main() {
use volley_connectors::blob_store::lance::{LanceStreamExt, LanceSinkConfig, LanceStorage};
use volley_core::prelude::*;

let config = LanceSinkConfig::new("s3://my-bucket/datasets/events")
    .with_pipeline_id("events-v1")
    .with_storage(LanceStorage::S3 {
        region: "us-east-1".into(),
        bucket: "my-bucket".into(),
        endpoint: None,
        access_key_id: None,
        secret_access_key: None,
    });

StreamExecutionEnvironment::new()
    .from_kafka(kafka_cfg).await?
    .filter_expr(col("status").eq(lit("active")))
    .to_lance(config).await?
    .execute("kafka-to-lance")
    .await?;
}

Enable the umbrella feature in your Cargo.toml:

volley-connectors = { version = "0.8", features = ["lance-sink"] }

Or depend on the standalone crate directly:

volley-connector-lance = "0.8"

Configuration Reference

#![allow(unused)]
fn main() {
let config = LanceSinkConfig::new("s3://bucket/path")
    .with_pipeline_id("my-pipeline")             // default: "volley-default"
    .with_storage(LanceStorage::Local)           // default: Local
    .with_max_rows_per_file(1_048_576)            // default: 1_048_576
    .with_max_rows_per_group(1024)                // default: 1024
    .with_max_bytes_per_file(90 * 1024 * 1024 * 1024) // default: 90 GB
    .with_max_records_buffer(100_000)             // default: 100_000
    .with_recovery_history_limit(256);            // default: 256
}
MethodDefaultDescription
with_pipeline_id()"volley-default"Stable identifier embedded in commit metadata. Used for idempotent recovery — use a unique value per pipeline writing to the same dataset
with_storage()LanceStorage::LocalObject store backend; see Storage Backends
with_max_rows_per_file()1,048,576Maximum rows per Lance data file. Larger values produce fewer, bigger files
with_max_rows_per_group()1,024Maximum rows per row group (columnar encoding unit)
with_max_bytes_per_file()90 GBSoft per-file size ceiling. Lance has a hard 100 GB limit on S3
with_max_records_buffer()100,000Soft in-memory row budget between checkpoints. Exceeding it logs a warning but does not force an untagged flush — all commits wait for the next checkpoint barrier so the epoch tag is always present
with_recovery_history_limit()256Maximum number of historical Lance transactions scanned on startup to recover last_committed_epoch. Increase if you retain a long uncompacted history

Exactly-Once Semantics

LanceSink uses a two-phase commit built on Lance’s native transaction API:

  1. Stage. Buffered batches are materialized through InsertBuilder::execute_uncommitted_stream, producing an uncommitted Transaction that represents the staged data files without making them visible.
  2. Tag. The sink sets the transaction’s transaction_properties field to {"volley.pipeline_id": id, "volley.epoch": N} before committing. These key-value pairs are persisted to the Lance transaction file on disk.
  3. Commit. CommitBuilder::execute(transaction) atomically publishes the new dataset version.

On startup, LanceSink::new calls Dataset::get_transactions to fetch up to recovery_history_limit recent transactions, scans each one for a matching volley.pipeline_id, and takes the maximum volley.epoch as last_committed_epoch. Any checkpoint barrier at or below that epoch is dropped as a duplicate on the next restart.

Note: Lance 4.0’s CommitBuilder::with_transaction_properties stores the map on the builder but does not forward it to the staged Transaction during execute. To work around this, the sink writes the properties directly to Transaction.transaction_properties — which is a public field on the struct — before calling execute. This is a tracked upstream issue; when Lance fixes it, the workaround can be replaced with the documented builder method.

Storage Backends

Lance delegates I/O to object_store under the hood, and LanceStorage maps cleanly onto its configuration keys.

VariantURICredentials
LanceStorage::Localfilesystem path or file://…none
LanceStorage::S3 { region, bucket, endpoint, access_key_id, secret_access_key }s3://bucket/pathinline fields, or AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY env vars, or IAM instance role
LanceStorage::Gcs { project_id, bucket, service_account_json }gs://bucket/pathinline JSON, or GOOGLE_APPLICATION_CREDENTIALS
LanceStorage::Azure { storage_account, container, access_key }az://container/pathinline key, or AZURE_STORAGE_ACCOUNT_KEY / DefaultAzureCredential

Setting endpoint = Some("http://…") on S3 also emits allow_http=true so MinIO / localstack work without extra plumbing.

Limitations

  • No Hive-style partitioning in v1. Lance uses fragments internally rather than filesystem-visible partitions, and the partition story is deferred.
  • No schema evolution within a single pipeline_id — the first batch’s schema is assumed to hold for every subsequent batch.
  • Single writer per (dataset_uri, pipeline_id) — concurrent writers conflict on commit and return an error rather than silently serializing.
  • Recovery horizon is bounded by recovery_history_limit. If a pipeline accumulates more uncompacted versions than the limit without a successful recovery, older commits may be invisible to the scan and epochs can be re-executed. Compact regularly or raise the limit.

Troubleshooting

Recovery does not pick up the last committed epoch — Ensure the new sink uses the same pipeline_id as the writer, and that recovery_history_limit covers the relevant version range. A warn! log is emitted if a transaction’s volley.epoch property is malformed.

Build fails with “Could not find protoc — Lance’s build scripts compile protobuf definitions at build time. Install the protoc binary (apt-get install protobuf-compiler on Debian/Ubuntu, brew install protobuf on macOS) before building.

Example

See volley-examples/examples/memory_to_lance.rs for a complete end-to-end example that runs a MemorySourceLanceSink pipeline, writes 5 trade records to a local Lance dataset via the full Volley runtime (including the final shutdown barrier), then reopens the dataset and prints all rows.

cargo run --example memory_to_lance -p volley-examples --release