kinetic/transforms/
util.rs1use kinetic_buffers::BufferSender;
2use kinetic_config::model::ErrorPolicy;
3use kinetic_core::{EventBatch, create_error_batch, xid_util::batch_xids};
4use std::collections::HashMap;
5use tracing::error;
6
7pub struct TransformErrorHandler {
8 pub component_id: String,
9 pub error_policy: ErrorPolicy,
10}
11
12impl TransformErrorHandler {
13 pub fn new(component_id: String, error_policy: ErrorPolicy) -> Self {
14 Self {
15 component_id,
16 error_policy,
17 }
18 }
19
20 pub async fn handle_error(
21 &self,
22 senders: &HashMap<String, BufferSender>,
23 error_message: String,
24 batch: Option<&EventBatch>,
25 ) -> bool {
26 match self.error_policy {
27 ErrorPolicy::DropOnError => {
28 error!(
29 "{} error dropped by policy: {}",
30 self.component_id, error_message
31 );
32 true
33 }
34 ErrorPolicy::RerouteOnError => {
35 error!(
36 "{} rerouting errored batch: {}",
37 self.component_id, error_message
38 );
39 if let Some(err_sender) = senders.get("error")
40 && let Some(batch) = batch
41 && let Some(error_batch) = self.create_error_event_batch(batch, &error_message)
42 && let Err(e) = err_sender.send(error_batch).await
43 {
44 error!(
45 "Component {} failed to send error batch: {:?}",
46 self.component_id, e
47 );
48 }
49 true
50 }
51 ErrorPolicy::HaltOnError => {
52 error!(
53 "{} halting due to error policy: {}",
54 self.component_id, error_message
55 );
56 if let Some(err_sender) = senders.get("error")
57 && let Some(batch) = batch
58 && let Some(error_batch) = self.create_error_event_batch(batch, &error_message)
59 && let Err(e) = err_sender.send(error_batch).await
60 {
61 error!(
62 "Component {} failed to send error batch: {:?}",
63 self.component_id, e
64 );
65 }
66 false
67 }
68 }
69 }
70
71 fn create_error_event_batch(
72 &self,
73 batch: &EventBatch,
74 error_message: &str,
75 ) -> Option<EventBatch> {
76 let xids = batch_xids(&batch.payload);
77 let row_count = xids.len();
78 if row_count == 0 {
79 return None;
80 }
81
82 let error_messages = vec![error_message.to_string(); row_count];
83 let raw_payloads = vec![Vec::new(); row_count];
84
85 match create_error_batch(
86 &batch.metadata.pipeline_id,
87 &self.component_id,
88 xids,
89 error_messages,
90 raw_payloads,
91 batch.metadata.clone(),
92 ) {
93 Ok(eb) => Some(eb),
94 Err(e) => {
95 error!("Failed to create error batch: {}", e);
96 None
97 }
98 }
99 }
100}