1use arrow_array::RecordBatch;
2use async_trait::async_trait;
3use catalog_common::{
4 CatalogProvider, CommitOutcome, SecurityContext, TableIdent, TableSpec, WriteReceipt,
5};
6use snafu::Snafu;
7use tokio::sync::OnceCell;
8use uuid::Uuid;
9
10#[derive(Debug, Snafu)]
11pub enum StarRocksError {
12 #[snafu(display("HTTP error: {source}"))]
13 Http { source: reqwest::Error },
14 #[snafu(display("Flight SQL error: {source}"))]
15 Flight { source: tonic::Status },
16 #[snafu(display("Serialization error: {source}"))]
17 Serialization { source: serde_json::Error },
18 #[snafu(display("Ingestion failed: {message}"))]
19 IngestionFailed { message: String },
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum IngestionMode {
24 Auto,
25 StreamLoad,
26 FlightSql,
27}
28
29pub struct StarRocksCatalogProvider {
30 pub fe_http_url: String,
31 pub flight_sql_url: Option<String>,
32 pub mode: IngestionMode,
33 client: reqwest::Client,
34 resolved_mode: OnceCell<IngestionMode>,
35}
36
37impl StarRocksCatalogProvider {
38 pub fn new(fe_http_url: String, flight_sql_url: Option<String>, mode: IngestionMode) -> Self {
39 Self {
40 fe_http_url,
41 flight_sql_url,
42 mode,
43 client: reqwest::Client::new(),
44 resolved_mode: OnceCell::new(),
45 }
46 }
47
48 async fn resolve_mode(&self) -> IngestionMode {
49 *self
50 .resolved_mode
51 .get_or_init(|| async {
52 match self.mode {
53 IngestionMode::Auto => {
54 IngestionMode::StreamLoad
56 }
57 m => m,
58 }
59 })
60 .await
61 }
62}
63
64#[async_trait]
65impl CatalogProvider for StarRocksCatalogProvider {
66 type Error = StarRocksError;
67
68 async fn get_table_spec(&self, _table: &TableIdent) -> Result<TableSpec, Self::Error> {
69 Err(StarRocksError::IngestionFailed {
70 message: "Table spec retrieval pending SQL metadata implementation".to_string(),
71 })
72 }
73
74 async fn write_batch(
75 &self,
76 table: &TableIdent,
77 batch: RecordBatch,
78 ) -> Result<WriteReceipt, Self::Error> {
79 let mode = self.resolve_mode().await;
80 let row_count = batch.num_rows() as u64;
81 let byte_count = batch.get_array_memory_size() as u64;
82
83 match mode {
84 IngestionMode::FlightSql => Err(StarRocksError::IngestionFailed {
85 message: "Flight SQL logic implementation pending".to_string(),
86 }),
87 _ => {
88 let label = format!("kinetic_{}_{}", table.name, Uuid::new_v4());
90 let url = format!(
91 "{}/api/{}/{}/_stream_load",
92 self.fe_http_url, table.catalog, table.name
93 );
94
95 let mut buffer = Vec::new();
97 {
98 let mut writer = arrow_json::LineDelimitedWriter::new(&mut buffer);
99 writer
100 .write(&batch)
101 .map_err(|e| StarRocksError::IngestionFailed {
102 message: e.to_string(),
103 })?;
104 writer
105 .finish()
106 .map_err(|e| StarRocksError::IngestionFailed {
107 message: e.to_string(),
108 })?;
109 }
110
111 let _response = self
112 .client
113 .put(&url)
114 .header("label", label.clone())
115 .header("format", "json")
116 .header("strip_outer_array", "true")
117 .body(buffer)
118 .send()
119 .await
120 .map_err(|e| StarRocksError::Http { source: e })?;
121
122 Ok(WriteReceipt {
123 metadata: serde_json::json!({ "mode": "stream_load", "label": label }),
124 row_count,
125 byte_count,
126 })
127 }
128 }
129 }
130
131 async fn commit(
132 &self,
133 _table: &TableIdent,
134 _receipts: Vec<WriteReceipt>,
135 ) -> Result<CommitOutcome, Self::Error> {
136 Ok(CommitOutcome {
138 snapshot_id: 0,
139 committed_at: time::OffsetDateTime::now_utc(),
140 retried: 0,
141 })
142 }
143
144 async fn get_security_context(
145 &self,
146 _table: &TableIdent,
147 ) -> Result<SecurityContext, Self::Error> {
148 Err(StarRocksError::IngestionFailed {
149 message: "Security context vending logic pending".to_string(),
150 })
151 }
152}