Skip to main content

kinetic_encoder_avro/
lib.rs

1//! Avro encoder and decoder for Kinetic.
2
3#![deny(clippy::unwrap_used)]
4
5use arrow_avro::reader::ReaderBuilder;
6use arrow_avro::writer::AvroWriter;
7use arrow_schema::Schema;
8use bytes::Bytes;
9use kinetic_core::encode::{
10    Decoder, DecoderConfig, Encoder, EncoderConfig, Error as EncodeError, Result as EncodeResult,
11};
12use kinetic_core::{ArcEventMetadata, EventBatch};
13use serde::{Deserialize, Serialize};
14use snafu::Snafu;
15use std::io::Cursor;
16use std::sync::Arc;
17
18#[derive(Debug, Snafu)]
19pub enum Error {
20    #[snafu(display("Avro error: {}", source))]
21    Avro { source: arrow_schema::ArrowError },
22
23    #[snafu(display("Arrow error: {}", source))]
24    Arrow { source: arrow_schema::ArrowError },
25
26    #[snafu(display("Kinetic core error: {}", source))]
27    KineticCore { source: kinetic_core::Error },
28}
29
30impl From<Error> for EncodeError {
31    fn from(err: Error) -> Self {
32        match err {
33            Error::Avro { source } => EncodeError::Encode {
34                source: Box::new(source),
35            },
36            Error::Arrow { source } => EncodeError::Encode {
37                source: Box::new(source),
38            },
39            Error::KineticCore { source } => EncodeError::Encode {
40                source: Box::new(source),
41            },
42        }
43    }
44}
45
46pub type Result<T, E = Error> = std::result::Result<T, E>;
47
48/// Options for configuring Avro encoding.
49#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
50pub struct AvroEncoderOptions {}
51
52impl EncoderConfig for AvroEncoderOptions {
53    fn build(&self) -> EncodeResult<Arc<dyn Encoder>> {
54        Ok(Arc::new(AvroEncoder::new(self.clone())))
55    }
56}
57
58/// Avro Encoder
59#[derive(Debug, Default)]
60pub struct AvroEncoder {
61    _options: AvroEncoderOptions,
62}
63
64impl AvroEncoder {
65    pub fn new(options: AvroEncoderOptions) -> Self {
66        Self { _options: options }
67    }
68}
69
70impl Encoder for AvroEncoder {
71    fn encode(&self, batch: &EventBatch) -> EncodeResult<Bytes> {
72        let mut buffer = Vec::new();
73        {
74            let mut writer = AvroWriter::new(&mut buffer, (*batch.payload.schema()).clone())
75                .map_err(|e| Error::Avro { source: e.into() })?;
76            writer
77                .write(&batch.payload)
78                .map_err(|e| Error::Avro { source: e.into() })?;
79            writer
80                .finish()
81                .map_err(|e| Error::Avro { source: e.into() })?;
82        }
83        Ok(Bytes::from(buffer))
84    }
85}
86/// Options for configuring Avro decoding.
87#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
88pub struct AvroDecoderOptions {
89    /// Maximum size of a single message in bytes.
90    pub max_size: Option<usize>,
91}
92
93impl Default for AvroDecoderOptions {
94    fn default() -> Self {
95        Self {
96            max_size: Some(10 * 1024 * 1024), // 10MB default
97        }
98    }
99}
100
101impl DecoderConfig for AvroDecoderOptions {
102    fn build(&self, _schema: Arc<Schema>) -> EncodeResult<Arc<dyn Decoder>> {
103        Ok(Arc::new(AvroDecoder::new(self.clone())))
104    }
105}
106
107/// Avro Decoder
108#[derive(Debug, Default)]
109pub struct AvroDecoder {
110    options: AvroDecoderOptions,
111}
112
113impl AvroDecoder {
114    pub fn new(options: AvroDecoderOptions) -> Self {
115        Self { options }
116    }
117}
118
119impl Decoder for AvroDecoder {
120    fn decode(&self, data: &[u8], metadata: ArcEventMetadata) -> EncodeResult<EventBatch> {
121        if let Some(limit) = self.options.max_size
122            && data.len() > limit
123        {
124            return Err(kinetic_core::encode::Error::MessageTooLarge {
125                size: data.len(),
126                limit,
127            });
128        }
129
130        let cursor = Cursor::new(data);
131        let mut reader = ReaderBuilder::new()
132            .build(cursor)
133            .map_err(|e| Error::Arrow { source: e })?;
134
135        if let Some(maybe_batch) = reader.next() {
136            let batch = maybe_batch.map_err(|e| Error::Arrow { source: e })?;
137            EventBatch::new_with_xid(batch, metadata)
138                .map_err(|e| Error::KineticCore { source: e })
139                .map_err(Into::<EncodeError>::into)
140        } else {
141            Err(EncodeError::Decode {
142                source: "No batches found in Avro data".into(),
143            })
144        }
145    }
146}
147
148#[cfg(test)]
149#[allow(clippy::unwrap_used)]
150mod tests {
151    use super::*;
152    use arrow_array::{Int32Array, RecordBatch, StringArray};
153    use arrow_schema::{DataType, Field, Schema};
154    use kinetic_core::metadata::{ComponentId, EventMetadata};
155
156    #[test]
157    fn test_avro_encode_decode() {
158        let schema = Arc::new(Schema::new(vec![
159            Field::new("id", DataType::Int32, false),
160            Field::new("name", DataType::Utf8, false),
161        ]));
162
163        let batch = RecordBatch::try_new(
164            schema.clone(),
165            vec![
166                Arc::new(Int32Array::from(vec![1, 2, 3])),
167                Arc::new(StringArray::from(vec!["a", "b", "c"])),
168            ],
169        )
170        .unwrap();
171
172        let metadata = Arc::new(EventMetadata::new("test", ComponentId::from("test-source")));
173        let event_batch = EventBatch::new_with_xid(batch, metadata.clone()).unwrap();
174        let expected_schema = event_batch.payload.schema();
175
176        let encoder = AvroEncoder::default();
177        let encoded = encoder.encode(&event_batch).unwrap();
178
179        let decoder = AvroDecoder::default();
180        let decoded = decoder.decode(&encoded, metadata).unwrap();
181
182        assert_eq!(decoded.payload.num_rows(), 3);
183        assert_eq!(decoded.payload.schema(), expected_schema);
184    }
185}