1use catalog_common::{CatalogProvider, TableIdent};
3use iceberg_common::{IcebergCatalogProvider, IcebergError};
4use kinetic_buffers::BufferReceiver;
5use kinetic_core::{AckToken, ShutdownSignal};
6use metrics::{Label, counter, histogram};
7use tracing::{error, info};
8
9use crate::topology::healthcheck::Healthcheck;
10use async_trait::async_trait;
11use kinetic_common::register;
12use kinetic_common::telemetry::EventDuration;
13use kinetic_core::encode::Encoder;
14use std::sync::Arc;
15use std::time::Instant;
16
17pub struct IcebergSink {
18 provider: Arc<IcebergCatalogProvider>,
19 receiver: BufferReceiver,
20 table: TableIdent,
21 component_id: String,
22 shutdown: ShutdownSignal,
23 _encoder: Arc<dyn Encoder>,
24 labels: Arc<[Label]>,
25 event_duration: EventDuration,
26
27 max_rows: u64,
29 max_bytes: u64,
30 max_age: std::time::Duration,
31}
32
33#[async_trait]
34impl Healthcheck for IcebergSink {
35 async fn check(&self) -> anyhow::Result<()> {
36 self.provider
37 .get_table_spec(&self.table)
38 .await
39 .map_err(|e| {
40 anyhow::anyhow!(
41 "Iceberg catalog healthcheck failed for table {:?}: {}",
42 self.table,
43 e
44 )
45 })?;
46 Ok(())
47 }
48}
49
50impl IcebergSink {
51 #[allow(clippy::too_many_arguments)]
52 pub fn new(
53 provider: IcebergCatalogProvider,
54 table: TableIdent,
55 component_id: String,
56 receiver: BufferReceiver,
57 shutdown: ShutdownSignal,
58 _encoder: Arc<dyn Encoder>,
59 max_rows: u64,
60 max_bytes: u64,
61 max_age_ms: u64,
62 ) -> Self {
63 let labels: Arc<[Label]> = Arc::new([
64 Label::new("component_id", component_id.clone()),
65 Label::new("component_type", "sink"),
66 Label::new("component_kind", "iceberg"),
67 Label::new(
68 "table",
69 format!("{}.{}", table.namespace.join("."), table.name),
70 ),
71 ]);
72 let event_duration = register!(EventDuration::new(component_id.clone(), "sink"));
73
74 Self {
75 provider: Arc::new(provider),
76 receiver,
77 table,
78 component_id,
79 shutdown,
80 _encoder,
81 labels,
82 event_duration,
83 max_rows,
84 max_bytes,
85 max_age: std::time::Duration::from_millis(max_age_ms),
86 }
87 }
88
89 pub async fn run(mut self) {
90 info!("Starting Iceberg sink task: {}", self.component_id);
91
92 let mut current_rows = 0;
93 let mut current_bytes = 0;
94 let mut last_flush = Instant::now();
95 let mut receipts = Vec::new();
96 let mut pending_tokens = Vec::new();
97
98 loop {
99 let timeout = self.max_age.saturating_sub(last_flush.elapsed());
100
101 tokio::select! {
102 _ = self.shutdown.recv() => {
103 info!("Iceberg sink '{}' shutting down", self.component_id);
104 if !receipts.is_empty() {
105 let _ = self.flush(&mut receipts, &mut pending_tokens).await;
106 }
107 break;
108 }
109 batch = self.receiver.recv() => {
110 let Some(batch) = batch else {
111 break;
112 };
113
114 let start = Instant::now();
115 counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
116
117 match self.provider.write_batch(&self.table, batch.payload.clone()).await {
118 Ok(receipt) => {
119 current_rows += receipt.row_count;
120 current_bytes += receipt.byte_count;
121 receipts.push(receipt);
122 if let Some(token) = batch.ack_token {
123 pending_tokens.push(token);
124 }
125 }
126 Err(e) => {
127 error!("Failed to write batch to Iceberg for sink '{}': {}", self.component_id, e);
128 counter!("component_errors_total", self.labels.iter()).increment(1);
129 }
130 }
131
132 if current_rows >= self.max_rows || current_bytes >= self.max_bytes || last_flush.elapsed() >= self.max_age {
133 if let Err(e) = self.flush(&mut receipts, &mut pending_tokens).await {
134 error!("Failed to commit to Iceberg for sink '{}': {}", self.component_id, e);
135 }
136 current_rows = 0;
137 current_bytes = 0;
138 last_flush = Instant::now();
139 }
140
141 self.event_duration.emit(start.elapsed());
142 }
143 _ = tokio::time::sleep(timeout) => {
144 if !receipts.is_empty() {
145 if let Err(e) = self.flush(&mut receipts, &mut pending_tokens).await {
146 error!("Failed to commit to Iceberg (timeout) for sink '{}': {}", self.component_id, e);
147 }
148 current_rows = 0;
149 current_bytes = 0;
150 }
151 last_flush = Instant::now();
152 }
153 }
154 }
155 }
156
157 async fn flush(
158 &self,
159 receipts: &mut Vec<catalog_common::WriteReceipt>,
160 tokens: &mut Vec<AckToken>,
161 ) -> Result<(), IcebergError> {
162 let start = Instant::now();
163 match self
164 .provider
165 .commit(&self.table, std::mem::take(receipts))
166 .await
167 {
168 Ok(_outcome) => {
169 let duration = start.elapsed();
170 histogram!("datalake_commit_duration_seconds", self.labels.iter())
171 .record(duration.as_secs_f64());
172 counter!("datalake_commits_total", self.labels.iter()).increment(1);
173
174 for token in std::mem::take(tokens) {
175 token.ack();
176 }
177 Ok(())
178 }
179 Err(e) => {
180 counter!("datalake_commits_total", self.labels.iter()).increment(1);
181 Err(e)
182 }
183 }
184 }
185}