1#![deny(clippy::unwrap_used)]
4
5use arrow_json::{ReaderBuilder, WriterBuilder};
6use bytes::{BufMut, Bytes, BytesMut};
7use kinetic_core::encode::{
8 Decoder, DecoderConfig, Encoder, EncoderConfig, Error as EncodeError, Result as EncodeResult,
9};
10use kinetic_core::{ArcEventMetadata, EventBatch};
11use serde::{Deserialize, Serialize};
12use snafu::Snafu;
13use sonic_rs::{JsonContainerTrait, JsonValueMutTrait};
14use std::io::Cursor;
15use std::sync::Arc;
16
17#[derive(Debug, Snafu)]
18pub enum Error {
19 #[snafu(display("Arrow JSON encoding error: {}", source))]
20 ArrowEncode { source: arrow_schema::ArrowError },
21
22 #[snafu(display("Arrow JSON decoding error: {}", source))]
23 ArrowDecode { source: arrow_schema::ArrowError },
24
25 #[snafu(display("Kinetic core error: {}", source))]
26 KineticCore { source: kinetic_core::Error },
27
28 #[snafu(display("Failed to parse JSON using sonic-rs: {}", source))]
29 SonicParse { source: sonic_rs::Error },
30}
31
32impl From<Error> for EncodeError {
33 fn from(err: Error) -> Self {
34 match err {
35 Error::ArrowEncode { source } => EncodeError::Encode {
36 source: Box::new(source),
37 },
38 Error::ArrowDecode { source } => EncodeError::Decode {
39 source: Box::new(source),
40 },
41 Error::KineticCore { source } => EncodeError::Decode {
42 source: Box::new(source),
43 },
44 Error::SonicParse { source } => EncodeError::Decode {
45 source: Box::new(source),
46 },
47 }
48 }
49}
50
51pub type Result<T, E = Error> = std::result::Result<T, E>;
52
53#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
55pub struct JsonEncoderOptions {
56 pub as_array: bool,
59}
60
61impl EncoderConfig for JsonEncoderOptions {
62 fn build(&self) -> EncodeResult<Arc<dyn Encoder>> {
63 Ok(Arc::new(JsonEncoder::new(self.clone())))
64 }
65}
66
67#[derive(Debug, Default)]
69pub struct JsonEncoder {
70 options: JsonEncoderOptions,
71}
72
73impl JsonEncoder {
74 pub fn new(options: JsonEncoderOptions) -> Self {
75 Self { options }
76 }
77
78 fn encode_inner(&self, batch: &EventBatch) -> Result<Bytes> {
80 let buf = BytesMut::new().writer();
81
82 if self.options.as_array {
83 let mut writer = WriterBuilder::new().build::<_, arrow_json::writer::JsonArray>(buf);
84 writer
85 .write(&batch.payload)
86 .map_err(|e| Error::ArrowEncode { source: e })?;
87 writer
88 .finish()
89 .map_err(|e| Error::ArrowEncode { source: e })?;
90 Ok(writer.into_inner().into_inner().freeze())
91 } else {
92 let mut writer =
93 WriterBuilder::new().build::<_, arrow_json::writer::LineDelimited>(buf);
94 writer
95 .write(&batch.payload)
96 .map_err(|e| Error::ArrowEncode { source: e })?;
97 writer
98 .finish()
99 .map_err(|e| Error::ArrowEncode { source: e })?;
100 Ok(writer.into_inner().into_inner().freeze())
101 }
102 }
103}
104
105impl Encoder for JsonEncoder {
106 fn encode(&self, batch: &EventBatch) -> EncodeResult<Bytes> {
107 self.encode_inner(batch).map_err(Into::into)
108 }
109
110 fn encode_individual(&self, batch: &EventBatch) -> EncodeResult<Vec<Bytes>> {
111 if self.options.as_array {
112 let mut results = Vec::with_capacity(batch.num_rows());
115 for i in 0..batch.num_rows() {
116 let row = batch.payload.slice(i, 1);
117 let buf = BytesMut::new().writer();
118 let mut writer =
119 WriterBuilder::new().build::<_, arrow_json::writer::LineDelimited>(buf);
120 writer
121 .write(&row)
122 .map_err(|e| Error::ArrowEncode { source: e })?;
123 writer
124 .finish()
125 .map_err(|e| Error::ArrowEncode { source: e })?;
126 results.push(writer.into_inner().into_inner().freeze());
127 }
128 Ok(results)
129 } else {
130 let all_bytes = self.encode_inner(batch).map_err(EncodeError::from)?;
132 let mut results = Vec::with_capacity(batch.num_rows());
133 let mut start = 0;
134 for (i, &b) in all_bytes.iter().enumerate() {
135 if b == b'\n' {
136 results.push(all_bytes.slice(start..i + 1));
137 start = i + 1;
138 }
139 }
140 if start < all_bytes.len() {
142 results.push(all_bytes.slice(start..all_bytes.len()));
143 }
144 Ok(results)
145 }
146 }
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
151pub struct JsonDecoderOptions {
152 pub batch_size: usize,
154 pub max_size: Option<usize>,
156}
157
158impl Default for JsonDecoderOptions {
159 fn default() -> Self {
160 Self {
161 batch_size: 1024,
162 max_size: Some(10 * 1024 * 1024), }
164 }
165}
166
167impl DecoderConfig for JsonDecoderOptions {
168 fn build(&self, schema: Arc<arrow_schema::Schema>) -> EncodeResult<Arc<dyn Decoder>> {
169 Ok(Arc::new(JsonDecoder::new(self.clone(), schema)))
170 }
171}
172
173#[derive(Debug)]
175pub struct JsonDecoder {
176 options: JsonDecoderOptions,
177 schema: Arc<arrow_schema::Schema>,
178}
179
180impl JsonDecoder {
181 pub fn new(options: JsonDecoderOptions, schema: Arc<arrow_schema::Schema>) -> Self {
182 Self { options, schema }
183 }
184
185 fn decode_inner(&self, data: &[u8], metadata: ArcEventMetadata) -> EncodeResult<EventBatch> {
187 if let Some(limit) = self.options.max_size
188 && data.len() > limit
189 {
190 return Err(EncodeError::MessageTooLarge {
191 size: data.len(),
192 limit,
193 });
194 }
195
196 if self.schema.column_with_name("_unmapped").is_some() {
198 return self.decode_with_unmapped(data, metadata);
199 }
200
201 let cursor = Cursor::new(data);
202 let mut reader = ReaderBuilder::new(self.schema.clone())
203 .with_batch_size(self.options.batch_size)
204 .build(cursor)
205 .map_err(|e| Error::ArrowDecode { source: e })?;
206
207 let record_batch = reader
208 .next()
209 .transpose()
210 .map_err(|e| Error::ArrowDecode { source: e })?;
211
212 match record_batch {
213 Some(batch) => EventBatch::new_with_xid(batch, metadata)
214 .map_err(|e| Error::KineticCore { source: e }),
215 None => EventBatch::new_with_xid(
216 arrow_array::RecordBatch::new_empty(self.schema.clone()),
217 metadata,
218 )
219 .map_err(|e| Error::KineticCore { source: e }),
220 }
221 .map_err(Into::into)
222 }
223
224 fn decode_with_unmapped(
225 &self,
226 data: &[u8],
227 metadata: ArcEventMetadata,
228 ) -> EncodeResult<EventBatch> {
229 let mut rows = Vec::new();
231 if let Ok(val) = sonic_rs::from_slice::<sonic_rs::Value>(data) {
233 if let Some(arr) = val.as_array() {
234 for v in arr {
235 rows.push(self.process_unmapped_row(v)?);
236 }
237 } else {
238 rows.push(self.process_unmapped_row(&val)?);
239 }
240 } else {
241 let reader = std::io::BufReader::new(data);
243 use std::io::BufRead;
244 for line in reader.lines() {
245 let line = line.map_err(|e| EncodeError::Decode {
246 source: Box::new(e),
247 })?;
248 if line.trim().is_empty() {
249 continue;
250 }
251 let val = sonic_rs::from_str::<sonic_rs::Value>(&line).map_err(|e| {
252 EncodeError::Decode {
253 source: Box::new(e),
254 }
255 })?;
256 rows.push(self.process_unmapped_row(&val)?);
257 }
258 }
259
260 if rows.is_empty() {
261 return EventBatch::new_with_xid(
262 arrow_array::RecordBatch::new_empty(self.schema.clone()),
263 metadata,
264 )
265 .map_err(|e| EncodeError::Decode {
266 source: Box::new(e),
267 });
268 }
269
270 let mut decoder = arrow_json::ReaderBuilder::new(self.schema.clone())
272 .build_decoder()
273 .map_err(|e| EncodeError::Decode {
274 source: Box::new(e),
275 })?;
276
277 decoder.serialize(&rows).map_err(|e| EncodeError::Decode {
278 source: Box::new(e),
279 })?;
280
281 let batch = decoder
282 .flush()
283 .map_err(|e| EncodeError::Decode {
284 source: Box::new(e),
285 })?
286 .unwrap_or_else(|| arrow_array::RecordBatch::new_empty(self.schema.clone()));
287
288 EventBatch::new_with_xid(batch, metadata).map_err(|e| EncodeError::Decode {
289 source: Box::new(e),
290 })
291 }
292
293 fn process_unmapped_row(&self, val: &sonic_rs::Value) -> EncodeResult<sonic_rs::Value> {
294 let mut row = val.clone();
295 if let Some(obj) = row.as_object_mut() {
296 let mut unmapped = sonic_rs::json!({});
297 let unmapped_obj = unmapped
298 .as_object_mut()
299 .ok_or_else(|| EncodeError::Decode {
300 source: "Failed to create unmapped object".into(),
301 })?;
302
303 let mut keys_to_remove = Vec::new();
305 for (k, v) in obj.iter() {
306 if k == "_unmapped" {
307 continue;
308 }
309 if self.schema.column_with_name(k).is_none() {
310 unmapped_obj.insert(k, v.clone());
311 keys_to_remove.push(k.to_owned());
312 }
313 }
314
315 for k in keys_to_remove {
316 obj.remove(&k);
317 }
318
319 if !unmapped_obj.is_empty() {
320 let unmapped_str =
321 sonic_rs::to_string(&unmapped).map_err(|e| EncodeError::Decode {
322 source: Box::new(e),
323 })?;
324 obj.insert("_unmapped", sonic_rs::Value::from(unmapped_str.as_str()));
325 }
326 }
327 Ok(row)
328 }
329}
330
331impl Decoder for JsonDecoder {
332 fn decode(&self, data: &[u8], metadata: ArcEventMetadata) -> EncodeResult<EventBatch> {
333 self.decode_inner(data, metadata)
334 }
335}
336
337#[cfg(test)]
338#[allow(clippy::unwrap_used)]
339mod tests {
340 use super::*;
341 use arrow_array::{Int32Array, RecordBatch};
342 use arrow_schema::{DataType, Field, Schema};
343 use kinetic_core::{ComponentId, EventMetadata};
344
345 fn dummy_metadata() -> ArcEventMetadata {
346 Arc::new(EventMetadata::new("test", ComponentId("test".to_string())))
347 }
348
349 #[test]
350 fn test_json_encode_ndjson() {
351 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
352 let array = Int32Array::from(vec![1, 2, 3]);
353 let rb = RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap();
354 let rb = kinetic_core::xid_util::ensure_xid_column(rb).unwrap();
355 let batch = EventBatch::new_with_xid(rb, dummy_metadata()).expect("failed to create batch");
356
357 let encoder = JsonEncoder::default();
358 let encoded = encoder.encode(&batch).unwrap();
359
360 let encoded_str = std::str::from_utf8(encoded.as_ref()).unwrap();
361 assert!(encoded_str.contains("\"a\":1"));
362 assert!(encoded_str.contains("\"a\":2"));
363 assert!(encoded_str.contains("\"a\":3"));
364 assert!(encoded_str.contains("\"_xid\":"));
365 }
366
367 #[test]
368 fn test_json_decode() {
369 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
370 let decoder = JsonDecoder::new(JsonDecoderOptions::default(), schema);
371
372 let json_data = b"{\"a\":1}\n{\"a\":2}\n{\"a\":3}\n";
373 let batch = decoder.decode(json_data, dummy_metadata()).unwrap();
374
375 assert_eq!(batch.num_rows(), 3);
376 let col = batch
377 .payload
378 .column(0)
379 .as_any()
380 .downcast_ref::<arrow_array::Int32Array>()
381 .unwrap();
382 assert_eq!(col.value(0), 1);
383 assert_eq!(col.value(2), 3);
384 }
385}