Skip to main content

kinetic/agent/
mod.rs

1//! Kinetic Fleet Agent for Nexus integration.
2
3pub mod config_poller;
4pub mod health_reporter;
5pub mod identity;
6
7use crate::topology::RunningTopology;
8use anyhow::Result;
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11use tokio::sync::Mutex;
12use tracing::{error, info};
13
14#[derive(Serialize, Deserialize)]
15pub struct HealthSummary {
16    pub cpu_percent: f64,
17    pub memory_percent: f64,
18    pub pipeline_status: String,
19    pub error_rate: f64,
20    pub events_per_second: f64,
21    pub uptime_secs: u64,
22}
23
24pub struct FleetAgent {
25    pub nexus_url: String,
26    pub identity: Arc<identity::HostIdentity>,
27    pub poll_interval_secs: u64,
28    pub data_dir: std::path::PathBuf,
29}
30
31impl FleetAgent {
32    pub fn new(
33        nexus_url: String,
34        token: Option<String>,
35        poll_interval_secs: u64,
36        data_dir: std::path::PathBuf,
37    ) -> Result<Self> {
38        let identity = identity::HostIdentity::load_or_enroll(token)?;
39        Ok(Self {
40            nexus_url,
41            identity: Arc::new(identity),
42            poll_interval_secs,
43            data_dir,
44        })
45    }
46
47    pub async fn run(&self, initial_topology: Option<RunningTopology>) -> Result<()> {
48        info!("Starting Fleet Agent polling loop...");
49
50        let topology = Arc::new(Mutex::new(initial_topology));
51        let poller = config_poller::ConfigPoller::new(
52            self.nexus_url.clone(),
53            self.identity.clone(),
54            self.poll_interval_secs,
55        );
56
57        let reporter =
58            health_reporter::HealthReporter::new(self.nexus_url.clone(), self.identity.clone());
59
60        let mut poll_interval =
61            tokio::time::interval(std::time::Duration::from_secs(self.poll_interval_secs));
62
63        loop {
64            tokio::select! {
65                _ = poll_interval.tick() => {
66                    // 1. Sync state and check for config updates
67                    match poller.sync(topology.clone()).await {
68                        Ok(_) => info!("Fleet sync successful"),
69                        Err(e) => error!("Fleet sync failed: {}", e),
70                    }
71
72                    // 2. Report health
73                    match reporter.report(topology.clone()).await {
74                        Ok(_) => info!("Health report sent"),
75                        Err(e) => error!("Health report failed: {}", e),
76                    }
77                }
78                _ = tokio::signal::ctrl_c() => {
79                    info!("Fleet Agent shutting down...");
80                    break;
81                }
82            }
83        }
84
85        Ok(())
86    }
87}