Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

ADBC (Arrow Database Connectivity)

Volley provides ADBC-based database connectivity via two crates:

  • volley-connector-adbc – A sink that writes Arrow RecordBatches to any database with an ADBC driver (PostgreSQL, BigQuery, Snowflake, Clickhouse, Databricks, MySQL, Redshift, Exasol, and more).
  • volley-enrichment-adbc – A real-time enrichment operator that queries external databases to enrich streaming records with lookup data, backed by an LRU+TTL cache.

ADBC uses a C-API driver model (dlopen/dlsym), so adding support for a new database only requires installing the corresponding ADBC driver shared library – no connector-specific code needed.

Setup

Add crate dependencies

[dependencies]
# Sink (write to a database)
volley-connector-adbc = { git = "https://github.com/volley-streams/volley" }

# Enrichment operator (query a database to enrich records)
volley-enrichment-adbc = { git = "https://github.com/volley-streams/volley" }

Install the ADBC driver shared library

ADBC drivers are platform-specific shared libraries. Install the driver for your target database:

PostgreSQL:

# macOS (Homebrew)
brew install apache-arrow-adbc-driver-postgresql

# Ubuntu/Debian
apt-get install libadbc-driver-postgresql

# Or download from https://github.com/apache/arrow-adbc/releases

BigQuery:

# Download the adbc_driver_bigquery shared library from
# https://github.com/apache/arrow-adbc/releases

The driver name (e.g., "adbc_driver_postgresql") is resolved via platform library search paths (LD_LIBRARY_PATH on Linux, DYLD_LIBRARY_PATH on macOS). Alternatively, pass a full file path to the shared library.

ADBC Sink

The sink writes Arrow RecordBatches to a database table. It supports insert (bulk ingest) and upsert (ON CONFLICT) modes with write-behind buffering.

Basic insert

#![allow(unused)]
fn main() {
use volley_connector_adbc::{AdbcConfig, AdbcSinkConfig, AdbcStreamExt};

let adbc = AdbcConfig::new("adbc_driver_postgresql", "postgresql://localhost/mydb")
    .with_pool_size(4);

let sink_config = AdbcSinkConfig::new(adbc, "events")
    .with_batch_size(5000)
    .with_create_table(true);

env.from_kafka(source_config).await?
    .filter_expr(col("status").eq(lit("active")))
    .to_adbc(sink_config).await?
    .execute("kafka-to-postgres").await?;
}

Upsert mode

Set with_upsert_key() to enable upsert semantics. The sink auto-generates INSERT ... ON CONFLICT (key) DO UPDATE SET SQL:

#![allow(unused)]
fn main() {
let sink_config = AdbcSinkConfig::new(adbc, "events")
    .with_upsert_key("event_id")
    .with_batch_size(5000);
}

For databases with non-standard upsert syntax, provide custom SQL:

#![allow(unused)]
fn main() {
let sink_config = AdbcSinkConfig::new(adbc, "events")
    .with_upsert_query(
        "INSERT INTO events (id, name, value) VALUES ($1, $2, $3) \
         ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, value = EXCLUDED.value"
    );
}

Sink configuration reference

MethodDefaultDescription
AdbcSinkConfig::new(adbc, table)Required: ADBC config and target table name
.with_upsert_key(column)NoneColumn for upsert (ON CONFLICT) semantics. None = insert-only
.with_upsert_query(sql)NoneCustom SQL for upsert. Overrides auto-generated SQL
.with_batch_size(n)1000Number of buffered rows before auto-flush
.with_create_table(bool)falseAuto-create the table from the first batch’s Arrow schema

Delivery guarantees

  • At-least-once (default)on_checkpoint() flushes buffered batches. If the pipeline crashes after flushing but before the checkpoint completes, some records may be written twice on recovery.
  • Effectively exactly-once via upsert – When upsert_key is set, duplicate writes are idempotent. Choose a natural key that makes re-delivery safe.

ADBC Enrichment Operator

The enrichment operator queries an external database to enrich streaming records. It extracts lookup keys from each batch, issues a single micro-batch query, and appends enrichment columns to the output. An LRU+TTL cache avoids redundant queries.

Key-column lookup

The simplest mode: specify a stream column and a remote table. The operator generates SELECT [columns] FROM table WHERE target_key IN (k1, k2, ...).

#![allow(unused)]
fn main() {
use volley_enrichment_adbc::{AdbcEnrichConfig, AdbcEnrichStreamExt, EnrichMissStrategy};
use volley_connector_adbc::AdbcConfig;
use std::time::Duration;

let pg = AdbcConfig::new("adbc_driver_postgresql", "postgresql://localhost/mydb");

env.from_kafka(source_config).await?
    .enrich_with_adbc(
        AdbcEnrichConfig::new(pg, "customers", "customer_id")
            .with_select_columns(vec!["tier", "region", "lifetime_value"])
            .with_prefix("customer_")
            .with_cache_ttl(Duration::from_secs(300))
            .with_on_miss(EnrichMissStrategy::NullFill)
    )
    .filter_expr(col("customer_tier").eq(lit("premium")))
    .to_kafka(sink_config).await?
    .execute("enrich-orders").await?;
}

Parameterized query

For complex lookups (joins, filters, computed columns), provide a SQL template with IN $1:

#![allow(unused)]
fn main() {
let bq = AdbcConfig::new("adbc_driver_bigquery", "bigquery://project-id");

env.from_kafka(source_config).await?
    .enrich_with_adbc(
        AdbcEnrichConfig::new_with_query(
            bq,
            "SELECT category, risk_score FROM products WHERE sku IN $1 AND active = true",
            "product_sku"
        )
    )
    .to_adbc(AdbcSinkConfig::new(pg, "enriched_events")).await?
    .execute("enrich-and-store").await?;
}

Miss strategies

When a lookup key has no match in the database:

StrategyBehavior
NullFill (default)Add enrichment columns filled with nulls
DropRecordDrop the record entirely
PassThroughKeep the original record without enrichment columns

Enrichment configuration reference

MethodDefaultDescription
AdbcEnrichConfig::new(adbc, table, key)Key-column lookup mode
AdbcEnrichConfig::new_with_query(adbc, sql, key)Parameterized query mode
.with_target_key(column)Same as lookup_keyRemote column to match against (if different from the stream column)
.with_select_columns(vec)All columnsWhich columns to fetch from the remote table
.with_prefix(prefix)NonePrefix for enriched columns (e.g., "customer_")
.with_cache_ttl(duration)5 minutesTime-to-live for cached entries
.with_cache_max_entries(n)10,000Maximum entries in the LRU cache
.with_on_miss(strategy)NullFillStrategy for unmatched records

Cache behavior

The enrichment operator uses an LRU cache with per-entry TTL:

  • Cache hits return the stored enrichment row without querying the database.
  • Cache misses (key not found) are cached as None to avoid repeated lookups for non-existent keys.
  • TTL expiry evicts stale entries so the operator picks up changes in the remote database.
  • On recovery, the cache starts cold and repopulates organically. Enrichment data is external and authoritative, so there is no state to recover.

Connection Configuration Reference

AdbcConfig is shared between the sink and enrichment operator:

MethodDefaultDescription
AdbcConfig::new(driver, uri)Required: driver name/path and connection URI
.with_option(key, value){}Add a driver-specific key-value option
.with_pool_size(n)4Connection pool size (round-robin)

Driver name resolution: If the driver name contains / or \, it is treated as a file path. Otherwise, it is resolved via platform library search paths (e.g., LD_LIBRARY_PATH, DYLD_LIBRARY_PATH).

Common driver names:

DatabaseDriver Name
PostgreSQLadbc_driver_postgresql
BigQueryadbc_driver_bigquery
Snowflakeadbc_driver_snowflake
SQLiteadbc_driver_sqlite
Flight SQLadbc_driver_flightsql

Observability

MetricTypeDescription
adbc_sink_rows_written_totalcounterRows written, by table
adbc_sink_flush_duration_secondshistogramFlush latency
adbc_enrich_cache_hit_totalcounterCache hits
adbc_enrich_cache_miss_totalcounterCache misses
adbc_enrich_cache_sizegaugeCurrent cache entry count
adbc_enrich_query_duration_secondshistogramMicro-batch query latency
adbc_enrich_query_errors_totalcounterFailed enrichment queries

Integration Testing

The ADBC integration tests require Docker (for PostgreSQL via testcontainers) and the adbc_driver_postgresql shared library.

Set the ADBC_POSTGRESQL_DRIVER environment variable if the driver is not on the default library search path:

# Point to the driver if not on LD_LIBRARY_PATH
export ADBC_POSTGRESQL_DRIVER=/usr/local/lib/libadbc_driver_postgresql.so

# Run the sink integration tests
cargo test -p volley-connector-adbc --test sink_integration -- --ignored

# Run the enrichment integration tests
cargo test -p volley-enrichment-adbc --test enrich_integration -- --ignored

Tests use volley-test-harness::containers::PostgresContainer to spin up a PostgreSQL instance automatically.

Troubleshooting

“Failed to load ADBC driver”

  • Verify the driver shared library is installed and on the library search path.
  • On macOS: check DYLD_LIBRARY_PATH. On Linux: check LD_LIBRARY_PATH.
  • Try passing the full file path instead of just the driver name.

“Failed to create ADBC connection”

  • Verify the connection URI is correct and the database is reachable.
  • Check that driver-specific options (credentials, project ID, etc.) are set via with_option().

Slow enrichment queries

  • Increase cache_ttl and cache_max_entries to reduce query frequency.
  • Add an index on the lookup column in the remote database.
  • Use with_select_columns() to fetch only the columns you need.

Duplicate rows after recovery

  • Use with_upsert_key() to make writes idempotent on a natural key.
  • Without upsert, the sink provides at-least-once delivery.