Skip to main content

kinetic_encoder_parquet/
lib.rs

1//! Parquet encoder and decoder for Kinetic.
2
3#![deny(clippy::unwrap_used)]
4
5use arrow_schema::Schema;
6use bytes::Bytes;
7use kinetic_core::encode::{
8    Decoder, DecoderConfig, Encoder, EncoderConfig, Error as EncodeError, Result as EncodeResult,
9};
10use kinetic_core::{ArcEventMetadata, EventBatch};
11use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
12use parquet::arrow::arrow_writer::ArrowWriter;
13use parquet::basic::Compression;
14use parquet::file::properties::WriterProperties;
15use serde::{Deserialize, Serialize};
16use snafu::Snafu;
17use std::sync::Arc;
18
19#[derive(Debug, Snafu)]
20pub enum Error {
21    #[snafu(display("Parquet error: {}", source))]
22    Parquet {
23        source: parquet::errors::ParquetError,
24    },
25
26    #[snafu(display("Arrow error: {}", source))]
27    Arrow { source: arrow_schema::ArrowError },
28
29    #[snafu(display("Kinetic core error: {}", source))]
30    KineticCore { source: kinetic_core::Error },
31}
32
33impl From<Error> for EncodeError {
34    fn from(err: Error) -> Self {
35        match err {
36            Error::Parquet { source } => EncodeError::Encode {
37                source: Box::new(source),
38            },
39            Error::Arrow { source } => EncodeError::Encode {
40                source: Box::new(source),
41            },
42            Error::KineticCore { source } => EncodeError::Encode {
43                source: Box::new(source),
44            },
45        }
46    }
47}
48
49pub type Result<T, E = Error> = std::result::Result<T, E>;
50
51/// Supported Parquet compression codecs.
52#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
53#[serde(rename_all = "snake_case")]
54pub enum ParquetCompression {
55    #[default]
56    Lz4,
57    Snappy,
58    Zstd,
59    Gzip,
60    None,
61}
62
63impl From<ParquetCompression> for Compression {
64    fn from(c: ParquetCompression) -> Self {
65        match c {
66            ParquetCompression::Lz4 => Compression::LZ4,
67            ParquetCompression::Snappy => Compression::SNAPPY,
68            ParquetCompression::Zstd => Compression::ZSTD(Default::default()),
69            ParquetCompression::Gzip => Compression::GZIP(Default::default()),
70            ParquetCompression::None => Compression::UNCOMPRESSED,
71        }
72    }
73}
74
75/// Options for configuring Parquet encoding.
76#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
77pub struct ParquetEncoderOptions {
78    /// Compression algorithm for Parquet files.
79    #[serde(default)]
80    pub compression: ParquetCompression,
81}
82
83impl EncoderConfig for ParquetEncoderOptions {
84    fn build(&self) -> EncodeResult<Arc<dyn Encoder>> {
85        Ok(Arc::new(ParquetEncoder::new(self.clone())))
86    }
87}
88
89/// Parquet Encoder
90#[derive(Debug, Default)]
91pub struct ParquetEncoder {
92    options: ParquetEncoderOptions,
93}
94
95impl ParquetEncoder {
96    pub fn new(options: ParquetEncoderOptions) -> Self {
97        Self { options }
98    }
99}
100
101impl Encoder for ParquetEncoder {
102    fn encode(&self, batch: &EventBatch) -> EncodeResult<Bytes> {
103        let mut buffer = Vec::new();
104        let props = WriterProperties::builder()
105            .set_compression(self.options.compression.into())
106            .build();
107
108        {
109            let mut writer = ArrowWriter::try_new(&mut buffer, batch.payload.schema(), Some(props))
110                .map_err(|e| Error::Parquet { source: e })?;
111            writer
112                .write(&batch.payload)
113                .map_err(|e| Error::Parquet { source: e })?;
114            writer.close().map_err(|e| Error::Parquet { source: e })?;
115        }
116        Ok(Bytes::from(buffer))
117    }
118}
119/// Options for configuring Parquet decoding.
120#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
121pub struct ParquetDecoderOptions {
122    /// Maximum size of a single message in bytes.
123    pub max_size: Option<usize>,
124}
125
126impl Default for ParquetDecoderOptions {
127    fn default() -> Self {
128        Self {
129            max_size: Some(10 * 1024 * 1024), // 10MB default
130        }
131    }
132}
133
134impl DecoderConfig for ParquetDecoderOptions {
135    fn build(&self, _schema: Arc<Schema>) -> EncodeResult<Arc<dyn Decoder>> {
136        Ok(Arc::new(ParquetDecoder::new(self.clone())))
137    }
138}
139
140/// Parquet Decoder
141#[derive(Debug, Default)]
142pub struct ParquetDecoder {
143    options: ParquetDecoderOptions,
144}
145
146impl ParquetDecoder {
147    pub fn new(options: ParquetDecoderOptions) -> Self {
148        Self { options }
149    }
150}
151
152impl Decoder for ParquetDecoder {
153    fn decode(&self, data: &[u8], metadata: ArcEventMetadata) -> EncodeResult<EventBatch> {
154        if let Some(limit) = self.options.max_size
155            && data.len() > limit
156        {
157            return Err(kinetic_core::encode::Error::MessageTooLarge {
158                size: data.len(),
159                limit,
160            });
161        }
162
163        let bytes = Bytes::copy_from_slice(data);
164        let builder = ParquetRecordBatchReaderBuilder::try_new(bytes)
165            .map_err(|e| Error::Parquet { source: e })?;
166
167        let mut reader = builder.build().map_err(|e| Error::Parquet { source: e })?;
168
169        if let Some(maybe_batch) = reader.next() {
170            let batch = maybe_batch.map_err(|e| Error::Arrow { source: e })?;
171            EventBatch::new_with_xid(batch, metadata)
172                .map_err(|e| Error::KineticCore { source: e })
173                .map_err(Into::<EncodeError>::into)
174        } else {
175            Err(EncodeError::Decode {
176                source: "No batches found in Parquet data".into(),
177            })
178        }
179    }
180}
181
182#[cfg(test)]
183#[allow(clippy::unwrap_used)]
184mod tests {
185    use super::*;
186    use arrow_array::{Int32Array, RecordBatch, StringArray};
187    use arrow_schema::{DataType, Field, Schema};
188    use kinetic_core::metadata::{ComponentId, EventMetadata};
189
190    #[test]
191    fn test_parquet_encode_decode() {
192        let schema = Arc::new(Schema::new(vec![
193            Field::new("id", DataType::Int32, false),
194            Field::new("name", DataType::Utf8, false),
195        ]));
196
197        let batch = RecordBatch::try_new(
198            schema.clone(),
199            vec![
200                Arc::new(Int32Array::from(vec![1, 2, 3])),
201                Arc::new(StringArray::from(vec!["a", "b", "c"])),
202            ],
203        )
204        .unwrap();
205
206        let metadata = Arc::new(EventMetadata::new("test", ComponentId::from("test-source")));
207        let event_batch = EventBatch::new_with_xid(batch, metadata.clone()).unwrap();
208        let expected_schema = event_batch.payload.schema();
209
210        let encoder = ParquetEncoder::default();
211        let encoded = encoder.encode(&event_batch).unwrap();
212
213        let decoder = ParquetDecoder::default();
214        let decoded = decoder.decode(&encoded, metadata).unwrap();
215
216        assert_eq!(decoded.payload.num_rows(), 3);
217        assert_eq!(decoded.payload.schema(), expected_schema);
218    }
219}