Integration Testing
Volley provides the volley-test-harness crate — a shared test framework inspired by Apache Flink’s testing patterns. It gives you containers, assertion harnesses, and a standardized connector test suite so you can validate pipelines end-to-end with real infrastructure.
Architecture
The test harness is layered — each layer builds on the one below it:
┌──────────────────────────────────────────────────┐
│ End-to-End Tests (tests/) │
│ Full pipelines with real connectors │
├──────────────────────────────────────────────────┤
│ Connector Framework (connector/) │
│ ConnectorTestContext + source/sink suites │
├──────────────────────────────────────────────────┤
│ Assertion Harnesses (assertions/) │
│ MetricsSnapshot, TraceCapture, ThroughputRecorder│
├──────────────────────────────────────────────────┤
│ TestCluster (cluster.rs) │
│ In-process pipeline runner + observability │
├──────────────────────────────────────────────────┤
│ Container Management (containers/) │
│ Kafka, MinIO with retry logic + ARM64 images │
└──────────────────────────────────────────────────┘
Quick Start
Add volley-test-harness as a dev-dependency in your connector crate:
[dev-dependencies]
volley-test-harness = { path = "../volley-test-harness" }
async-trait = { workspace = true }
arrow = { workspace = true }
anyhow = { workspace = true }
Container Management
Shared testcontainers with retry logic and one-container-per-binary lifecycle via OnceCell. Containers start lazily on first use and are cleaned up when the test process exits.
Kafka
#![allow(unused)]
fn main() {
use volley_test_harness::containers::KafkaContainer;
// Starts a native ARM64 Kafka container on first call.
let servers = KafkaContainer::bootstrap_servers().await;
// Each test gets an isolated topic.
let topic = KafkaContainer::unique_topic("my-test");
KafkaContainer::create_topic(&topic, 1).await;
// Seed test data.
KafkaContainer::produce_json(&topic, &[
serde_json::json!({"name": "alice", "score": 42}),
]).await;
// Verify output.
let messages = KafkaContainer::consume_json(&topic, 1, 10).await;
assert_eq!(messages[0]["name"], "alice");
}
MinIO (S3-compatible)
#![allow(unused)]
fn main() {
use volley_test_harness::containers::{MinioContainer, MINIO_ACCESS_KEY, MINIO_SECRET_KEY};
let endpoint = MinioContainer::endpoint().await;
let bucket = MinioContainer::unique_bucket("test");
MinioContainer::create_bucket(&bucket).await;
}
Polaris (Apache Iceberg REST catalog)
#![allow(unused)]
fn main() {
use volley_test_harness::containers::PolarisContainer;
// Starts a Polaris container backed by MinIO. MinIO must be started first.
let catalog_url = PolarisContainer::catalog_url().await;
// Each test gets an isolated catalog name.
let catalog = PolarisContainer::unique_catalog("my-test");
PolarisContainer::create_catalog(&catalog).await;
}
PolarisContainer depends on MinioContainer for the underlying storage — start MinioContainer before calling any PolarisContainer method.
Connector Test Framework
Like Flink’s Connector Testing Framework — implement ConnectorTestContext for your connector and get a standardized source/sink test suite for free.
Implementing ConnectorTestContext
#![allow(unused)]
fn main() {
use volley_test_harness::connector::ConnectorTestContext;
struct MyConnectorTestContext { /* ... */ }
#[async_trait]
impl ConnectorTestContext for MyConnectorTestContext {
fn name(&self) -> &str { "MyConnector" }
fn test_schema(&self) -> Arc<Schema> { /* ... */ }
async fn seed_test_data(&self, records: &[RecordBatch]) -> anyhow::Result<()> {
// Write records into the external system.
}
async fn create_source(&self) -> anyhow::Result<Option<Box<dyn DynSource>>> {
// Create a source pointing at the seeded data.
}
async fn create_sink(&self) -> anyhow::Result<Option<Box<dyn DynSink>>> {
// Create a sink writing to a test destination.
}
async fn read_sink_results(&self) -> anyhow::Result<Vec<RecordBatch>> {
// Read back what the sink wrote for verification.
}
async fn create_empty_source(&self) -> anyhow::Result<Option<Box<dyn DynSource>>> {
// Optional: source pointing at an empty data set.
Ok(None) // Return None to skip the empty-source test.
}
}
}
Running the Suites
#![allow(unused)]
fn main() {
#[tokio::test]
async fn my_connector_source_suite() {
let ctx = MyConnectorTestContext::new().await;
volley_test_harness::connector::run_source_suite(&ctx).await;
}
#[tokio::test]
async fn my_connector_sink_suite() {
let ctx = MyConnectorTestContext::new().await;
volley_test_harness::connector::run_sink_suite(&ctx).await;
}
}
The source suite runs three tests:
- test_basic_read — seeds 5 records, verifies the source returns data
- test_read_all_records — seeds 10 records, reads until all consumed
- test_empty_source — verifies graceful handling of empty data sets
The sink suite runs three tests:
- test_basic_write — writes records, flushes, verifies they’re readable
- test_write_and_flush — multiple writes followed by flush
- test_checkpoint_triggers_flush — verifies
on_checkpoint()commits data
See volley-connector-kafka/tests/connector_suite.rs for the reference implementation.
TestCluster
An in-process pipeline execution wrapper with built-in observability capture. Registers an InMemorySpanExporter as the global OpenTelemetry tracer so spans created during pipeline execution are captured for assertions.
#![allow(unused)]
fn main() {
use volley_test_harness::cluster::TestCluster;
let cluster = TestCluster::new("my-test");
let pipeline = StreamExecutionEnvironment::new()
.from_source(source)
.with_tracing(cluster.tracing_config()) // 100% sampling, in-memory export
.with_operator(my_operator)
.with_operator_id("my-op")
.to_sink(sink);
pipeline.execute("my-pipeline").await?;
// Inspect captured spans.
let traces = cluster.traces();
println!("Captured {} spans", traces.span_count());
traces.print_timing_summary();
}
Assertion Harnesses
TraceCapture
Assert on OpenTelemetry spans captured by the TestCluster:
#![allow(unused)]
fn main() {
let traces = cluster.traces();
// Verify span counts.
traces.assert_span_count("volley.operator", 10);
// Verify trace structure.
traces.assert_valid_tree(); // No orphan spans.
// Print per-operator timing breakdown.
traces.print_timing_summary();
}
The timing summary shows per-span-name breakdown with operator identity:
Span Count Total ms Avg ms Max ms
--------------------------------------------------------------------------------
volley.record 10 559.2 55.9 100.3
volley.sink 8 72.0 9.0 11.0
volley.operator (ml-embed) 8 56.8 7.1 25.9
volley.operator (status-filter) 10 1.1 0.1 0.8
volley.operator (proto-decode) 10 0.8 0.1 0.5
volley.source 10 0.4 0.0 0.3
Use .with_operator_id("name") on your pipeline to assign human-readable names.
MetricsSnapshot
Scrape and assert on Prometheus metrics:
#![allow(unused)]
fn main() {
let metrics = MetricsSnapshot::scrape(prometheus_port).await?;
metrics.assert_counter_exists("volley_source_records_polled");
metrics.assert_counter_gte("volley_records_processed_total", 100.0);
metrics.assert_gauge_in_range("volley_pipeline_health", 1.0, 1.0);
}
ThroughputRecorder
Measure and assert on pipeline throughput:
#![allow(unused)]
fn main() {
let mut recorder = ThroughputRecorder::new();
recorder.start();
pipeline.execute("bench").await?;
recorder.stop(record_count);
recorder.assert_throughput_gte(100.0); // min records/sec
recorder.assert_elapsed_under_secs(30.0); // max wall time
println!("{:.1} records/sec", recorder.records_per_second().unwrap());
}
Test Data Generators
The data module provides test data factories:
#![allow(unused)]
fn main() {
use volley_test_harness::data::*;
// Protobuf messages (serialized bytes).
let protos = generate_crawl_results(50);
// Every 5th record is a failure (HTTP 503), rest are HTTP 200.
// Arrow RecordBatch (for non-protobuf tests).
let batch = generate_crawl_batch(50);
// The schema used by both generators.
let schema = crawl_result_schema();
}
The CrawlResult type has impl_to_arrow! so it works with auto_decode_operator::<CrawlResult>("payload").
Running Tests
All integration tests require Docker for testcontainers.
# Kafka connector suite (6 tests, ~45s)
cargo test -p volley-connector-kafka --test connector_suite -- --nocapture
# Iceberg connector suite (Polaris + MinIO, ~60s)
cargo test -p volley-connector-iceberg --test connector_suite -- --nocapture
# E2E golden path — full pipeline with windowing, metrics, traces, and ML (1 test, ~30s)
cargo test -p volley-test-harness --test e2e_golden_path -- --nocapture
# With Metal GPU disabled (CPU fallback)
cargo test -p volley-test-harness --test e2e_golden_path --no-default-features -- --nocapture
Where Tests Live
Connector-specific integration tests belong in their connector crate, importing shared infrastructure from volley-test-harness:
| Crate | Tests | Purpose |
|---|---|---|
volley-connector-kafka | tests/connector_suite.rs | Kafka source/sink through ConnectorTestContext |
volley-connector-iceberg | tests/connector_suite.rs | Iceberg source/sink through ConnectorTestContext (Polaris + MinIO) |
volley-test-harness | tests/e2e_golden_path.rs | Cross-connector end-to-end pipeline test |
The pattern: volley-test-harness provides the framework, each connector crate owns its own integration tests.
Golden Path Test Coverage
The E2E golden path test (e2e_golden_path.rs) exercises the full pipeline:
- Correctness — Kafka protobuf → decode → filter → ML embed → Kafka sink, asserting output records match expectations
- Windowed aggregation — tumbling window over the event stream with watermark advancement
- Throughput —
ThroughputRecorderasserts sustained records/sec - Metrics —
MetricsSnapshotscrapes the embedded Prometheus endpoint and asserts counters (volley_records_processed_total,volley_source_records_polled, pipeline health gauge) - Traces —
TraceCapturefromTestClusterasserts span count, valid parent–child tree, and per-operator timing viaprint_timing_summary()