Skip to main content

kinetic_encoder_arrow/
lib.rs

1//! Arrow IPC encoder and decoder for Kinetic.
2
3#![deny(clippy::unwrap_used)]
4
5use arrow_ipc::reader::StreamReader;
6use arrow_ipc::writer::{IpcWriteOptions, StreamWriter};
7use bytes::Bytes;
8use kinetic_core::encode::{
9    Decoder, DecoderConfig, Encoder, EncoderConfig, Error as EncodeError, Result as EncodeResult,
10};
11use kinetic_core::{ArcEventMetadata, EventBatch};
12use serde::{Deserialize, Serialize};
13use snafu::Snafu;
14use std::sync::Arc;
15
16#[derive(Debug, Snafu)]
17pub enum Error {
18    #[snafu(display("Arrow IPC encoding error: {}", source))]
19    ArrowEncode { source: arrow_schema::ArrowError },
20
21    #[snafu(display("Arrow IPC decoding error: {}", source))]
22    ArrowDecode { source: arrow_schema::ArrowError },
23
24    #[snafu(display("Kinetic core error: {}", source))]
25    KineticCore { source: kinetic_core::Error },
26}
27
28impl From<Error> for EncodeError {
29    fn from(err: Error) -> Self {
30        match err {
31            Error::ArrowEncode { source } => EncodeError::Encode {
32                source: Box::new(source),
33            },
34            Error::ArrowDecode { source } => EncodeError::Decode {
35                source: Box::new(source),
36            },
37            Error::KineticCore { source } => EncodeError::Decode {
38                source: Box::new(source),
39            },
40        }
41    }
42}
43
44pub type Result<T, E = Error> = std::result::Result<T, E>;
45
46/// Options for configuring Arrow IPC encoding.
47#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
48pub struct ArrowIpcEncoderOptions {
49    // We could add IpcWriteOptions fields here if needed
50}
51
52impl EncoderConfig for ArrowIpcEncoderOptions {
53    fn build(&self) -> EncodeResult<Arc<dyn Encoder>> {
54        Ok(Arc::new(ArrowIpcEncoder::new(self.clone())))
55    }
56}
57
58/// Arrow IPC Encoder
59#[derive(Debug, Default)]
60pub struct ArrowIpcEncoder {
61    _options: ArrowIpcEncoderOptions,
62}
63
64impl ArrowIpcEncoder {
65    pub fn new(options: ArrowIpcEncoderOptions) -> Self {
66        Self { _options: options }
67    }
68
69    /// Encodes an `EventBatch` into Arrow IPC Stream bytes.
70    fn encode_inner(&self, batch: &EventBatch) -> Result<Bytes> {
71        let mut buf = Vec::new();
72        {
73            // Default options for now
74            let ipc_options = IpcWriteOptions::default();
75            let mut writer =
76                StreamWriter::try_new_with_options(&mut buf, &batch.payload.schema(), ipc_options)
77                    .map_err(|e| Error::ArrowEncode { source: e })?;
78            writer
79                .write(&batch.payload)
80                .map_err(|e| Error::ArrowEncode { source: e })?;
81            writer
82                .finish()
83                .map_err(|e| Error::ArrowEncode { source: e })?;
84        }
85        Ok(Bytes::from(buf))
86    }
87}
88
89impl Encoder for ArrowIpcEncoder {
90    fn encode(&self, batch: &EventBatch) -> EncodeResult<Bytes> {
91        self.encode_inner(batch).map_err(Into::into)
92    }
93}
94
95/// Options for configuring Arrow IPC decoding.
96#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
97pub struct ArrowIpcDecoderOptions {
98    /// Maximum size of a single message in bytes.
99    pub max_size: Option<usize>,
100}
101
102impl Default for ArrowIpcDecoderOptions {
103    fn default() -> Self {
104        Self {
105            max_size: Some(10 * 1024 * 1024), // 10MB default
106        }
107    }
108}
109
110impl DecoderConfig for ArrowIpcDecoderOptions {
111    fn build(&self, _schema: Arc<arrow_schema::Schema>) -> EncodeResult<Arc<dyn Decoder>> {
112        Ok(Arc::new(ArrowIpcDecoder::new(self.clone())))
113    }
114}
115
116/// Arrow IPC Decoder
117#[derive(Debug, Default)]
118pub struct ArrowIpcDecoder {
119    options: ArrowIpcDecoderOptions,
120}
121
122impl ArrowIpcDecoder {
123    pub fn new(options: ArrowIpcDecoderOptions) -> Self {
124        Self { options }
125    }
126
127    fn decode_inner(&self, data: &[u8], metadata: ArcEventMetadata) -> EncodeResult<EventBatch> {
128        if let Some(limit) = self.options.max_size
129            && data.len() > limit
130        {
131            return Err(EncodeError::MessageTooLarge {
132                size: data.len(),
133                limit,
134            });
135        }
136
137        let mut reader = StreamReader::try_new(std::io::Cursor::new(data), None)
138            .map_err(|e| Error::ArrowDecode { source: e })?;
139
140        let record_batch = reader
141            .next()
142            .transpose()
143            .map_err(|e| Error::ArrowDecode { source: e })?;
144
145        match record_batch {
146            Some(batch) => EventBatch::new_with_xid(batch, metadata)
147                .map_err(|e| Error::KineticCore { source: e }),
148            None => EventBatch::new_with_xid(
149                arrow_array::RecordBatch::new_empty(reader.schema()),
150                metadata,
151            )
152            .map_err(|e| Error::KineticCore { source: e }),
153        }
154        .map_err(Into::into)
155    }
156}
157
158impl Decoder for ArrowIpcDecoder {
159    fn decode(&self, data: &[u8], metadata: ArcEventMetadata) -> EncodeResult<EventBatch> {
160        self.decode_inner(data, metadata)
161    }
162}
163
164#[cfg(test)]
165#[allow(clippy::unwrap_used)]
166mod tests {
167    use super::*;
168    use arrow_array::{Int32Array, RecordBatch};
169    use arrow_schema::{DataType, Field, Schema};
170    use kinetic_core::{ComponentId, EventMetadata};
171    use std::sync::Arc;
172
173    fn dummy_metadata() -> ArcEventMetadata {
174        Arc::new(EventMetadata::new("test", ComponentId("test".to_string())))
175    }
176
177    #[test]
178    fn test_ipc_encode_decode() {
179        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
180        let array = Int32Array::from(vec![1, 2, 3]);
181        let rb = RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap();
182        let batch = EventBatch::new(rb, dummy_metadata()).expect("failed to create batch");
183
184        let encoder = ArrowIpcEncoder::default();
185        let encoded = encoder.encode(&batch).unwrap();
186
187        let decoder = ArrowIpcDecoder::new(ArrowIpcDecoderOptions::default());
188        let decoded_batch = decoder.decode(&encoded, dummy_metadata()).unwrap();
189
190        assert_eq!(decoded_batch.num_rows(), 3);
191        let col = decoded_batch
192            .payload
193            .column(0)
194            .as_any()
195            .downcast_ref::<arrow_array::Int32Array>()
196            .unwrap();
197        assert_eq!(col.value(0), 1);
198        assert_eq!(col.value(2), 3);
199    }
200}