kinetic_encoder_avro/
lib.rs1#![deny(clippy::unwrap_used)]
4
5use arrow_avro::reader::ReaderBuilder;
6use arrow_avro::writer::AvroWriter;
7use arrow_schema::Schema;
8use bytes::Bytes;
9use kinetic_core::encode::{
10 Decoder, DecoderConfig, Encoder, EncoderConfig, Error as EncodeError, Result as EncodeResult,
11};
12use kinetic_core::{ArcEventMetadata, EventBatch};
13use serde::{Deserialize, Serialize};
14use snafu::Snafu;
15use std::io::Cursor;
16use std::sync::Arc;
17
18#[derive(Debug, Snafu)]
19pub enum Error {
20 #[snafu(display("Avro error: {}", source))]
21 Avro { source: arrow_schema::ArrowError },
22
23 #[snafu(display("Arrow error: {}", source))]
24 Arrow { source: arrow_schema::ArrowError },
25
26 #[snafu(display("Kinetic core error: {}", source))]
27 KineticCore { source: kinetic_core::Error },
28}
29
30impl From<Error> for EncodeError {
31 fn from(err: Error) -> Self {
32 match err {
33 Error::Avro { source } => EncodeError::Encode {
34 source: Box::new(source),
35 },
36 Error::Arrow { source } => EncodeError::Encode {
37 source: Box::new(source),
38 },
39 Error::KineticCore { source } => EncodeError::Encode {
40 source: Box::new(source),
41 },
42 }
43 }
44}
45
46pub type Result<T, E = Error> = std::result::Result<T, E>;
47
48#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
50pub struct AvroEncoderOptions {}
51
52impl EncoderConfig for AvroEncoderOptions {
53 fn build(&self) -> EncodeResult<Arc<dyn Encoder>> {
54 Ok(Arc::new(AvroEncoder::new(self.clone())))
55 }
56}
57
58#[derive(Debug, Default)]
60pub struct AvroEncoder {
61 _options: AvroEncoderOptions,
62}
63
64impl AvroEncoder {
65 pub fn new(options: AvroEncoderOptions) -> Self {
66 Self { _options: options }
67 }
68}
69
70impl Encoder for AvroEncoder {
71 fn encode(&self, batch: &EventBatch) -> EncodeResult<Bytes> {
72 let mut buffer = Vec::new();
73 {
74 let mut writer = AvroWriter::new(&mut buffer, (*batch.payload.schema()).clone())
75 .map_err(|e| Error::Avro { source: e.into() })?;
76 writer
77 .write(&batch.payload)
78 .map_err(|e| Error::Avro { source: e.into() })?;
79 writer
80 .finish()
81 .map_err(|e| Error::Avro { source: e.into() })?;
82 }
83 Ok(Bytes::from(buffer))
84 }
85}
86#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
88pub struct AvroDecoderOptions {
89 pub max_size: Option<usize>,
91}
92
93impl Default for AvroDecoderOptions {
94 fn default() -> Self {
95 Self {
96 max_size: Some(10 * 1024 * 1024), }
98 }
99}
100
101impl DecoderConfig for AvroDecoderOptions {
102 fn build(&self, _schema: Arc<Schema>) -> EncodeResult<Arc<dyn Decoder>> {
103 Ok(Arc::new(AvroDecoder::new(self.clone())))
104 }
105}
106
107#[derive(Debug, Default)]
109pub struct AvroDecoder {
110 options: AvroDecoderOptions,
111}
112
113impl AvroDecoder {
114 pub fn new(options: AvroDecoderOptions) -> Self {
115 Self { options }
116 }
117}
118
119impl Decoder for AvroDecoder {
120 fn decode(&self, data: &[u8], metadata: ArcEventMetadata) -> EncodeResult<EventBatch> {
121 if let Some(limit) = self.options.max_size
122 && data.len() > limit
123 {
124 return Err(kinetic_core::encode::Error::MessageTooLarge {
125 size: data.len(),
126 limit,
127 });
128 }
129
130 let cursor = Cursor::new(data);
131 let mut reader = ReaderBuilder::new()
132 .build(cursor)
133 .map_err(|e| Error::Arrow { source: e })?;
134
135 if let Some(maybe_batch) = reader.next() {
136 let batch = maybe_batch.map_err(|e| Error::Arrow { source: e })?;
137 EventBatch::new_with_xid(batch, metadata)
138 .map_err(|e| Error::KineticCore { source: e })
139 .map_err(Into::<EncodeError>::into)
140 } else {
141 Err(EncodeError::Decode {
142 source: "No batches found in Avro data".into(),
143 })
144 }
145 }
146}
147
148#[cfg(test)]
149#[allow(clippy::unwrap_used)]
150mod tests {
151 use super::*;
152 use arrow_array::{Int32Array, RecordBatch, StringArray};
153 use arrow_schema::{DataType, Field, Schema};
154 use kinetic_core::metadata::{ComponentId, EventMetadata};
155
156 #[test]
157 fn test_avro_encode_decode() {
158 let schema = Arc::new(Schema::new(vec![
159 Field::new("id", DataType::Int32, false),
160 Field::new("name", DataType::Utf8, false),
161 ]));
162
163 let batch = RecordBatch::try_new(
164 schema.clone(),
165 vec![
166 Arc::new(Int32Array::from(vec![1, 2, 3])),
167 Arc::new(StringArray::from(vec!["a", "b", "c"])),
168 ],
169 )
170 .unwrap();
171
172 let metadata = Arc::new(EventMetadata::new("test", ComponentId::from("test-source")));
173 let event_batch = EventBatch::new_with_xid(batch, metadata.clone()).unwrap();
174 let expected_schema = event_batch.payload.schema();
175
176 let encoder = AvroEncoder::default();
177 let encoded = encoder.encode(&event_batch).unwrap();
178
179 let decoder = AvroDecoder::default();
180 let decoded = decoder.decode(&encoded, metadata).unwrap();
181
182 assert_eq!(decoded.payload.num_rows(), 3);
183 assert_eq!(decoded.payload.schema(), expected_schema);
184 }
185}