Skip to main content

kinetic_encoder_json/
lib.rs

1//! JSON encoder and decoder for Kinetic using sonic-rs and arrow-json.
2
3#![deny(clippy::unwrap_used)]
4
5use arrow_json::{ReaderBuilder, WriterBuilder};
6use bytes::{BufMut, Bytes, BytesMut};
7use kinetic_core::encode::{
8    Decoder, DecoderConfig, Encoder, EncoderConfig, Error as EncodeError, Result as EncodeResult,
9};
10use kinetic_core::{ArcEventMetadata, EventBatch};
11use serde::{Deserialize, Serialize};
12use snafu::Snafu;
13use sonic_rs::{JsonContainerTrait, JsonValueMutTrait};
14use std::io::Cursor;
15use std::sync::Arc;
16
17#[derive(Debug, Snafu)]
18pub enum Error {
19    #[snafu(display("Arrow JSON encoding error: {}", source))]
20    ArrowEncode { source: arrow_schema::ArrowError },
21
22    #[snafu(display("Arrow JSON decoding error: {}", source))]
23    ArrowDecode { source: arrow_schema::ArrowError },
24
25    #[snafu(display("Kinetic core error: {}", source))]
26    KineticCore { source: kinetic_core::Error },
27
28    #[snafu(display("Failed to parse JSON using sonic-rs: {}", source))]
29    SonicParse { source: sonic_rs::Error },
30}
31
32impl From<Error> for EncodeError {
33    fn from(err: Error) -> Self {
34        match err {
35            Error::ArrowEncode { source } => EncodeError::Encode {
36                source: Box::new(source),
37            },
38            Error::ArrowDecode { source } => EncodeError::Decode {
39                source: Box::new(source),
40            },
41            Error::KineticCore { source } => EncodeError::Decode {
42                source: Box::new(source),
43            },
44            Error::SonicParse { source } => EncodeError::Decode {
45                source: Box::new(source),
46            },
47        }
48    }
49}
50
51pub type Result<T, E = Error> = std::result::Result<T, E>;
52
53/// Options for configuring JSON encoding.
54#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
55pub struct JsonEncoderOptions {
56    /// If true, encode each batch as a JSON array of objects.
57    /// If false, encode as NDJSON (newline-delimited JSON).
58    pub as_array: bool,
59}
60
61impl EncoderConfig for JsonEncoderOptions {
62    fn build(&self) -> EncodeResult<Arc<dyn Encoder>> {
63        Ok(Arc::new(JsonEncoder::new(self.clone())))
64    }
65}
66
67/// JSON Encoder
68#[derive(Debug, Default)]
69pub struct JsonEncoder {
70    options: JsonEncoderOptions,
71}
72
73impl JsonEncoder {
74    pub fn new(options: JsonEncoderOptions) -> Self {
75        Self { options }
76    }
77
78    /// Encodes an `EventBatch` into JSON bytes.
79    fn encode_inner(&self, batch: &EventBatch) -> Result<Bytes> {
80        let buf = BytesMut::new().writer();
81
82        if self.options.as_array {
83            let mut writer = WriterBuilder::new().build::<_, arrow_json::writer::JsonArray>(buf);
84            writer
85                .write(&batch.payload)
86                .map_err(|e| Error::ArrowEncode { source: e })?;
87            writer
88                .finish()
89                .map_err(|e| Error::ArrowEncode { source: e })?;
90            Ok(writer.into_inner().into_inner().freeze())
91        } else {
92            let mut writer =
93                WriterBuilder::new().build::<_, arrow_json::writer::LineDelimited>(buf);
94            writer
95                .write(&batch.payload)
96                .map_err(|e| Error::ArrowEncode { source: e })?;
97            writer
98                .finish()
99                .map_err(|e| Error::ArrowEncode { source: e })?;
100            Ok(writer.into_inner().into_inner().freeze())
101        }
102    }
103}
104
105impl Encoder for JsonEncoder {
106    fn encode(&self, batch: &EventBatch) -> EncodeResult<Bytes> {
107        self.encode_inner(batch).map_err(Into::into)
108    }
109
110    fn encode_individual(&self, batch: &EventBatch) -> EncodeResult<Vec<Bytes>> {
111        if self.options.as_array {
112            // If configured as an array, we still want individual objects for Kafka.
113            // We use the slow path here as it's a rare configuration for message-oriented sinks.
114            let mut results = Vec::with_capacity(batch.num_rows());
115            for i in 0..batch.num_rows() {
116                let row = batch.payload.slice(i, 1);
117                let buf = BytesMut::new().writer();
118                let mut writer =
119                    WriterBuilder::new().build::<_, arrow_json::writer::LineDelimited>(buf);
120                writer
121                    .write(&row)
122                    .map_err(|e| Error::ArrowEncode { source: e })?;
123                writer
124                    .finish()
125                    .map_err(|e| Error::ArrowEncode { source: e })?;
126                results.push(writer.into_inner().into_inner().freeze());
127            }
128            Ok(results)
129        } else {
130            // Fast path: encode once to NDJSON and split by newline.
131            let all_bytes = self.encode_inner(batch).map_err(EncodeError::from)?;
132            let mut results = Vec::with_capacity(batch.num_rows());
133            let mut start = 0;
134            for (i, &b) in all_bytes.iter().enumerate() {
135                if b == b'\n' {
136                    results.push(all_bytes.slice(start..i + 1));
137                    start = i + 1;
138                }
139            }
140            // Handle last line if not newline terminated (though arrow-json usually terminates it)
141            if start < all_bytes.len() {
142                results.push(all_bytes.slice(start..all_bytes.len()));
143            }
144            Ok(results)
145        }
146    }
147}
148
149/// Options for configuring JSON decoding.
150#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
151pub struct JsonDecoderOptions {
152    /// Batch size for parsing.
153    pub batch_size: usize,
154    /// Maximum size of a single message in bytes.
155    pub max_size: Option<usize>,
156}
157
158impl Default for JsonDecoderOptions {
159    fn default() -> Self {
160        Self {
161            batch_size: 1024,
162            max_size: Some(10 * 1024 * 1024), // 10MB default
163        }
164    }
165}
166
167impl DecoderConfig for JsonDecoderOptions {
168    fn build(&self, schema: Arc<arrow_schema::Schema>) -> EncodeResult<Arc<dyn Decoder>> {
169        Ok(Arc::new(JsonDecoder::new(self.clone(), schema)))
170    }
171}
172
173/// JSON Decoder
174#[derive(Debug)]
175pub struct JsonDecoder {
176    options: JsonDecoderOptions,
177    schema: Arc<arrow_schema::Schema>,
178}
179
180impl JsonDecoder {
181    pub fn new(options: JsonDecoderOptions, schema: Arc<arrow_schema::Schema>) -> Self {
182        Self { options, schema }
183    }
184
185    /// Decodes JSON bytes into an `EventBatch`.
186    fn decode_inner(&self, data: &[u8], metadata: ArcEventMetadata) -> EncodeResult<EventBatch> {
187        if let Some(limit) = self.options.max_size
188            && data.len() > limit
189        {
190            return Err(EncodeError::MessageTooLarge {
191                size: data.len(),
192                limit,
193            });
194        }
195
196        // Check if we need to handle _unmapped
197        if self.schema.column_with_name("_unmapped").is_some() {
198            return self.decode_with_unmapped(data, metadata);
199        }
200
201        let cursor = Cursor::new(data);
202        let mut reader = ReaderBuilder::new(self.schema.clone())
203            .with_batch_size(self.options.batch_size)
204            .build(cursor)
205            .map_err(|e| Error::ArrowDecode { source: e })?;
206
207        let record_batch = reader
208            .next()
209            .transpose()
210            .map_err(|e| Error::ArrowDecode { source: e })?;
211
212        match record_batch {
213            Some(batch) => EventBatch::new_with_xid(batch, metadata)
214                .map_err(|e| Error::KineticCore { source: e }),
215            None => EventBatch::new_with_xid(
216                arrow_array::RecordBatch::new_empty(self.schema.clone()),
217                metadata,
218            )
219            .map_err(|e| Error::KineticCore { source: e }),
220        }
221        .map_err(Into::into)
222    }
223
224    fn decode_with_unmapped(
225        &self,
226        data: &[u8],
227        metadata: ArcEventMetadata,
228    ) -> EncodeResult<EventBatch> {
229        // Parse using sonic-rs to handle extra fields
230        let mut rows = Vec::new();
231        // Try to parse as array first, then as NDJSON
232        if let Ok(val) = sonic_rs::from_slice::<sonic_rs::Value>(data) {
233            if let Some(arr) = val.as_array() {
234                for v in arr {
235                    rows.push(self.process_unmapped_row(v)?);
236                }
237            } else {
238                rows.push(self.process_unmapped_row(&val)?);
239            }
240        } else {
241            // NDJSON
242            let reader = std::io::BufReader::new(data);
243            use std::io::BufRead;
244            for line in reader.lines() {
245                let line = line.map_err(|e| EncodeError::Decode {
246                    source: Box::new(e),
247                })?;
248                if line.trim().is_empty() {
249                    continue;
250                }
251                let val = sonic_rs::from_str::<sonic_rs::Value>(&line).map_err(|e| {
252                    EncodeError::Decode {
253                        source: Box::new(e),
254                    }
255                })?;
256                rows.push(self.process_unmapped_row(&val)?);
257            }
258        }
259
260        if rows.is_empty() {
261            return EventBatch::new_with_xid(
262                arrow_array::RecordBatch::new_empty(self.schema.clone()),
263                metadata,
264            )
265            .map_err(|e| EncodeError::Decode {
266                source: Box::new(e),
267            });
268        }
269
270        // Pass sonic_rs::Value directly to arrow_json::Decoder::serialize
271        let mut decoder = arrow_json::ReaderBuilder::new(self.schema.clone())
272            .build_decoder()
273            .map_err(|e| EncodeError::Decode {
274                source: Box::new(e),
275            })?;
276
277        decoder.serialize(&rows).map_err(|e| EncodeError::Decode {
278            source: Box::new(e),
279        })?;
280
281        let batch = decoder
282            .flush()
283            .map_err(|e| EncodeError::Decode {
284                source: Box::new(e),
285            })?
286            .unwrap_or_else(|| arrow_array::RecordBatch::new_empty(self.schema.clone()));
287
288        EventBatch::new_with_xid(batch, metadata).map_err(|e| EncodeError::Decode {
289            source: Box::new(e),
290        })
291    }
292
293    fn process_unmapped_row(&self, val: &sonic_rs::Value) -> EncodeResult<sonic_rs::Value> {
294        let mut row = val.clone();
295        if let Some(obj) = row.as_object_mut() {
296            let mut unmapped = sonic_rs::json!({});
297            let unmapped_obj = unmapped
298                .as_object_mut()
299                .ok_or_else(|| EncodeError::Decode {
300                    source: "Failed to create unmapped object".into(),
301                })?;
302
303            // Collect all fields not in schema
304            let mut keys_to_remove = Vec::new();
305            for (k, v) in obj.iter() {
306                if k == "_unmapped" {
307                    continue;
308                }
309                if self.schema.column_with_name(k).is_none() {
310                    unmapped_obj.insert(k, v.clone());
311                    keys_to_remove.push(k.to_owned());
312                }
313            }
314
315            for k in keys_to_remove {
316                obj.remove(&k);
317            }
318
319            if !unmapped_obj.is_empty() {
320                let unmapped_str =
321                    sonic_rs::to_string(&unmapped).map_err(|e| EncodeError::Decode {
322                        source: Box::new(e),
323                    })?;
324                obj.insert("_unmapped", sonic_rs::Value::from(unmapped_str.as_str()));
325            }
326        }
327        Ok(row)
328    }
329}
330
331impl Decoder for JsonDecoder {
332    fn decode(&self, data: &[u8], metadata: ArcEventMetadata) -> EncodeResult<EventBatch> {
333        self.decode_inner(data, metadata)
334    }
335}
336
337#[cfg(test)]
338#[allow(clippy::unwrap_used)]
339mod tests {
340    use super::*;
341    use arrow_array::{Int32Array, RecordBatch};
342    use arrow_schema::{DataType, Field, Schema};
343    use kinetic_core::{ComponentId, EventMetadata};
344
345    fn dummy_metadata() -> ArcEventMetadata {
346        Arc::new(EventMetadata::new("test", ComponentId("test".to_string())))
347    }
348
349    #[test]
350    fn test_json_encode_ndjson() {
351        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
352        let array = Int32Array::from(vec![1, 2, 3]);
353        let rb = RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap();
354        let rb = kinetic_core::xid_util::ensure_xid_column(rb).unwrap();
355        let batch = EventBatch::new_with_xid(rb, dummy_metadata()).expect("failed to create batch");
356
357        let encoder = JsonEncoder::default();
358        let encoded = encoder.encode(&batch).unwrap();
359
360        let encoded_str = std::str::from_utf8(encoded.as_ref()).unwrap();
361        assert!(encoded_str.contains("\"a\":1"));
362        assert!(encoded_str.contains("\"a\":2"));
363        assert!(encoded_str.contains("\"a\":3"));
364        assert!(encoded_str.contains("\"_xid\":"));
365    }
366
367    #[test]
368    fn test_json_decode() {
369        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
370        let decoder = JsonDecoder::new(JsonDecoderOptions::default(), schema);
371
372        let json_data = b"{\"a\":1}\n{\"a\":2}\n{\"a\":3}\n";
373        let batch = decoder.decode(json_data, dummy_metadata()).unwrap();
374
375        assert_eq!(batch.num_rows(), 3);
376        let col = batch
377            .payload
378            .column(0)
379            .as_any()
380            .downcast_ref::<arrow_array::Int32Array>()
381            .unwrap();
382        assert_eq!(col.value(0), 1);
383        assert_eq!(col.value(2), 3);
384    }
385}