Skip to main content

kinetic/transforms/
util.rs

1use kinetic_buffers::BufferSender;
2use kinetic_config::model::ErrorPolicy;
3use kinetic_core::{EventBatch, create_error_batch, xid_util::batch_xids};
4use std::collections::HashMap;
5use tracing::error;
6
7pub struct TransformErrorHandler {
8    pub component_id: String,
9    pub error_policy: ErrorPolicy,
10}
11
12impl TransformErrorHandler {
13    pub fn new(component_id: String, error_policy: ErrorPolicy) -> Self {
14        Self {
15            component_id,
16            error_policy,
17        }
18    }
19
20    pub async fn handle_error(
21        &self,
22        senders: &HashMap<String, BufferSender>,
23        error_message: String,
24        batch: Option<&EventBatch>,
25    ) -> bool {
26        match self.error_policy {
27            ErrorPolicy::DropOnError => {
28                error!(
29                    "{} error dropped by policy: {}",
30                    self.component_id, error_message
31                );
32                true
33            }
34            ErrorPolicy::RerouteOnError => {
35                error!(
36                    "{} rerouting errored batch: {}",
37                    self.component_id, error_message
38                );
39                if let Some(err_sender) = senders.get("error")
40                    && let Some(batch) = batch
41                    && let Some(error_batch) = self.create_error_event_batch(batch, &error_message)
42                    && let Err(e) = err_sender.send(error_batch).await
43                {
44                    error!(
45                        "Component {} failed to send error batch: {:?}",
46                        self.component_id, e
47                    );
48                }
49                true
50            }
51            ErrorPolicy::HaltOnError => {
52                error!(
53                    "{} halting due to error policy: {}",
54                    self.component_id, error_message
55                );
56                if let Some(err_sender) = senders.get("error")
57                    && let Some(batch) = batch
58                    && let Some(error_batch) = self.create_error_event_batch(batch, &error_message)
59                    && let Err(e) = err_sender.send(error_batch).await
60                {
61                    error!(
62                        "Component {} failed to send error batch: {:?}",
63                        self.component_id, e
64                    );
65                }
66                false
67            }
68        }
69    }
70
71    fn create_error_event_batch(
72        &self,
73        batch: &EventBatch,
74        error_message: &str,
75    ) -> Option<EventBatch> {
76        let xids = batch_xids(&batch.payload);
77        let row_count = xids.len();
78        if row_count == 0 {
79            return None;
80        }
81
82        let error_messages = vec![error_message.to_string(); row_count];
83        let raw_payloads = vec![Vec::new(); row_count];
84
85        match create_error_batch(
86            &batch.metadata.pipeline_id,
87            &self.component_id,
88            xids,
89            error_messages,
90            raw_payloads,
91            batch.metadata.clone(),
92        ) {
93            Ok(eb) => Some(eb),
94            Err(e) => {
95                error!("Failed to create error batch: {}", e);
96                None
97            }
98        }
99    }
100}