kinetic_core/event.rs
1//! Core event definitions wrapping Apache Arrow structures.
2
3use crate::Result;
4use crate::ack::AckToken;
5use crate::metadata::ArcEventMetadata;
6use arrow_array::RecordBatch;
7
8/// The core unit of data flowing through a Kinetic pipeline.
9///
10/// An `EventBatch` wraps an Apache Arrow `RecordBatch` along with
11/// metadata tracking its lineage, timing, and multi-tenant profile.
12#[derive(Clone, Debug)]
13pub struct EventBatch {
14 /// The actual data payload in columnar Arrow format.
15 pub payload: RecordBatch,
16
17 /// Metadata tracking the batch's journey through the pipeline.
18 pub metadata: ArcEventMetadata,
19
20 /// Optional token for end-to-end acknowledgement.
21 pub ack_token: Option<AckToken>,
22}
23
24impl PartialEq for EventBatch {
25 fn eq(&self, other: &Self) -> bool {
26 self.payload == other.payload
27 && self.metadata == other.metadata
28 && self.ack_token == other.ack_token
29 }
30}
31
32impl EventBatch {
33 /// Create a new `EventBatch` from a `RecordBatch` and metadata.
34 ///
35 /// This is a low-level constructor that assumes the payload is already correctly formatted.
36 pub fn new(payload: RecordBatch, metadata: ArcEventMetadata) -> Result<Self> {
37 Ok(Self {
38 payload,
39 metadata,
40 ack_token: None,
41 })
42 }
43
44 /// Create a new `EventBatch` and ensure it has a unique `_xid` column.
45 ///
46 /// This is temporarily mapped to `new` to delay xid creation until it is actually needed
47 /// (e.g., when routing to an error topic).
48 pub fn new_with_xid(payload: RecordBatch, metadata: ArcEventMetadata) -> Result<Self> {
49 Self::new(payload, metadata)
50 }
51
52 /// Create a new `EventBatch` with an acknowledgement token.
53 pub fn new_with_ack(
54 payload: RecordBatch,
55 metadata: ArcEventMetadata,
56 ack_token: AckToken,
57 ) -> Result<Self> {
58 let mut batch = Self::new(payload, metadata)?;
59 batch.ack_token = Some(ack_token);
60 Ok(batch)
61 }
62
63 /// Returns the number of rows (events) in this batch.
64 pub fn num_rows(&self) -> usize {
65 self.payload.num_rows()
66 }
67
68 /// Returns `true` if the batch contains no rows.
69 pub fn is_empty(&self) -> bool {
70 self.payload.num_rows() == 0
71 }
72
73 /// Returns the estimated in-memory size of the batch in bytes.
74 pub fn estimated_size(&self) -> usize {
75 self.payload.get_array_memory_size()
76 }
77}