Skip to main content

kinetic_core/
encode.rs

1//! Encoder and Decoder traits for serializing and deserializing EventBatch.
2
3use crate::event::EventBatch;
4use crate::metadata::ArcEventMetadata;
5use bytes::Bytes;
6use snafu::Snafu;
7use std::sync::Arc;
8
9#[derive(Debug, Snafu)]
10pub enum Error {
11    #[snafu(display("Failed to encode batch: {}", source))]
12    Encode {
13        #[snafu(source(false))]
14        source: Box<dyn std::error::Error + Send + Sync + 'static>,
15    },
16
17    #[snafu(display("Failed to decode batch: {}", source))]
18    Decode {
19        #[snafu(source(false))]
20        source: Box<dyn std::error::Error + Send + Sync + 'static>,
21    },
22
23    #[snafu(display("Message size {} exceeds limit of {}", size, limit))]
24    MessageTooLarge { size: usize, limit: usize },
25}
26
27pub type Result<T, E = Error> = std::result::Result<T, E>;
28
29/// A trait for components that convert an `EventBatch` into a byte stream.
30pub trait Encoder: Send + Sync {
31    /// Encodes a single `EventBatch` into a single byte stream.
32    /// This is typically used for batch-oriented sinks like S3 or GCS.
33    fn encode(&self, batch: &EventBatch) -> Result<Bytes>;
34
35    /// Encodes an `EventBatch` into multiple independent byte streams (one per event).
36    /// This is typically used for message-oriented sinks like Kafka.
37    /// Default implementation just returns a single Vec with the output of `encode`.
38    fn encode_individual(&self, batch: &EventBatch) -> Result<Vec<Bytes>> {
39        self.encode(batch).map(|b| vec![b])
40    }
41}
42
43/// A trait for components that convert a byte stream into an `EventBatch`.
44pub trait Decoder: Send + Sync {
45    /// Decodes bytes into an `EventBatch`.
46    /// The caller must provide the starting metadata for the batch (e.g., source ID, ingestion time).
47    fn decode(&self, data: &[u8], metadata: ArcEventMetadata) -> Result<EventBatch>;
48}
49
50/// Configuration trait for building encoders.
51pub trait EncoderConfig: Send + Sync {
52    fn build(&self) -> Result<Arc<dyn Encoder>>;
53}
54
55/// Configuration trait for building decoders.
56pub trait DecoderConfig: Send + Sync {
57    /// Builds a decoder, given the expected Arrow schema.
58    fn build(&self, schema: Arc<arrow_schema::Schema>) -> Result<Arc<dyn Decoder>>;
59}