Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Blob Store Connectors

The volley-connector-blob-store crate provides a cloud-agnostic abstraction for reading from object storage. Azure Blob Storage and Google Cloud Storage are built on this shared foundation.

Shared Architecture

All cloud blob store connectors follow the same pattern:

Cloud Queue/Topic          Cloud Storage
(SQS / Azure Queue)       (S3 / Azure Blob)
      |                        |
      v                        v
NotificationSource         BlobReader
      |                        |
      +--------+   +-----------+
               |   |
               v   v
          BlobSource
               |
               v
          Decoder (Parquet / JSON / CSV / Avro)
               |
               v
          RecordBatch --> Pipeline

To add a new cloud backend, implement NotificationSource and BlobReader. The decoder layer is shared.

Azure Blob Storage

From volley-connector-azure-blob:

TypeDescription
AzureBlobSourceBuilderBuilder with automatic credential discovery
AzureBlobSourceConfigured Azure Blob source
AzureBlobReaderReads blobs from Azure Blob Storage
AzureQueueNotificationSourcePolls Azure Queue Storage for blob notifications

Enable with feature flag blob-store-azure on volley-connectors.

Authentication

By default, Azure connectors use DefaultAzureCredential which discovers credentials automatically. The credential chain tries, in order:

  1. Environment variablesAZURE_TENANT_ID, AZURE_CLIENT_ID, AZURE_CLIENT_SECRET
  2. Workload Identity — Kubernetes pods with Azure Workload Identity federation
  3. Managed Identity — Azure VM, App Service, or Container Instance identity
  4. Azure CLI — Cached az login credentials (local development)

To override with an explicit storage account key, use with_access_key():

#![allow(unused)]
fn main() {
use volley_connector_azure_blob::AzureBlobSourceBuilder;

// DefaultAzureCredential (recommended for production)
let source = AzureBlobSourceBuilder::new("myaccount", "my-queue", "my-container")
    .build(config).await?;

// Explicit access key (override)
let source = AzureBlobSourceBuilder::new("myaccount", "my-queue", "my-container")
    .with_access_key("MYACCESSKEY...")
    .build(config).await?;
}

Google Cloud Storage

GCS is supported as a sink only. The previous Pub/Sub-driven source was removed in v1.1.0 because the google-cloud-pubsub streaming pull API was not stable enough for production use.

From volley-connector-gcp-gcs:

TypeDescription
GcsBlobWriterWrites objects to GCS (for use with BufferedBlobSink)

Enable with feature flag blob-store-gcs on volley-connectors. GCS credentials are discovered through Application Default Credentials (GOOGLE_APPLICATION_CREDENTIALS, GKE Workload Identity, GCE metadata, or gcloud auth application-default login).

Blob Store Sinks

The volley-connectors crate also provides buffered blob store sinks for writing output files to S3, Azure Blob Storage, or GCS. These sinks batch records and flush them as files based on record count or time interval.

Encoders

EncoderOutput FormatFile Extension
ParquetEncoderApache Parquet.parquet
JsonLinesEncoderJSON Lines (newline-delimited JSON).jsonl
CsvEncoderCSV with optional header.csv

Usage

#![allow(unused)]
fn main() {
use volley_connectors::blob_store::{BufferedBlobSink, BufferedBlobSinkConfig};
use volley_connectors::blob_store::encoders::ParquetEncoder;
use volley_connectors::blob_store::aws::S3BlobWriter;

let encoder = Box::new(ParquetEncoder::new());
let config = BufferedBlobSinkConfig::new("my-bucket", "output/")
    .with_max_records(100_000)
    .with_flush_interval(Duration::from_secs(60));
let sink = BufferedBlobSink::new(
    Box::new(S3BlobWriter::new(s3_client)),
    encoder,
    config,
);
}

For Azure Blob Storage, use AzureBlobWriter instead of S3BlobWriter:

#![allow(unused)]
fn main() {
use volley_connectors::blob_store::azure::AzureBlobWriter;

let sink = BufferedBlobSink::new(
    Box::new(AzureBlobWriter::new(blob_service_client, "my-container")),
    encoder,
    config,
);
}

For Google Cloud Storage, use GcsBlobWriter:

#![allow(unused)]
fn main() {
use volley_connectors::blob_store::gcs::GcsBlobWriter;

let sink = BufferedBlobSink::new(
    Box::new(GcsBlobWriter::new(gcs_client)),
    encoder,
    config,
);
}

Enable with feature flags blob-store-aws, blob-store-azure, or blob-store-gcs on volley-connectors.

Supported Source Formats

All blob store sources share the same decoder layer:

FormatFile Extensions
Parquet.parquet
JSON.json, .jsonl
CSV.csv
Avro.avro

Feature Flags

[dependencies]
volley-connectors = {
    git = "https://github.com/volley-streams/volley",
    features = ["blob-store-azure", "blob-store-gcs"]
}

See also: AWS S3 for the S3-specific connector.