Skip to main content

delta_common/
lib.rs

1use arrow_array::RecordBatch;
2use async_trait::async_trait;
3use catalog_common::{
4    CatalogProvider, CommitOutcome, FieldMetadata, PartitionField, PartitionSpec,
5    PartitionTransform, SchemaSpec, SecurityContext, TableIdent, TableSpec, WriteReceipt,
6};
7use deltalake::protocol::SaveMode;
8use snafu::Snafu;
9use std::sync::Arc;
10use url::Url;
11
12#[derive(Debug, Snafu)]
13pub enum DeltaError {
14    #[snafu(display("Delta error: {source}"))]
15    Delta {
16        source: deltalake::errors::DeltaTableError,
17    },
18    #[snafu(display("Serialization error: {source}"))]
19    Serialization { source: serde_json::Error },
20    #[snafu(display("Table path invalid: {path}"))]
21    InvalidPath { path: String },
22    #[snafu(display("URL parse error: {source}"))]
23    UrlParse { source: url::ParseError },
24}
25
26pub struct DeltaCatalogProvider {
27    /// Base URI for the Delta table storage
28    pub base_uri: String,
29}
30
31impl DeltaCatalogProvider {
32    pub fn new(uri: String) -> Self {
33        Self { base_uri: uri }
34    }
35
36    fn resolve_table_path(&self, table: &TableIdent) -> String {
37        format!(
38            "{}/{}/{}",
39            self.base_uri,
40            table.namespace.join("/"),
41            table.name
42        )
43    }
44}
45
46#[async_trait]
47impl CatalogProvider for DeltaCatalogProvider {
48    type Error = DeltaError;
49
50    async fn get_table_spec(&self, table: &TableIdent) -> Result<TableSpec, Self::Error> {
51        let path = self.resolve_table_path(table);
52        let url = Url::parse(&path).map_err(|e| DeltaError::UrlParse { source: e })?;
53        let table_obj = deltalake::open_table(url)
54            .await
55            .map_err(|e| DeltaError::Delta { source: e })?;
56
57        let snapshot = table_obj
58            .snapshot()
59            .map_err(|e| DeltaError::Delta { source: e })?;
60        let metadata = snapshot.metadata();
61
62        // Map Delta schema to our SchemaSpec
63        let fields = snapshot
64            .schema()
65            .fields()
66            .map(|f| FieldMetadata {
67                id: 0,
68                name: f.name().to_string(),
69                data_type: arrow_schema::DataType::Utf8,
70                nullable: f.is_nullable(),
71                doc: None,
72            })
73            .collect();
74
75        let version = table_obj.version().unwrap_or(0);
76        let schema_spec = SchemaSpec {
77            schema_version: version as u64,
78            timestamp_ms: metadata.created_time().unwrap_or(0),
79            fields,
80        };
81
82        let partition_fields = metadata
83            .partition_columns()
84            .iter()
85            .map(|col| PartitionField {
86                source_id: 0,
87                name: col.clone(),
88                transform: PartitionTransform::Identity,
89            })
90            .collect();
91
92        Ok(TableSpec {
93            schema: Arc::new(arrow_schema::Schema::empty()),
94            schema_spec,
95            partition_spec: PartitionSpec {
96                spec_id: 0,
97                fields: partition_fields,
98            },
99            properties: metadata
100                .configuration()
101                .iter()
102                .map(|(k, v)| (k.clone(), v.clone()))
103                .collect(),
104            current_snapshot_id: Some(version as i64),
105        })
106    }
107
108    async fn write_batch(
109        &self,
110        table: &TableIdent,
111        batch: RecordBatch,
112    ) -> Result<WriteReceipt, Self::Error> {
113        let path = self.resolve_table_path(table);
114        let url = Url::parse(&path).map_err(|e| DeltaError::UrlParse { source: e })?;
115
116        let table_obj = deltalake::open_table(url)
117            .await
118            .map_err(|e| DeltaError::Delta { source: e })?;
119
120        let row_count = batch.num_rows() as u64;
121        let byte_count = batch.get_array_memory_size() as u64;
122
123        table_obj
124            .write(vec![batch])
125            .with_save_mode(SaveMode::Append)
126            .await
127            .map_err(|e| DeltaError::Delta { source: e })?;
128
129        Ok(WriteReceipt {
130            metadata: serde_json::json!([]), // Managed by delta-rs
131            row_count,
132            byte_count,
133        })
134    }
135
136    async fn commit(
137        &self,
138        table: &TableIdent,
139        _receipts: Vec<WriteReceipt>,
140    ) -> Result<CommitOutcome, Self::Error> {
141        let path = self.resolve_table_path(table);
142        let url = Url::parse(&path).map_err(|e| DeltaError::UrlParse { source: e })?;
143        let table_obj = deltalake::open_table(url)
144            .await
145            .map_err(|e| DeltaError::Delta { source: e })?;
146
147        Ok(CommitOutcome {
148            snapshot_id: table_obj.version().unwrap_or(0) as i64,
149            committed_at: time::OffsetDateTime::now_utc(),
150            retried: 0,
151        })
152    }
153
154    async fn get_security_context(
155        &self,
156        _table: &TableIdent,
157    ) -> Result<SecurityContext, Self::Error> {
158        Err(DeltaError::Delta {
159            source: deltalake::errors::DeltaTableError::Generic("Auth logic pending".to_string()),
160        })
161    }
162}