Skip to main content

kinetic/sinks/
paimon.rs

1//! Apache Paimon data lake sink (Bridge Path).
2use catalog_common::{CatalogProvider, TableIdent};
3use kinetic_buffers::BufferReceiver;
4use kinetic_core::{AckToken, ShutdownSignal};
5use metrics::{Label, counter, histogram};
6use paimon_common::PaimonCatalogProvider;
7use tracing::{error, info};
8
9use crate::topology::healthcheck::Healthcheck;
10use async_trait::async_trait;
11use kinetic_common::register;
12use kinetic_common::telemetry::EventDuration;
13use kinetic_core::encode::Encoder;
14use std::sync::Arc;
15use std::time::Instant;
16
17pub struct PaimonSink {
18    provider: Arc<PaimonCatalogProvider>,
19    receiver: BufferReceiver,
20    table: TableIdent,
21    component_id: String,
22    shutdown: ShutdownSignal,
23    _encoder: Arc<dyn Encoder>,
24    labels: Arc<[Label]>,
25    event_duration: EventDuration,
26
27    // Batching configuration
28    max_rows: u64,
29    max_bytes: u64,
30    max_age: std::time::Duration,
31}
32
33#[async_trait]
34impl Healthcheck for PaimonSink {
35    async fn check(&self) -> anyhow::Result<()> {
36        self.provider
37            .get_table_spec(&self.table)
38            .await
39            .map_err(|e| {
40                anyhow::anyhow!(
41                    "Paimon bridge healthcheck failed for table {:?}: {}",
42                    self.table,
43                    e
44                )
45            })?;
46        Ok(())
47    }
48}
49
50impl PaimonSink {
51    #[allow(clippy::too_many_arguments)]
52    pub fn new(
53        provider: PaimonCatalogProvider,
54        table: TableIdent,
55        component_id: String,
56        receiver: BufferReceiver,
57        shutdown: ShutdownSignal,
58        _encoder: Arc<dyn Encoder>,
59        max_rows: u64,
60        max_bytes: u64,
61        max_age_ms: u64,
62    ) -> Self {
63        let labels: Arc<[Label]> = Arc::new([
64            Label::new("component_id", component_id.clone()),
65            Label::new("component_type", "sink"),
66            Label::new("component_kind", "paimon_bridge"),
67            Label::new("table", table.name.clone()),
68        ]);
69        let event_duration = register!(EventDuration::new(component_id.clone(), "sink"));
70
71        Self {
72            provider: Arc::new(provider),
73            receiver,
74            table,
75            component_id,
76            shutdown,
77            _encoder,
78            labels,
79            event_duration,
80            max_rows,
81            max_bytes,
82            max_age: std::time::Duration::from_millis(max_age_ms),
83        }
84    }
85
86    pub async fn run(mut self) {
87        info!("Starting Paimon bridge sink task: {}", self.component_id);
88
89        let mut current_rows = 0;
90        let mut current_bytes = 0;
91        let mut last_flush = Instant::now();
92        let mut receipts = Vec::new();
93        let mut pending_tokens = Vec::new();
94
95        loop {
96            let timeout = self.max_age.saturating_sub(last_flush.elapsed());
97
98            tokio::select! {
99                _ = self.shutdown.recv() => {
100                    info!("Paimon bridge sink '{}' shutting down", self.component_id);
101                    if !receipts.is_empty() {
102                        let _ = self.flush(&mut receipts, &mut pending_tokens).await;
103                    }
104                    break;
105                }
106                batch = self.receiver.recv() => {
107                    let Some(batch) = batch else {
108                        break;
109                    };
110
111                    let start = Instant::now();
112                    counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
113
114                    match self.provider.write_batch(&self.table, batch.payload.clone()).await {
115                        Ok(receipt) => {
116                            current_rows += receipt.row_count;
117                            current_bytes += receipt.byte_count;
118                            receipts.push(receipt);
119                            if let Some(token) = batch.ack_token {
120                                pending_tokens.push(token);
121                            }
122                        }
123                        Err(e) => {
124                            error!("Failed to write batch to Paimon bridge for sink '{}': {}", self.component_id, e);
125                            counter!("component_errors_total", self.labels.iter()).increment(1);
126                        }
127                    }
128
129                    if current_rows >= self.max_rows || current_bytes >= self.max_bytes || last_flush.elapsed() >= self.max_age {
130                        if let Err(e) = self.flush(&mut receipts, &mut pending_tokens).await {
131                             error!("Failed to commit to Paimon bridge for sink '{}': {}", self.component_id, e);
132                        }
133                        current_rows = 0;
134                        current_bytes = 0;
135                        last_flush = Instant::now();
136                    }
137
138                    self.event_duration.emit(start.elapsed());
139                }
140                _ = tokio::time::sleep(timeout) => {
141                    if !receipts.is_empty() {
142                         if let Err(e) = self.flush(&mut receipts, &mut pending_tokens).await {
143                             error!("Failed to commit to Paimon bridge (timeout) for sink '{}': {}", self.component_id, e);
144                        }
145                        current_rows = 0;
146                        current_bytes = 0;
147                    }
148                    last_flush = Instant::now();
149                }
150            }
151        }
152    }
153
154    async fn flush(
155        &self,
156        receipts: &mut Vec<catalog_common::WriteReceipt>,
157        tokens: &mut Vec<AckToken>,
158    ) -> Result<(), paimon_common::PaimonError> {
159        let start = Instant::now();
160        match self
161            .provider
162            .commit(&self.table, std::mem::take(receipts))
163            .await
164        {
165            Ok(_outcome) => {
166                let duration = start.elapsed();
167                histogram!("datalake_commit_duration_seconds", self.labels.iter())
168                    .record(duration.as_secs_f64());
169                counter!("datalake_commits_total", self.labels.iter()).increment(1);
170
171                for token in std::mem::take(tokens) {
172                    token.ack();
173                }
174                Ok(())
175            }
176            Err(e) => {
177                counter!("datalake_commits_total", self.labels.iter()).increment(1);
178                Err(e)
179            }
180        }
181    }
182}