Blob Store Connectors
The volley-connector-blob-store crate provides a cloud-agnostic abstraction for reading from object storage. Azure Blob Storage and Google Cloud Storage are built on this shared foundation.
Shared Architecture
All cloud blob store connectors follow the same pattern:
Cloud Queue/Topic Cloud Storage
(SQS / Azure Queue / (S3 / Azure Blob /
Pub/Sub) GCS)
| |
v v
NotificationSource BlobReader
| |
+--------+ +-----------+
| |
v v
BlobSource
|
v
Decoder (Parquet / JSON / CSV / Avro)
|
v
RecordBatch --> Pipeline
To add a new cloud backend, implement NotificationSource and BlobReader. The decoder layer is shared.
Azure Blob Storage
From volley-connector-azure-blob:
| Type | Description |
|---|---|
AzureBlobSourceBuilder | Builder with automatic credential discovery |
AzureBlobSource | Configured Azure Blob source |
AzureBlobReader | Reads blobs from Azure Blob Storage |
AzureQueueNotificationSource | Polls Azure Queue Storage for blob notifications |
Enable with feature flag blob-store-azure on volley-connectors.
Authentication
By default, Azure connectors use DefaultAzureCredential which discovers credentials automatically. The credential chain tries, in order:
- Environment variables —
AZURE_TENANT_ID,AZURE_CLIENT_ID,AZURE_CLIENT_SECRET - Workload Identity — Kubernetes pods with Azure Workload Identity federation
- Managed Identity — Azure VM, App Service, or Container Instance identity
- Azure CLI — Cached
az logincredentials (local development)
To override with an explicit storage account key, use with_access_key():
#![allow(unused)]
fn main() {
use volley_connector_azure_blob::AzureBlobSourceBuilder;
// DefaultAzureCredential (recommended for production)
let source = AzureBlobSourceBuilder::new("myaccount", "my-queue", "my-container")
.build(config).await?;
// Explicit access key (override)
let source = AzureBlobSourceBuilder::new("myaccount", "my-queue", "my-container")
.with_access_key("MYACCESSKEY...")
.build(config).await?;
}
Google Cloud Storage
From volley-connector-gcp-gcs:
| Type | Description |
|---|---|
GcsSource | Configured GCS source |
GcsBlobReader | Reads objects from GCS |
PubSubNotificationSource | Polls Pub/Sub for GCS object notifications |
Enable with feature flag blob-store-gcs on volley-connectors.
Authentication
GCS connectors use Application Default Credentials, which discovers credentials automatically. The credential chain tries, in order:
- Environment variable —
GOOGLE_APPLICATION_CREDENTIALSpointing to a service account key file - Workload Identity — Kubernetes pods with GKE Workload Identity federation
- Compute Engine metadata — GCE VM, Cloud Run, or GKE node identity
- gcloud CLI — Cached
gcloud auth application-default logincredentials (local development)
Usage
#![allow(unused)]
fn main() {
use volley_connector_gcp_gcs::GcsSource;
let source = GcsSource::new("my-gcs-bucket", "my-pubsub-subscription")
.build(config).await?;
}
Note: The GCS Pub/Sub notification integration is currently experimental. The
PubSubNotificationSourcedepends on thegoogle-cloud-pubsubcrate’s streaming pull API, which is not yet fully stabilized. The blob reader and event parser are functional — the gap is in notification polling reliability under sustained load.
Blob Store Sinks
The volley-connectors crate also provides buffered blob store sinks for writing output files to S3 or Azure Blob Storage. These sinks batch records and flush them as files based on record count or time interval.
Encoders
| Encoder | Output Format | File Extension |
|---|---|---|
ParquetEncoder | Apache Parquet | .parquet |
JsonLinesEncoder | JSON Lines (newline-delimited JSON) | .jsonl |
CsvEncoder | CSV with optional header | .csv |
Usage
#![allow(unused)]
fn main() {
use volley_connectors::blob_store::{BufferedBlobSink, BufferedBlobSinkConfig};
use volley_connectors::blob_store::encoders::ParquetEncoder;
use volley_connectors::blob_store::aws::S3BlobWriter;
let encoder = Box::new(ParquetEncoder::new());
let config = BufferedBlobSinkConfig::new("my-bucket", "output/")
.with_max_records(100_000)
.with_flush_interval(Duration::from_secs(60));
let sink = BufferedBlobSink::new(
Box::new(S3BlobWriter::new(s3_client)),
encoder,
config,
);
}
For Azure Blob Storage, use AzureBlobWriter instead of S3BlobWriter:
#![allow(unused)]
fn main() {
use volley_connectors::blob_store::azure::AzureBlobWriter;
let sink = BufferedBlobSink::new(
Box::new(AzureBlobWriter::new(blob_service_client, "my-container")),
encoder,
config,
);
}
Enable with feature flags blob-store-aws or blob-store-azure on volley-connectors.
Supported Source Formats
All blob store sources share the same decoder layer:
| Format | File Extensions |
|---|---|
| Parquet | .parquet |
| JSON | .json, .jsonl |
| CSV | .csv |
| Avro | .avro |
Feature Flags
[dependencies]
volley-connectors = {
git = "https://github.com/volley-streams/volley",
features = ["blob-store-azure", "blob-store-gcs"]
}
See also: AWS S3 for the S3-specific connector.