1#![deny(clippy::unwrap_used)]
4
5use arrow_schema::Schema;
6use bytes::Bytes;
7use kinetic_core::encode::{
8 Decoder, DecoderConfig, Encoder, EncoderConfig, Error as EncodeError, Result as EncodeResult,
9};
10use kinetic_core::{ArcEventMetadata, EventBatch};
11use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
12use parquet::arrow::arrow_writer::ArrowWriter;
13use parquet::basic::Compression;
14use parquet::file::properties::WriterProperties;
15use serde::{Deserialize, Serialize};
16use snafu::Snafu;
17use std::sync::Arc;
18
19#[derive(Debug, Snafu)]
20pub enum Error {
21 #[snafu(display("Parquet error: {}", source))]
22 Parquet {
23 source: parquet::errors::ParquetError,
24 },
25
26 #[snafu(display("Arrow error: {}", source))]
27 Arrow { source: arrow_schema::ArrowError },
28
29 #[snafu(display("Kinetic core error: {}", source))]
30 KineticCore { source: kinetic_core::Error },
31}
32
33impl From<Error> for EncodeError {
34 fn from(err: Error) -> Self {
35 match err {
36 Error::Parquet { source } => EncodeError::Encode {
37 source: Box::new(source),
38 },
39 Error::Arrow { source } => EncodeError::Encode {
40 source: Box::new(source),
41 },
42 Error::KineticCore { source } => EncodeError::Encode {
43 source: Box::new(source),
44 },
45 }
46 }
47}
48
49pub type Result<T, E = Error> = std::result::Result<T, E>;
50
51#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
53#[serde(rename_all = "snake_case")]
54pub enum ParquetCompression {
55 #[default]
56 Lz4,
57 Snappy,
58 Zstd,
59 Gzip,
60 None,
61}
62
63impl From<ParquetCompression> for Compression {
64 fn from(c: ParquetCompression) -> Self {
65 match c {
66 ParquetCompression::Lz4 => Compression::LZ4,
67 ParquetCompression::Snappy => Compression::SNAPPY,
68 ParquetCompression::Zstd => Compression::ZSTD(Default::default()),
69 ParquetCompression::Gzip => Compression::GZIP(Default::default()),
70 ParquetCompression::None => Compression::UNCOMPRESSED,
71 }
72 }
73}
74
75#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
77pub struct ParquetEncoderOptions {
78 #[serde(default)]
80 pub compression: ParquetCompression,
81}
82
83impl EncoderConfig for ParquetEncoderOptions {
84 fn build(&self) -> EncodeResult<Arc<dyn Encoder>> {
85 Ok(Arc::new(ParquetEncoder::new(self.clone())))
86 }
87}
88
89#[derive(Debug, Default)]
91pub struct ParquetEncoder {
92 options: ParquetEncoderOptions,
93}
94
95impl ParquetEncoder {
96 pub fn new(options: ParquetEncoderOptions) -> Self {
97 Self { options }
98 }
99}
100
101impl Encoder for ParquetEncoder {
102 fn encode(&self, batch: &EventBatch) -> EncodeResult<Bytes> {
103 let mut buffer = Vec::new();
104 let props = WriterProperties::builder()
105 .set_compression(self.options.compression.into())
106 .build();
107
108 {
109 let mut writer = ArrowWriter::try_new(&mut buffer, batch.payload.schema(), Some(props))
110 .map_err(|e| Error::Parquet { source: e })?;
111 writer
112 .write(&batch.payload)
113 .map_err(|e| Error::Parquet { source: e })?;
114 writer.close().map_err(|e| Error::Parquet { source: e })?;
115 }
116 Ok(Bytes::from(buffer))
117 }
118}
119#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
121pub struct ParquetDecoderOptions {
122 pub max_size: Option<usize>,
124}
125
126impl Default for ParquetDecoderOptions {
127 fn default() -> Self {
128 Self {
129 max_size: Some(10 * 1024 * 1024), }
131 }
132}
133
134impl DecoderConfig for ParquetDecoderOptions {
135 fn build(&self, _schema: Arc<Schema>) -> EncodeResult<Arc<dyn Decoder>> {
136 Ok(Arc::new(ParquetDecoder::new(self.clone())))
137 }
138}
139
140#[derive(Debug, Default)]
142pub struct ParquetDecoder {
143 options: ParquetDecoderOptions,
144}
145
146impl ParquetDecoder {
147 pub fn new(options: ParquetDecoderOptions) -> Self {
148 Self { options }
149 }
150}
151
152impl Decoder for ParquetDecoder {
153 fn decode(&self, data: &[u8], metadata: ArcEventMetadata) -> EncodeResult<EventBatch> {
154 if let Some(limit) = self.options.max_size
155 && data.len() > limit
156 {
157 return Err(kinetic_core::encode::Error::MessageTooLarge {
158 size: data.len(),
159 limit,
160 });
161 }
162
163 let bytes = Bytes::copy_from_slice(data);
164 let builder = ParquetRecordBatchReaderBuilder::try_new(bytes)
165 .map_err(|e| Error::Parquet { source: e })?;
166
167 let mut reader = builder.build().map_err(|e| Error::Parquet { source: e })?;
168
169 if let Some(maybe_batch) = reader.next() {
170 let batch = maybe_batch.map_err(|e| Error::Arrow { source: e })?;
171 EventBatch::new_with_xid(batch, metadata)
172 .map_err(|e| Error::KineticCore { source: e })
173 .map_err(Into::<EncodeError>::into)
174 } else {
175 Err(EncodeError::Decode {
176 source: "No batches found in Parquet data".into(),
177 })
178 }
179 }
180}
181
182#[cfg(test)]
183#[allow(clippy::unwrap_used)]
184mod tests {
185 use super::*;
186 use arrow_array::{Int32Array, RecordBatch, StringArray};
187 use arrow_schema::{DataType, Field, Schema};
188 use kinetic_core::metadata::{ComponentId, EventMetadata};
189
190 #[test]
191 fn test_parquet_encode_decode() {
192 let schema = Arc::new(Schema::new(vec![
193 Field::new("id", DataType::Int32, false),
194 Field::new("name", DataType::Utf8, false),
195 ]));
196
197 let batch = RecordBatch::try_new(
198 schema.clone(),
199 vec![
200 Arc::new(Int32Array::from(vec![1, 2, 3])),
201 Arc::new(StringArray::from(vec!["a", "b", "c"])),
202 ],
203 )
204 .unwrap();
205
206 let metadata = Arc::new(EventMetadata::new("test", ComponentId::from("test-source")));
207 let event_batch = EventBatch::new_with_xid(batch, metadata.clone()).unwrap();
208 let expected_schema = event_batch.payload.schema();
209
210 let encoder = ParquetEncoder::default();
211 let encoded = encoder.encode(&event_batch).unwrap();
212
213 let decoder = ParquetDecoder::default();
214 let decoded = decoder.decode(&encoded, metadata).unwrap();
215
216 assert_eq!(decoded.payload.num_rows(), 3);
217 assert_eq!(decoded.payload.schema(), expected_schema);
218 }
219}