kinetic/sources/internal/
metrics.rs1use arrow_array::{Float64Array, RecordBatch, StringArray};
4use arrow_schema::{DataType, Field, Schema};
5use kinetic_buffers::BufferSender;
6use kinetic_core::{ComponentId, EventBatch, EventMetadata, ShutdownSignal};
7use std::sync::Arc;
8use sysinfo::System;
9use tokio::time::{Duration, interval};
10use tracing::{error, info};
11
12use metrics::{Label, counter};
13
14pub struct InternalMetricsTask {
15 sender: BufferSender,
16 #[allow(dead_code)]
17 error_sender: BufferSender,
18 component_id: String,
19 pipeline_id: String,
20 interval_secs: u64,
21 shutdown: ShutdownSignal,
22 labels: Arc<[Label]>,
23 sys: System,
24}
25
26impl InternalMetricsTask {
27 pub fn new(
28 component_id: String,
29 pipeline_id: String,
30 sender: BufferSender,
31 error_sender: BufferSender,
32 interval_secs: u64,
33 shutdown: ShutdownSignal,
34 ) -> Self {
35 let labels: Arc<[Label]> = Arc::new([
36 Label::new("component_id", component_id.clone()),
37 Label::new("component_type", "source"),
38 Label::new("component_kind", "internal_metrics"),
39 ]);
40
41 Self {
42 sender,
43 error_sender,
44 component_id,
45 pipeline_id,
46 interval_secs,
47 shutdown,
48 labels,
49 sys: System::new_all(),
50 }
51 }
52
53 pub async fn run(mut self) {
54 info!("Starting internal_metrics source: {}", self.component_id);
55
56 let mut ticker = interval(Duration::from_secs(self.interval_secs));
57 let metadata = Arc::new(EventMetadata::new(
58 self.pipeline_id.clone(),
59 ComponentId(self.component_id.clone()),
60 ));
61
62 let schema = Arc::new(Schema::new(vec![
63 Field::new("metric_name", DataType::Utf8, false),
64 Field::new("value", DataType::Float64, false),
65 Field::new("unit", DataType::Utf8, true),
66 ]));
67
68 loop {
69 tokio::select! {
70 _ = self.shutdown.recv() => {
71 info!("internal_metrics source '{}' received shutdown signal", self.component_id);
72 break;
73 }
74 _ = ticker.tick() => {
75 self.sys.refresh_all();
76
77 let mut names = Vec::new();
78 let mut values = Vec::new();
79 let mut units = Vec::new();
80
81 names.push("system_memory_used_bytes".to_string());
83 values.push(self.sys.used_memory() as f64);
84 units.push(Some("bytes".to_string()));
85
86 names.push("system_memory_total_bytes".to_string());
87 values.push(self.sys.total_memory() as f64);
88 units.push(Some("bytes".to_string()));
89
90 names.push("system_cpu_usage_percent".to_string());
92 values.push(self.sys.global_cpu_usage() as f64);
93 units.push(Some("percent".to_string()));
94
95 match sysinfo::get_current_pid() {
97 Ok(pid) => {
98 if let Some(process) = self.sys.process(pid) {
99 names.push("process_memory_usage_bytes".to_string());
100 values.push(process.memory() as f64);
101 units.push(Some("bytes".to_string()));
102
103 names.push("process_cpu_usage_percent".to_string());
104 values.push(process.cpu_usage() as f64);
105 units.push(Some("percent".to_string()));
106 }
107 }
108 Err(e) => {
109 tracing::warn!("Failed to get current PID: {}", e);
110 }
111 }
112
113 let record_batch =
114 match RecordBatch::try_new(
115 schema.clone(),
116 vec![
117 Arc::new(StringArray::from(names)),
118 Arc::new(Float64Array::from(values)),
119 Arc::new(StringArray::from(units)),
120 ]
121 )
122 {
123 Ok(rb) => rb,
124 Err(e) => {
125 error!("Failed to create RecordBatch for internal metrics: {}", e);
126 continue;
127 }
128 };
129
130 match EventBatch::new_with_xid(record_batch, metadata.clone()) {
131 Ok(batch) => {
132 let rows = batch.num_rows();
133 let bytes = batch.estimated_size();
134 counter!("component_received_events_total", self.labels.iter()).increment(rows as u64);
135 counter!("component_received_event_bytes_total", self.labels.iter()).increment(bytes as u64);
136
137 if let Err(e) = self.sender.send(batch).await {
138 error!("Failed to send internal metrics downstream: {:?}", e);
139 } else {
140 counter!("component_sent_events_total", self.labels.iter()).increment(rows as u64);
141 counter!("component_sent_event_bytes_total", self.labels.iter()).increment(bytes as u64);
142 }
143 }
144 Err(e) => {
145 error!("Failed to create EventBatch for internal metrics: {}", e);
146 }
147 }
148 }
149 }
150 }
151 }
152}