Skip to main content

kinetic/sinks/dlq/
task.rs

1use crate::sinks::dlq::client::S3DlqClient;
2use arrow_array::{Array, StringArray};
3use duckdb_engine::WindowTimer;
4use kinetic_buffers::BufferReceiver;
5use kinetic_config::model::DlqSinkConfig;
6use kinetic_core::EventBatch;
7use kinetic_core::encode::Encoder;
8use metrics::{Label, counter};
9use std::collections::VecDeque;
10use std::time::{Duration, Instant};
11use tracing::{debug, error, info};
12
13struct RetryConfig {
14    base_delay: Duration,
15    max_delay: Duration,
16}
17
18use std::sync::Arc;
19
20struct PendingWrite {
21    pipeline_name: String,
22    stage: String,
23    payload: bytes::Bytes,
24    attempts: usize,
25    next_retry_at: Instant,
26}
27
28struct TaskState<'a> {
29    client: &'a S3DlqClient,
30    pending_writes: &'a mut VecDeque<PendingWrite>,
31    retry_config: &'a RetryConfig,
32}
33
34struct WriteContext {
35    pipeline_name: String,
36    stage: String,
37    payload: bytes::Bytes,
38}
39
40fn compute_backoff_delay(config: &RetryConfig, attempts: usize) -> Duration {
41    let exponent = attempts.saturating_sub(1).min(20);
42    let factor = 1_u32 << exponent;
43    let delay = config.base_delay.saturating_mul(factor);
44    std::cmp::min(delay, config.max_delay)
45}
46
47pub struct DlqSinkTask {
48    receiver: BufferReceiver,
49    config: DlqSinkConfig,
50    component_id: String,
51    encoder: Arc<dyn Encoder>,
52    labels: Arc<[Label]>,
53}
54
55use crate::topology::healthcheck::Healthcheck;
56use async_trait::async_trait;
57
58#[async_trait]
59impl Healthcheck for DlqSinkTask {
60    async fn check(&self) -> anyhow::Result<()> {
61        let client = S3DlqClient::new(
62            &self.config.region,
63            self.config.bucket.clone(),
64            self.config.prefix.clone(),
65        )
66        .await;
67
68        client.check_health().await.map_err(|e| {
69            anyhow::anyhow!(
70                "DLQ healthcheck failed for bucket '{}': {}",
71                self.config.bucket,
72                e
73            )
74        })
75    }
76}
77
78impl DlqSinkTask {
79    pub fn new(
80        receiver: BufferReceiver,
81        config: DlqSinkConfig,
82        component_id: String,
83        encoder: Arc<dyn Encoder>,
84    ) -> Self {
85        let labels: Arc<[Label]> = Arc::new([
86            Label::new("component_id", component_id.clone()),
87            Label::new("component_type", "sink"),
88            Label::new("component_kind", "dlq"),
89        ]);
90
91        Self {
92            receiver,
93            config,
94            component_id,
95            encoder,
96            labels,
97        }
98    }
99
100    pub async fn run_task(mut self) {
101        info!("Starting DLQ sink task: {}", self.component_id);
102
103        let client = S3DlqClient::new(
104            &self.config.region,
105            self.config.bucket.clone(),
106            self.config.prefix.clone(),
107        )
108        .await;
109
110        let mut window_timer = match WindowTimer::new(&self.config.timeout) {
111            Ok(t) => t,
112            Err(e) => {
113                error!(
114                    "Invalid DLQ timeout {}, defaulting to 60s: {}",
115                    self.config.timeout, e
116                );
117                WindowTimer::from_duration(std::time::Duration::from_secs(60))
118            }
119        };
120
121        let retry_config = RetryConfig {
122            base_delay: parse_duration(&self.config.retry_delay).unwrap_or_else(|e| {
123                error!(
124                    "Invalid DLQ retry delay '{}', defaulting to 5s: {}",
125                    self.config.retry_delay, e
126                );
127                Duration::from_secs(5)
128            }),
129            max_delay: parse_duration(&self.config.retry_max_delay).unwrap_or_else(|e| {
130                error!(
131                    "Invalid DLQ retry max delay '{}', defaulting to 5m: {}",
132                    self.config.retry_max_delay, e
133                );
134                Duration::from_secs(300)
135            }),
136        };
137
138        let mut current_batch = Vec::new();
139        let mut pending_writes: VecDeque<PendingWrite> = VecDeque::new();
140
141        loop {
142            tokio::select! {
143                biased;
144
145                _ = window_timer.tick() => {
146                    let mut state = TaskState {
147                        client: &client,
148                        pending_writes: &mut pending_writes,
149                        retry_config: &retry_config,
150                    };
151                    self.retry_pending_writes(&mut state).await;
152
153                    if !current_batch.is_empty() {
154                        self.flush_batches(&mut current_batch, &mut state).await;
155                    }
156                }
157
158                maybe_batch = self.receiver.recv() => {
159                    match maybe_batch {
160                        Some(batch) => {
161                            counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
162                            counter!("component_received_event_bytes_total", self.labels.iter()).increment(batch.estimated_size() as u64);
163                            current_batch.push(batch);
164                            if current_batch.len() >= self.config.batch_size {
165                                let mut state = TaskState {
166                                    client: &client,
167                                    pending_writes: &mut pending_writes,
168                                    retry_config: &retry_config,
169                                };
170                                self.flush_batches(&mut current_batch, &mut state).await;
171                                window_timer.reset();
172                            }
173                        }
174                        None => {
175                            info!("DLQ sink task {} input channel closed", self.component_id);
176                            let mut state = TaskState {
177                                client: &client,
178                                pending_writes: &mut pending_writes,
179                                retry_config: &retry_config,
180                            };
181                            if !current_batch.is_empty() {
182                                self.flush_batches(&mut current_batch, &mut state).await;
183                            }
184                            self.retry_pending_writes(&mut state).await;
185
186                            if !pending_writes.is_empty() {
187                                error!(
188                                    "DLQ sink '{}' shutting down with {} pending writes still buffered",
189                                    self.component_id,
190                                    pending_writes.len()
191                                );
192                            }
193                            break;
194                        }
195                    }
196                }
197            }
198        }
199
200        info!("DLQ sink task {} shutting down", self.component_id);
201    }
202
203    async fn flush_batches(&self, batches: &mut Vec<EventBatch>, state: &mut TaskState<'_>) {
204        if batches.is_empty() {
205            return;
206        }
207
208        debug!(
209            "DLQ {} flushing {} event batches",
210            self.component_id,
211            batches.len()
212        );
213
214        let mut grouped_payloads: std::collections::HashMap<(String, String), Vec<u8>> =
215            std::collections::HashMap::new();
216
217        for mut batch in batches.drain(..) {
218            let context = extract_context(&batch);
219            let events = batch.num_rows();
220            match self.encoder.encode(&batch) {
221                Ok(data) => {
222                    let data_len = data.len();
223                    grouped_payloads
224                        .entry(context)
225                        .or_default()
226                        .extend_from_slice(data.as_ref());
227
228                    if let Some(token) = batch.ack_token.take() {
229                        token.ack();
230                    }
231                    counter!("component_sent_events_total", self.labels.iter())
232                        .increment(events as u64);
233                    counter!("component_sent_event_bytes_total", self.labels.iter())
234                        .increment(batch.estimated_size() as u64);
235                    counter!("component_sent_network_bytes_total", self.labels.iter())
236                        .increment(data_len as u64);
237                }
238                Err(e) => {
239                    counter!("component_errors_total", self.labels.iter()).increment(1);
240                    error!("Failed to encode error batch in DLQ: {}", e);
241                }
242            }
243        }
244
245        for ((pipeline_name, stage), payload) in grouped_payloads {
246            if payload.is_empty() {
247                continue;
248            }
249
250            self.attempt_or_buffer_write(
251                state,
252                WriteContext {
253                    pipeline_name,
254                    stage,
255                    payload: bytes::Bytes::from(payload),
256                },
257                0,
258            )
259            .await;
260        }
261    }
262
263    async fn retry_pending_writes(&self, state: &mut TaskState<'_>) {
264        if state.pending_writes.is_empty() {
265            return;
266        }
267
268        let now = Instant::now();
269        let mut to_process = state.pending_writes.len();
270
271        while to_process > 0 {
272            to_process -= 1;
273
274            let Some(pending) = state.pending_writes.pop_front() else {
275                break;
276            };
277
278            if pending.next_retry_at > now {
279                state.pending_writes.push_back(pending);
280                continue;
281            }
282
283            self.attempt_or_buffer_write(
284                state,
285                WriteContext {
286                    pipeline_name: pending.pipeline_name,
287                    stage: pending.stage,
288                    payload: pending.payload,
289                },
290                pending.attempts,
291            )
292            .await;
293        }
294    }
295
296    async fn attempt_or_buffer_write(
297        &self,
298        state: &mut TaskState<'_>,
299        context: WriteContext,
300        attempts: usize,
301    ) {
302        match state
303            .client
304            .write_batch(
305                &context.pipeline_name,
306                &context.stage,
307                context.payload.clone(),
308                &self.config.encoding,
309            )
310            .await
311        {
312            Ok(()) => {}
313            Err(e) => {
314                counter!("component_errors_total", self.labels.iter()).increment(1);
315                let next_attempts = attempts + 1;
316
317                if next_attempts > self.config.retry_max_attempts {
318                    error!(
319                        "Dropping DLQ write after {} attempts for pipeline '{}' stage '{}': {}",
320                        next_attempts, context.pipeline_name, context.stage, e
321                    );
322                    return;
323                }
324
325                let delay = compute_backoff_delay(state.retry_config, next_attempts);
326                let pending = PendingWrite {
327                    pipeline_name: context.pipeline_name.clone(),
328                    stage: context.stage.clone(),
329                    payload: context.payload.clone(),
330                    attempts: next_attempts,
331                    next_retry_at: Instant::now() + delay,
332                };
333
334                enqueue_pending_write(
335                    state.pending_writes,
336                    pending,
337                    self.config.max_pending_writes,
338                );
339
340                error!(
341                    "Buffered failed DLQ write attempt {} for retry (pending: {}): {}",
342                    next_attempts,
343                    state.pending_writes.len(),
344                    e
345                );
346            }
347        }
348    }
349}
350
351fn enqueue_pending_write(
352    pending_writes: &mut VecDeque<PendingWrite>,
353    pending: PendingWrite,
354    max_pending_writes: usize,
355) {
356    if max_pending_writes == 0 {
357        return;
358    }
359
360    while pending_writes.len() >= max_pending_writes {
361        let _ = pending_writes.pop_front();
362    }
363
364    pending_writes.push_back(pending);
365}
366
367fn extract_context(batch: &EventBatch) -> (String, String) {
368    let pipeline_from_payload = batch
369        .payload
370        .column_by_name("pipeline_name")
371        .and_then(|column| column.as_any().downcast_ref::<StringArray>())
372        .and_then(|array| {
373            if array.is_empty() {
374                None
375            } else {
376                Some(array.value(0).to_string())
377            }
378        });
379
380    let stage_from_payload = batch
381        .payload
382        .column_by_name("stage")
383        .and_then(|column| column.as_any().downcast_ref::<StringArray>())
384        .and_then(|array| {
385            if array.is_empty() {
386                None
387            } else {
388                Some(array.value(0).to_string())
389            }
390        });
391
392    (
393        pipeline_from_payload.unwrap_or_else(|| batch.metadata.pipeline_id.clone()),
394        stage_from_payload.unwrap_or_else(|| batch.metadata.source_id.0.clone()),
395    )
396}
397
398fn parse_duration(s: &str) -> std::result::Result<Duration, String> {
399    kinetic_common::parse_duration(s).map_err(|e| e.to_string())
400}
401
402#[cfg(test)]
403#[allow(clippy::unwrap_used, clippy::expect_used)]
404mod tests {
405    use super::*;
406    use arrow_array::{Int32Array, RecordBatch};
407    use arrow_schema::{DataType, Field, Schema};
408    use kinetic_core::{ArcEventMetadata, ComponentId, EventMetadata};
409    use std::sync::Arc;
410
411    fn metadata() -> ArcEventMetadata {
412        Arc::new(EventMetadata::new(
413            "meta_pipeline",
414            ComponentId("meta_source".to_string()),
415        ))
416    }
417
418    #[test]
419    fn extract_context_uses_payload_columns_when_present() {
420        let schema = Arc::new(Schema::new(vec![
421            Field::new("pipeline_name", DataType::Utf8, false),
422            Field::new("stage", DataType::Utf8, false),
423        ]));
424        let batch = RecordBatch::try_new(
425            schema,
426            vec![
427                Arc::new(StringArray::from(vec!["payload_pipeline"])) as _,
428                Arc::new(StringArray::from(vec!["payload_stage"])) as _,
429            ],
430        )
431        .expect("record batch should build");
432
433        let event_batch = EventBatch::new(batch, metadata()).expect("failed to create event batch");
434        let (pipeline, stage) = extract_context(&event_batch);
435
436        assert_eq!(pipeline, "payload_pipeline");
437        assert_eq!(stage, "payload_stage");
438    }
439
440    #[test]
441    fn extract_context_falls_back_to_metadata() {
442        let schema = Arc::new(Schema::new(vec![Field::new(
443            "value",
444            DataType::Int32,
445            false,
446        )]));
447        let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1])) as _])
448            .expect("record batch should build");
449
450        let event_batch = EventBatch::new(batch, metadata()).expect("failed to create event batch");
451        let (pipeline, stage) = extract_context(&event_batch);
452
453        assert_eq!(pipeline, "meta_pipeline");
454        assert_eq!(stage, "meta_source");
455    }
456
457    #[test]
458    fn parse_duration_supports_seconds_and_ms() {
459        assert_eq!(
460            parse_duration("5s").expect("duration should parse"),
461            Duration::from_secs(5)
462        );
463        assert_eq!(
464            parse_duration("250ms").expect("duration should parse"),
465            Duration::from_millis(250)
466        );
467    }
468
469    #[test]
470    fn enqueue_pending_write_respects_capacity() {
471        let mut queue = VecDeque::new();
472
473        enqueue_pending_write(
474            &mut queue,
475            PendingWrite {
476                pipeline_name: "p1".to_string(),
477                stage: "s1".to_string(),
478                payload: bytes::Bytes::from(vec![1]),
479                attempts: 1,
480                next_retry_at: Instant::now(),
481            },
482            2,
483        );
484        enqueue_pending_write(
485            &mut queue,
486            PendingWrite {
487                pipeline_name: "p2".to_string(),
488                stage: "s2".to_string(),
489                payload: bytes::Bytes::from(vec![2]),
490                attempts: 1,
491                next_retry_at: Instant::now(),
492            },
493            2,
494        );
495        enqueue_pending_write(
496            &mut queue,
497            PendingWrite {
498                pipeline_name: "p3".to_string(),
499                stage: "s3".to_string(),
500                payload: bytes::Bytes::from(vec![3]),
501                attempts: 1,
502                next_retry_at: Instant::now(),
503            },
504            2,
505        );
506
507        assert_eq!(queue.len(), 2);
508        assert_eq!(
509            queue
510                .front()
511                .expect("queue should not be empty")
512                .pipeline_name,
513            "p2"
514        );
515        assert_eq!(
516            queue
517                .back()
518                .expect("queue should not be empty")
519                .pipeline_name,
520            "p3"
521        );
522    }
523
524    #[test]
525    fn compute_backoff_delay_grows_and_caps() {
526        let retry_config = RetryConfig {
527            base_delay: Duration::from_secs(2),
528            max_delay: Duration::from_secs(30),
529        };
530
531        assert_eq!(
532            compute_backoff_delay(&retry_config, 1),
533            Duration::from_secs(2)
534        );
535        assert_eq!(
536            compute_backoff_delay(&retry_config, 2),
537            Duration::from_secs(4)
538        );
539        assert_eq!(
540            compute_backoff_delay(&retry_config, 3),
541            Duration::from_secs(8)
542        );
543        assert_eq!(
544            compute_backoff_delay(&retry_config, 10),
545            Duration::from_secs(30)
546        );
547    }
548}