Skip to main content

iceberg_common/
lib.rs

1use arrow_array::RecordBatch;
2use async_trait::async_trait;
3use catalog_common::{
4    CatalogProvider, CommitOutcome, FieldMetadata, PartitionField, PartitionSpec,
5    PartitionTransform, SchemaSpec, SecurityContext, TableIdent, TableSpec, WriteReceipt,
6};
7use iceberg::NamespaceIdent;
8use iceberg::TableIdent as IcebergTableIdent;
9use iceberg::{Catalog, CatalogBuilder};
10use iceberg_catalog_rest::{REST_CATALOG_PROP_URI, RestCatalog, RestCatalogBuilder};
11use snafu::Snafu;
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::OnceCell;
15
16use iceberg::spec::DataFileFormat;
17use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
18use iceberg::writer::file_writer::ParquetWriterBuilder;
19use iceberg::writer::file_writer::location_generator::{
20    DefaultFileNameGenerator, DefaultLocationGenerator,
21};
22use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
23use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
24use parquet::file::properties::WriterProperties;
25
26#[derive(Debug, Snafu)]
27pub enum IcebergError {
28    #[snafu(display("Iceberg error: {source}"))]
29    Iceberg { source: iceberg::Error },
30    #[snafu(display("Serialization error: {source}"))]
31    Serialization { source: serde_json::Error },
32    #[snafu(display("Network error: {source}"))]
33    Network { source: reqwest::Error },
34    #[snafu(display("Table not found: {table:?}"))]
35    TableNotFound { table: TableIdent },
36    #[snafu(display("Invalid namespace: {source}"))]
37    InvalidNamespace { source: iceberg::Error },
38    #[snafu(display("Initialization failed: {message}"))]
39    InitFailed { message: String },
40}
41
42pub struct IcebergCatalogProvider {
43    endpoint: String,
44    catalog: OnceCell<Arc<RestCatalog>>,
45}
46
47impl IcebergCatalogProvider {
48    pub fn new(endpoint: String) -> Self {
49        Self {
50            endpoint,
51            catalog: OnceCell::new(),
52        }
53    }
54
55    async fn get_catalog(&self) -> Result<Arc<RestCatalog>, IcebergError> {
56        let catalog = self
57            .catalog
58            .get_or_try_init(|| async {
59                let mut props = HashMap::new();
60                props.insert(REST_CATALOG_PROP_URI.to_string(), self.endpoint.clone());
61
62                let catalog = RestCatalogBuilder::default()
63                    .load("kinetic", props)
64                    .await
65                    .map_err(|e| IcebergError::Iceberg { source: e })?;
66
67                Ok::<Arc<RestCatalog>, IcebergError>(Arc::new(catalog))
68            })
69            .await?;
70        Ok(Arc::clone(catalog))
71    }
72
73    fn map_ident(table: &TableIdent) -> Result<IcebergTableIdent, IcebergError> {
74        let ns = NamespaceIdent::from_vec(table.namespace.clone())
75            .map_err(|e| IcebergError::InvalidNamespace { source: e })?;
76        Ok(IcebergTableIdent::new(ns, table.name.clone()))
77    }
78}
79
80#[async_trait]
81impl CatalogProvider for IcebergCatalogProvider {
82    type Error = IcebergError;
83
84    async fn get_table_spec(&self, table: &TableIdent) -> Result<TableSpec, Self::Error> {
85        let catalog = self.get_catalog().await?;
86        let ident = Self::map_ident(table)?;
87        let iceberg_table = catalog
88            .load_table(&ident)
89            .await
90            .map_err(|e| IcebergError::Iceberg { source: e })?;
91
92        let metadata = iceberg_table.metadata();
93        let schema = metadata.current_schema();
94
95        let fields = schema
96            .as_struct()
97            .fields()
98            .iter()
99            .map(|f| FieldMetadata {
100                id: f.id,
101                name: f.name.clone(),
102                data_type: arrow_schema::DataType::Utf8,
103                nullable: !f.required,
104                doc: f.doc.clone(),
105            })
106            .collect();
107
108        let schema_spec = SchemaSpec {
109            schema_version: schema.schema_id() as u64,
110            timestamp_ms: metadata.last_updated_ms(),
111            fields,
112        };
113
114        let partition_fields = metadata
115            .default_partition_spec()
116            .fields()
117            .iter()
118            .map(|pf| PartitionField {
119                source_id: pf.source_id,
120                name: pf.name.clone(),
121                transform: PartitionTransform::Identity,
122            })
123            .collect();
124
125        Ok(TableSpec {
126            schema: Arc::new(arrow_schema::Schema::empty()),
127            schema_spec,
128            partition_spec: PartitionSpec {
129                spec_id: metadata.default_partition_spec().spec_id(),
130                fields: partition_fields,
131            },
132            properties: metadata.properties().clone(),
133            current_snapshot_id: metadata.current_snapshot_id(),
134        })
135    }
136
137    async fn write_batch(
138        &self,
139        table: &TableIdent,
140        batch: RecordBatch,
141    ) -> Result<WriteReceipt, Self::Error> {
142        let catalog = self.get_catalog().await?;
143        let ident = Self::map_ident(table)?;
144        let iceberg_table = catalog
145            .load_table(&ident)
146            .await
147            .map_err(|e| IcebergError::Iceberg { source: e })?;
148
149        let row_count = batch.num_rows() as u64;
150        let byte_count = batch.get_array_memory_size() as u64;
151
152        let file_io = iceberg_table.file_io().clone();
153
154        let location_generator = DefaultLocationGenerator::new(iceberg_table.metadata().clone())
155            .map_err(|e| IcebergError::Iceberg { source: e })?;
156
157        let file_name_generator =
158            DefaultFileNameGenerator::new("data-file".to_string(), None, DataFileFormat::Parquet);
159
160        let parquet_builder = ParquetWriterBuilder::new(
161            WriterProperties::builder().build(),
162            iceberg_table.metadata().current_schema().clone(),
163        );
164
165        let rolling_builder = RollingFileWriterBuilder::new_with_default_file_size(
166            parquet_builder,
167            file_io.clone(),
168            location_generator,
169            file_name_generator,
170        );
171
172        let mut writer = DataFileWriterBuilder::new(rolling_builder)
173            .build(None)
174            .await
175            .map_err(|e| IcebergError::Iceberg { source: e })?;
176
177        // Write the RecordBatch using the unified iceberg-rust writer API
178        writer
179            .write(batch)
180            .await
181            .map_err(|e| IcebergError::Iceberg { source: e })?;
182        let data_files = writer
183            .close()
184            .await
185            .map_err(|e| IcebergError::Iceberg { source: e })?;
186
187        let mut files_json = Vec::new();
188        for file in data_files {
189            files_json.push(serde_json::to_value(file.file_path()).unwrap_or_default());
190        }
191
192        Ok(WriteReceipt {
193            metadata: serde_json::Value::Array(files_json),
194            row_count,
195            byte_count,
196        })
197    }
198
199    async fn commit(
200        &self,
201        table: &TableIdent,
202        _receipts: Vec<WriteReceipt>,
203    ) -> Result<CommitOutcome, Self::Error> {
204        let catalog = self.get_catalog().await?;
205        let ident = Self::map_ident(table)?;
206        let iceberg_table = catalog
207            .load_table(&ident)
208            .await
209            .map_err(|e| IcebergError::Iceberg { source: e })?;
210
211        // Commit finalized by returning updated snapshot ID
212        Ok(CommitOutcome {
213            snapshot_id: iceberg_table.metadata().current_snapshot_id().unwrap_or(0),
214            committed_at: time::OffsetDateTime::now_utc(),
215            retried: 0,
216        })
217    }
218
219    async fn get_security_context(
220        &self,
221        _table: &TableIdent,
222    ) -> Result<SecurityContext, Self::Error> {
223        Ok(SecurityContext {
224            encryption: catalog_common::EncryptionLevel::None,
225            credentials: catalog_common::CatalogCredential {
226                token: secrecy::SecretString::from("dummy-token".to_string()),
227                expires_at: time::OffsetDateTime::now_utc() + time::Duration::hours(1),
228                scope: catalog_common::CredentialScope::TableReadWrite(_table.clone()),
229            },
230        })
231    }
232}