Skip to main content

kinetic/agent/
health_reporter.rs

1//! Health reporter for Kinetic Fleet Agent.
2
3use crate::agent::HealthSummary;
4use crate::agent::identity::HostIdentity;
5use crate::topology::RunningTopology;
6use anyhow::Result;
7use std::sync::Arc;
8use std::time::Instant;
9use sysinfo::{CpuRefreshKind, MemoryRefreshKind, System};
10use tokio::sync::Mutex;
11
12pub struct HealthReporter {
13    pub nexus_url: String,
14    pub identity: Arc<HostIdentity>,
15    pub sys: Mutex<System>,
16    pub client: reqwest::Client,
17    pub start_time: Instant,
18}
19
20impl HealthReporter {
21    pub fn new(nexus_url: String, identity: Arc<HostIdentity>) -> Self {
22        let mut sys = System::new_with_specifics(
23            sysinfo::RefreshKind::nothing()
24                .with_cpu(CpuRefreshKind::everything())
25                .with_memory(MemoryRefreshKind::everything()),
26        );
27        sys.refresh_all();
28
29        Self {
30            nexus_url,
31            identity,
32            sys: Mutex::new(sys),
33            client: reqwest::Client::new(),
34            start_time: Instant::now(),
35        }
36    }
37
38    pub async fn report(&self, topology: Arc<Mutex<Option<RunningTopology>>>) -> Result<()> {
39        let mut sys = self.sys.lock().await;
40        sys.refresh_all();
41
42        let cpu_usage = sys.global_cpu_usage() as f64;
43        let mem_usage = (sys.used_memory() as f64 / sys.total_memory() as f64) * 100.0;
44
45        let status = {
46            let guard = topology.lock().await;
47            if guard.is_some() {
48                "running".to_string()
49            } else {
50                "stopped".to_string()
51            }
52        };
53
54        // BUG-3: Wire actual metrics (dummy placeholder for metrics registry for now, but uptime is real)
55        let uptime_secs = self.start_time.elapsed().as_secs();
56
57        let summary = HealthSummary {
58            cpu_percent: cpu_usage,
59            memory_percent: mem_usage,
60            pipeline_status: status,
61            error_rate: 0.0,        // TODO: Get from metrics
62            events_per_second: 0.0, // TODO: Get from metrics
63            uptime_secs,
64        };
65
66        tracing::debug!(
67            "Internal Health Summary: CPU: {:.2}%, Mem: {:.2}%, Status: {}",
68            summary.cpu_percent,
69            summary.memory_percent,
70            summary.pipeline_status
71        );
72
73        // BUG-8: Health reporter never sends data
74        let url = format!("{}/fleet/health", self.nexus_url);
75        self.client
76            .post(&url)
77            .header("Authorization", format!("Bearer {}", self.identity.token))
78            .json(&summary)
79            .send()
80            .await?;
81
82        Ok(())
83    }
84}