kinetic_core/metadata.rs
1//! Event metadata structures for Kinetic pipelines.
2
3use serde::{Deserialize, Serialize};
4use std::sync::Arc;
5use std::time::SystemTime;
6use uuid::Uuid;
7
8/// A globally unique identifier for a pipeline component (source, transform, sink).
9#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
10pub struct ComponentId(pub String);
11
12impl From<&str> for ComponentId {
13 fn from(s: &str) -> Self {
14 Self(s.to_string())
15 }
16}
17
18/// Identifies a specific named output from a component.
19/// Format is `component_id.output_name`.
20#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
21pub struct OutputId {
22 /// The ID of the component producing the output.
23 pub component: ComponentId,
24 /// The specific named output (e.g., "logs", "metrics", "team_a.logs").
25 pub output_name: String,
26}
27
28impl std::fmt::Display for OutputId {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 write!(f, "{}.{}", self.component.0, self.output_name)
31 }
32}
33
34/// Core metadata attached to every batch of events flowing through the pipeline.
35///
36/// This is wrapped in an `Arc` when attached to an `EventBatch` to ensure
37/// cheap cloning during fanout operations.
38#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
39pub struct EventMetadata {
40 /// A globally unique identifier for this specific batch.
41 /// Used for end-to-end tracing of data flow.
42 pub batch_id: Uuid,
43
44 /// The ID of the pipeline this batch belongs to.
45 pub pipeline_id: String,
46
47 /// The time the data was originally generated at the source (if known).
48 /// For example, the S3 object last-modified time, or Kafka message timestamp.
49 pub source_time: Option<SystemTime>,
50
51 /// The time the data was ingested by Kinetic's source component.
52 pub ingestion_time: SystemTime,
53
54 /// The component that originally produced this batch.
55 pub source_id: ComponentId,
56
57 /// The specific output of the component that produced this batch.
58 /// This is `None` until the batch actually leaves a source or transform.
59 pub upstream_id: Option<OutputId>,
60
61 /// Optional tenant profile name. Used primarily by multi-tenant sources
62 /// like the OTLP source to tag which tenant generated the data.
63 pub tenant_profile: Option<String>,
64}
65
66impl EventMetadata {
67 /// Creates a new `EventMetadata` with a random UUID and current ingestion time.
68 pub fn new(pipeline_id: impl Into<String>, source_id: ComponentId) -> Self {
69 Self {
70 batch_id: Uuid::new_v4(),
71 pipeline_id: pipeline_id.into(),
72 source_time: None,
73 ingestion_time: SystemTime::now(),
74 source_id,
75 upstream_id: None,
76 tenant_profile: None,
77 }
78 }
79
80 /// Sets the source time (e.g., from external metadata).
81 pub fn with_source_time(mut self, time: SystemTime) -> Self {
82 self.source_time = Some(time);
83 self
84 }
85
86 /// Sets the tenant profile.
87 pub fn with_tenant_profile(mut self, profile: impl Into<String>) -> Self {
88 self.tenant_profile = Some(profile.into());
89 self
90 }
91
92 /// Sets the upstream output ID. This is typically called by a component
93 /// just before sending the batch to the next component.
94 pub fn with_upstream_id(mut self, upstream_id: OutputId) -> Self {
95 self.upstream_id = Some(upstream_id);
96 self
97 }
98}
99
100/// A thread-safe, cheaply cloneable wrapper around event metadata.
101pub type ArcEventMetadata = Arc<EventMetadata>;