Skip to main content

kinetic/sources/internal/
metrics.rs

1//! Internal metrics source that emits component health and throughput metrics.
2
3use arrow_array::{Float64Array, RecordBatch, StringArray};
4use arrow_schema::{DataType, Field, Schema};
5use kinetic_buffers::BufferSender;
6use kinetic_core::{ComponentId, EventBatch, EventMetadata, ShutdownSignal};
7use std::sync::Arc;
8use sysinfo::System;
9use tokio::time::{Duration, interval};
10use tracing::{error, info};
11
12use metrics::{Label, counter};
13
14pub struct InternalMetricsTask {
15    sender: BufferSender,
16    #[allow(dead_code)]
17    error_sender: BufferSender,
18    component_id: String,
19    pipeline_id: String,
20    interval_secs: u64,
21    shutdown: ShutdownSignal,
22    labels: Arc<[Label]>,
23    sys: System,
24}
25
26impl InternalMetricsTask {
27    pub fn new(
28        component_id: String,
29        pipeline_id: String,
30        sender: BufferSender,
31        error_sender: BufferSender,
32        interval_secs: u64,
33        shutdown: ShutdownSignal,
34    ) -> Self {
35        let labels: Arc<[Label]> = Arc::new([
36            Label::new("component_id", component_id.clone()),
37            Label::new("component_type", "source"),
38            Label::new("component_kind", "internal_metrics"),
39        ]);
40
41        Self {
42            sender,
43            error_sender,
44            component_id,
45            pipeline_id,
46            interval_secs,
47            shutdown,
48            labels,
49            sys: System::new_all(),
50        }
51    }
52
53    pub async fn run(mut self) {
54        info!("Starting internal_metrics source: {}", self.component_id);
55
56        let mut ticker = interval(Duration::from_secs(self.interval_secs));
57        let metadata = Arc::new(EventMetadata::new(
58            self.pipeline_id.clone(),
59            ComponentId(self.component_id.clone()),
60        ));
61
62        let schema = Arc::new(Schema::new(vec![
63            Field::new("metric_name", DataType::Utf8, false),
64            Field::new("value", DataType::Float64, false),
65            Field::new("unit", DataType::Utf8, true),
66        ]));
67
68        loop {
69            tokio::select! {
70                _ = self.shutdown.recv() => {
71                    info!("internal_metrics source '{}' received shutdown signal", self.component_id);
72                    break;
73                }
74                _ = ticker.tick() => {
75                    self.sys.refresh_all();
76
77                    let mut names = Vec::new();
78                    let mut values = Vec::new();
79                    let mut units = Vec::new();
80
81                    // Memory usage
82                    names.push("system_memory_used_bytes".to_string());
83                    values.push(self.sys.used_memory() as f64);
84                    units.push(Some("bytes".to_string()));
85
86                    names.push("system_memory_total_bytes".to_string());
87                    values.push(self.sys.total_memory() as f64);
88                    units.push(Some("bytes".to_string()));
89
90                    // CPU usage
91                    names.push("system_cpu_usage_percent".to_string());
92                    values.push(self.sys.global_cpu_usage() as f64);
93                    units.push(Some("percent".to_string()));
94
95                    // Process info
96                    match sysinfo::get_current_pid() {
97                        Ok(pid) => {
98                            if let Some(process) = self.sys.process(pid) {
99                                names.push("process_memory_usage_bytes".to_string());
100                                values.push(process.memory() as f64);
101                                units.push(Some("bytes".to_string()));
102
103                                names.push("process_cpu_usage_percent".to_string());
104                                values.push(process.cpu_usage() as f64);
105                                units.push(Some("percent".to_string()));
106                            }
107                        }
108                        Err(e) => {
109                            tracing::warn!("Failed to get current PID: {}", e);
110                        }
111                    }
112
113                    let record_batch =
114                        match RecordBatch::try_new(
115                            schema.clone(),
116                            vec![
117                                Arc::new(StringArray::from(names)),
118                                Arc::new(Float64Array::from(values)),
119                                Arc::new(StringArray::from(units)),
120                            ]
121                        )
122                        {
123                            Ok(rb) => rb,
124                            Err(e) => {
125                                error!("Failed to create RecordBatch for internal metrics: {}", e);
126                                continue;
127                            }
128                        };
129
130                    match EventBatch::new_with_xid(record_batch, metadata.clone()) {
131                        Ok(batch) => {
132                            let rows = batch.num_rows();
133                            let bytes = batch.estimated_size();
134                            counter!("component_received_events_total", self.labels.iter()).increment(rows as u64);
135                            counter!("component_received_event_bytes_total", self.labels.iter()).increment(bytes as u64);
136
137                            if let Err(e) = self.sender.send(batch).await {
138                                error!("Failed to send internal metrics downstream: {:?}", e);
139                            } else {
140                                counter!("component_sent_events_total", self.labels.iter()).increment(rows as u64);
141                                counter!("component_sent_event_bytes_total", self.labels.iter()).increment(bytes as u64);
142                            }
143                        }
144                        Err(e) => {
145                            error!("Failed to create EventBatch for internal metrics: {}", e);
146                        }
147                    }
148                }
149            }
150        }
151    }
152}