Skip to main content

kinetic_core/
event.rs

1//! Core event definitions wrapping Apache Arrow structures.
2
3use crate::Result;
4use crate::ack::AckToken;
5use crate::metadata::ArcEventMetadata;
6use arrow_array::RecordBatch;
7
8/// The core unit of data flowing through a Kinetic pipeline.
9///
10/// An `EventBatch` wraps an Apache Arrow `RecordBatch` along with
11/// metadata tracking its lineage, timing, and multi-tenant profile.
12#[derive(Clone, Debug)]
13pub struct EventBatch {
14    /// The actual data payload in columnar Arrow format.
15    pub payload: RecordBatch,
16
17    /// Metadata tracking the batch's journey through the pipeline.
18    pub metadata: ArcEventMetadata,
19
20    /// Optional token for end-to-end acknowledgement.
21    pub ack_token: Option<AckToken>,
22}
23
24impl PartialEq for EventBatch {
25    fn eq(&self, other: &Self) -> bool {
26        self.payload == other.payload
27            && self.metadata == other.metadata
28            && self.ack_token == other.ack_token
29    }
30}
31
32impl EventBatch {
33    /// Create a new `EventBatch` from a `RecordBatch` and metadata.
34    ///
35    /// This is a low-level constructor that assumes the payload is already correctly formatted.
36    pub fn new(payload: RecordBatch, metadata: ArcEventMetadata) -> Result<Self> {
37        Ok(Self {
38            payload,
39            metadata,
40            ack_token: None,
41        })
42    }
43
44    /// Create a new `EventBatch` and ensure it has a unique `_xid` column.
45    ///
46    /// This is temporarily mapped to `new` to delay xid creation until it is actually needed
47    /// (e.g., when routing to an error topic).
48    pub fn new_with_xid(payload: RecordBatch, metadata: ArcEventMetadata) -> Result<Self> {
49        Self::new(payload, metadata)
50    }
51
52    /// Create a new `EventBatch` with an acknowledgement token.
53    pub fn new_with_ack(
54        payload: RecordBatch,
55        metadata: ArcEventMetadata,
56        ack_token: AckToken,
57    ) -> Result<Self> {
58        let mut batch = Self::new(payload, metadata)?;
59        batch.ack_token = Some(ack_token);
60        Ok(batch)
61    }
62
63    /// Returns the number of rows (events) in this batch.
64    pub fn num_rows(&self) -> usize {
65        self.payload.num_rows()
66    }
67
68    /// Returns `true` if the batch contains no rows.
69    pub fn is_empty(&self) -> bool {
70        self.payload.num_rows() == 0
71    }
72
73    /// Returns the estimated in-memory size of the batch in bytes.
74    pub fn estimated_size(&self) -> usize {
75        self.payload.get_array_memory_size()
76    }
77}