Skip to main content

kinetic/sinks/
delta.rs

1//! Delta Lake data lake sink.
2use catalog_common::{CatalogProvider, TableIdent};
3use delta_common::{DeltaCatalogProvider, DeltaError};
4use kinetic_buffers::BufferReceiver;
5use kinetic_core::{AckToken, ShutdownSignal};
6use metrics::{Label, counter, histogram};
7use tracing::{error, info};
8
9use crate::topology::healthcheck::Healthcheck;
10use async_trait::async_trait;
11use kinetic_common::register;
12use kinetic_common::telemetry::EventDuration;
13use kinetic_core::encode::Encoder;
14use std::sync::Arc;
15use std::time::Instant;
16
17pub struct DeltaSink {
18    provider: Arc<DeltaCatalogProvider>,
19    receiver: BufferReceiver,
20    table: TableIdent,
21    component_id: String,
22    shutdown: ShutdownSignal,
23    _encoder: Arc<dyn Encoder>,
24    labels: Arc<[Label]>,
25    event_duration: EventDuration,
26
27    // Batching configuration
28    max_rows: u64,
29    max_bytes: u64,
30    max_age: std::time::Duration,
31}
32
33#[async_trait]
34impl Healthcheck for DeltaSink {
35    async fn check(&self) -> anyhow::Result<()> {
36        self.provider
37            .get_table_spec(&self.table)
38            .await
39            .map_err(|e| {
40                anyhow::anyhow!(
41                    "Delta catalog healthcheck failed for table {:?}: {}",
42                    self.table,
43                    e
44                )
45            })?;
46        Ok(())
47    }
48}
49
50impl DeltaSink {
51    #[allow(clippy::too_many_arguments)]
52    pub fn new(
53        provider: DeltaCatalogProvider,
54        table: TableIdent,
55        component_id: String,
56        receiver: BufferReceiver,
57        shutdown: ShutdownSignal,
58        _encoder: Arc<dyn Encoder>,
59        max_rows: u64,
60        max_bytes: u64,
61        max_age_ms: u64,
62    ) -> Self {
63        let labels: Arc<[Label]> = Arc::new([
64            Label::new("component_id", component_id.clone()),
65            Label::new("component_type", "sink"),
66            Label::new("component_kind", "delta"),
67            Label::new(
68                "table",
69                format!("{}.{}", table.namespace.join("."), table.name),
70            ),
71        ]);
72        let event_duration = register!(EventDuration::new(component_id.clone(), "sink"));
73
74        Self {
75            provider: Arc::new(provider),
76            receiver,
77            table,
78            component_id,
79            shutdown,
80            _encoder,
81            labels,
82            event_duration,
83            max_rows,
84            max_bytes,
85            max_age: std::time::Duration::from_millis(max_age_ms),
86        }
87    }
88
89    pub async fn run(mut self) {
90        info!("Starting Delta sink task: {}", self.component_id);
91
92        let mut current_rows = 0;
93        let mut current_bytes = 0;
94        let mut last_flush = Instant::now();
95        let mut receipts = Vec::new();
96        let mut pending_tokens = Vec::new();
97
98        loop {
99            let timeout = self.max_age.saturating_sub(last_flush.elapsed());
100
101            tokio::select! {
102                _ = self.shutdown.recv() => {
103                    info!("Delta sink '{}' shutting down", self.component_id);
104                    if !receipts.is_empty() {
105                        let _ = self.flush(&mut receipts, &mut pending_tokens).await;
106                    }
107                    break;
108                }
109                batch = self.receiver.recv() => {
110                    let Some(batch) = batch else {
111                        break;
112                    };
113
114                    let start = Instant::now();
115                    counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
116
117                    match self.provider.write_batch(&self.table, batch.payload.clone()).await {
118                        Ok(receipt) => {
119                            current_rows += receipt.row_count;
120                            current_bytes += receipt.byte_count;
121                            receipts.push(receipt);
122                            if let Some(token) = batch.ack_token {
123                                pending_tokens.push(token);
124                            }
125                        }
126                        Err(e) => {
127                            error!("Failed to write batch to Delta for sink '{}': {}", self.component_id, e);
128                            counter!("component_errors_total", self.labels.iter()).increment(1);
129                        }
130                    }
131
132                    if current_rows >= self.max_rows || current_bytes >= self.max_bytes || last_flush.elapsed() >= self.max_age {
133                        if let Err(e) = self.flush(&mut receipts, &mut pending_tokens).await {
134                             error!("Failed to commit to Delta for sink '{}': {}", self.component_id, e);
135                        }
136                        current_rows = 0;
137                        current_bytes = 0;
138                        last_flush = Instant::now();
139                    }
140
141                    self.event_duration.emit(start.elapsed());
142                }
143                _ = tokio::time::sleep(timeout) => {
144                    if !receipts.is_empty() {
145                         if let Err(e) = self.flush(&mut receipts, &mut pending_tokens).await {
146                             error!("Failed to commit to Delta (timeout) for sink '{}': {}", self.component_id, e);
147                        }
148                        current_rows = 0;
149                        current_bytes = 0;
150                    }
151                    last_flush = Instant::now();
152                }
153            }
154        }
155    }
156
157    async fn flush(
158        &self,
159        receipts: &mut Vec<catalog_common::WriteReceipt>,
160        tokens: &mut Vec<AckToken>,
161    ) -> Result<(), DeltaError> {
162        let start = Instant::now();
163        match self
164            .provider
165            .commit(&self.table, std::mem::take(receipts))
166            .await
167        {
168            Ok(_outcome) => {
169                let duration = start.elapsed();
170                histogram!("datalake_commit_duration_seconds", self.labels.iter())
171                    .record(duration.as_secs_f64());
172                counter!("datalake_commits_total", self.labels.iter()).increment(1);
173
174                for token in std::mem::take(tokens) {
175                    token.ack();
176                }
177                Ok(())
178            }
179            Err(e) => {
180                counter!("datalake_commits_total", self.labels.iter()).increment(1);
181                Err(e)
182            }
183        }
184    }
185}