Skip to main content

kinetic_core/
metadata.rs

1//! Event metadata structures for Kinetic pipelines.
2
3use serde::{Deserialize, Serialize};
4use std::sync::Arc;
5use std::time::SystemTime;
6use uuid::Uuid;
7
8/// A globally unique identifier for a pipeline component (source, transform, sink).
9#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
10pub struct ComponentId(pub String);
11
12impl From<&str> for ComponentId {
13    fn from(s: &str) -> Self {
14        Self(s.to_string())
15    }
16}
17
18/// Identifies a specific named output from a component.
19/// Format is `component_id.output_name`.
20#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
21pub struct OutputId {
22    /// The ID of the component producing the output.
23    pub component: ComponentId,
24    /// The specific named output (e.g., "logs", "metrics", "team_a.logs").
25    pub output_name: String,
26}
27
28impl std::fmt::Display for OutputId {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        write!(f, "{}.{}", self.component.0, self.output_name)
31    }
32}
33
34/// Core metadata attached to every batch of events flowing through the pipeline.
35///
36/// This is wrapped in an `Arc` when attached to an `EventBatch` to ensure
37/// cheap cloning during fanout operations.
38#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
39pub struct EventMetadata {
40    /// A globally unique identifier for this specific batch.
41    /// Used for end-to-end tracing of data flow.
42    pub batch_id: Uuid,
43
44    /// The ID of the pipeline this batch belongs to.
45    pub pipeline_id: String,
46
47    /// The time the data was originally generated at the source (if known).
48    /// For example, the S3 object last-modified time, or Kafka message timestamp.
49    pub source_time: Option<SystemTime>,
50
51    /// The time the data was ingested by Kinetic's source component.
52    pub ingestion_time: SystemTime,
53
54    /// The component that originally produced this batch.
55    pub source_id: ComponentId,
56
57    /// The specific output of the component that produced this batch.
58    /// This is `None` until the batch actually leaves a source or transform.
59    pub upstream_id: Option<OutputId>,
60
61    /// Optional tenant profile name. Used primarily by multi-tenant sources
62    /// like the OTLP source to tag which tenant generated the data.
63    pub tenant_profile: Option<String>,
64}
65
66impl EventMetadata {
67    /// Creates a new `EventMetadata` with a random UUID and current ingestion time.
68    pub fn new(pipeline_id: impl Into<String>, source_id: ComponentId) -> Self {
69        Self {
70            batch_id: Uuid::new_v4(),
71            pipeline_id: pipeline_id.into(),
72            source_time: None,
73            ingestion_time: SystemTime::now(),
74            source_id,
75            upstream_id: None,
76            tenant_profile: None,
77        }
78    }
79
80    /// Sets the source time (e.g., from external metadata).
81    pub fn with_source_time(mut self, time: SystemTime) -> Self {
82        self.source_time = Some(time);
83        self
84    }
85
86    /// Sets the tenant profile.
87    pub fn with_tenant_profile(mut self, profile: impl Into<String>) -> Self {
88        self.tenant_profile = Some(profile.into());
89        self
90    }
91
92    /// Sets the upstream output ID. This is typically called by a component
93    /// just before sending the batch to the next component.
94    pub fn with_upstream_id(mut self, upstream_id: OutputId) -> Self {
95        self.upstream_id = Some(upstream_id);
96        self
97    }
98}
99
100/// A thread-safe, cheaply cloneable wrapper around event metadata.
101pub type ArcEventMetadata = Arc<EventMetadata>;