kinetic_core/
error_payload.rs1use crate::Result;
2use crate::error::LengthMismatchSnafu;
3use crate::event::EventBatch;
4use crate::metadata::ArcEventMetadata;
5use arrow_array::{ArrayRef, BinaryArray, RecordBatch, StringArray};
6use arrow_schema::{DataType, Field, Schema};
7use snafu::ensure;
8use std::sync::Arc;
9
10pub fn create_error_batch(
11 pipeline_name: &str,
12 stage: &str,
13 xids: Vec<String>,
14 error_messages: Vec<String>,
15 raw_payloads: Vec<Vec<u8>>,
16 metadata: ArcEventMetadata,
17) -> Result<EventBatch> {
18 let num_rows = xids.len();
19 ensure!(
20 error_messages.len() == num_rows,
21 LengthMismatchSnafu {
22 expected: num_rows,
23 actual: error_messages.len()
24 }
25 );
26 ensure!(
27 raw_payloads.len() == num_rows,
28 LengthMismatchSnafu {
29 expected: num_rows,
30 actual: raw_payloads.len()
31 }
32 );
33
34 let pipeline_array = StringArray::from(vec![pipeline_name; num_rows]);
35 let stage_array = StringArray::from(vec![stage; num_rows]);
36 let xid_array = StringArray::from(xids);
37 let error_array = StringArray::from(error_messages);
38
39 let raw_payloads_refs: Vec<&[u8]> = raw_payloads.iter().map(|v| v.as_slice()).collect();
40 let payload_array = BinaryArray::from(raw_payloads_refs);
41
42 let schema = Arc::new(Schema::new(vec![
43 Field::new("pipeline_name", DataType::Utf8, false),
44 Field::new("stage", DataType::Utf8, false),
45 Field::new("xid", DataType::Utf8, false),
46 Field::new("error_message", DataType::Utf8, false),
47 Field::new("raw_payload", DataType::Binary, false),
48 ]));
49
50 let columns: Vec<ArrayRef> = vec![
51 Arc::new(pipeline_array),
52 Arc::new(stage_array),
53 Arc::new(xid_array),
54 Arc::new(error_array),
55 Arc::new(payload_array),
56 ];
57
58 let record_batch = RecordBatch::try_new(schema, columns)
59 .map_err(|e| crate::error::Error::Arrow { source: e })?;
60 let record_batch = crate::xid_util::ensure_xid_column(record_batch)
61 .map_err(|e| crate::error::Error::XidAppend { source: e })?;
62 EventBatch::new(record_batch, metadata)
63}
64
65#[cfg(test)]
66#[allow(clippy::unwrap_used)]
67mod tests {
68 use super::*;
69 use crate::{ComponentId, EventMetadata};
70
71 fn test_metadata() -> ArcEventMetadata {
72 Arc::new(EventMetadata::new(
73 "pipeline_a",
74 ComponentId("source_a".to_string()),
75 ))
76 }
77
78 #[test]
79 fn create_error_batch_contains_expected_columns_and_rows() {
80 let batch = create_error_batch(
81 "pipeline_a",
82 "transform_parse",
83 vec!["xid_1".to_string(), "xid_2".to_string()],
84 vec!["bad json".to_string(), "schema mismatch".to_string()],
85 vec![b"{bad}".to_vec(), b"{also_bad}".to_vec()],
86 test_metadata(),
87 )
88 .expect("error batch creation should succeed");
89
90 assert_eq!(batch.num_rows(), 2);
91 assert!(
92 batch
93 .payload
94 .schema()
95 .column_with_name("pipeline_name")
96 .is_some()
97 );
98 assert!(batch.payload.schema().column_with_name("stage").is_some());
99 assert!(batch.payload.schema().column_with_name("xid").is_some());
100 assert!(
101 batch
102 .payload
103 .schema()
104 .column_with_name("error_message")
105 .is_some()
106 );
107 assert!(
108 batch
109 .payload
110 .schema()
111 .column_with_name("raw_payload")
112 .is_some()
113 );
114 assert!(batch.payload.schema().column_with_name("_xid").is_some());
115 }
116}