kinetic/agent/
config_poller.rs1use crate::agent::HealthSummary;
4use crate::agent::identity::HostIdentity;
5use crate::topology::RunningTopology;
6use anyhow::{Context, Result};
7use serde::{Deserialize, Serialize};
8use sha2::Digest;
9use std::sync::Arc;
10use std::time::Instant;
11use sysinfo::{CpuRefreshKind, MemoryRefreshKind, System};
12use tokio::sync::Mutex;
13use tracing::{info, warn};
14
15pub struct ConfigPoller {
16 pub nexus_url: String,
17 pub identity: Arc<HostIdentity>,
18 pub client: reqwest::Client,
19 pub current_generation: Mutex<u64>,
20 pub sys: Mutex<System>,
21 pub start_time: Instant,
22}
23
24#[derive(Serialize)]
25struct SyncRequest {
26 #[serde(rename = "hostId")]
27 pub host_id: String,
28 #[serde(rename = "realmId")]
29 pub realm_id: String,
30 #[serde(rename = "configGeneration")]
31 pub config_generation: u64,
32 #[serde(rename = "healthSummary")]
33 pub health_summary: HealthSummary,
34}
35
36#[derive(Deserialize)]
37struct SyncResponse {
38 pub status: String, #[serde(rename = "targetGeneration")]
40 pub target_generation: Option<u64>,
41 #[serde(rename = "applyNow")]
42 pub apply_now: Option<bool>,
43 pub _checksum: Option<String>,
44}
45
46#[derive(Deserialize)]
47struct ConfigResponse {
48 pub config: String,
49 pub _generation: u64,
50 pub _checksum: String,
51}
52
53impl ConfigPoller {
54 pub fn new(nexus_url: String, identity: Arc<HostIdentity>, _interval: u64) -> Self {
55 let mut sys = System::new_with_specifics(
56 sysinfo::RefreshKind::nothing()
57 .with_cpu(CpuRefreshKind::everything())
58 .with_memory(MemoryRefreshKind::everything()),
59 );
60 sys.refresh_all();
61
62 Self {
63 nexus_url,
64 identity,
65 client: reqwest::Client::new(),
66 current_generation: Mutex::new(0),
67 sys: Mutex::new(sys),
68 start_time: Instant::now(),
69 }
70 }
71
72 pub async fn sync(&self, topology: Arc<Mutex<Option<RunningTopology>>>) -> Result<()> {
73 let current_generation = *self.current_generation.lock().await;
74
75 let mut sys = self.sys.lock().await;
76 sys.refresh_all();
77
78 let cpu_usage = sys.global_cpu_usage() as f64;
79 let mem_usage = (sys.used_memory() as f64 / sys.total_memory() as f64) * 100.0;
80
81 let status = {
82 let guard = topology.lock().await;
83 if guard.is_some() {
84 "running".to_string()
85 } else {
86 "stopped".to_string()
87 }
88 };
89
90 let uptime_secs = self.start_time.elapsed().as_secs();
92
93 let req = SyncRequest {
94 host_id: self.identity.host_id.clone(),
95 realm_id: self.identity.realm_id.clone(),
96 config_generation: current_generation,
97 health_summary: HealthSummary {
98 cpu_percent: cpu_usage,
99 memory_percent: mem_usage,
100 pipeline_status: status,
101 error_rate: 0.0,
102 events_per_second: 0.0,
103 uptime_secs,
104 },
105 };
106
107 let url = format!("{}/fleet/sync", self.nexus_url);
108 let resp = self
109 .client
110 .post(&url)
111 .header("Authorization", format!("Bearer {}", self.identity.token))
112 .json(&req)
113 .send()
114 .await?
115 .json::<SyncResponse>()
116 .await?;
117
118 if resp.status == "UPDATE_AVAILABLE" || resp.status == "UPDATE_REQUIRED" {
119 if resp.apply_now.unwrap_or(false) {
120 if let Some(r#gen) = resp.target_generation {
121 self.apply_update(r#gen, topology).await?;
122 }
123 } else {
124 info!(
125 "Update available (generation {}), but apply_now is false",
126 resp.target_generation.unwrap_or(0)
127 );
128 }
129 }
130
131 Ok(())
132 }
133
134 async fn apply_update(
135 &self,
136 r#gen: u64,
137 topology: Arc<Mutex<Option<RunningTopology>>>,
138 ) -> Result<()> {
139 info!("Fetching new configuration generation: {}", r#gen);
140
141 let url = format!("{}/fleet/config/{}", self.nexus_url, r#gen);
142 let resp = self
143 .client
144 .get(&url)
145 .header("Authorization", format!("Bearer {}", self.identity.token))
146 .send()
147 .await?
148 .json::<ConfigResponse>()
149 .await?;
150
151 let computed_checksum = hex::encode(sha2::Sha256::digest(resp.config.as_bytes()));
153 if computed_checksum != resp._checksum {
154 anyhow::bail!(
155 "Configuration checksum mismatch. Expected {}, got {}",
156 resp._checksum,
157 computed_checksum
158 );
159 }
160
161 let config_str = kinetic_config::interpolate_string(&resp.config)
162 .map_err(|e| anyhow::anyhow!("Interpolation failed: {}", e))?;
163 let new_config: kinetic_config::Config =
164 serde_yml::from_str(&config_str).context("Failed to parse fetched configuration")?;
165
166 new_config.validate()?;
171
172 let mut guard = topology.lock().await;
173 if let Some(running) = guard.as_mut() {
174 info!("Reloading topology with new configuration...");
175 running.reload(new_config).await?;
176 } else {
177 warn!("No running topology to reload");
178 }
179
180 *self.current_generation.lock().await = r#gen;
181
182 Ok(())
183 }
184}