ADBC (Arrow Database Connectivity)
Volley provides ADBC-based database connectivity via two crates:
volley-connector-adbc– A sink that writes Arrow RecordBatches to any database with an ADBC driver (PostgreSQL, BigQuery, Snowflake, Clickhouse, Databricks, MySQL, Redshift, Exasol, and more).volley-enrichment-adbc– A real-time enrichment operator that queries external databases to enrich streaming records with lookup data, backed by an LRU+TTL cache.
ADBC uses a C-API driver model (dlopen/dlsym), so adding support for a new database only requires installing the corresponding ADBC driver shared library – no connector-specific code needed.
Setup
Add crate dependencies
[dependencies]
# Sink (write to a database)
volley-connector-adbc = { git = "https://github.com/volley-streams/volley" }
# Enrichment operator (query a database to enrich records)
volley-enrichment-adbc = { git = "https://github.com/volley-streams/volley" }
Install the ADBC driver shared library
ADBC drivers are platform-specific shared libraries. Install the driver for your target database:
PostgreSQL:
# macOS (Homebrew)
brew install apache-arrow-adbc-driver-postgresql
# Ubuntu/Debian
apt-get install libadbc-driver-postgresql
# Or download from https://github.com/apache/arrow-adbc/releases
BigQuery:
# Download the adbc_driver_bigquery shared library from
# https://github.com/apache/arrow-adbc/releases
The driver name (e.g., "adbc_driver_postgresql") is resolved via platform library search paths (LD_LIBRARY_PATH on Linux, DYLD_LIBRARY_PATH on macOS). Alternatively, pass a full file path to the shared library.
ADBC Sink
The sink writes Arrow RecordBatches to a database table. It supports insert (bulk ingest) and upsert (ON CONFLICT) modes with write-behind buffering.
Basic insert
#![allow(unused)]
fn main() {
use volley_connector_adbc::{AdbcConfig, AdbcSinkConfig, AdbcStreamExt};
let adbc = AdbcConfig::new("adbc_driver_postgresql", "postgresql://localhost/mydb")
.with_pool_size(4);
let sink_config = AdbcSinkConfig::new(adbc, "events")
.with_batch_size(5000)
.with_create_table(true);
env.from_kafka(source_config).await?
.filter_expr(col("status").eq(lit("active")))
.to_adbc(sink_config).await?
.execute("kafka-to-postgres").await?;
}
Upsert mode
Set with_upsert_key() to enable upsert semantics. The sink auto-generates INSERT ... ON CONFLICT (key) DO UPDATE SET SQL:
#![allow(unused)]
fn main() {
let sink_config = AdbcSinkConfig::new(adbc, "events")
.with_upsert_key("event_id")
.with_batch_size(5000);
}
For databases with non-standard upsert syntax, provide custom SQL:
#![allow(unused)]
fn main() {
let sink_config = AdbcSinkConfig::new(adbc, "events")
.with_upsert_query(
"INSERT INTO events (id, name, value) VALUES ($1, $2, $3) \
ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, value = EXCLUDED.value"
);
}
Sink configuration reference
| Method | Default | Description |
|---|---|---|
AdbcSinkConfig::new(adbc, table) | – | Required: ADBC config and target table name |
.with_upsert_key(column) | None | Column for upsert (ON CONFLICT) semantics. None = insert-only |
.with_upsert_query(sql) | None | Custom SQL for upsert. Overrides auto-generated SQL |
.with_batch_size(n) | 1000 | Number of buffered rows before auto-flush |
.with_create_table(bool) | false | Auto-create the table from the first batch’s Arrow schema |
Delivery guarantees
- At-least-once (default) –
on_checkpoint()flushes buffered batches. If the pipeline crashes after flushing but before the checkpoint completes, some records may be written twice on recovery. - Effectively exactly-once via upsert – When
upsert_keyis set, duplicate writes are idempotent. Choose a natural key that makes re-delivery safe.
ADBC Enrichment Operator
The enrichment operator queries an external database to enrich streaming records. It extracts lookup keys from each batch, issues a single micro-batch query, and appends enrichment columns to the output. An LRU+TTL cache avoids redundant queries.
Key-column lookup
The simplest mode: specify a stream column and a remote table. The operator generates SELECT [columns] FROM table WHERE target_key IN (k1, k2, ...).
#![allow(unused)]
fn main() {
use volley_enrichment_adbc::{AdbcEnrichConfig, AdbcEnrichStreamExt, EnrichMissStrategy};
use volley_connector_adbc::AdbcConfig;
use std::time::Duration;
let pg = AdbcConfig::new("adbc_driver_postgresql", "postgresql://localhost/mydb");
env.from_kafka(source_config).await?
.enrich_with_adbc(
AdbcEnrichConfig::new(pg, "customers", "customer_id")
.with_select_columns(vec!["tier", "region", "lifetime_value"])
.with_prefix("customer_")
.with_cache_ttl(Duration::from_secs(300))
.with_on_miss(EnrichMissStrategy::NullFill)
)
.filter_expr(col("customer_tier").eq(lit("premium")))
.to_kafka(sink_config).await?
.execute("enrich-orders").await?;
}
Parameterized query
For complex lookups (joins, filters, computed columns), provide a SQL template with IN $1:
#![allow(unused)]
fn main() {
let bq = AdbcConfig::new("adbc_driver_bigquery", "bigquery://project-id");
env.from_kafka(source_config).await?
.enrich_with_adbc(
AdbcEnrichConfig::new_with_query(
bq,
"SELECT category, risk_score FROM products WHERE sku IN $1 AND active = true",
"product_sku"
)
)
.to_adbc(AdbcSinkConfig::new(pg, "enriched_events")).await?
.execute("enrich-and-store").await?;
}
Miss strategies
When a lookup key has no match in the database:
| Strategy | Behavior |
|---|---|
NullFill (default) | Add enrichment columns filled with nulls |
DropRecord | Drop the record entirely |
PassThrough | Keep the original record without enrichment columns |
Enrichment configuration reference
| Method | Default | Description |
|---|---|---|
AdbcEnrichConfig::new(adbc, table, key) | – | Key-column lookup mode |
AdbcEnrichConfig::new_with_query(adbc, sql, key) | – | Parameterized query mode |
.with_target_key(column) | Same as lookup_key | Remote column to match against (if different from the stream column) |
.with_select_columns(vec) | All columns | Which columns to fetch from the remote table |
.with_prefix(prefix) | None | Prefix for enriched columns (e.g., "customer_") |
.with_cache_ttl(duration) | 5 minutes | Time-to-live for cached entries |
.with_cache_max_entries(n) | 10,000 | Maximum entries in the LRU cache |
.with_on_miss(strategy) | NullFill | Strategy for unmatched records |
Cache behavior
The enrichment operator uses an LRU cache with per-entry TTL:
- Cache hits return the stored enrichment row without querying the database.
- Cache misses (key not found) are cached as
Noneto avoid repeated lookups for non-existent keys. - TTL expiry evicts stale entries so the operator picks up changes in the remote database.
- On recovery, the cache starts cold and repopulates organically. Enrichment data is external and authoritative, so there is no state to recover.
Connection Configuration Reference
AdbcConfig is shared between the sink and enrichment operator:
| Method | Default | Description |
|---|---|---|
AdbcConfig::new(driver, uri) | – | Required: driver name/path and connection URI |
.with_option(key, value) | {} | Add a driver-specific key-value option |
.with_pool_size(n) | 4 | Connection pool size (round-robin) |
Driver name resolution: If the driver name contains / or \, it is treated as a file path. Otherwise, it is resolved via platform library search paths (e.g., LD_LIBRARY_PATH, DYLD_LIBRARY_PATH).
Common driver names:
| Database | Driver Name |
|---|---|
| PostgreSQL | adbc_driver_postgresql |
| BigQuery | adbc_driver_bigquery |
| Snowflake | adbc_driver_snowflake |
| SQLite | adbc_driver_sqlite |
| Flight SQL | adbc_driver_flightsql |
Observability
| Metric | Type | Description |
|---|---|---|
adbc_sink_rows_written_total | counter | Rows written, by table |
adbc_sink_flush_duration_seconds | histogram | Flush latency |
adbc_enrich_cache_hit_total | counter | Cache hits |
adbc_enrich_cache_miss_total | counter | Cache misses |
adbc_enrich_cache_size | gauge | Current cache entry count |
adbc_enrich_query_duration_seconds | histogram | Micro-batch query latency |
adbc_enrich_query_errors_total | counter | Failed enrichment queries |
Integration Testing
The ADBC integration tests require Docker (for PostgreSQL via testcontainers) and the adbc_driver_postgresql shared library.
Set the ADBC_POSTGRESQL_DRIVER environment variable if the driver is not on the default library search path:
# Point to the driver if not on LD_LIBRARY_PATH
export ADBC_POSTGRESQL_DRIVER=/usr/local/lib/libadbc_driver_postgresql.so
# Run the sink integration tests
cargo test -p volley-connector-adbc --test sink_integration -- --ignored
# Run the enrichment integration tests
cargo test -p volley-enrichment-adbc --test enrich_integration -- --ignored
Tests use volley-test-harness::containers::PostgresContainer to spin up a PostgreSQL instance automatically.
Troubleshooting
“Failed to load ADBC driver”
- Verify the driver shared library is installed and on the library search path.
- On macOS: check
DYLD_LIBRARY_PATH. On Linux: checkLD_LIBRARY_PATH. - Try passing the full file path instead of just the driver name.
“Failed to create ADBC connection”
- Verify the connection URI is correct and the database is reachable.
- Check that driver-specific options (credentials, project ID, etc.) are set via
with_option().
Slow enrichment queries
- Increase
cache_ttlandcache_max_entriesto reduce query frequency. - Add an index on the lookup column in the remote database.
- Use
with_select_columns()to fetch only the columns you need.
Duplicate rows after recovery
- Use
with_upsert_key()to make writes idempotent on a natural key. - Without upsert, the sink provides at-least-once delivery.