1use crate::sinks::dlq::client::S3DlqClient;
2use arrow_array::{Array, StringArray};
3use duckdb_engine::WindowTimer;
4use kinetic_buffers::BufferReceiver;
5use kinetic_config::model::DlqSinkConfig;
6use kinetic_core::EventBatch;
7use kinetic_core::encode::Encoder;
8use metrics::{Label, counter};
9use std::collections::VecDeque;
10use std::time::{Duration, Instant};
11use tracing::{debug, error, info};
12
13struct RetryConfig {
14 base_delay: Duration,
15 max_delay: Duration,
16}
17
18use std::sync::Arc;
19
20struct PendingWrite {
21 pipeline_name: String,
22 stage: String,
23 payload: bytes::Bytes,
24 attempts: usize,
25 next_retry_at: Instant,
26}
27
28struct TaskState<'a> {
29 client: &'a S3DlqClient,
30 pending_writes: &'a mut VecDeque<PendingWrite>,
31 retry_config: &'a RetryConfig,
32}
33
34struct WriteContext {
35 pipeline_name: String,
36 stage: String,
37 payload: bytes::Bytes,
38}
39
40fn compute_backoff_delay(config: &RetryConfig, attempts: usize) -> Duration {
41 let exponent = attempts.saturating_sub(1).min(20);
42 let factor = 1_u32 << exponent;
43 let delay = config.base_delay.saturating_mul(factor);
44 std::cmp::min(delay, config.max_delay)
45}
46
47pub struct DlqSinkTask {
48 receiver: BufferReceiver,
49 config: DlqSinkConfig,
50 component_id: String,
51 encoder: Arc<dyn Encoder>,
52 labels: Arc<[Label]>,
53}
54
55use crate::topology::healthcheck::Healthcheck;
56use async_trait::async_trait;
57
58#[async_trait]
59impl Healthcheck for DlqSinkTask {
60 async fn check(&self) -> anyhow::Result<()> {
61 let client = S3DlqClient::new(
62 &self.config.region,
63 self.config.bucket.clone(),
64 self.config.prefix.clone(),
65 )
66 .await;
67
68 client.check_health().await.map_err(|e| {
69 anyhow::anyhow!(
70 "DLQ healthcheck failed for bucket '{}': {}",
71 self.config.bucket,
72 e
73 )
74 })
75 }
76}
77
78impl DlqSinkTask {
79 pub fn new(
80 receiver: BufferReceiver,
81 config: DlqSinkConfig,
82 component_id: String,
83 encoder: Arc<dyn Encoder>,
84 ) -> Self {
85 let labels: Arc<[Label]> = Arc::new([
86 Label::new("component_id", component_id.clone()),
87 Label::new("component_type", "sink"),
88 Label::new("component_kind", "dlq"),
89 ]);
90
91 Self {
92 receiver,
93 config,
94 component_id,
95 encoder,
96 labels,
97 }
98 }
99
100 pub async fn run_task(mut self) {
101 info!("Starting DLQ sink task: {}", self.component_id);
102
103 let client = S3DlqClient::new(
104 &self.config.region,
105 self.config.bucket.clone(),
106 self.config.prefix.clone(),
107 )
108 .await;
109
110 let mut window_timer = match WindowTimer::new(&self.config.timeout) {
111 Ok(t) => t,
112 Err(e) => {
113 error!(
114 "Invalid DLQ timeout {}, defaulting to 60s: {}",
115 self.config.timeout, e
116 );
117 WindowTimer::from_duration(std::time::Duration::from_secs(60))
118 }
119 };
120
121 let retry_config = RetryConfig {
122 base_delay: parse_duration(&self.config.retry_delay).unwrap_or_else(|e| {
123 error!(
124 "Invalid DLQ retry delay '{}', defaulting to 5s: {}",
125 self.config.retry_delay, e
126 );
127 Duration::from_secs(5)
128 }),
129 max_delay: parse_duration(&self.config.retry_max_delay).unwrap_or_else(|e| {
130 error!(
131 "Invalid DLQ retry max delay '{}', defaulting to 5m: {}",
132 self.config.retry_max_delay, e
133 );
134 Duration::from_secs(300)
135 }),
136 };
137
138 let mut current_batch = Vec::new();
139 let mut pending_writes: VecDeque<PendingWrite> = VecDeque::new();
140
141 loop {
142 tokio::select! {
143 biased;
144
145 _ = window_timer.tick() => {
146 let mut state = TaskState {
147 client: &client,
148 pending_writes: &mut pending_writes,
149 retry_config: &retry_config,
150 };
151 self.retry_pending_writes(&mut state).await;
152
153 if !current_batch.is_empty() {
154 self.flush_batches(&mut current_batch, &mut state).await;
155 }
156 }
157
158 maybe_batch = self.receiver.recv() => {
159 match maybe_batch {
160 Some(batch) => {
161 counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
162 counter!("component_received_event_bytes_total", self.labels.iter()).increment(batch.estimated_size() as u64);
163 current_batch.push(batch);
164 if current_batch.len() >= self.config.batch_size {
165 let mut state = TaskState {
166 client: &client,
167 pending_writes: &mut pending_writes,
168 retry_config: &retry_config,
169 };
170 self.flush_batches(&mut current_batch, &mut state).await;
171 window_timer.reset();
172 }
173 }
174 None => {
175 info!("DLQ sink task {} input channel closed", self.component_id);
176 let mut state = TaskState {
177 client: &client,
178 pending_writes: &mut pending_writes,
179 retry_config: &retry_config,
180 };
181 if !current_batch.is_empty() {
182 self.flush_batches(&mut current_batch, &mut state).await;
183 }
184 self.retry_pending_writes(&mut state).await;
185
186 if !pending_writes.is_empty() {
187 error!(
188 "DLQ sink '{}' shutting down with {} pending writes still buffered",
189 self.component_id,
190 pending_writes.len()
191 );
192 }
193 break;
194 }
195 }
196 }
197 }
198 }
199
200 info!("DLQ sink task {} shutting down", self.component_id);
201 }
202
203 async fn flush_batches(&self, batches: &mut Vec<EventBatch>, state: &mut TaskState<'_>) {
204 if batches.is_empty() {
205 return;
206 }
207
208 debug!(
209 "DLQ {} flushing {} event batches",
210 self.component_id,
211 batches.len()
212 );
213
214 let mut grouped_payloads: std::collections::HashMap<(String, String), Vec<u8>> =
215 std::collections::HashMap::new();
216
217 for mut batch in batches.drain(..) {
218 let context = extract_context(&batch);
219 let events = batch.num_rows();
220 match self.encoder.encode(&batch) {
221 Ok(data) => {
222 let data_len = data.len();
223 grouped_payloads
224 .entry(context)
225 .or_default()
226 .extend_from_slice(data.as_ref());
227
228 if let Some(token) = batch.ack_token.take() {
229 token.ack();
230 }
231 counter!("component_sent_events_total", self.labels.iter())
232 .increment(events as u64);
233 counter!("component_sent_event_bytes_total", self.labels.iter())
234 .increment(batch.estimated_size() as u64);
235 counter!("component_sent_network_bytes_total", self.labels.iter())
236 .increment(data_len as u64);
237 }
238 Err(e) => {
239 counter!("component_errors_total", self.labels.iter()).increment(1);
240 error!("Failed to encode error batch in DLQ: {}", e);
241 }
242 }
243 }
244
245 for ((pipeline_name, stage), payload) in grouped_payloads {
246 if payload.is_empty() {
247 continue;
248 }
249
250 self.attempt_or_buffer_write(
251 state,
252 WriteContext {
253 pipeline_name,
254 stage,
255 payload: bytes::Bytes::from(payload),
256 },
257 0,
258 )
259 .await;
260 }
261 }
262
263 async fn retry_pending_writes(&self, state: &mut TaskState<'_>) {
264 if state.pending_writes.is_empty() {
265 return;
266 }
267
268 let now = Instant::now();
269 let mut to_process = state.pending_writes.len();
270
271 while to_process > 0 {
272 to_process -= 1;
273
274 let Some(pending) = state.pending_writes.pop_front() else {
275 break;
276 };
277
278 if pending.next_retry_at > now {
279 state.pending_writes.push_back(pending);
280 continue;
281 }
282
283 self.attempt_or_buffer_write(
284 state,
285 WriteContext {
286 pipeline_name: pending.pipeline_name,
287 stage: pending.stage,
288 payload: pending.payload,
289 },
290 pending.attempts,
291 )
292 .await;
293 }
294 }
295
296 async fn attempt_or_buffer_write(
297 &self,
298 state: &mut TaskState<'_>,
299 context: WriteContext,
300 attempts: usize,
301 ) {
302 match state
303 .client
304 .write_batch(
305 &context.pipeline_name,
306 &context.stage,
307 context.payload.clone(),
308 &self.config.encoding,
309 )
310 .await
311 {
312 Ok(()) => {}
313 Err(e) => {
314 counter!("component_errors_total", self.labels.iter()).increment(1);
315 let next_attempts = attempts + 1;
316
317 if next_attempts > self.config.retry_max_attempts {
318 error!(
319 "Dropping DLQ write after {} attempts for pipeline '{}' stage '{}': {}",
320 next_attempts, context.pipeline_name, context.stage, e
321 );
322 return;
323 }
324
325 let delay = compute_backoff_delay(state.retry_config, next_attempts);
326 let pending = PendingWrite {
327 pipeline_name: context.pipeline_name.clone(),
328 stage: context.stage.clone(),
329 payload: context.payload.clone(),
330 attempts: next_attempts,
331 next_retry_at: Instant::now() + delay,
332 };
333
334 enqueue_pending_write(
335 state.pending_writes,
336 pending,
337 self.config.max_pending_writes,
338 );
339
340 error!(
341 "Buffered failed DLQ write attempt {} for retry (pending: {}): {}",
342 next_attempts,
343 state.pending_writes.len(),
344 e
345 );
346 }
347 }
348 }
349}
350
351fn enqueue_pending_write(
352 pending_writes: &mut VecDeque<PendingWrite>,
353 pending: PendingWrite,
354 max_pending_writes: usize,
355) {
356 if max_pending_writes == 0 {
357 return;
358 }
359
360 while pending_writes.len() >= max_pending_writes {
361 let _ = pending_writes.pop_front();
362 }
363
364 pending_writes.push_back(pending);
365}
366
367fn extract_context(batch: &EventBatch) -> (String, String) {
368 let pipeline_from_payload = batch
369 .payload
370 .column_by_name("pipeline_name")
371 .and_then(|column| column.as_any().downcast_ref::<StringArray>())
372 .and_then(|array| {
373 if array.is_empty() {
374 None
375 } else {
376 Some(array.value(0).to_string())
377 }
378 });
379
380 let stage_from_payload = batch
381 .payload
382 .column_by_name("stage")
383 .and_then(|column| column.as_any().downcast_ref::<StringArray>())
384 .and_then(|array| {
385 if array.is_empty() {
386 None
387 } else {
388 Some(array.value(0).to_string())
389 }
390 });
391
392 (
393 pipeline_from_payload.unwrap_or_else(|| batch.metadata.pipeline_id.clone()),
394 stage_from_payload.unwrap_or_else(|| batch.metadata.source_id.0.clone()),
395 )
396}
397
398fn parse_duration(s: &str) -> std::result::Result<Duration, String> {
399 kinetic_common::parse_duration(s).map_err(|e| e.to_string())
400}
401
402#[cfg(test)]
403#[allow(clippy::unwrap_used, clippy::expect_used)]
404mod tests {
405 use super::*;
406 use arrow_array::{Int32Array, RecordBatch};
407 use arrow_schema::{DataType, Field, Schema};
408 use kinetic_core::{ArcEventMetadata, ComponentId, EventMetadata};
409 use std::sync::Arc;
410
411 fn metadata() -> ArcEventMetadata {
412 Arc::new(EventMetadata::new(
413 "meta_pipeline",
414 ComponentId("meta_source".to_string()),
415 ))
416 }
417
418 #[test]
419 fn extract_context_uses_payload_columns_when_present() {
420 let schema = Arc::new(Schema::new(vec![
421 Field::new("pipeline_name", DataType::Utf8, false),
422 Field::new("stage", DataType::Utf8, false),
423 ]));
424 let batch = RecordBatch::try_new(
425 schema,
426 vec![
427 Arc::new(StringArray::from(vec!["payload_pipeline"])) as _,
428 Arc::new(StringArray::from(vec!["payload_stage"])) as _,
429 ],
430 )
431 .expect("record batch should build");
432
433 let event_batch = EventBatch::new(batch, metadata()).expect("failed to create event batch");
434 let (pipeline, stage) = extract_context(&event_batch);
435
436 assert_eq!(pipeline, "payload_pipeline");
437 assert_eq!(stage, "payload_stage");
438 }
439
440 #[test]
441 fn extract_context_falls_back_to_metadata() {
442 let schema = Arc::new(Schema::new(vec![Field::new(
443 "value",
444 DataType::Int32,
445 false,
446 )]));
447 let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1])) as _])
448 .expect("record batch should build");
449
450 let event_batch = EventBatch::new(batch, metadata()).expect("failed to create event batch");
451 let (pipeline, stage) = extract_context(&event_batch);
452
453 assert_eq!(pipeline, "meta_pipeline");
454 assert_eq!(stage, "meta_source");
455 }
456
457 #[test]
458 fn parse_duration_supports_seconds_and_ms() {
459 assert_eq!(
460 parse_duration("5s").expect("duration should parse"),
461 Duration::from_secs(5)
462 );
463 assert_eq!(
464 parse_duration("250ms").expect("duration should parse"),
465 Duration::from_millis(250)
466 );
467 }
468
469 #[test]
470 fn enqueue_pending_write_respects_capacity() {
471 let mut queue = VecDeque::new();
472
473 enqueue_pending_write(
474 &mut queue,
475 PendingWrite {
476 pipeline_name: "p1".to_string(),
477 stage: "s1".to_string(),
478 payload: bytes::Bytes::from(vec![1]),
479 attempts: 1,
480 next_retry_at: Instant::now(),
481 },
482 2,
483 );
484 enqueue_pending_write(
485 &mut queue,
486 PendingWrite {
487 pipeline_name: "p2".to_string(),
488 stage: "s2".to_string(),
489 payload: bytes::Bytes::from(vec![2]),
490 attempts: 1,
491 next_retry_at: Instant::now(),
492 },
493 2,
494 );
495 enqueue_pending_write(
496 &mut queue,
497 PendingWrite {
498 pipeline_name: "p3".to_string(),
499 stage: "s3".to_string(),
500 payload: bytes::Bytes::from(vec![3]),
501 attempts: 1,
502 next_retry_at: Instant::now(),
503 },
504 2,
505 );
506
507 assert_eq!(queue.len(), 2);
508 assert_eq!(
509 queue
510 .front()
511 .expect("queue should not be empty")
512 .pipeline_name,
513 "p2"
514 );
515 assert_eq!(
516 queue
517 .back()
518 .expect("queue should not be empty")
519 .pipeline_name,
520 "p3"
521 );
522 }
523
524 #[test]
525 fn compute_backoff_delay_grows_and_caps() {
526 let retry_config = RetryConfig {
527 base_delay: Duration::from_secs(2),
528 max_delay: Duration::from_secs(30),
529 };
530
531 assert_eq!(
532 compute_backoff_delay(&retry_config, 1),
533 Duration::from_secs(2)
534 );
535 assert_eq!(
536 compute_backoff_delay(&retry_config, 2),
537 Duration::from_secs(4)
538 );
539 assert_eq!(
540 compute_backoff_delay(&retry_config, 3),
541 Duration::from_secs(8)
542 );
543 assert_eq!(
544 compute_backoff_delay(&retry_config, 10),
545 Duration::from_secs(30)
546 );
547 }
548}