AWS S3
Ingest files from an S3 bucket as a streaming source. Drop new objects in the bucket, and S3’s event notifications push their keys to an SQS queue; the volley-connector-aws-s3 crate polls that queue, reads each object, decodes it (Parquet, JSON, CSV, or Avro), and emits Arrow record batches. Use this when your upstream system writes files to S3 rather than publishing to Kafka.
Architecture
SQS Queue S3 Bucket
(object notifications) (data files)
| |
v v
SqsNotificationSource S3BlobReader
| |
+----------+ +----------+
| |
v v
BlobSource
|
v
Decoder (Parquet / JSON / CSV / Avro)
|
v
RecordBatch --> Pipeline
- S3 sends object-created notifications to an SQS queue
SqsNotificationSourcepolls the queue for new notificationsS3BlobReaderreads the object data from S3- The shared decoder layer converts the data to Arrow RecordBatch
Key Types
| Type | Description |
|---|---|
S3Source | Configured S3 source (combines SQS + S3 reader) |
S3BlobReader | Reads object data from S3 |
SqsNotificationSource | Polls SQS for S3 event notifications |
Supported Formats
| Format | File Extensions |
|---|---|
| Parquet | .parquet |
| JSON | .json, .jsonl |
| CSV | .csv |
| Avro | .avro |
Format detection is automatic based on file extension, or can be configured explicitly.
Usage
#![allow(unused)]
fn main() {
use volley_connector_aws_s3::S3Source;
use volley_core::prelude::*;
let source = S3Source::builder()
.sqs_queue_url("https://sqs.us-east-1.amazonaws.com/123456789/my-queue")
.build()
.await?;
env.from_source(source)
.filter_expr(col("status").eq(lit("active")))
.to_sink(sink)
.execute("s3-ingest")
.await?;
}
AWS credentials are resolved via the standard AWS credential chain (environment variables, IAM role, credential file).