Skip to main content

kinetic/sinks/
starrocks.rs

1//! StarRocks OLAP sink.
2use catalog_common::{CatalogProvider, TableIdent};
3use kinetic_buffers::BufferReceiver;
4use kinetic_core::{AckToken, ShutdownSignal};
5use metrics::{Label, counter, histogram};
6use starrocks_common::StarRocksCatalogProvider;
7use tracing::{error, info};
8
9use crate::topology::healthcheck::Healthcheck;
10use async_trait::async_trait;
11use kinetic_common::register;
12use kinetic_common::telemetry::EventDuration;
13use kinetic_core::encode::Encoder;
14use std::sync::Arc;
15use std::time::Instant;
16
17pub struct StarRocksSink {
18    provider: Arc<StarRocksCatalogProvider>,
19    receiver: BufferReceiver,
20    table: TableIdent,
21    component_id: String,
22    shutdown: ShutdownSignal,
23    _encoder: Arc<dyn Encoder>,
24    labels: Arc<[Label]>,
25    event_duration: EventDuration,
26
27    // Batching configuration
28    max_rows: u64,
29    max_bytes: u64,
30    max_age: std::time::Duration,
31}
32
33#[async_trait]
34impl Healthcheck for StarRocksSink {
35    async fn check(&self) -> anyhow::Result<()> {
36        self.provider
37            .get_table_spec(&self.table)
38            .await
39            .map_err(|e| {
40                anyhow::anyhow!(
41                    "StarRocks healthcheck failed for table {:?}: {}",
42                    self.table,
43                    e
44                )
45            })?;
46        Ok(())
47    }
48}
49
50impl StarRocksSink {
51    #[allow(clippy::too_many_arguments)]
52    pub fn new(
53        provider: StarRocksCatalogProvider,
54        table: TableIdent,
55        component_id: String,
56        receiver: BufferReceiver,
57        shutdown: ShutdownSignal,
58        _encoder: Arc<dyn Encoder>,
59        max_rows: u64,
60        max_bytes: u64,
61        max_age_ms: u64,
62    ) -> Self {
63        let labels: Arc<[Label]> = Arc::new([
64            Label::new("component_id", component_id.clone()),
65            Label::new("component_type", "sink"),
66            Label::new("component_kind", "starrocks"),
67            Label::new("database", table.catalog.clone()), // Mapping StarRocks db to 'catalog'
68            Label::new("table", table.name.clone()),
69        ]);
70        let event_duration = register!(EventDuration::new(component_id.clone(), "sink"));
71
72        Self {
73            provider: Arc::new(provider),
74            receiver,
75            table,
76            component_id,
77            shutdown,
78            _encoder,
79            labels,
80            event_duration,
81            max_rows,
82            max_bytes,
83            max_age: std::time::Duration::from_millis(max_age_ms),
84        }
85    }
86
87    pub async fn run(mut self) {
88        info!("Starting StarRocks sink task: {}", self.component_id);
89
90        let mut current_rows = 0;
91        let mut current_bytes = 0;
92        let mut last_flush = Instant::now();
93        let mut receipts = Vec::new();
94        let mut pending_tokens = Vec::new();
95
96        loop {
97            let timeout = self.max_age.saturating_sub(last_flush.elapsed());
98
99            tokio::select! {
100                _ = self.shutdown.recv() => {
101                    info!("StarRocks sink '{}' shutting down", self.component_id);
102                    if !receipts.is_empty() {
103                        let _ = self.flush(&mut receipts, &mut pending_tokens).await;
104                    }
105                    break;
106                }
107                batch = self.receiver.recv() => {
108                    let Some(batch) = batch else {
109                        break;
110                    };
111
112                    let start = Instant::now();
113                    counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
114
115                    // StarRocks write is atomic (Stream Load / Flight SQL)
116                    match self.provider.write_batch(&self.table, batch.payload.clone()).await {
117                        Ok(receipt) => {
118                            current_rows += receipt.row_count;
119                            current_bytes += receipt.byte_count;
120                            receipts.push(receipt);
121                            if let Some(token) = batch.ack_token {
122                                pending_tokens.push(token);
123                            }
124                        }
125                        Err(e) => {
126                            error!("Failed to write to StarRocks for sink '{}': {}", self.component_id, e);
127                            counter!("component_errors_total", self.labels.iter()).increment(1);
128                        }
129                    }
130
131                    if current_rows >= self.max_rows || current_bytes >= self.max_bytes || last_flush.elapsed() >= self.max_age {
132                        if let Err(e) = self.flush(&mut receipts, &mut pending_tokens).await {
133                             error!("Failed to commit to StarRocks for sink '{}': {}", self.component_id, e);
134                        }
135                        current_rows = 0;
136                        current_bytes = 0;
137                        last_flush = Instant::now();
138                    }
139
140                    self.event_duration.emit(start.elapsed());
141                }
142                _ = tokio::time::sleep(timeout) => {
143                    if !receipts.is_empty() {
144                         if let Err(e) = self.flush(&mut receipts, &mut pending_tokens).await {
145                             error!("Failed to commit to StarRocks (timeout) for sink '{}': {}", self.component_id, e);
146                        }
147                        current_rows = 0;
148                        current_bytes = 0;
149                    }
150                    last_flush = Instant::now();
151                }
152            }
153        }
154    }
155
156    async fn flush(
157        &self,
158        receipts: &mut Vec<catalog_common::WriteReceipt>,
159        tokens: &mut Vec<AckToken>,
160    ) -> Result<(), starrocks_common::StarRocksError> {
161        let start = Instant::now();
162        match self
163            .provider
164            .commit(&self.table, std::mem::take(receipts))
165            .await
166        {
167            Ok(_outcome) => {
168                let duration = start.elapsed();
169                histogram!("datalake_commit_duration_seconds", self.labels.iter())
170                    .record(duration.as_secs_f64());
171                counter!("datalake_commits_total", self.labels.iter()).increment(1);
172
173                for token in std::mem::take(tokens) {
174                    token.ack();
175                }
176                Ok(())
177            }
178            Err(e) => {
179                counter!("datalake_commits_total", self.labels.iter()).increment(1);
180                Err(e)
181            }
182        }
183    }
184}