kinetic/topology/
running.rs1use crate::topology::{Fanout, TopologyBuilder};
4use kinetic_config::model::Config;
5use kinetic_core::{OutputId, ShutdownCoordinator, ShutdownReason};
6use std::collections::HashMap;
7use tokio::task::JoinHandle;
8use tracing::info;
9
10pub struct ComponentTask {
12 pub coordinator: ShutdownCoordinator,
13 pub handle: JoinHandle<()>,
14}
15
16pub struct RunningTopology {
18 config: Config,
19 data_dir: std::path::PathBuf,
20 pub sources: HashMap<String, ComponentTask>,
21 pub transforms: HashMap<String, ComponentTask>,
22 pub sinks: HashMap<String, ComponentTask>,
23 pub fanouts: HashMap<OutputId, Fanout>,
24}
25
26impl RunningTopology {
27 pub fn new(
29 config: Config,
30 data_dir: std::path::PathBuf,
31 sources: HashMap<String, ComponentTask>,
32 transforms: HashMap<String, ComponentTask>,
33 sinks: HashMap<String, ComponentTask>,
34 fanouts: HashMap<OutputId, Fanout>,
35 ) -> Self {
36 Self {
37 config,
38 data_dir,
39 sources,
40 transforms,
41 sinks,
42 fanouts,
43 }
44 }
45
46 pub async fn stop(mut self) {
48 info!("Initiating graceful shutdown of all components...");
49
50 for task in self.sources.values() {
53 task.coordinator.broadcast(ShutdownReason::Graceful);
54 }
55 for task in self.transforms.values() {
56 task.coordinator.broadcast(ShutdownReason::Graceful);
57 }
58 for task in self.sinks.values() {
59 task.coordinator.broadcast(ShutdownReason::Graceful);
60 }
61
62 for (name, task) in self.sources.drain() {
64 let _ = task.handle.await;
65 info!("Source '{}' stopped", name);
66 }
67
68 for (name, task) in self.transforms.drain() {
69 let _ = task.handle.await;
70 info!("Transform '{}' stopped", name);
71 }
72
73 for (name, task) in self.sinks.drain() {
74 let _ = task.handle.await;
75 info!("Sink '{}' stopped", name);
76 }
77
78 info!("Shutdown complete.");
79 }
80
81 pub async fn reload(&mut self, new_config: Config) -> anyhow::Result<()> {
83 let diff = self.config.diff(&new_config);
84 if diff.is_empty() {
85 info!("No changes detected in configuration reload.");
86 return Ok(());
87 }
88
89 info!(
90 "Reloading topology with granular changes: added={}, removed={}, changed={}",
91 diff.sources_added.len() + diff.transforms_added.len() + diff.sinks_added.len(),
92 diff.sources_removed.len() + diff.transforms_removed.len() + diff.sinks_removed.len(),
93 diff.sources_changed.len() + diff.transforms_changed.len() + diff.sinks_changed.len()
94 );
95
96 let builder = TopologyBuilder::new(new_config.clone(), self.data_dir.clone());
97 let new_context = builder.build().map_err(|e| anyhow::anyhow!(e))?;
98
99 for (output_id, new_fanout) in &new_context.fanouts {
103 if let Some(existing_fanout) = self.fanouts.get(output_id) {
104 existing_fanout.replace_senders(new_fanout.get_senders());
105 } else {
106 self.fanouts.insert(output_id.clone(), new_fanout.clone());
108 }
109 }
110
111 for name in diff.sources_removed.iter().chain(&diff.sources_changed) {
113 if let Some(task) = self.sources.remove(name) {
114 task.coordinator.broadcast(ShutdownReason::Reload);
115 let _ = task.handle.await;
116 }
117 }
118 for name in diff
119 .transforms_removed
120 .iter()
121 .chain(&diff.transforms_changed)
122 {
123 if let Some(task) = self.transforms.remove(name) {
124 task.coordinator.broadcast(ShutdownReason::Reload);
125 let _ = task.handle.await;
126 }
127 }
128 for name in diff.sinks_removed.iter().chain(&diff.sinks_changed) {
129 if let Some(task) = self.sinks.remove(name) {
130 task.coordinator.broadcast(ShutdownReason::Reload);
131 let _ = task.handle.await;
132 }
133 }
134
135 let new_running = builder.spawn_with_context(new_context, Some(&diff)).await?;
137
138 for (name, task) in new_running.sources {
140 if diff.sources_added.contains(&name) || diff.sources_changed.contains(&name) {
141 self.sources.insert(name, task);
142 }
143 }
144 for (name, task) in new_running.transforms {
145 if diff.transforms_added.contains(&name) || diff.transforms_changed.contains(&name) {
146 self.transforms.insert(name, task);
147 }
148 }
149 for (name, task) in new_running.sinks {
150 if diff.sinks_added.contains(&name) || diff.sinks_changed.contains(&name) {
151 self.sinks.insert(name, task);
152 }
153 }
154
155 self.fanouts = new_running.fanouts;
157
158 self.config = new_config;
159 info!("Granular topology reload completed.");
160 Ok(())
161 }
162}