kinetic/sinks/
starrocks.rs1use catalog_common::{CatalogProvider, TableIdent};
3use kinetic_buffers::BufferReceiver;
4use kinetic_core::{AckToken, ShutdownSignal};
5use metrics::{Label, counter, histogram};
6use starrocks_common::StarRocksCatalogProvider;
7use tracing::{error, info};
8
9use crate::topology::healthcheck::Healthcheck;
10use async_trait::async_trait;
11use kinetic_common::register;
12use kinetic_common::telemetry::EventDuration;
13use kinetic_core::encode::Encoder;
14use std::sync::Arc;
15use std::time::Instant;
16
17pub struct StarRocksSink {
18 provider: Arc<StarRocksCatalogProvider>,
19 receiver: BufferReceiver,
20 table: TableIdent,
21 component_id: String,
22 shutdown: ShutdownSignal,
23 _encoder: Arc<dyn Encoder>,
24 labels: Arc<[Label]>,
25 event_duration: EventDuration,
26
27 max_rows: u64,
29 max_bytes: u64,
30 max_age: std::time::Duration,
31}
32
33#[async_trait]
34impl Healthcheck for StarRocksSink {
35 async fn check(&self) -> anyhow::Result<()> {
36 self.provider
37 .get_table_spec(&self.table)
38 .await
39 .map_err(|e| {
40 anyhow::anyhow!(
41 "StarRocks healthcheck failed for table {:?}: {}",
42 self.table,
43 e
44 )
45 })?;
46 Ok(())
47 }
48}
49
50impl StarRocksSink {
51 #[allow(clippy::too_many_arguments)]
52 pub fn new(
53 provider: StarRocksCatalogProvider,
54 table: TableIdent,
55 component_id: String,
56 receiver: BufferReceiver,
57 shutdown: ShutdownSignal,
58 _encoder: Arc<dyn Encoder>,
59 max_rows: u64,
60 max_bytes: u64,
61 max_age_ms: u64,
62 ) -> Self {
63 let labels: Arc<[Label]> = Arc::new([
64 Label::new("component_id", component_id.clone()),
65 Label::new("component_type", "sink"),
66 Label::new("component_kind", "starrocks"),
67 Label::new("database", table.catalog.clone()), Label::new("table", table.name.clone()),
69 ]);
70 let event_duration = register!(EventDuration::new(component_id.clone(), "sink"));
71
72 Self {
73 provider: Arc::new(provider),
74 receiver,
75 table,
76 component_id,
77 shutdown,
78 _encoder,
79 labels,
80 event_duration,
81 max_rows,
82 max_bytes,
83 max_age: std::time::Duration::from_millis(max_age_ms),
84 }
85 }
86
87 pub async fn run(mut self) {
88 info!("Starting StarRocks sink task: {}", self.component_id);
89
90 let mut current_rows = 0;
91 let mut current_bytes = 0;
92 let mut last_flush = Instant::now();
93 let mut receipts = Vec::new();
94 let mut pending_tokens = Vec::new();
95
96 loop {
97 let timeout = self.max_age.saturating_sub(last_flush.elapsed());
98
99 tokio::select! {
100 _ = self.shutdown.recv() => {
101 info!("StarRocks sink '{}' shutting down", self.component_id);
102 if !receipts.is_empty() {
103 let _ = self.flush(&mut receipts, &mut pending_tokens).await;
104 }
105 break;
106 }
107 batch = self.receiver.recv() => {
108 let Some(batch) = batch else {
109 break;
110 };
111
112 let start = Instant::now();
113 counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
114
115 match self.provider.write_batch(&self.table, batch.payload.clone()).await {
117 Ok(receipt) => {
118 current_rows += receipt.row_count;
119 current_bytes += receipt.byte_count;
120 receipts.push(receipt);
121 if let Some(token) = batch.ack_token {
122 pending_tokens.push(token);
123 }
124 }
125 Err(e) => {
126 error!("Failed to write to StarRocks for sink '{}': {}", self.component_id, e);
127 counter!("component_errors_total", self.labels.iter()).increment(1);
128 }
129 }
130
131 if current_rows >= self.max_rows || current_bytes >= self.max_bytes || last_flush.elapsed() >= self.max_age {
132 if let Err(e) = self.flush(&mut receipts, &mut pending_tokens).await {
133 error!("Failed to commit to StarRocks for sink '{}': {}", self.component_id, e);
134 }
135 current_rows = 0;
136 current_bytes = 0;
137 last_flush = Instant::now();
138 }
139
140 self.event_duration.emit(start.elapsed());
141 }
142 _ = tokio::time::sleep(timeout) => {
143 if !receipts.is_empty() {
144 if let Err(e) = self.flush(&mut receipts, &mut pending_tokens).await {
145 error!("Failed to commit to StarRocks (timeout) for sink '{}': {}", self.component_id, e);
146 }
147 current_rows = 0;
148 current_bytes = 0;
149 }
150 last_flush = Instant::now();
151 }
152 }
153 }
154 }
155
156 async fn flush(
157 &self,
158 receipts: &mut Vec<catalog_common::WriteReceipt>,
159 tokens: &mut Vec<AckToken>,
160 ) -> Result<(), starrocks_common::StarRocksError> {
161 let start = Instant::now();
162 match self
163 .provider
164 .commit(&self.table, std::mem::take(receipts))
165 .await
166 {
167 Ok(_outcome) => {
168 let duration = start.elapsed();
169 histogram!("datalake_commit_duration_seconds", self.labels.iter())
170 .record(duration.as_secs_f64());
171 counter!("datalake_commits_total", self.labels.iter()).increment(1);
172
173 for token in std::mem::take(tokens) {
174 token.ack();
175 }
176 Ok(())
177 }
178 Err(e) => {
179 counter!("datalake_commits_total", self.labels.iter()).increment(1);
180 Err(e)
181 }
182 }
183 }
184}