Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

AWS S3

Ingest files from an S3 bucket as a streaming source. Drop new objects in the bucket, and S3’s event notifications push their keys to an SQS queue; the volley-connector-aws-s3 crate polls that queue, reads each object, decodes it (Parquet, JSON, CSV, or Avro), and emits Arrow record batches. Use this when your upstream system writes files to S3 rather than publishing to Kafka.

Architecture

SQS Queue                    S3 Bucket
(object notifications)       (data files)
      |                          |
      v                          v
SqsNotificationSource        S3BlobReader
      |                          |
      +----------+    +----------+
                 |    |
                 v    v
            BlobSource
                 |
                 v
            Decoder (Parquet / JSON / CSV / Avro)
                 |
                 v
            RecordBatch --> Pipeline
  1. S3 sends object-created notifications to an SQS queue
  2. SqsNotificationSource polls the queue for new notifications
  3. S3BlobReader reads the object data from S3
  4. The shared decoder layer converts the data to Arrow RecordBatch

Key Types

TypeDescription
S3SourceConfigured S3 source (combines SQS + S3 reader)
S3BlobReaderReads object data from S3
SqsNotificationSourcePolls SQS for S3 event notifications

Supported Formats

FormatFile Extensions
Parquet.parquet
JSON.json, .jsonl
CSV.csv
Avro.avro

Format detection is automatic based on file extension, or can be configured explicitly.

Usage

#![allow(unused)]
fn main() {
use volley_connector_aws_s3::S3Source;
use volley_core::prelude::*;

let source = S3Source::builder()
    .sqs_queue_url("https://sqs.us-east-1.amazonaws.com/123456789/my-queue")
    .build()
    .await?;

env.from_source(source)
    .filter_expr(col("status").eq(lit("active")))
    .to_sink(sink)
    .execute("s3-ingest")
    .await?;
}

AWS credentials are resolved via the standard AWS credential chain (environment variables, IAM role, credential file).