Skip to main content

kinetic/agent/
config_poller.rs

1//! Configuration poller for Kinetic Fleet Agent.
2
3use crate::agent::HealthSummary;
4use crate::agent::identity::HostIdentity;
5use crate::topology::RunningTopology;
6use anyhow::{Context, Result};
7use serde::{Deserialize, Serialize};
8use sha2::Digest;
9use std::sync::Arc;
10use std::time::Instant;
11use sysinfo::{CpuRefreshKind, MemoryRefreshKind, System};
12use tokio::sync::Mutex;
13use tracing::{info, warn};
14
15pub struct ConfigPoller {
16    pub nexus_url: String,
17    pub identity: Arc<HostIdentity>,
18    pub client: reqwest::Client,
19    pub current_generation: Mutex<u64>,
20    pub sys: Mutex<System>,
21    pub start_time: Instant,
22}
23
24#[derive(Serialize)]
25struct SyncRequest {
26    #[serde(rename = "hostId")]
27    pub host_id: String,
28    #[serde(rename = "realmId")]
29    pub realm_id: String,
30    #[serde(rename = "configGeneration")]
31    pub config_generation: u64,
32    #[serde(rename = "healthSummary")]
33    pub health_summary: HealthSummary,
34}
35
36#[derive(Deserialize)]
37struct SyncResponse {
38    pub status: String, // NO_CHANGE, UPDATE_AVAILABLE, UPDATE_REQUIRED
39    #[serde(rename = "targetGeneration")]
40    pub target_generation: Option<u64>,
41    #[serde(rename = "applyNow")]
42    pub apply_now: Option<bool>,
43    pub _checksum: Option<String>,
44}
45
46#[derive(Deserialize)]
47struct ConfigResponse {
48    pub config: String,
49    pub _generation: u64,
50    pub _checksum: String,
51}
52
53impl ConfigPoller {
54    pub fn new(nexus_url: String, identity: Arc<HostIdentity>, _interval: u64) -> Self {
55        let mut sys = System::new_with_specifics(
56            sysinfo::RefreshKind::nothing()
57                .with_cpu(CpuRefreshKind::everything())
58                .with_memory(MemoryRefreshKind::everything()),
59        );
60        sys.refresh_all();
61
62        Self {
63            nexus_url,
64            identity,
65            client: reqwest::Client::new(),
66            current_generation: Mutex::new(0),
67            sys: Mutex::new(sys),
68            start_time: Instant::now(),
69        }
70    }
71
72    pub async fn sync(&self, topology: Arc<Mutex<Option<RunningTopology>>>) -> Result<()> {
73        let current_generation = *self.current_generation.lock().await;
74
75        let mut sys = self.sys.lock().await;
76        sys.refresh_all();
77
78        let cpu_usage = sys.global_cpu_usage() as f64;
79        let mem_usage = (sys.used_memory() as f64 / sys.total_memory() as f64) * 100.0;
80
81        let status = {
82            let guard = topology.lock().await;
83            if guard.is_some() {
84                "running".to_string()
85            } else {
86                "stopped".to_string()
87            }
88        };
89
90        // Note: Real error rate and events per second would be fetched from metrics.
91        let uptime_secs = self.start_time.elapsed().as_secs();
92
93        let req = SyncRequest {
94            host_id: self.identity.host_id.clone(),
95            realm_id: self.identity.realm_id.clone(),
96            config_generation: current_generation,
97            health_summary: HealthSummary {
98                cpu_percent: cpu_usage,
99                memory_percent: mem_usage,
100                pipeline_status: status,
101                error_rate: 0.0,
102                events_per_second: 0.0,
103                uptime_secs,
104            },
105        };
106
107        let url = format!("{}/fleet/sync", self.nexus_url);
108        let resp = self
109            .client
110            .post(&url)
111            .header("Authorization", format!("Bearer {}", self.identity.token))
112            .json(&req)
113            .send()
114            .await?
115            .json::<SyncResponse>()
116            .await?;
117
118        if resp.status == "UPDATE_AVAILABLE" || resp.status == "UPDATE_REQUIRED" {
119            if resp.apply_now.unwrap_or(false) {
120                if let Some(r#gen) = resp.target_generation {
121                    self.apply_update(r#gen, topology).await?;
122                }
123            } else {
124                info!(
125                    "Update available (generation {}), but apply_now is false",
126                    resp.target_generation.unwrap_or(0)
127                );
128            }
129        }
130
131        Ok(())
132    }
133
134    async fn apply_update(
135        &self,
136        r#gen: u64,
137        topology: Arc<Mutex<Option<RunningTopology>>>,
138    ) -> Result<()> {
139        info!("Fetching new configuration generation: {}", r#gen);
140
141        let url = format!("{}/fleet/config/{}", self.nexus_url, r#gen);
142        let resp = self
143            .client
144            .get(&url)
145            .header("Authorization", format!("Bearer {}", self.identity.token))
146            .send()
147            .await?
148            .json::<ConfigResponse>()
149            .await?;
150
151        // SEC-5: Verify the provided checksum
152        let computed_checksum = hex::encode(sha2::Sha256::digest(resp.config.as_bytes()));
153        if computed_checksum != resp._checksum {
154            anyhow::bail!(
155                "Configuration checksum mismatch. Expected {}, got {}",
156                resp._checksum,
157                computed_checksum
158            );
159        }
160
161        let config_str = kinetic_config::interpolate_string(&resp.config)
162            .map_err(|e| anyhow::anyhow!("Interpolation failed: {}", e))?;
163        let new_config: kinetic_config::Config =
164            serde_yml::from_str(&config_str).context("Failed to parse fetched configuration")?;
165
166        // SEC-5: Ensure realm_id is verified (using pipeline_id as a proxy for realm or specific tenant isolation if available)
167        // Since Config doesn't strictly have realm_id at the root level in current implementation,
168        // we assume pipeline_id or a future field maps to it. Here we log it or enforce if it existed.
169        // For MVP, we at least validate the config.
170        new_config.validate()?;
171
172        let mut guard = topology.lock().await;
173        if let Some(running) = guard.as_mut() {
174            info!("Reloading topology with new configuration...");
175            running.reload(new_config).await?;
176        } else {
177            warn!("No running topology to reload");
178        }
179
180        *self.current_generation.lock().await = r#gen;
181
182        Ok(())
183    }
184}