kinetic/agent/
health_reporter.rs1use crate::agent::HealthSummary;
4use crate::agent::identity::HostIdentity;
5use crate::topology::RunningTopology;
6use anyhow::Result;
7use std::sync::Arc;
8use std::time::Instant;
9use sysinfo::{CpuRefreshKind, MemoryRefreshKind, System};
10use tokio::sync::Mutex;
11
12pub struct HealthReporter {
13 pub nexus_url: String,
14 pub identity: Arc<HostIdentity>,
15 pub sys: Mutex<System>,
16 pub client: reqwest::Client,
17 pub start_time: Instant,
18}
19
20impl HealthReporter {
21 pub fn new(nexus_url: String, identity: Arc<HostIdentity>) -> Self {
22 let mut sys = System::new_with_specifics(
23 sysinfo::RefreshKind::nothing()
24 .with_cpu(CpuRefreshKind::everything())
25 .with_memory(MemoryRefreshKind::everything()),
26 );
27 sys.refresh_all();
28
29 Self {
30 nexus_url,
31 identity,
32 sys: Mutex::new(sys),
33 client: reqwest::Client::new(),
34 start_time: Instant::now(),
35 }
36 }
37
38 pub async fn report(&self, topology: Arc<Mutex<Option<RunningTopology>>>) -> Result<()> {
39 let mut sys = self.sys.lock().await;
40 sys.refresh_all();
41
42 let cpu_usage = sys.global_cpu_usage() as f64;
43 let mem_usage = (sys.used_memory() as f64 / sys.total_memory() as f64) * 100.0;
44
45 let status = {
46 let guard = topology.lock().await;
47 if guard.is_some() {
48 "running".to_string()
49 } else {
50 "stopped".to_string()
51 }
52 };
53
54 let uptime_secs = self.start_time.elapsed().as_secs();
56
57 let summary = HealthSummary {
58 cpu_percent: cpu_usage,
59 memory_percent: mem_usage,
60 pipeline_status: status,
61 error_rate: 0.0, events_per_second: 0.0, uptime_secs,
64 };
65
66 tracing::debug!(
67 "Internal Health Summary: CPU: {:.2}%, Mem: {:.2}%, Status: {}",
68 summary.cpu_percent,
69 summary.memory_percent,
70 summary.pipeline_status
71 );
72
73 let url = format!("{}/fleet/health", self.nexus_url);
75 self.client
76 .post(&url)
77 .header("Authorization", format!("Bearer {}", self.identity.token))
78 .json(&summary)
79 .send()
80 .await?;
81
82 Ok(())
83 }
84}