Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Blob Store Connectors

The volley-connector-blob-store crate provides a cloud-agnostic abstraction for reading from object storage. Azure Blob Storage and Google Cloud Storage are built on this shared foundation.

Shared Architecture

All cloud blob store connectors follow the same pattern:

Cloud Queue/Topic          Cloud Storage
(SQS / Azure Queue /      (S3 / Azure Blob /
 Pub/Sub)                   GCS)
      |                        |
      v                        v
NotificationSource         BlobReader
      |                        |
      +--------+   +-----------+
               |   |
               v   v
          BlobSource
               |
               v
          Decoder (Parquet / JSON / CSV / Avro)
               |
               v
          RecordBatch --> Pipeline

To add a new cloud backend, implement NotificationSource and BlobReader. The decoder layer is shared.

Azure Blob Storage

From volley-connector-azure-blob:

TypeDescription
AzureBlobSourceBuilderBuilder with automatic credential discovery
AzureBlobSourceConfigured Azure Blob source
AzureBlobReaderReads blobs from Azure Blob Storage
AzureQueueNotificationSourcePolls Azure Queue Storage for blob notifications

Enable with feature flag blob-store-azure on volley-connectors.

Authentication

By default, Azure connectors use DefaultAzureCredential which discovers credentials automatically. The credential chain tries, in order:

  1. Environment variablesAZURE_TENANT_ID, AZURE_CLIENT_ID, AZURE_CLIENT_SECRET
  2. Workload Identity — Kubernetes pods with Azure Workload Identity federation
  3. Managed Identity — Azure VM, App Service, or Container Instance identity
  4. Azure CLI — Cached az login credentials (local development)

To override with an explicit storage account key, use with_access_key():

#![allow(unused)]
fn main() {
use volley_connector_azure_blob::AzureBlobSourceBuilder;

// DefaultAzureCredential (recommended for production)
let source = AzureBlobSourceBuilder::new("myaccount", "my-queue", "my-container")
    .build(config).await?;

// Explicit access key (override)
let source = AzureBlobSourceBuilder::new("myaccount", "my-queue", "my-container")
    .with_access_key("MYACCESSKEY...")
    .build(config).await?;
}

Google Cloud Storage

From volley-connector-gcp-gcs:

TypeDescription
GcsSourceConfigured GCS source
GcsBlobReaderReads objects from GCS
PubSubNotificationSourcePolls Pub/Sub for GCS object notifications

Enable with feature flag blob-store-gcs on volley-connectors.

Authentication

GCS connectors use Application Default Credentials, which discovers credentials automatically. The credential chain tries, in order:

  1. Environment variableGOOGLE_APPLICATION_CREDENTIALS pointing to a service account key file
  2. Workload Identity — Kubernetes pods with GKE Workload Identity federation
  3. Compute Engine metadata — GCE VM, Cloud Run, or GKE node identity
  4. gcloud CLI — Cached gcloud auth application-default login credentials (local development)

Usage

#![allow(unused)]
fn main() {
use volley_connector_gcp_gcs::GcsSource;

let source = GcsSource::new("my-gcs-bucket", "my-pubsub-subscription")
    .build(config).await?;
}

Note: The GCS Pub/Sub notification integration is currently experimental. The PubSubNotificationSource depends on the google-cloud-pubsub crate’s streaming pull API, which is not yet fully stabilized. The blob reader and event parser are functional — the gap is in notification polling reliability under sustained load.

Blob Store Sinks

The volley-connectors crate also provides buffered blob store sinks for writing output files to S3 or Azure Blob Storage. These sinks batch records and flush them as files based on record count or time interval.

Encoders

EncoderOutput FormatFile Extension
ParquetEncoderApache Parquet.parquet
JsonLinesEncoderJSON Lines (newline-delimited JSON).jsonl
CsvEncoderCSV with optional header.csv

Usage

#![allow(unused)]
fn main() {
use volley_connectors::blob_store::{BufferedBlobSink, BufferedBlobSinkConfig};
use volley_connectors::blob_store::encoders::ParquetEncoder;
use volley_connectors::blob_store::aws::S3BlobWriter;

let encoder = Box::new(ParquetEncoder::new());
let config = BufferedBlobSinkConfig::new("my-bucket", "output/")
    .with_max_records(100_000)
    .with_flush_interval(Duration::from_secs(60));
let sink = BufferedBlobSink::new(
    Box::new(S3BlobWriter::new(s3_client)),
    encoder,
    config,
);
}

For Azure Blob Storage, use AzureBlobWriter instead of S3BlobWriter:

#![allow(unused)]
fn main() {
use volley_connectors::blob_store::azure::AzureBlobWriter;

let sink = BufferedBlobSink::new(
    Box::new(AzureBlobWriter::new(blob_service_client, "my-container")),
    encoder,
    config,
);
}

Enable with feature flags blob-store-aws or blob-store-azure on volley-connectors.

Supported Source Formats

All blob store sources share the same decoder layer:

FormatFile Extensions
Parquet.parquet
JSON.json, .jsonl
CSV.csv
Avro.avro

Feature Flags

[dependencies]
volley-connectors = {
    git = "https://github.com/volley-streams/volley",
    features = ["blob-store-azure", "blob-store-gcs"]
}

See also: AWS S3 for the S3-specific connector.