Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Integration Testing

Volley provides the volley-test-harness crate — a shared test framework inspired by Apache Flink’s testing patterns. It gives you containers, assertion harnesses, and a standardized connector test suite so you can validate pipelines end-to-end with real infrastructure.

Architecture

The test harness is layered — each layer builds on the one below it:

┌──────────────────────────────────────────────────┐
│  End-to-End Tests (tests/)                       │
│  Full pipelines with real connectors             │
├──────────────────────────────────────────────────┤
│  Connector Framework (connector/)                │
│  ConnectorTestContext + source/sink suites        │
├──────────────────────────────────────────────────┤
│  Assertion Harnesses (assertions/)               │
│  MetricsSnapshot, TraceCapture, ThroughputRecorder│
├──────────────────────────────────────────────────┤
│  TestCluster (cluster.rs)                        │
│  In-process pipeline runner + observability       │
├──────────────────────────────────────────────────┤
│  Container Management (containers/)              │
│  Kafka, MinIO with retry logic + ARM64 images     │
└──────────────────────────────────────────────────┘

Quick Start

Add volley-test-harness as a dev-dependency in your connector crate:

[dev-dependencies]
volley-test-harness = { path = "../volley-test-harness" }
async-trait = { workspace = true }
arrow = { workspace = true }
anyhow = { workspace = true }

Container Management

Shared testcontainers with retry logic and one-container-per-binary lifecycle via OnceCell. Containers start lazily on first use and are cleaned up when the test process exits.

Kafka

#![allow(unused)]
fn main() {
use volley_test_harness::containers::KafkaContainer;

// Starts a native ARM64 Kafka container on first call.
let servers = KafkaContainer::bootstrap_servers().await;

// Each test gets an isolated topic.
let topic = KafkaContainer::unique_topic("my-test");
KafkaContainer::create_topic(&topic, 1).await;

// Seed test data.
KafkaContainer::produce_json(&topic, &[
    serde_json::json!({"name": "alice", "score": 42}),
]).await;

// Verify output.
let messages = KafkaContainer::consume_json(&topic, 1, 10).await;
assert_eq!(messages[0]["name"], "alice");
}

MinIO (S3-compatible)

#![allow(unused)]
fn main() {
use volley_test_harness::containers::{MinioContainer, MINIO_ACCESS_KEY, MINIO_SECRET_KEY};

let endpoint = MinioContainer::endpoint().await;
let bucket = MinioContainer::unique_bucket("test");
MinioContainer::create_bucket(&bucket).await;
}

Polaris (Apache Iceberg REST catalog)

#![allow(unused)]
fn main() {
use volley_test_harness::containers::PolarisContainer;

// Starts a Polaris container backed by MinIO. MinIO must be started first.
let catalog_url = PolarisContainer::catalog_url().await;

// Each test gets an isolated catalog name.
let catalog = PolarisContainer::unique_catalog("my-test");
PolarisContainer::create_catalog(&catalog).await;
}

PolarisContainer depends on MinioContainer for the underlying storage — start MinioContainer before calling any PolarisContainer method.

Connector Test Framework

Like Flink’s Connector Testing Framework — implement ConnectorTestContext for your connector and get a standardized source/sink test suite for free.

Implementing ConnectorTestContext

#![allow(unused)]
fn main() {
use volley_test_harness::connector::ConnectorTestContext;

struct MyConnectorTestContext { /* ... */ }

#[async_trait]
impl ConnectorTestContext for MyConnectorTestContext {
    fn name(&self) -> &str { "MyConnector" }
    fn test_schema(&self) -> Arc<Schema> { /* ... */ }

    async fn seed_test_data(&self, records: &[RecordBatch]) -> anyhow::Result<()> {
        // Write records into the external system.
    }

    async fn create_source(&self) -> anyhow::Result<Option<Box<dyn DynSource>>> {
        // Create a source pointing at the seeded data.
    }

    async fn create_sink(&self) -> anyhow::Result<Option<Box<dyn DynSink>>> {
        // Create a sink writing to a test destination.
    }

    async fn read_sink_results(&self) -> anyhow::Result<Vec<RecordBatch>> {
        // Read back what the sink wrote for verification.
    }

    async fn create_empty_source(&self) -> anyhow::Result<Option<Box<dyn DynSource>>> {
        // Optional: source pointing at an empty data set.
        Ok(None) // Return None to skip the empty-source test.
    }
}
}

Running the Suites

#![allow(unused)]
fn main() {
#[tokio::test]
async fn my_connector_source_suite() {
    let ctx = MyConnectorTestContext::new().await;
    volley_test_harness::connector::run_source_suite(&ctx).await;
}

#[tokio::test]
async fn my_connector_sink_suite() {
    let ctx = MyConnectorTestContext::new().await;
    volley_test_harness::connector::run_sink_suite(&ctx).await;
}
}

The source suite runs three tests:

  • test_basic_read — seeds 5 records, verifies the source returns data
  • test_read_all_records — seeds 10 records, reads until all consumed
  • test_empty_source — verifies graceful handling of empty data sets

The sink suite runs three tests:

  • test_basic_write — writes records, flushes, verifies they’re readable
  • test_write_and_flush — multiple writes followed by flush
  • test_checkpoint_triggers_flush — verifies on_checkpoint() commits data

See volley-connector-kafka/tests/connector_suite.rs for the reference implementation.

TestCluster

An in-process pipeline execution wrapper with built-in observability capture. Registers an InMemorySpanExporter as the global OpenTelemetry tracer so spans created during pipeline execution are captured for assertions.

#![allow(unused)]
fn main() {
use volley_test_harness::cluster::TestCluster;

let cluster = TestCluster::new("my-test");

let pipeline = StreamExecutionEnvironment::new()
    .from_source(source)
    .with_tracing(cluster.tracing_config())  // 100% sampling, in-memory export
    .with_operator(my_operator)
    .with_operator_id("my-op")
    .to_sink(sink);

pipeline.execute("my-pipeline").await?;

// Inspect captured spans.
let traces = cluster.traces();
println!("Captured {} spans", traces.span_count());
traces.print_timing_summary();
}

Assertion Harnesses

TraceCapture

Assert on OpenTelemetry spans captured by the TestCluster:

#![allow(unused)]
fn main() {
let traces = cluster.traces();

// Verify span counts.
traces.assert_span_count("volley.operator", 10);

// Verify trace structure.
traces.assert_valid_tree();  // No orphan spans.

// Print per-operator timing breakdown.
traces.print_timing_summary();
}

The timing summary shows per-span-name breakdown with operator identity:

  Span                                      Count   Total ms     Avg ms     Max ms
  --------------------------------------------------------------------------------
  volley.record                                10      559.2       55.9      100.3
  volley.sink                                   8       72.0        9.0       11.0
  volley.operator (ml-embed)                    8       56.8        7.1       25.9
  volley.operator (status-filter)              10        1.1        0.1        0.8
  volley.operator (proto-decode)               10        0.8        0.1        0.5
  volley.source                                10        0.4        0.0        0.3

Use .with_operator_id("name") on your pipeline to assign human-readable names.

MetricsSnapshot

Scrape and assert on Prometheus metrics:

#![allow(unused)]
fn main() {
let metrics = MetricsSnapshot::scrape(prometheus_port).await?;

metrics.assert_counter_exists("volley_source_records_polled");
metrics.assert_counter_gte("volley_records_processed_total", 100.0);
metrics.assert_gauge_in_range("volley_pipeline_health", 1.0, 1.0);
}

ThroughputRecorder

Measure and assert on pipeline throughput:

#![allow(unused)]
fn main() {
let mut recorder = ThroughputRecorder::new();
recorder.start();
pipeline.execute("bench").await?;
recorder.stop(record_count);

recorder.assert_throughput_gte(100.0);        // min records/sec
recorder.assert_elapsed_under_secs(30.0);     // max wall time
println!("{:.1} records/sec", recorder.records_per_second().unwrap());
}

Test Data Generators

The data module provides test data factories:

#![allow(unused)]
fn main() {
use volley_test_harness::data::*;

// Protobuf messages (serialized bytes).
let protos = generate_crawl_results(50);
// Every 5th record is a failure (HTTP 503), rest are HTTP 200.

// Arrow RecordBatch (for non-protobuf tests).
let batch = generate_crawl_batch(50);

// The schema used by both generators.
let schema = crawl_result_schema();
}

The CrawlResult type has impl_to_arrow! so it works with auto_decode_operator::<CrawlResult>("payload").

Running Tests

All integration tests require Docker for testcontainers.

# Kafka connector suite (6 tests, ~45s)
cargo test -p volley-connector-kafka --test connector_suite -- --nocapture

# Iceberg connector suite (Polaris + MinIO, ~60s)
cargo test -p volley-connector-iceberg --test connector_suite -- --nocapture

# E2E golden path — full pipeline with windowing, metrics, traces, and ML (1 test, ~30s)
cargo test -p volley-test-harness --test e2e_golden_path -- --nocapture

# With Metal GPU disabled (CPU fallback)
cargo test -p volley-test-harness --test e2e_golden_path --no-default-features -- --nocapture

Where Tests Live

Connector-specific integration tests belong in their connector crate, importing shared infrastructure from volley-test-harness:

CrateTestsPurpose
volley-connector-kafkatests/connector_suite.rsKafka source/sink through ConnectorTestContext
volley-connector-icebergtests/connector_suite.rsIceberg source/sink through ConnectorTestContext (Polaris + MinIO)
volley-test-harnesstests/e2e_golden_path.rsCross-connector end-to-end pipeline test

The pattern: volley-test-harness provides the framework, each connector crate owns its own integration tests.

Golden Path Test Coverage

The E2E golden path test (e2e_golden_path.rs) exercises the full pipeline:

  • Correctness — Kafka protobuf → decode → filter → ML embed → Kafka sink, asserting output records match expectations
  • Windowed aggregation — tumbling window over the event stream with watermark advancement
  • ThroughputThroughputRecorder asserts sustained records/sec
  • MetricsMetricsSnapshot scrapes the embedded Prometheus endpoint and asserts counters (volley_records_processed_total, volley_source_records_polled, pipeline health gauge)
  • TracesTraceCapture from TestCluster asserts span count, valid parent–child tree, and per-operator timing via print_timing_summary()