Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

ScyllaDB / Cassandra

Volley provides ScyllaDB (and Cassandra-compatible) integration via two crates:

  • volley-connector-scylla — A sink that writes Arrow RecordBatches to a ScyllaDB table using micro-batch CQL writes, with full 2PC support and first-class vector column handling.
  • volley-enrichment-scylla — A real-time enrichment operator that queries ScyllaDB to enrich streaming records, with LRU+TTL caching, compound key support, and ANN vector similarity search.

Both crates use the scylla Rust driver, which provides native token-aware routing and efficient CQL batch execution.

Targets: ScyllaDB 5.x / 6.x and Apache Cassandra 4.x / 5.x.

Setup

Add crate dependencies

[dependencies]
# Sink (write to ScyllaDB)
volley-connector-scylla = { git = "https://github.com/volley-streams/volley" }

# Enrichment operator (query ScyllaDB to enrich records)
volley-enrichment-scylla = { git = "https://github.com/volley-streams/volley" }

Prerequisites

A running ScyllaDB (or Cassandra) cluster is required. For local development, start a single node with Docker:

docker run --rm -d -p 9042:9042 --name scylla scylladb/scylla:6.1

ScyllaDB Sink

The sink buffers incoming rows and flushes them as CQL batch statements. It supports append mode (INSERT), upsert mode (INSERT IF NOT EXISTS), and checkpoint-aligned 2PC.

Basic append

#![allow(unused)]
fn main() {
use volley_connector_scylla::{ScyllaConfig, ScyllaSinkConfig, ScyllaStreamExt};

let scylla = ScyllaConfig::new(vec!["127.0.0.1:9042"])
    .with_keyspace("my_keyspace");

let sink_config = ScyllaSinkConfig::new(scylla, "events")
    .with_batch_size(500)
    .with_create_table(true);

env.from_kafka(source_config).await?
    .filter_expr(col("status").eq(lit("active")))
    .to_scylla(sink_config).await?
    .execute("kafka-to-scylla").await?;
}

Upsert mode

Use ScyllaSinkConfig::new_upsert() for idempotent writes. Duplicate records with the same partition key are silently discarded:

#![allow(unused)]
fn main() {
let sink_config = ScyllaSinkConfig::new_upsert(scylla, "events", vec!["event_id"])
    .with_batch_size(500);
}

For tables with a composite primary key, pass all key columns:

#![allow(unused)]
fn main() {
let sink_config = ScyllaSinkConfig::new_upsert(scylla, "events", vec!["tenant_id", "event_id"])
    .with_batch_size(500);
}

Auto-create table

When with_create_table(true) is set, the sink issues a CREATE TABLE IF NOT EXISTS DDL statement on startup. The table schema is derived from the Arrow schema of the first batch. Arrow-to-CQL type mappings:

Arrow typeCQL type
Int32int
Int64bigint
Float32float
Float64double
Utf8 / LargeUtf8text
Booleanboolean
Timestamp(ms, ...)timestamp
FixedSizeList<Float16/32/64, N>vector<float, N>

Vector columns

ScyllaDB’s native vector<float> type is fully supported. Arrow FixedSizeList columns with Float16, Float32, or Float64 children are automatically serialised as vector values. This is the recommended way to store embedding vectors for ANN search:

#![allow(unused)]
fn main() {
// Arrow schema with a vector embedding column
let schema = Arc::new(Schema::new(vec![
    Field::new("id", DataType::Utf8, false),
    Field::new(
        "embedding",
        DataType::FixedSizeList(
            Arc::new(Field::new("item", DataType::Float32, false)),
            384,
        ),
        false,
    ),
]));

let sink_config = ScyllaSinkConfig::new(scylla, "embeddings")
    .with_create_table(true)
    .with_vector_index("embedding"); // CREATE CUSTOM INDEX for ANN
}

Sink configuration reference

MethodDefaultDescription
ScyllaSinkConfig::new(config, table)Append mode. Required: ScyllaDB config and target table name
ScyllaSinkConfig::new_upsert(config, table, keys)Upsert mode. keys lists partition + clustering key columns
.with_batch_size(n)500Number of buffered rows before auto-flush
.with_create_table(bool)falseAuto-create the table from the first batch’s Arrow schema
.with_vector_index(column)NoneCreate a ScyllaDB custom ANN index on this vector column
.with_ttl(seconds)NonePer-row TTL in seconds (USING TTL n on every INSERT)

Delivery guarantees

  • Append mode (default) — at-least-once. On checkpoint, buffered rows are flushed. If the pipeline crashes after the flush but before the checkpoint completes, the rows may be written again on recovery.
  • Upsert mode — effectively exactly-once for tables with a natural primary key. Duplicate INSERTs on recovery are idempotent because ScyllaDB discards rows whose partition key already exists.
  • 2PC (transactional pipelines)prepare_commit stages the CQL batch; finalize_commit executes it; abort_commit discards it. Sinks in a multi-sink DAG all commit atomically per epoch.

ScyllaDB Enrichment Operator

The enrichment operator queries ScyllaDB to join lookup data into the stream. For each incoming batch it extracts unique keys, issues per-key CQL SELECT queries (batched via async concurrency), and left-joins the results back into the batch. An LRU+TTL cache avoids redundant queries.

Simple key lookup

#![allow(unused)]
fn main() {
use volley_enrichment_scylla::{ScyllaEnrichConfig, ScyllaEnrichStreamExt, EnrichMissStrategy};
use volley_connector_scylla::ScyllaConfig;
use std::time::Duration;

let scylla = ScyllaConfig::new(vec!["127.0.0.1:9042"])
    .with_keyspace("my_keyspace");

env.from_kafka(source_config).await?
    .enrich_with_scylla(
        ScyllaEnrichConfig::new(scylla, "customers", "customer_id")
            .with_select_columns(vec!["tier", "region", "lifetime_value"])
            .with_prefix("customer_")
            .with_cache_ttl(Duration::from_secs(300))
            .with_on_miss(EnrichMissStrategy::NullFill)
    )
    .filter_expr(col("customer_tier").eq(lit("premium")))
    .to_kafka(sink_config).await?
    .execute("enrich-orders").await?;
}

This issues SELECT tier, region, lifetime_value FROM customers WHERE customer_id = ? for each unique key in the batch.

Compound key lookup

For tables with a composite primary key, specify all key columns and the corresponding stream columns:

#![allow(unused)]
fn main() {
ScyllaEnrichConfig::new_compound(
    scylla,
    "order_items",
    vec![
        ("order_id", "order_id"),    // (stream_column, table_column)
        ("item_sku", "sku"),
    ],
)
.with_select_columns(vec!["price", "weight_kg"])
.with_prefix("item_")
}

This issues SELECT price, weight_kg FROM order_items WHERE order_id = ? AND sku = ?.

For vector similarity search, use ScyllaEnrichConfig::new_ann(). The query vector is read from a designated column in the stream batch:

#![allow(unused)]
fn main() {
ScyllaEnrichConfig::new_ann(
    scylla,
    "product_embeddings",   // table
    "embedding",             // vector column in the table
    "query_embedding",       // vector column in the stream batch
)
.with_select_columns(vec!["product_id", "title", "category"])
.with_ann_limit(10)          // LIMIT n in the ANN query
.with_prefix("nearest_")
}

This issues SELECT product_id, title, category FROM product_embeddings ORDER BY embedding ANN OF ? LIMIT 10 for each row in the batch, enabling real-time semantic search enrichment.

Miss strategies

When a lookup key has no match in ScyllaDB:

StrategyBehavior
NullFill (default)Add enrichment columns filled with nulls
DropRecordDrop the record entirely
PassThroughKeep the original record without enrichment columns

Enrichment configuration reference

MethodDefaultDescription
ScyllaEnrichConfig::new(config, table, key)Simple-key lookup mode
ScyllaEnrichConfig::new_compound(config, table, key_pairs)Compound-key lookup mode
ScyllaEnrichConfig::new_ann(config, table, table_vec_col, stream_vec_col)ANN vector search mode
.with_select_columns(vec)All columnsColumns to fetch from the remote table
.with_prefix(prefix)NonePrefix for enriched columns (e.g., "customer_")
.with_cache_ttl(duration)5 minutesTime-to-live for cached lookup entries
.with_cache_max_entries(n)10,000Maximum entries in the LRU cache
.with_on_miss(strategy)NullFillStrategy for unmatched records
.with_prefetch(bool)falseOverlap ScyllaDB queries with batch processing for reduced latency
.with_ann_limit(n)5Number of ANN results to return per query (ANN mode only)

Connection configuration reference

ScyllaConfig is shared between the sink and enrichment operator:

MethodDefaultDescription
ScyllaConfig::new(contact_points)Required: list of "host:port" seed nodes
.with_keyspace(keyspace)NoneDefault keyspace for all queries
.with_username(user)NoneCQL authentication username
.with_password(pass)NoneCQL authentication password
.with_connect_timeout(duration)5 sConnection establishment timeout
.with_request_timeout(duration)30 sPer-query timeout
.with_consistency(level)LocalQuorumCQL consistency level for writes

Full pipeline example

A Kafka-to-ScyllaDB pipeline that enriches incoming order events with product data before writing to a destination table:

use volley_core::prelude::*;
use volley_connector_kafka::{KafkaEnvExt, KafkaStreamExt};
use volley_connector_scylla::{ScyllaConfig, ScyllaSinkConfig, ScyllaStreamExt};
use volley_enrichment_scylla::{ScyllaEnrichConfig, ScyllaEnrichStreamExt};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    let scylla = ScyllaConfig::new(vec!["scylla-node1:9042", "scylla-node2:9042"])
        .with_keyspace("ecommerce")
        .with_username("volley")
        .with_password("secret");

    let source_config = KafkaSourceConfig::new(
        "broker:9092",
        "orders",
        "volley-order-processor",
        schema,
    );

    let sink_config = ScyllaSinkConfig::new_upsert(
        scylla.clone(),
        "enriched_orders",
        vec!["order_id"],
    )
    .with_batch_size(1000)
    .with_create_table(true);

    StreamExecutionEnvironment::new()
        .with_checkpoints(CheckpointConfig::new(Duration::from_secs(30)))
        .from_kafka(source_config).await?
        .enrich_with_scylla(
            ScyllaEnrichConfig::new(scylla, "products", "product_id")
                .with_select_columns(vec!["name", "category", "unit_price"])
                .with_prefix("product_")
                .with_cache_ttl(Duration::from_secs(600))
        )
        .filter_expr(col("product_category").eq(lit("electronics")))
        .select_expr(vec![
            col("order_id"),
            col("customer_id"),
            col("product_name"),
            col("product_unit_price"),
            col("quantity"),
            (col("product_unit_price") * col("quantity")).alias("total"),
        ])
        .to_scylla(sink_config).await?
        .execute("orders-enrich-to-scylla")
        .await?;

    Ok(())
}

Observability

MetricTypeDescription
scylla_sink_rows_written_totalcounterRows written, labelled by table
scylla_sink_batch_duration_secondshistogramCQL batch execution latency
scylla_sink_retries_totalcounterBatch retries due to transient errors
scylla_enrich_cache_hit_totalcounterCache hits
scylla_enrich_cache_miss_totalcounterCache misses
scylla_enrich_cache_sizegaugeCurrent cache entry count
scylla_enrich_query_duration_secondshistogramPer-key query latency
scylla_enrich_query_errors_totalcounterFailed enrichment queries

Integration testing

The integration tests require Docker and a ScyllaDB image. They run via volley-test-harness which starts a scylladb/scylla container automatically via testcontainers:

# Run sink integration tests
cargo test -p volley-connector-scylla --test sink_integration -- --ignored

# Run enrichment integration tests
cargo test -p volley-enrichment-scylla --test enrich_integration -- --ignored

Troubleshooting

“Unable to connect to any node”

  • Verify the contact points include the correct host and port (default: 9042).
  • Check network reachability from the pipeline process to the ScyllaDB nodes.
  • For Docker, ensure the container exposes port 9042 and the pipeline connects to the host IP.

“Keyspace does not exist”

  • Create the keyspace before starting the pipeline:
    CREATE KEYSPACE my_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
    

Slow enrichment queries

  • Increase cache_ttl and cache_max_entries to reduce query frequency.
  • Ensure the lookup column is the partition key (range queries on non-key columns require ALLOW FILTERING and are slow).
  • Use with_prefetch(true) to overlap ScyllaDB I/O with downstream processing.

ANN search returns no results

  • Verify a custom index exists on the vector column:
    CREATE CUSTOM INDEX ON table_name (vector_col) USING 'StorageAttachedIndex';
    
  • ANN search requires ScyllaDB 6.0+ or the SAI extension on Cassandra 5+.