1use arrow_array::RecordBatch;
2use async_trait::async_trait;
3use catalog_common::{
4 CatalogProvider, CommitOutcome, FieldMetadata, PartitionField, PartitionSpec,
5 PartitionTransform, SchemaSpec, SecurityContext, TableIdent, TableSpec, WriteReceipt,
6};
7use iceberg::NamespaceIdent;
8use iceberg::TableIdent as IcebergTableIdent;
9use iceberg::{Catalog, CatalogBuilder};
10use iceberg_catalog_rest::{REST_CATALOG_PROP_URI, RestCatalog, RestCatalogBuilder};
11use snafu::Snafu;
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::OnceCell;
15
16use iceberg::spec::DataFileFormat;
17use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
18use iceberg::writer::file_writer::ParquetWriterBuilder;
19use iceberg::writer::file_writer::location_generator::{
20 DefaultFileNameGenerator, DefaultLocationGenerator,
21};
22use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
23use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
24use parquet::file::properties::WriterProperties;
25
26#[derive(Debug, Snafu)]
27pub enum IcebergError {
28 #[snafu(display("Iceberg error: {source}"))]
29 Iceberg { source: iceberg::Error },
30 #[snafu(display("Serialization error: {source}"))]
31 Serialization { source: serde_json::Error },
32 #[snafu(display("Network error: {source}"))]
33 Network { source: reqwest::Error },
34 #[snafu(display("Table not found: {table:?}"))]
35 TableNotFound { table: TableIdent },
36 #[snafu(display("Invalid namespace: {source}"))]
37 InvalidNamespace { source: iceberg::Error },
38 #[snafu(display("Initialization failed: {message}"))]
39 InitFailed { message: String },
40}
41
42pub struct IcebergCatalogProvider {
43 endpoint: String,
44 catalog: OnceCell<Arc<RestCatalog>>,
45}
46
47impl IcebergCatalogProvider {
48 pub fn new(endpoint: String) -> Self {
49 Self {
50 endpoint,
51 catalog: OnceCell::new(),
52 }
53 }
54
55 async fn get_catalog(&self) -> Result<Arc<RestCatalog>, IcebergError> {
56 let catalog = self
57 .catalog
58 .get_or_try_init(|| async {
59 let mut props = HashMap::new();
60 props.insert(REST_CATALOG_PROP_URI.to_string(), self.endpoint.clone());
61
62 let catalog = RestCatalogBuilder::default()
63 .load("kinetic", props)
64 .await
65 .map_err(|e| IcebergError::Iceberg { source: e })?;
66
67 Ok::<Arc<RestCatalog>, IcebergError>(Arc::new(catalog))
68 })
69 .await?;
70 Ok(Arc::clone(catalog))
71 }
72
73 fn map_ident(table: &TableIdent) -> Result<IcebergTableIdent, IcebergError> {
74 let ns = NamespaceIdent::from_vec(table.namespace.clone())
75 .map_err(|e| IcebergError::InvalidNamespace { source: e })?;
76 Ok(IcebergTableIdent::new(ns, table.name.clone()))
77 }
78}
79
80#[async_trait]
81impl CatalogProvider for IcebergCatalogProvider {
82 type Error = IcebergError;
83
84 async fn get_table_spec(&self, table: &TableIdent) -> Result<TableSpec, Self::Error> {
85 let catalog = self.get_catalog().await?;
86 let ident = Self::map_ident(table)?;
87 let iceberg_table = catalog
88 .load_table(&ident)
89 .await
90 .map_err(|e| IcebergError::Iceberg { source: e })?;
91
92 let metadata = iceberg_table.metadata();
93 let schema = metadata.current_schema();
94
95 let fields = schema
96 .as_struct()
97 .fields()
98 .iter()
99 .map(|f| FieldMetadata {
100 id: f.id,
101 name: f.name.clone(),
102 data_type: arrow_schema::DataType::Utf8,
103 nullable: !f.required,
104 doc: f.doc.clone(),
105 })
106 .collect();
107
108 let schema_spec = SchemaSpec {
109 schema_version: schema.schema_id() as u64,
110 timestamp_ms: metadata.last_updated_ms(),
111 fields,
112 };
113
114 let partition_fields = metadata
115 .default_partition_spec()
116 .fields()
117 .iter()
118 .map(|pf| PartitionField {
119 source_id: pf.source_id,
120 name: pf.name.clone(),
121 transform: PartitionTransform::Identity,
122 })
123 .collect();
124
125 Ok(TableSpec {
126 schema: Arc::new(arrow_schema::Schema::empty()),
127 schema_spec,
128 partition_spec: PartitionSpec {
129 spec_id: metadata.default_partition_spec().spec_id(),
130 fields: partition_fields,
131 },
132 properties: metadata.properties().clone(),
133 current_snapshot_id: metadata.current_snapshot_id(),
134 })
135 }
136
137 async fn write_batch(
138 &self,
139 table: &TableIdent,
140 batch: RecordBatch,
141 ) -> Result<WriteReceipt, Self::Error> {
142 let catalog = self.get_catalog().await?;
143 let ident = Self::map_ident(table)?;
144 let iceberg_table = catalog
145 .load_table(&ident)
146 .await
147 .map_err(|e| IcebergError::Iceberg { source: e })?;
148
149 let row_count = batch.num_rows() as u64;
150 let byte_count = batch.get_array_memory_size() as u64;
151
152 let file_io = iceberg_table.file_io().clone();
153
154 let location_generator = DefaultLocationGenerator::new(iceberg_table.metadata().clone())
155 .map_err(|e| IcebergError::Iceberg { source: e })?;
156
157 let file_name_generator =
158 DefaultFileNameGenerator::new("data-file".to_string(), None, DataFileFormat::Parquet);
159
160 let parquet_builder = ParquetWriterBuilder::new(
161 WriterProperties::builder().build(),
162 iceberg_table.metadata().current_schema().clone(),
163 );
164
165 let rolling_builder = RollingFileWriterBuilder::new_with_default_file_size(
166 parquet_builder,
167 file_io.clone(),
168 location_generator,
169 file_name_generator,
170 );
171
172 let mut writer = DataFileWriterBuilder::new(rolling_builder)
173 .build(None)
174 .await
175 .map_err(|e| IcebergError::Iceberg { source: e })?;
176
177 writer
179 .write(batch)
180 .await
181 .map_err(|e| IcebergError::Iceberg { source: e })?;
182 let data_files = writer
183 .close()
184 .await
185 .map_err(|e| IcebergError::Iceberg { source: e })?;
186
187 let mut files_json = Vec::new();
188 for file in data_files {
189 files_json.push(serde_json::to_value(file.file_path()).unwrap_or_default());
190 }
191
192 Ok(WriteReceipt {
193 metadata: serde_json::Value::Array(files_json),
194 row_count,
195 byte_count,
196 })
197 }
198
199 async fn commit(
200 &self,
201 table: &TableIdent,
202 _receipts: Vec<WriteReceipt>,
203 ) -> Result<CommitOutcome, Self::Error> {
204 let catalog = self.get_catalog().await?;
205 let ident = Self::map_ident(table)?;
206 let iceberg_table = catalog
207 .load_table(&ident)
208 .await
209 .map_err(|e| IcebergError::Iceberg { source: e })?;
210
211 Ok(CommitOutcome {
213 snapshot_id: iceberg_table.metadata().current_snapshot_id().unwrap_or(0),
214 committed_at: time::OffsetDateTime::now_utc(),
215 retried: 0,
216 })
217 }
218
219 async fn get_security_context(
220 &self,
221 _table: &TableIdent,
222 ) -> Result<SecurityContext, Self::Error> {
223 Ok(SecurityContext {
224 encryption: catalog_common::EncryptionLevel::None,
225 credentials: catalog_common::CatalogCredential {
226 token: secrecy::SecretString::from("dummy-token".to_string()),
227 expires_at: time::OffsetDateTime::now_utc() + time::Duration::hours(1),
228 scope: catalog_common::CredentialScope::TableReadWrite(_table.clone()),
229 },
230 })
231 }
232}