ScyllaDB / Cassandra
Volley provides ScyllaDB (and Cassandra-compatible) integration via two crates:
volley-connector-scylla— A sink that writes ArrowRecordBatches to a ScyllaDB table using micro-batch CQL writes, with full 2PC support and first-class vector column handling.volley-enrichment-scylla— A real-time enrichment operator that queries ScyllaDB to enrich streaming records, with LRU+TTL caching, compound key support, and ANN vector similarity search.
Both crates use the scylla Rust driver, which provides native token-aware routing and efficient CQL batch execution.
Targets: ScyllaDB 5.x / 6.x and Apache Cassandra 4.x / 5.x.
Setup
Add crate dependencies
[dependencies]
# Sink (write to ScyllaDB)
volley-connector-scylla = { git = "https://github.com/volley-streams/volley" }
# Enrichment operator (query ScyllaDB to enrich records)
volley-enrichment-scylla = { git = "https://github.com/volley-streams/volley" }
Prerequisites
A running ScyllaDB (or Cassandra) cluster is required. For local development, start a single node with Docker:
docker run --rm -d -p 9042:9042 --name scylla scylladb/scylla:6.1
ScyllaDB Sink
The sink buffers incoming rows and flushes them as CQL batch statements. It supports append mode (INSERT), upsert mode (INSERT IF NOT EXISTS), and checkpoint-aligned 2PC.
Basic append
#![allow(unused)]
fn main() {
use volley_connector_scylla::{ScyllaConfig, ScyllaSinkConfig, ScyllaStreamExt};
let scylla = ScyllaConfig::new(vec!["127.0.0.1:9042"])
.with_keyspace("my_keyspace");
let sink_config = ScyllaSinkConfig::new(scylla, "events")
.with_batch_size(500)
.with_create_table(true);
env.from_kafka(source_config).await?
.filter_expr(col("status").eq(lit("active")))
.to_scylla(sink_config).await?
.execute("kafka-to-scylla").await?;
}
Upsert mode
Use ScyllaSinkConfig::new_upsert() for idempotent writes. Duplicate records
with the same partition key are silently discarded:
#![allow(unused)]
fn main() {
let sink_config = ScyllaSinkConfig::new_upsert(scylla, "events", vec!["event_id"])
.with_batch_size(500);
}
For tables with a composite primary key, pass all key columns:
#![allow(unused)]
fn main() {
let sink_config = ScyllaSinkConfig::new_upsert(scylla, "events", vec!["tenant_id", "event_id"])
.with_batch_size(500);
}
Auto-create table
When with_create_table(true) is set, the sink issues a CREATE TABLE IF NOT EXISTS
DDL statement on startup. The table schema is derived from the Arrow schema of the
first batch. Arrow-to-CQL type mappings:
| Arrow type | CQL type |
|---|---|
Int32 | int |
Int64 | bigint |
Float32 | float |
Float64 | double |
Utf8 / LargeUtf8 | text |
Boolean | boolean |
Timestamp(ms, ...) | timestamp |
FixedSizeList<Float16/32/64, N> | vector<float, N> |
Vector columns
ScyllaDB’s native vector<float> type is fully supported. Arrow
FixedSizeList columns with Float16, Float32, or Float64 children are
automatically serialised as vector values. This is the recommended way to
store embedding vectors for ANN search:
#![allow(unused)]
fn main() {
// Arrow schema with a vector embedding column
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new(
"embedding",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, false)),
384,
),
false,
),
]));
let sink_config = ScyllaSinkConfig::new(scylla, "embeddings")
.with_create_table(true)
.with_vector_index("embedding"); // CREATE CUSTOM INDEX for ANN
}
Sink configuration reference
| Method | Default | Description |
|---|---|---|
ScyllaSinkConfig::new(config, table) | — | Append mode. Required: ScyllaDB config and target table name |
ScyllaSinkConfig::new_upsert(config, table, keys) | — | Upsert mode. keys lists partition + clustering key columns |
.with_batch_size(n) | 500 | Number of buffered rows before auto-flush |
.with_create_table(bool) | false | Auto-create the table from the first batch’s Arrow schema |
.with_vector_index(column) | None | Create a ScyllaDB custom ANN index on this vector column |
.with_ttl(seconds) | None | Per-row TTL in seconds (USING TTL n on every INSERT) |
Delivery guarantees
- Append mode (default) — at-least-once. On checkpoint, buffered rows are flushed. If the pipeline crashes after the flush but before the checkpoint completes, the rows may be written again on recovery.
- Upsert mode — effectively exactly-once for tables with a natural primary key. Duplicate INSERTs on recovery are idempotent because ScyllaDB discards rows whose partition key already exists.
- 2PC (transactional pipelines) —
prepare_commitstages the CQL batch;finalize_commitexecutes it;abort_commitdiscards it. Sinks in a multi-sink DAG all commit atomically per epoch.
ScyllaDB Enrichment Operator
The enrichment operator queries ScyllaDB to join lookup data into the stream.
For each incoming batch it extracts unique keys, issues per-key CQL SELECT
queries (batched via async concurrency), and left-joins the results back into
the batch. An LRU+TTL cache avoids redundant queries.
Simple key lookup
#![allow(unused)]
fn main() {
use volley_enrichment_scylla::{ScyllaEnrichConfig, ScyllaEnrichStreamExt, EnrichMissStrategy};
use volley_connector_scylla::ScyllaConfig;
use std::time::Duration;
let scylla = ScyllaConfig::new(vec!["127.0.0.1:9042"])
.with_keyspace("my_keyspace");
env.from_kafka(source_config).await?
.enrich_with_scylla(
ScyllaEnrichConfig::new(scylla, "customers", "customer_id")
.with_select_columns(vec!["tier", "region", "lifetime_value"])
.with_prefix("customer_")
.with_cache_ttl(Duration::from_secs(300))
.with_on_miss(EnrichMissStrategy::NullFill)
)
.filter_expr(col("customer_tier").eq(lit("premium")))
.to_kafka(sink_config).await?
.execute("enrich-orders").await?;
}
This issues SELECT tier, region, lifetime_value FROM customers WHERE customer_id = ?
for each unique key in the batch.
Compound key lookup
For tables with a composite primary key, specify all key columns and the corresponding stream columns:
#![allow(unused)]
fn main() {
ScyllaEnrichConfig::new_compound(
scylla,
"order_items",
vec![
("order_id", "order_id"), // (stream_column, table_column)
("item_sku", "sku"),
],
)
.with_select_columns(vec!["price", "weight_kg"])
.with_prefix("item_")
}
This issues SELECT price, weight_kg FROM order_items WHERE order_id = ? AND sku = ?.
ANN vector similarity search
For vector similarity search, use ScyllaEnrichConfig::new_ann(). The query
vector is read from a designated column in the stream batch:
#![allow(unused)]
fn main() {
ScyllaEnrichConfig::new_ann(
scylla,
"product_embeddings", // table
"embedding", // vector column in the table
"query_embedding", // vector column in the stream batch
)
.with_select_columns(vec!["product_id", "title", "category"])
.with_ann_limit(10) // LIMIT n in the ANN query
.with_prefix("nearest_")
}
This issues SELECT product_id, title, category FROM product_embeddings ORDER BY embedding ANN OF ? LIMIT 10
for each row in the batch, enabling real-time semantic search enrichment.
Miss strategies
When a lookup key has no match in ScyllaDB:
| Strategy | Behavior |
|---|---|
NullFill (default) | Add enrichment columns filled with nulls |
DropRecord | Drop the record entirely |
PassThrough | Keep the original record without enrichment columns |
Enrichment configuration reference
| Method | Default | Description |
|---|---|---|
ScyllaEnrichConfig::new(config, table, key) | — | Simple-key lookup mode |
ScyllaEnrichConfig::new_compound(config, table, key_pairs) | — | Compound-key lookup mode |
ScyllaEnrichConfig::new_ann(config, table, table_vec_col, stream_vec_col) | — | ANN vector search mode |
.with_select_columns(vec) | All columns | Columns to fetch from the remote table |
.with_prefix(prefix) | None | Prefix for enriched columns (e.g., "customer_") |
.with_cache_ttl(duration) | 5 minutes | Time-to-live for cached lookup entries |
.with_cache_max_entries(n) | 10,000 | Maximum entries in the LRU cache |
.with_on_miss(strategy) | NullFill | Strategy for unmatched records |
.with_prefetch(bool) | false | Overlap ScyllaDB queries with batch processing for reduced latency |
.with_ann_limit(n) | 5 | Number of ANN results to return per query (ANN mode only) |
Connection configuration reference
ScyllaConfig is shared between the sink and enrichment operator:
| Method | Default | Description |
|---|---|---|
ScyllaConfig::new(contact_points) | — | Required: list of "host:port" seed nodes |
.with_keyspace(keyspace) | None | Default keyspace for all queries |
.with_username(user) | None | CQL authentication username |
.with_password(pass) | None | CQL authentication password |
.with_connect_timeout(duration) | 5 s | Connection establishment timeout |
.with_request_timeout(duration) | 30 s | Per-query timeout |
.with_consistency(level) | LocalQuorum | CQL consistency level for writes |
Full pipeline example
A Kafka-to-ScyllaDB pipeline that enriches incoming order events with product data before writing to a destination table:
use volley_core::prelude::*;
use volley_connector_kafka::{KafkaEnvExt, KafkaStreamExt};
use volley_connector_scylla::{ScyllaConfig, ScyllaSinkConfig, ScyllaStreamExt};
use volley_enrichment_scylla::{ScyllaEnrichConfig, ScyllaEnrichStreamExt};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
let scylla = ScyllaConfig::new(vec!["scylla-node1:9042", "scylla-node2:9042"])
.with_keyspace("ecommerce")
.with_username("volley")
.with_password("secret");
let source_config = KafkaSourceConfig::new(
"broker:9092",
"orders",
"volley-order-processor",
schema,
);
let sink_config = ScyllaSinkConfig::new_upsert(
scylla.clone(),
"enriched_orders",
vec!["order_id"],
)
.with_batch_size(1000)
.with_create_table(true);
StreamExecutionEnvironment::new()
.with_checkpoints(CheckpointConfig::new(Duration::from_secs(30)))
.from_kafka(source_config).await?
.enrich_with_scylla(
ScyllaEnrichConfig::new(scylla, "products", "product_id")
.with_select_columns(vec!["name", "category", "unit_price"])
.with_prefix("product_")
.with_cache_ttl(Duration::from_secs(600))
)
.filter_expr(col("product_category").eq(lit("electronics")))
.select_expr(vec![
col("order_id"),
col("customer_id"),
col("product_name"),
col("product_unit_price"),
col("quantity"),
(col("product_unit_price") * col("quantity")).alias("total"),
])
.to_scylla(sink_config).await?
.execute("orders-enrich-to-scylla")
.await?;
Ok(())
}
Observability
| Metric | Type | Description |
|---|---|---|
scylla_sink_rows_written_total | counter | Rows written, labelled by table |
scylla_sink_batch_duration_seconds | histogram | CQL batch execution latency |
scylla_sink_retries_total | counter | Batch retries due to transient errors |
scylla_enrich_cache_hit_total | counter | Cache hits |
scylla_enrich_cache_miss_total | counter | Cache misses |
scylla_enrich_cache_size | gauge | Current cache entry count |
scylla_enrich_query_duration_seconds | histogram | Per-key query latency |
scylla_enrich_query_errors_total | counter | Failed enrichment queries |
Integration testing
The integration tests require Docker and a ScyllaDB image. They run via
volley-test-harness which starts a scylladb/scylla container automatically
via testcontainers:
# Run sink integration tests
cargo test -p volley-connector-scylla --test sink_integration -- --ignored
# Run enrichment integration tests
cargo test -p volley-enrichment-scylla --test enrich_integration -- --ignored
Troubleshooting
“Unable to connect to any node”
- Verify the contact points include the correct host and port (default: 9042).
- Check network reachability from the pipeline process to the ScyllaDB nodes.
- For Docker, ensure the container exposes port 9042 and the pipeline connects to the host IP.
“Keyspace does not exist”
- Create the keyspace before starting the pipeline:
CREATE KEYSPACE my_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
Slow enrichment queries
- Increase
cache_ttlandcache_max_entriesto reduce query frequency. - Ensure the lookup column is the partition key (range queries on non-key columns require ALLOW FILTERING and are slow).
- Use
with_prefetch(true)to overlap ScyllaDB I/O with downstream processing.
ANN search returns no results
- Verify a custom index exists on the vector column:
CREATE CUSTOM INDEX ON table_name (vector_col) USING 'StorageAttachedIndex'; - ANN search requires ScyllaDB 6.0+ or the SAI extension on Cassandra 5+.