Apache Iceberg
The volley-connector-iceberg crate provides an Iceberg sink via REST catalog with exactly-once semantics.
Key Types
| Type | Description |
|---|---|
IcebergSink | Sink that writes Parquet data files to an Iceberg table |
IcebergSinkConfig | Configuration (catalog URI, warehouse, namespace, table, storage) |
IcebergStorage | Storage backend enum (S3, Azure, GCS) |
Usage
#![allow(unused)]
fn main() {
use volley_connector_iceberg::{IcebergSink, IcebergSinkConfig, IcebergStorage};
let config = IcebergSinkConfig::new(
"http://polaris:8181", // REST catalog URI
"my-warehouse", // catalog warehouse
vec!["db".into()], // namespace
"events", // table name
IcebergStorage::S3 {
region: "us-east-1".into(),
bucket: "my-bucket".into(),
endpoint: None,
},
);
let sink = IcebergSink::new(config).await?;
env.from_source(source)
.filter_expr(col("status").eq(lit("active")))
.to_sink(sink)
.execute("iceberg-writer")
.await?;
}
Configuration Reference
Required Parameters
| Parameter | Description |
|---|---|
catalog_uri | REST catalog endpoint (e.g., http://polaris:8181) |
catalog_warehouse | Warehouse identifier in the catalog |
namespace | Iceberg namespace as a list of strings (e.g., ["db", "schema"]) |
table_name | Target table name |
storage | Storage backend — see Storage Backends |
Optional Parameters
#![allow(unused)]
fn main() {
let config = IcebergSinkConfig::new(catalog_uri, warehouse, namespace, table, storage)
.with_credential("client-id:client-secret") // OAuth2 credential for Polaris
.with_max_records(10_000) // buffer size before flush (default: 10,000)
.with_flush_interval(Duration::from_secs(30)); // time-based flush trigger
}
| Method | Default | Description |
|---|---|---|
with_credential() | None | OAuth2 credential for authenticated catalogs (Polaris, Tabular) |
with_max_records() | 10,000 | Number of records buffered before writing a Parquet data file |
with_flush_interval() | None | Time-based flush trigger; writes a file even if max_records hasn’t been reached |
Storage Backends
S3
#![allow(unused)]
fn main() {
IcebergStorage::S3 {
region: "us-east-1".into(),
bucket: "my-iceberg-data".into(),
endpoint: None, // use None for AWS, Some("http://localhost:9000") for MinIO
}
}
AWS credentials are resolved via the default credential chain (environment variables, ~/.aws/credentials, instance profile).
For local development with MinIO:
#![allow(unused)]
fn main() {
IcebergStorage::S3 {
region: "us-east-1".into(),
bucket: "test-bucket".into(),
endpoint: Some("http://localhost:9000".into()),
}
}
Azure Blob Storage
#![allow(unused)]
fn main() {
IcebergStorage::Azure {
storage_account: "mystorageaccount".into(),
container: "iceberg-data".into(),
access_key: None, // uses DefaultAzureCredential when None
}
}
When access_key is None, authentication uses Azure’s DefaultAzureCredential chain (environment variables, managed identity, Azure CLI).
Google Cloud Storage
#![allow(unused)]
fn main() {
IcebergStorage::Gcs {
project_id: "my-gcp-project".into(),
bucket: "iceberg-data".into(),
}
}
GCS credentials are resolved via Application Default Credentials (GOOGLE_APPLICATION_CREDENTIALS or metadata server).
Exactly-Once Semantics
The Iceberg sink achieves exactly-once via snapshot commits with epoch tracking:
- Records are buffered in memory and written as Parquet data files
- On checkpoint barrier, a new Iceberg snapshot is committed via the REST catalog
- The checkpoint epoch is stored in the snapshot’s summary metadata
- On recovery, the sink reads the latest snapshot’s epoch and skips already-committed epochs
This means replayed records after a failure never produce duplicate data in the table.
REST Catalog Compatibility
Volley uses the Iceberg REST catalog spec for table management. Compatible catalogs:
| Catalog | Notes |
|---|---|
| Apache Polaris | Recommended. Use with_credential() for OAuth2 auth |
| Tabular | Hosted REST catalog. Use with_credential() |
| AWS Glue (via REST adapter) | Requires a REST adapter in front of Glue |
| Gravitino | REST-compatible catalog |
Troubleshooting
“Connection refused” to catalog URI
- Verify the catalog is running and reachable from your application
- Check firewall rules and network policies (especially in Kubernetes)
“Unauthorized” or “Forbidden” from catalog
- Ensure
with_credential()is set with a valid OAuth2 client credential - For Polaris: verify the principal has
TABLE_WRITE_DATAprivilege on the target table
“Table not found”
- Iceberg tables must be created in the catalog before the sink writes to them
- Verify
namespaceandtable_namematch the catalog’s table identifier exactly
Small files accumulating
- Increase
with_max_records()to buffer more records per file - Set
with_flush_interval()to a longer duration - Consider running Iceberg’s
rewrite_data_filesprocedure periodically