Skip to main content

kinetic_core/
error_payload.rs

1use crate::Result;
2use crate::error::LengthMismatchSnafu;
3use crate::event::EventBatch;
4use crate::metadata::ArcEventMetadata;
5use arrow_array::{ArrayRef, BinaryArray, RecordBatch, StringArray};
6use arrow_schema::{DataType, Field, Schema};
7use snafu::ensure;
8use std::sync::Arc;
9
10pub fn create_error_batch(
11    pipeline_name: &str,
12    stage: &str,
13    xids: Vec<String>,
14    error_messages: Vec<String>,
15    raw_payloads: Vec<Vec<u8>>,
16    metadata: ArcEventMetadata,
17) -> Result<EventBatch> {
18    let num_rows = xids.len();
19    ensure!(
20        error_messages.len() == num_rows,
21        LengthMismatchSnafu {
22            expected: num_rows,
23            actual: error_messages.len()
24        }
25    );
26    ensure!(
27        raw_payloads.len() == num_rows,
28        LengthMismatchSnafu {
29            expected: num_rows,
30            actual: raw_payloads.len()
31        }
32    );
33
34    let pipeline_array = StringArray::from(vec![pipeline_name; num_rows]);
35    let stage_array = StringArray::from(vec![stage; num_rows]);
36    let xid_array = StringArray::from(xids);
37    let error_array = StringArray::from(error_messages);
38
39    let raw_payloads_refs: Vec<&[u8]> = raw_payloads.iter().map(|v| v.as_slice()).collect();
40    let payload_array = BinaryArray::from(raw_payloads_refs);
41
42    let schema = Arc::new(Schema::new(vec![
43        Field::new("pipeline_name", DataType::Utf8, false),
44        Field::new("stage", DataType::Utf8, false),
45        Field::new("xid", DataType::Utf8, false),
46        Field::new("error_message", DataType::Utf8, false),
47        Field::new("raw_payload", DataType::Binary, false),
48    ]));
49
50    let columns: Vec<ArrayRef> = vec![
51        Arc::new(pipeline_array),
52        Arc::new(stage_array),
53        Arc::new(xid_array),
54        Arc::new(error_array),
55        Arc::new(payload_array),
56    ];
57
58    let record_batch = RecordBatch::try_new(schema, columns)
59        .map_err(|e| crate::error::Error::Arrow { source: e })?;
60    let record_batch = crate::xid_util::ensure_xid_column(record_batch)
61        .map_err(|e| crate::error::Error::XidAppend { source: e })?;
62    EventBatch::new(record_batch, metadata)
63}
64
65#[cfg(test)]
66#[allow(clippy::unwrap_used)]
67mod tests {
68    use super::*;
69    use crate::{ComponentId, EventMetadata};
70
71    fn test_metadata() -> ArcEventMetadata {
72        Arc::new(EventMetadata::new(
73            "pipeline_a",
74            ComponentId("source_a".to_string()),
75        ))
76    }
77
78    #[test]
79    fn create_error_batch_contains_expected_columns_and_rows() {
80        let batch = create_error_batch(
81            "pipeline_a",
82            "transform_parse",
83            vec!["xid_1".to_string(), "xid_2".to_string()],
84            vec!["bad json".to_string(), "schema mismatch".to_string()],
85            vec![b"{bad}".to_vec(), b"{also_bad}".to_vec()],
86            test_metadata(),
87        )
88        .expect("error batch creation should succeed");
89
90        assert_eq!(batch.num_rows(), 2);
91        assert!(
92            batch
93                .payload
94                .schema()
95                .column_with_name("pipeline_name")
96                .is_some()
97        );
98        assert!(batch.payload.schema().column_with_name("stage").is_some());
99        assert!(batch.payload.schema().column_with_name("xid").is_some());
100        assert!(
101            batch
102                .payload
103                .schema()
104                .column_with_name("error_message")
105                .is_some()
106        );
107        assert!(
108            batch
109                .payload
110                .schema()
111                .column_with_name("raw_payload")
112                .is_some()
113        );
114        assert!(batch.payload.schema().column_with_name("_xid").is_some());
115    }
116}