Blob Store Connectors
The volley-connector-blob-store crate provides a cloud-agnostic abstraction for reading from object storage. Azure Blob Storage and Google Cloud Storage are built on this shared foundation.
Shared Architecture
All cloud blob store connectors follow the same pattern:
Cloud Queue/Topic Cloud Storage
(SQS / Azure Queue) (S3 / Azure Blob)
| |
v v
NotificationSource BlobReader
| |
+--------+ +-----------+
| |
v v
BlobSource
|
v
Decoder (Parquet / JSON / CSV / Avro)
|
v
RecordBatch --> Pipeline
To add a new cloud backend, implement NotificationSource and BlobReader. The decoder layer is shared.
Azure Blob Storage
From volley-connector-azure-blob:
| Type | Description |
|---|---|
AzureBlobSourceBuilder | Builder with automatic credential discovery |
AzureBlobSource | Configured Azure Blob source |
AzureBlobReader | Reads blobs from Azure Blob Storage |
AzureQueueNotificationSource | Polls Azure Queue Storage for blob notifications |
Enable with feature flag blob-store-azure on volley-connectors.
Authentication
By default, Azure connectors use DefaultAzureCredential which discovers credentials automatically. The credential chain tries, in order:
- Environment variables —
AZURE_TENANT_ID,AZURE_CLIENT_ID,AZURE_CLIENT_SECRET - Workload Identity — Kubernetes pods with Azure Workload Identity federation
- Managed Identity — Azure VM, App Service, or Container Instance identity
- Azure CLI — Cached
az logincredentials (local development)
To override with an explicit storage account key, use with_access_key():
#![allow(unused)]
fn main() {
use volley_connector_azure_blob::AzureBlobSourceBuilder;
// DefaultAzureCredential (recommended for production)
let source = AzureBlobSourceBuilder::new("myaccount", "my-queue", "my-container")
.build(config).await?;
// Explicit access key (override)
let source = AzureBlobSourceBuilder::new("myaccount", "my-queue", "my-container")
.with_access_key("MYACCESSKEY...")
.build(config).await?;
}
Google Cloud Storage
GCS is supported as a sink only. The previous Pub/Sub-driven source was
removed in v1.1.0 because the google-cloud-pubsub streaming pull API was not
stable enough for production use.
From volley-connector-gcp-gcs:
| Type | Description |
|---|---|
GcsBlobWriter | Writes objects to GCS (for use with BufferedBlobSink) |
Enable with feature flag blob-store-gcs on volley-connectors. GCS credentials
are discovered through Application Default Credentials
(GOOGLE_APPLICATION_CREDENTIALS, GKE Workload Identity, GCE metadata, or
gcloud auth application-default login).
Blob Store Sinks
The volley-connectors crate also provides buffered blob store sinks for writing output files to S3, Azure Blob Storage, or GCS. These sinks batch records and flush them as files based on record count or time interval.
Encoders
| Encoder | Output Format | File Extension |
|---|---|---|
ParquetEncoder | Apache Parquet | .parquet |
JsonLinesEncoder | JSON Lines (newline-delimited JSON) | .jsonl |
CsvEncoder | CSV with optional header | .csv |
Usage
#![allow(unused)]
fn main() {
use volley_connectors::blob_store::{BufferedBlobSink, BufferedBlobSinkConfig};
use volley_connectors::blob_store::encoders::ParquetEncoder;
use volley_connectors::blob_store::aws::S3BlobWriter;
let encoder = Box::new(ParquetEncoder::new());
let config = BufferedBlobSinkConfig::new("my-bucket", "output/")
.with_max_records(100_000)
.with_flush_interval(Duration::from_secs(60));
let sink = BufferedBlobSink::new(
Box::new(S3BlobWriter::new(s3_client)),
encoder,
config,
);
}
For Azure Blob Storage, use AzureBlobWriter instead of S3BlobWriter:
#![allow(unused)]
fn main() {
use volley_connectors::blob_store::azure::AzureBlobWriter;
let sink = BufferedBlobSink::new(
Box::new(AzureBlobWriter::new(blob_service_client, "my-container")),
encoder,
config,
);
}
For Google Cloud Storage, use GcsBlobWriter:
#![allow(unused)]
fn main() {
use volley_connectors::blob_store::gcs::GcsBlobWriter;
let sink = BufferedBlobSink::new(
Box::new(GcsBlobWriter::new(gcs_client)),
encoder,
config,
);
}
Enable with feature flags blob-store-aws, blob-store-azure, or blob-store-gcs on volley-connectors.
Supported Source Formats
All blob store sources share the same decoder layer:
| Format | File Extensions |
|---|---|
| Parquet | .parquet |
| JSON | .json, .jsonl |
| CSV | .csv |
| Avro | .avro |
Feature Flags
[dependencies]
volley-connectors = {
git = "https://github.com/volley-streams/volley",
features = ["blob-store-azure", "blob-store-gcs"]
}
See also: AWS S3 for the S3-specific connector.