kinetic_encoder_arrow/
lib.rs1#![deny(clippy::unwrap_used)]
4
5use arrow_ipc::reader::StreamReader;
6use arrow_ipc::writer::{IpcWriteOptions, StreamWriter};
7use bytes::Bytes;
8use kinetic_core::encode::{
9 Decoder, DecoderConfig, Encoder, EncoderConfig, Error as EncodeError, Result as EncodeResult,
10};
11use kinetic_core::{ArcEventMetadata, EventBatch};
12use serde::{Deserialize, Serialize};
13use snafu::Snafu;
14use std::sync::Arc;
15
16#[derive(Debug, Snafu)]
17pub enum Error {
18 #[snafu(display("Arrow IPC encoding error: {}", source))]
19 ArrowEncode { source: arrow_schema::ArrowError },
20
21 #[snafu(display("Arrow IPC decoding error: {}", source))]
22 ArrowDecode { source: arrow_schema::ArrowError },
23
24 #[snafu(display("Kinetic core error: {}", source))]
25 KineticCore { source: kinetic_core::Error },
26}
27
28impl From<Error> for EncodeError {
29 fn from(err: Error) -> Self {
30 match err {
31 Error::ArrowEncode { source } => EncodeError::Encode {
32 source: Box::new(source),
33 },
34 Error::ArrowDecode { source } => EncodeError::Decode {
35 source: Box::new(source),
36 },
37 Error::KineticCore { source } => EncodeError::Decode {
38 source: Box::new(source),
39 },
40 }
41 }
42}
43
44pub type Result<T, E = Error> = std::result::Result<T, E>;
45
46#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
48pub struct ArrowIpcEncoderOptions {
49 }
51
52impl EncoderConfig for ArrowIpcEncoderOptions {
53 fn build(&self) -> EncodeResult<Arc<dyn Encoder>> {
54 Ok(Arc::new(ArrowIpcEncoder::new(self.clone())))
55 }
56}
57
58#[derive(Debug, Default)]
60pub struct ArrowIpcEncoder {
61 _options: ArrowIpcEncoderOptions,
62}
63
64impl ArrowIpcEncoder {
65 pub fn new(options: ArrowIpcEncoderOptions) -> Self {
66 Self { _options: options }
67 }
68
69 fn encode_inner(&self, batch: &EventBatch) -> Result<Bytes> {
71 let mut buf = Vec::new();
72 {
73 let ipc_options = IpcWriteOptions::default();
75 let mut writer =
76 StreamWriter::try_new_with_options(&mut buf, &batch.payload.schema(), ipc_options)
77 .map_err(|e| Error::ArrowEncode { source: e })?;
78 writer
79 .write(&batch.payload)
80 .map_err(|e| Error::ArrowEncode { source: e })?;
81 writer
82 .finish()
83 .map_err(|e| Error::ArrowEncode { source: e })?;
84 }
85 Ok(Bytes::from(buf))
86 }
87}
88
89impl Encoder for ArrowIpcEncoder {
90 fn encode(&self, batch: &EventBatch) -> EncodeResult<Bytes> {
91 self.encode_inner(batch).map_err(Into::into)
92 }
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
97pub struct ArrowIpcDecoderOptions {
98 pub max_size: Option<usize>,
100}
101
102impl Default for ArrowIpcDecoderOptions {
103 fn default() -> Self {
104 Self {
105 max_size: Some(10 * 1024 * 1024), }
107 }
108}
109
110impl DecoderConfig for ArrowIpcDecoderOptions {
111 fn build(&self, _schema: Arc<arrow_schema::Schema>) -> EncodeResult<Arc<dyn Decoder>> {
112 Ok(Arc::new(ArrowIpcDecoder::new(self.clone())))
113 }
114}
115
116#[derive(Debug, Default)]
118pub struct ArrowIpcDecoder {
119 options: ArrowIpcDecoderOptions,
120}
121
122impl ArrowIpcDecoder {
123 pub fn new(options: ArrowIpcDecoderOptions) -> Self {
124 Self { options }
125 }
126
127 fn decode_inner(&self, data: &[u8], metadata: ArcEventMetadata) -> EncodeResult<EventBatch> {
128 if let Some(limit) = self.options.max_size
129 && data.len() > limit
130 {
131 return Err(EncodeError::MessageTooLarge {
132 size: data.len(),
133 limit,
134 });
135 }
136
137 let mut reader = StreamReader::try_new(std::io::Cursor::new(data), None)
138 .map_err(|e| Error::ArrowDecode { source: e })?;
139
140 let record_batch = reader
141 .next()
142 .transpose()
143 .map_err(|e| Error::ArrowDecode { source: e })?;
144
145 match record_batch {
146 Some(batch) => EventBatch::new_with_xid(batch, metadata)
147 .map_err(|e| Error::KineticCore { source: e }),
148 None => EventBatch::new_with_xid(
149 arrow_array::RecordBatch::new_empty(reader.schema()),
150 metadata,
151 )
152 .map_err(|e| Error::KineticCore { source: e }),
153 }
154 .map_err(Into::into)
155 }
156}
157
158impl Decoder for ArrowIpcDecoder {
159 fn decode(&self, data: &[u8], metadata: ArcEventMetadata) -> EncodeResult<EventBatch> {
160 self.decode_inner(data, metadata)
161 }
162}
163
164#[cfg(test)]
165#[allow(clippy::unwrap_used)]
166mod tests {
167 use super::*;
168 use arrow_array::{Int32Array, RecordBatch};
169 use arrow_schema::{DataType, Field, Schema};
170 use kinetic_core::{ComponentId, EventMetadata};
171 use std::sync::Arc;
172
173 fn dummy_metadata() -> ArcEventMetadata {
174 Arc::new(EventMetadata::new("test", ComponentId("test".to_string())))
175 }
176
177 #[test]
178 fn test_ipc_encode_decode() {
179 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
180 let array = Int32Array::from(vec![1, 2, 3]);
181 let rb = RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap();
182 let batch = EventBatch::new(rb, dummy_metadata()).expect("failed to create batch");
183
184 let encoder = ArrowIpcEncoder::default();
185 let encoded = encoder.encode(&batch).unwrap();
186
187 let decoder = ArrowIpcDecoder::new(ArrowIpcDecoderOptions::default());
188 let decoded_batch = decoder.decode(&encoded, dummy_metadata()).unwrap();
189
190 assert_eq!(decoded_batch.num_rows(), 3);
191 let col = decoded_batch
192 .payload
193 .column(0)
194 .as_any()
195 .downcast_ref::<arrow_array::Int32Array>()
196 .unwrap();
197 assert_eq!(col.value(0), 1);
198 assert_eq!(col.value(2), 3);
199 }
200}