Skip to main content

kinetic_common/
telemetry.rs

1//! Telemetry abstractions for Kinetic.
2
3use metrics::{Label, counter, histogram};
4use std::time::Duration;
5
6/// Macro to register a telemetry event.
7#[macro_export]
8macro_rules! register {
9    ($event:expr) => {
10        $event
11    };
12}
13
14/// Namespace for metrics.
15pub enum Namespace {
16    Component,
17    Buffer,
18    Topology,
19}
20
21impl Namespace {
22    pub fn as_str(&self) -> &'static str {
23        match self {
24            Self::Component => "component",
25            Self::Buffer => "buffer",
26            Self::Topology => "topology",
27        }
28    }
29}
30
31/// Trait for telemetry events.
32pub trait TelemetryEvent {
33    fn emit(&self);
34}
35
36/// EventDuration instruments how long it takes to process an event.
37#[derive(Debug, Clone)]
38pub struct EventDuration {
39    labels: Vec<Label>,
40}
41
42impl EventDuration {
43    pub fn new(component_id: String, component_type: &'static str) -> Self {
44        Self {
45            labels: vec![
46                Label::new("component_id", component_id),
47                Label::new("component_type", component_type),
48            ],
49        }
50    }
51
52    pub fn emit(&self, duration: Duration) {
53        histogram!("component_event_duration_seconds", self.labels.iter())
54            .record(duration.as_secs_f64());
55    }
56}
57
58/// BytesSent instruments how many bytes are sent downstream.
59#[derive(Debug, Clone)]
60pub struct BytesSent {
61    labels: Vec<Label>,
62}
63
64impl BytesSent {
65    pub fn new(component_id: String, component_type: &'static str, protocol: String) -> Self {
66        Self {
67            labels: vec![
68                Label::new("component_id", component_id),
69                Label::new("component_type", component_type),
70                Label::new("protocol", protocol),
71            ],
72        }
73    }
74
75    pub fn emit(&self, bytes: usize) {
76        counter!("component_sent_bytes_total", self.labels.iter()).increment(bytes as u64);
77    }
78}
79
80/// EventsSent instruments how many events are sent downstream.
81#[derive(Debug, Clone)]
82pub struct EventsSent {
83    labels: Vec<Label>,
84}
85
86impl EventsSent {
87    pub fn new(component_id: String, component_type: &'static str) -> Self {
88        Self {
89            labels: vec![
90                Label::new("component_id", component_id),
91                Label::new("component_type", component_type),
92            ],
93        }
94    }
95
96    pub fn emit(&self, count: usize, bytes: usize) {
97        counter!("component_sent_events_total", self.labels.iter()).increment(count as u64);
98        counter!("component_sent_event_bytes_total", self.labels.iter()).increment(bytes as u64);
99    }
100}