1use catalog_common::{CatalogProvider, TableIdent};
3use kinetic_buffers::BufferReceiver;
4use kinetic_core::{AckToken, ShutdownSignal};
5use metrics::{Label, counter, histogram};
6use paimon_common::PaimonCatalogProvider;
7use tracing::{error, info};
8
9use crate::topology::healthcheck::Healthcheck;
10use async_trait::async_trait;
11use kinetic_common::register;
12use kinetic_common::telemetry::EventDuration;
13use kinetic_core::encode::Encoder;
14use std::sync::Arc;
15use std::time::Instant;
16
17pub struct PaimonSink {
18 provider: Arc<PaimonCatalogProvider>,
19 receiver: BufferReceiver,
20 table: TableIdent,
21 component_id: String,
22 shutdown: ShutdownSignal,
23 _encoder: Arc<dyn Encoder>,
24 labels: Arc<[Label]>,
25 event_duration: EventDuration,
26
27 max_rows: u64,
29 max_bytes: u64,
30 max_age: std::time::Duration,
31}
32
33#[async_trait]
34impl Healthcheck for PaimonSink {
35 async fn check(&self) -> anyhow::Result<()> {
36 self.provider
37 .get_table_spec(&self.table)
38 .await
39 .map_err(|e| {
40 anyhow::anyhow!(
41 "Paimon bridge healthcheck failed for table {:?}: {}",
42 self.table,
43 e
44 )
45 })?;
46 Ok(())
47 }
48}
49
50impl PaimonSink {
51 #[allow(clippy::too_many_arguments)]
52 pub fn new(
53 provider: PaimonCatalogProvider,
54 table: TableIdent,
55 component_id: String,
56 receiver: BufferReceiver,
57 shutdown: ShutdownSignal,
58 _encoder: Arc<dyn Encoder>,
59 max_rows: u64,
60 max_bytes: u64,
61 max_age_ms: u64,
62 ) -> Self {
63 let labels: Arc<[Label]> = Arc::new([
64 Label::new("component_id", component_id.clone()),
65 Label::new("component_type", "sink"),
66 Label::new("component_kind", "paimon_bridge"),
67 Label::new("table", table.name.clone()),
68 ]);
69 let event_duration = register!(EventDuration::new(component_id.clone(), "sink"));
70
71 Self {
72 provider: Arc::new(provider),
73 receiver,
74 table,
75 component_id,
76 shutdown,
77 _encoder,
78 labels,
79 event_duration,
80 max_rows,
81 max_bytes,
82 max_age: std::time::Duration::from_millis(max_age_ms),
83 }
84 }
85
86 pub async fn run(mut self) {
87 info!("Starting Paimon bridge sink task: {}", self.component_id);
88
89 let mut current_rows = 0;
90 let mut current_bytes = 0;
91 let mut last_flush = Instant::now();
92 let mut receipts = Vec::new();
93 let mut pending_tokens = Vec::new();
94
95 loop {
96 let timeout = self.max_age.saturating_sub(last_flush.elapsed());
97
98 tokio::select! {
99 _ = self.shutdown.recv() => {
100 info!("Paimon bridge sink '{}' shutting down", self.component_id);
101 if !receipts.is_empty() {
102 let _ = self.flush(&mut receipts, &mut pending_tokens).await;
103 }
104 break;
105 }
106 batch = self.receiver.recv() => {
107 let Some(batch) = batch else {
108 break;
109 };
110
111 let start = Instant::now();
112 counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
113
114 match self.provider.write_batch(&self.table, batch.payload.clone()).await {
115 Ok(receipt) => {
116 current_rows += receipt.row_count;
117 current_bytes += receipt.byte_count;
118 receipts.push(receipt);
119 if let Some(token) = batch.ack_token {
120 pending_tokens.push(token);
121 }
122 }
123 Err(e) => {
124 error!("Failed to write batch to Paimon bridge for sink '{}': {}", self.component_id, e);
125 counter!("component_errors_total", self.labels.iter()).increment(1);
126 }
127 }
128
129 if current_rows >= self.max_rows || current_bytes >= self.max_bytes || last_flush.elapsed() >= self.max_age {
130 if let Err(e) = self.flush(&mut receipts, &mut pending_tokens).await {
131 error!("Failed to commit to Paimon bridge for sink '{}': {}", self.component_id, e);
132 }
133 current_rows = 0;
134 current_bytes = 0;
135 last_flush = Instant::now();
136 }
137
138 self.event_duration.emit(start.elapsed());
139 }
140 _ = tokio::time::sleep(timeout) => {
141 if !receipts.is_empty() {
142 if let Err(e) = self.flush(&mut receipts, &mut pending_tokens).await {
143 error!("Failed to commit to Paimon bridge (timeout) for sink '{}': {}", self.component_id, e);
144 }
145 current_rows = 0;
146 current_bytes = 0;
147 }
148 last_flush = Instant::now();
149 }
150 }
151 }
152 }
153
154 async fn flush(
155 &self,
156 receipts: &mut Vec<catalog_common::WriteReceipt>,
157 tokens: &mut Vec<AckToken>,
158 ) -> Result<(), paimon_common::PaimonError> {
159 let start = Instant::now();
160 match self
161 .provider
162 .commit(&self.table, std::mem::take(receipts))
163 .await
164 {
165 Ok(_outcome) => {
166 let duration = start.elapsed();
167 histogram!("datalake_commit_duration_seconds", self.labels.iter())
168 .record(duration.as_secs_f64());
169 counter!("datalake_commits_total", self.labels.iter()).increment(1);
170
171 for token in std::mem::take(tokens) {
172 token.ack();
173 }
174 Ok(())
175 }
176 Err(e) => {
177 counter!("datalake_commits_total", self.labels.iter()).increment(1);
178 Err(e)
179 }
180 }
181 }
182}