Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Apache Iceberg

The volley-connector-iceberg crate provides an Iceberg sink via REST catalog with exactly-once semantics.

Key Types

TypeDescription
IcebergSinkSink that writes Parquet data files to an Iceberg table
IcebergSinkConfigConfiguration (catalog URI, warehouse, namespace, table, storage)
IcebergStorageStorage backend enum (S3, Azure, GCS)

Usage

#![allow(unused)]
fn main() {
use volley_connector_iceberg::{IcebergSink, IcebergSinkConfig, IcebergStorage};

let config = IcebergSinkConfig::new(
    "http://polaris:8181",           // REST catalog URI
    "my-warehouse",                  // catalog warehouse
    vec!["db".into()],               // namespace
    "events",                        // table name
    IcebergStorage::S3 {
        region: "us-east-1".into(),
        bucket: "my-bucket".into(),
        endpoint: None,
    },
);

let sink = IcebergSink::new(config).await?;

env.from_source(source)
    .filter_expr(col("status").eq(lit("active")))
    .to_sink(sink)
    .execute("iceberg-writer")
    .await?;
}

Configuration Reference

Required Parameters

ParameterDescription
catalog_uriREST catalog endpoint (e.g., http://polaris:8181)
catalog_warehouseWarehouse identifier in the catalog
namespaceIceberg namespace as a list of strings (e.g., ["db", "schema"])
table_nameTarget table name
storageStorage backend — see Storage Backends

Optional Parameters

#![allow(unused)]
fn main() {
let config = IcebergSinkConfig::new(catalog_uri, warehouse, namespace, table, storage)
    .with_credential("client-id:client-secret")  // OAuth2 credential for Polaris
    .with_max_records(10_000)                     // buffer size before flush (default: 10,000)
    .with_flush_interval(Duration::from_secs(30)); // time-based flush trigger
}
MethodDefaultDescription
with_credential()NoneOAuth2 credential for authenticated catalogs (Polaris, Tabular)
with_max_records()10,000Number of records buffered before writing a Parquet data file
with_flush_interval()NoneTime-based flush trigger; writes a file even if max_records hasn’t been reached

Storage Backends

S3

#![allow(unused)]
fn main() {
IcebergStorage::S3 {
    region: "us-east-1".into(),
    bucket: "my-iceberg-data".into(),
    endpoint: None,  // use None for AWS, Some("http://localhost:9000") for MinIO
}
}

AWS credentials are resolved via the default credential chain (environment variables, ~/.aws/credentials, instance profile).

For local development with MinIO:

#![allow(unused)]
fn main() {
IcebergStorage::S3 {
    region: "us-east-1".into(),
    bucket: "test-bucket".into(),
    endpoint: Some("http://localhost:9000".into()),
}
}

Azure Blob Storage

#![allow(unused)]
fn main() {
IcebergStorage::Azure {
    storage_account: "mystorageaccount".into(),
    container: "iceberg-data".into(),
    access_key: None,  // uses DefaultAzureCredential when None
}
}

When access_key is None, authentication uses Azure’s DefaultAzureCredential chain (environment variables, managed identity, Azure CLI).

Google Cloud Storage

#![allow(unused)]
fn main() {
IcebergStorage::Gcs {
    project_id: "my-gcp-project".into(),
    bucket: "iceberg-data".into(),
}
}

GCS credentials are resolved via Application Default Credentials (GOOGLE_APPLICATION_CREDENTIALS or metadata server).

Exactly-Once Semantics

The Iceberg sink achieves exactly-once via snapshot commits with epoch tracking:

  1. Records are buffered in memory and written as Parquet data files
  2. On checkpoint barrier, a new Iceberg snapshot is committed via the REST catalog
  3. The checkpoint epoch is stored in the snapshot’s summary metadata
  4. On recovery, the sink reads the latest snapshot’s epoch and skips already-committed epochs

This means replayed records after a failure never produce duplicate data in the table.

REST Catalog Compatibility

Volley uses the Iceberg REST catalog spec for table management. Compatible catalogs:

CatalogNotes
Apache PolarisRecommended. Use with_credential() for OAuth2 auth
TabularHosted REST catalog. Use with_credential()
AWS Glue (via REST adapter)Requires a REST adapter in front of Glue
GravitinoREST-compatible catalog

Troubleshooting

“Connection refused” to catalog URI

  • Verify the catalog is running and reachable from your application
  • Check firewall rules and network policies (especially in Kubernetes)

“Unauthorized” or “Forbidden” from catalog

  • Ensure with_credential() is set with a valid OAuth2 client credential
  • For Polaris: verify the principal has TABLE_WRITE_DATA privilege on the target table

“Table not found”

  • Iceberg tables must be created in the catalog before the sink writes to them
  • Verify namespace and table_name match the catalog’s table identifier exactly

Small files accumulating

  • Increase with_max_records() to buffer more records per file
  • Set with_flush_interval() to a longer duration
  • Consider running Iceberg’s rewrite_data_files procedure periodically