1use arrow_array::RecordBatch;
2use async_trait::async_trait;
3use catalog_common::{
4 CatalogProvider, CommitOutcome, FieldMetadata, PartitionField, PartitionSpec,
5 PartitionTransform, SchemaSpec, SecurityContext, TableIdent, TableSpec, WriteReceipt,
6};
7use deltalake::protocol::SaveMode;
8use snafu::Snafu;
9use std::sync::Arc;
10use url::Url;
11
12#[derive(Debug, Snafu)]
13pub enum DeltaError {
14 #[snafu(display("Delta error: {source}"))]
15 Delta {
16 source: deltalake::errors::DeltaTableError,
17 },
18 #[snafu(display("Serialization error: {source}"))]
19 Serialization { source: serde_json::Error },
20 #[snafu(display("Table path invalid: {path}"))]
21 InvalidPath { path: String },
22 #[snafu(display("URL parse error: {source}"))]
23 UrlParse { source: url::ParseError },
24}
25
26pub struct DeltaCatalogProvider {
27 pub base_uri: String,
29}
30
31impl DeltaCatalogProvider {
32 pub fn new(uri: String) -> Self {
33 Self { base_uri: uri }
34 }
35
36 fn resolve_table_path(&self, table: &TableIdent) -> String {
37 format!(
38 "{}/{}/{}",
39 self.base_uri,
40 table.namespace.join("/"),
41 table.name
42 )
43 }
44}
45
46#[async_trait]
47impl CatalogProvider for DeltaCatalogProvider {
48 type Error = DeltaError;
49
50 async fn get_table_spec(&self, table: &TableIdent) -> Result<TableSpec, Self::Error> {
51 let path = self.resolve_table_path(table);
52 let url = Url::parse(&path).map_err(|e| DeltaError::UrlParse { source: e })?;
53 let table_obj = deltalake::open_table(url)
54 .await
55 .map_err(|e| DeltaError::Delta { source: e })?;
56
57 let snapshot = table_obj
58 .snapshot()
59 .map_err(|e| DeltaError::Delta { source: e })?;
60 let metadata = snapshot.metadata();
61
62 let fields = snapshot
64 .schema()
65 .fields()
66 .map(|f| FieldMetadata {
67 id: 0,
68 name: f.name().to_string(),
69 data_type: arrow_schema::DataType::Utf8,
70 nullable: f.is_nullable(),
71 doc: None,
72 })
73 .collect();
74
75 let version = table_obj.version().unwrap_or(0);
76 let schema_spec = SchemaSpec {
77 schema_version: version as u64,
78 timestamp_ms: metadata.created_time().unwrap_or(0),
79 fields,
80 };
81
82 let partition_fields = metadata
83 .partition_columns()
84 .iter()
85 .map(|col| PartitionField {
86 source_id: 0,
87 name: col.clone(),
88 transform: PartitionTransform::Identity,
89 })
90 .collect();
91
92 Ok(TableSpec {
93 schema: Arc::new(arrow_schema::Schema::empty()),
94 schema_spec,
95 partition_spec: PartitionSpec {
96 spec_id: 0,
97 fields: partition_fields,
98 },
99 properties: metadata
100 .configuration()
101 .iter()
102 .map(|(k, v)| (k.clone(), v.clone()))
103 .collect(),
104 current_snapshot_id: Some(version as i64),
105 })
106 }
107
108 async fn write_batch(
109 &self,
110 table: &TableIdent,
111 batch: RecordBatch,
112 ) -> Result<WriteReceipt, Self::Error> {
113 let path = self.resolve_table_path(table);
114 let url = Url::parse(&path).map_err(|e| DeltaError::UrlParse { source: e })?;
115
116 let table_obj = deltalake::open_table(url)
117 .await
118 .map_err(|e| DeltaError::Delta { source: e })?;
119
120 let row_count = batch.num_rows() as u64;
121 let byte_count = batch.get_array_memory_size() as u64;
122
123 table_obj
124 .write(vec![batch])
125 .with_save_mode(SaveMode::Append)
126 .await
127 .map_err(|e| DeltaError::Delta { source: e })?;
128
129 Ok(WriteReceipt {
130 metadata: serde_json::json!([]), row_count,
132 byte_count,
133 })
134 }
135
136 async fn commit(
137 &self,
138 table: &TableIdent,
139 _receipts: Vec<WriteReceipt>,
140 ) -> Result<CommitOutcome, Self::Error> {
141 let path = self.resolve_table_path(table);
142 let url = Url::parse(&path).map_err(|e| DeltaError::UrlParse { source: e })?;
143 let table_obj = deltalake::open_table(url)
144 .await
145 .map_err(|e| DeltaError::Delta { source: e })?;
146
147 Ok(CommitOutcome {
148 snapshot_id: table_obj.version().unwrap_or(0) as i64,
149 committed_at: time::OffsetDateTime::now_utc(),
150 retried: 0,
151 })
152 }
153
154 async fn get_security_context(
155 &self,
156 _table: &TableIdent,
157 ) -> Result<SecurityContext, Self::Error> {
158 Err(DeltaError::Delta {
159 source: deltalake::errors::DeltaTableError::Generic("Auth logic pending".to_string()),
160 })
161 }
162}