Skip to main content

kinetic/topology/
running.rs

1//! Management of a running topology, including graceful shutdown and hot-reload.
2
3use crate::topology::{Fanout, TopologyBuilder};
4use kinetic_config::model::Config;
5use kinetic_core::{OutputId, ShutdownCoordinator, ShutdownReason};
6use std::collections::HashMap;
7use tokio::task::JoinHandle;
8use tracing::info;
9
10/// A handle to a running component task.
11pub struct ComponentTask {
12    pub coordinator: ShutdownCoordinator,
13    pub handle: JoinHandle<()>,
14}
15
16/// A handle to a running topology.
17pub struct RunningTopology {
18    config: Config,
19    data_dir: std::path::PathBuf,
20    pub sources: HashMap<String, ComponentTask>,
21    pub transforms: HashMap<String, ComponentTask>,
22    pub sinks: HashMap<String, ComponentTask>,
23    pub fanouts: HashMap<OutputId, Fanout>,
24}
25
26impl RunningTopology {
27    /// Create a new `RunningTopology` from a set of spawned tasks.
28    pub fn new(
29        config: Config,
30        data_dir: std::path::PathBuf,
31        sources: HashMap<String, ComponentTask>,
32        transforms: HashMap<String, ComponentTask>,
33        sinks: HashMap<String, ComponentTask>,
34        fanouts: HashMap<OutputId, Fanout>,
35    ) -> Self {
36        Self {
37            config,
38            data_dir,
39            sources,
40            transforms,
41            sinks,
42            fanouts,
43        }
44    }
45
46    /// Triggers a graceful shutdown and waits for all components to drain.
47    pub async fn stop(mut self) {
48        info!("Initiating graceful shutdown of all components...");
49
50        // 1. Broadcast shutdown to all components first.
51        // This ensures everyone knows to stop accepting new work and start draining.
52        for task in self.sources.values() {
53            task.coordinator.broadcast(ShutdownReason::Graceful);
54        }
55        for task in self.transforms.values() {
56            task.coordinator.broadcast(ShutdownReason::Graceful);
57        }
58        for task in self.sinks.values() {
59            task.coordinator.broadcast(ShutdownReason::Graceful);
60        }
61
62        // 2. Await them in order (Sources -> Transforms -> Sinks) to ensure data flows through.
63        for (name, task) in self.sources.drain() {
64            let _ = task.handle.await;
65            info!("Source '{}' stopped", name);
66        }
67
68        for (name, task) in self.transforms.drain() {
69            let _ = task.handle.await;
70            info!("Transform '{}' stopped", name);
71        }
72
73        for (name, task) in self.sinks.drain() {
74            let _ = task.handle.await;
75            info!("Sink '{}' stopped", name);
76        }
77
78        info!("Shutdown complete.");
79    }
80
81    /// Reloads the topology with a new configuration using granular diffing.
82    pub async fn reload(&mut self, new_config: Config) -> anyhow::Result<()> {
83        let diff = self.config.diff(&new_config);
84        if diff.is_empty() {
85            info!("No changes detected in configuration reload.");
86            return Ok(());
87        }
88
89        info!(
90            "Reloading topology with granular changes: added={}, removed={}, changed={}",
91            diff.sources_added.len() + diff.transforms_added.len() + diff.sinks_added.len(),
92            diff.sources_removed.len() + diff.transforms_removed.len() + diff.sinks_removed.len(),
93            diff.sources_changed.len() + diff.transforms_changed.len() + diff.sinks_changed.len()
94        );
95
96        let builder = TopologyBuilder::new(new_config.clone(), self.data_dir.clone());
97        let new_context = builder.build().map_err(|e| anyhow::anyhow!(e))?;
98
99        // 1. Update existing Fanouts with new destinations.
100        // This ensures that existing components (which are still running and have a handle to their Fanout)
101        // can see the new downstream destinations that were wired in the new context.
102        for (output_id, new_fanout) in &new_context.fanouts {
103            if let Some(existing_fanout) = self.fanouts.get(output_id) {
104                existing_fanout.replace_senders(new_fanout.get_senders());
105            } else {
106                // This is a new output from an existing component (shouldn't happen for sources, maybe for transforms)
107                self.fanouts.insert(output_id.clone(), new_fanout.clone());
108            }
109        }
110
111        // 2. Stop removed and changed components
112        for name in diff.sources_removed.iter().chain(&diff.sources_changed) {
113            if let Some(task) = self.sources.remove(name) {
114                task.coordinator.broadcast(ShutdownReason::Reload);
115                let _ = task.handle.await;
116            }
117        }
118        for name in diff
119            .transforms_removed
120            .iter()
121            .chain(&diff.transforms_changed)
122        {
123            if let Some(task) = self.transforms.remove(name) {
124                task.coordinator.broadcast(ShutdownReason::Reload);
125                let _ = task.handle.await;
126            }
127        }
128        for name in diff.sinks_removed.iter().chain(&diff.sinks_changed) {
129            if let Some(task) = self.sinks.remove(name) {
130                task.coordinator.broadcast(ShutdownReason::Reload);
131                let _ = task.handle.await;
132            }
133        }
134
135        // 3. Spawn added and changed components
136        let new_running = builder.spawn_with_context(new_context, Some(&diff)).await?;
137
138        // Merge the new tasks into the existing ones
139        for (name, task) in new_running.sources {
140            if diff.sources_added.contains(&name) || diff.sources_changed.contains(&name) {
141                self.sources.insert(name, task);
142            }
143        }
144        for (name, task) in new_running.transforms {
145            if diff.transforms_added.contains(&name) || diff.transforms_changed.contains(&name) {
146                self.transforms.insert(name, task);
147            }
148        }
149        for (name, task) in new_running.sinks {
150            if diff.sinks_added.contains(&name) || diff.sinks_changed.contains(&name) {
151                self.sinks.insert(name, task);
152            }
153        }
154
155        // Update the set of fanouts we track
156        self.fanouts = new_running.fanouts;
157
158        self.config = new_config;
159        info!("Granular topology reload completed.");
160        Ok(())
161    }
162}