Skip to main content

starrocks_common/
lib.rs

1use arrow_array::RecordBatch;
2use async_trait::async_trait;
3use catalog_common::{
4    CatalogProvider, CommitOutcome, SecurityContext, TableIdent, TableSpec, WriteReceipt,
5};
6use snafu::Snafu;
7use tokio::sync::OnceCell;
8use uuid::Uuid;
9
10#[derive(Debug, Snafu)]
11pub enum StarRocksError {
12    #[snafu(display("HTTP error: {source}"))]
13    Http { source: reqwest::Error },
14    #[snafu(display("Flight SQL error: {source}"))]
15    Flight { source: tonic::Status },
16    #[snafu(display("Serialization error: {source}"))]
17    Serialization { source: serde_json::Error },
18    #[snafu(display("Ingestion failed: {message}"))]
19    IngestionFailed { message: String },
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum IngestionMode {
24    Auto,
25    StreamLoad,
26    FlightSql,
27}
28
29pub struct StarRocksCatalogProvider {
30    pub fe_http_url: String,
31    pub flight_sql_url: Option<String>,
32    pub mode: IngestionMode,
33    client: reqwest::Client,
34    resolved_mode: OnceCell<IngestionMode>,
35}
36
37impl StarRocksCatalogProvider {
38    pub fn new(fe_http_url: String, flight_sql_url: Option<String>, mode: IngestionMode) -> Self {
39        Self {
40            fe_http_url,
41            flight_sql_url,
42            mode,
43            client: reqwest::Client::new(),
44            resolved_mode: OnceCell::new(),
45        }
46    }
47
48    async fn resolve_mode(&self) -> IngestionMode {
49        *self
50            .resolved_mode
51            .get_or_init(|| async {
52                match self.mode {
53                    IngestionMode::Auto => {
54                        // Default to StreamLoad for now
55                        IngestionMode::StreamLoad
56                    }
57                    m => m,
58                }
59            })
60            .await
61    }
62}
63
64#[async_trait]
65impl CatalogProvider for StarRocksCatalogProvider {
66    type Error = StarRocksError;
67
68    async fn get_table_spec(&self, _table: &TableIdent) -> Result<TableSpec, Self::Error> {
69        Err(StarRocksError::IngestionFailed {
70            message: "Table spec retrieval pending SQL metadata implementation".to_string(),
71        })
72    }
73
74    async fn write_batch(
75        &self,
76        table: &TableIdent,
77        batch: RecordBatch,
78    ) -> Result<WriteReceipt, Self::Error> {
79        let mode = self.resolve_mode().await;
80        let row_count = batch.num_rows() as u64;
81        let byte_count = batch.get_array_memory_size() as u64;
82
83        match mode {
84            IngestionMode::FlightSql => Err(StarRocksError::IngestionFailed {
85                message: "Flight SQL logic implementation pending".to_string(),
86            }),
87            _ => {
88                // Functional HTTP Stream Load implementation
89                let label = format!("kinetic_{}_{}", table.name, Uuid::new_v4());
90                let url = format!(
91                    "{}/api/{}/{}/_stream_load",
92                    self.fe_http_url, table.catalog, table.name
93                );
94
95                // Serialize Arrow RecordBatch to Line-Delimited JSON for StarRocks ingestion
96                let mut buffer = Vec::new();
97                {
98                    let mut writer = arrow_json::LineDelimitedWriter::new(&mut buffer);
99                    writer
100                        .write(&batch)
101                        .map_err(|e| StarRocksError::IngestionFailed {
102                            message: e.to_string(),
103                        })?;
104                    writer
105                        .finish()
106                        .map_err(|e| StarRocksError::IngestionFailed {
107                            message: e.to_string(),
108                        })?;
109                }
110
111                let _response = self
112                    .client
113                    .put(&url)
114                    .header("label", label.clone())
115                    .header("format", "json")
116                    .header("strip_outer_array", "true")
117                    .body(buffer)
118                    .send()
119                    .await
120                    .map_err(|e| StarRocksError::Http { source: e })?;
121
122                Ok(WriteReceipt {
123                    metadata: serde_json::json!({ "mode": "stream_load", "label": label }),
124                    row_count,
125                    byte_count,
126                })
127            }
128        }
129    }
130
131    async fn commit(
132        &self,
133        _table: &TableIdent,
134        _receipts: Vec<WriteReceipt>,
135    ) -> Result<CommitOutcome, Self::Error> {
136        // StarRocks Stream Load is transactional per label, commit outcome is implicit from successful PUT
137        Ok(CommitOutcome {
138            snapshot_id: 0,
139            committed_at: time::OffsetDateTime::now_utc(),
140            retried: 0,
141        })
142    }
143
144    async fn get_security_context(
145        &self,
146        _table: &TableIdent,
147    ) -> Result<SecurityContext, Self::Error> {
148        Err(StarRocksError::IngestionFailed {
149            message: "Security context vending logic pending".to_string(),
150        })
151    }
152}