Skip to main content

catalog_common/
lib.rs

1use arrow_array::RecordBatch;
2use arrow_schema::SchemaRef;
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6
7/// The core abstraction for data lake catalog backends.
8///
9/// Implementations MUST be Send + Sync and MUST NOT panic on invalid input.
10/// Errors MUST be returned via the backend-specific `snafu` error enum
11/// exposed through the associated `Error` type.
12#[async_trait]
13pub trait CatalogProvider: Send + Sync {
14    /// Backend-specific error type (snafu enum).
15    type Error: std::error::Error + Send + Sync + 'static;
16
17    /// Load a table's current spec (schema, partition spec, properties,
18    /// current snapshot pointer). MUST be idempotent and safe to call
19    /// concurrently.
20    async fn get_table_spec(&self, table: &TableIdent) -> Result<TableSpec, Self::Error>;
21
22    /// Write a batch of Arrow data to the backend's storage layer.
23    /// MUST NOT visibly commit until `commit` is called — for open table
24    /// formats, this produces staged data files; for StarRocks Stream Load
25    /// and Flight SQL, this is part of the commit itself.
26    async fn write_batch(
27        &self,
28        table: &TableIdent,
29        batch: RecordBatch,
30    ) -> Result<WriteReceipt, Self::Error>;
31
32    /// Commit previously written batches. For CAS-based backends (Iceberg,
33    /// Delta, Paimon), MUST be implemented with a retry loop.
34    /// For non-transactional backends (StarRocks), MAY be a no-op.
35    async fn commit(
36        &self,
37        table: &TableIdent,
38        receipts: Vec<WriteReceipt>,
39    ) -> Result<CommitOutcome, Self::Error>;
40
41    /// Return the security context (credentials, encryption metadata)
42    /// for the table. Implementations MUST fetch short-lived credentials
43    /// and MUST NOT cache beyond the credential's expiry.
44    async fn get_security_context(
45        &self,
46        table: &TableIdent,
47    ) -> Result<SecurityContext, Self::Error>;
48}
49
50/// Fully-qualified table identifier: catalog / namespace / table.
51/// All three components are realm-scoped.
52#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
53pub struct TableIdent {
54    pub catalog: String,
55    pub namespace: Vec<String>,
56    pub name: String,
57}
58
59/// Description of a table at a point in time.
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct TableSpec {
62    pub schema: SchemaRef,
63    pub schema_spec: SchemaSpec,
64    pub partition_spec: PartitionSpec,
65    pub properties: HashMap<String, String>,
66    pub current_snapshot_id: Option<i64>,
67}
68
69/// Handle to staged data files awaiting commit.
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct WriteReceipt {
72    pub metadata: serde_json::Value, // Backend-specific opaque metadata (e.g. serialized DataFile)
73    pub row_count: u64,
74    pub byte_count: u64,
75}
76
77/// Result of a commit attempt.
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct CommitOutcome {
80    pub snapshot_id: i64,
81    #[serde(with = "time::serde::rfc3339")]
82    pub committed_at: time::OffsetDateTime,
83    pub retried: u32,
84}
85
86/// Encryption and credential metadata for a table.
87pub struct SecurityContext {
88    pub encryption: EncryptionLevel,
89    pub credentials: CatalogCredential,
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub enum EncryptionLevel {
94    None,
95    StorageLevel {
96        kms_key_arn: String,
97    },
98    ClientSide {
99        key_metadata: HashMap<String, String>,
100    },
101}
102
103pub struct CatalogCredential {
104    pub token: secrecy::SecretString,
105    pub expires_at: time::OffsetDateTime,
106    pub scope: CredentialScope,
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub enum CredentialScope {
111    TableReadOnly(TableIdent),
112    TableReadWrite(TableIdent),
113    NamespaceReadOnly { namespace: Vec<String> },
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct SchemaSpec {
118    pub schema_version: u64,
119    pub timestamp_ms: i64,
120    pub fields: Vec<FieldMetadata>,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct FieldMetadata {
125    pub id: i32,
126    pub name: String,
127    pub data_type: arrow_schema::DataType,
128    pub nullable: bool,
129    pub doc: Option<String>,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
133#[non_exhaustive]
134pub enum PartitionTransform {
135    Identity,
136    Bucket(u32),
137    Truncate(u32),
138    Year,
139    Month,
140    Day,
141    Hour,
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct PartitionField {
146    pub source_id: i32,
147    pub name: String,
148    pub transform: PartitionTransform,
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct PartitionSpec {
153    pub spec_id: i32,
154    pub fields: Vec<PartitionField>,
155}