Skip to main content

kinetic/topology/
builder.rs

1//! Builds and wires the DAG of pipeline components.
2
3use crate::topology::Fanout;
4use crate::topology::running::{ComponentTask, RunningTopology};
5use kinetic_common::config::{SinkContext, SourceContext, TransformContext};
6use kinetic_config::model::Config;
7use kinetic_core::{ComponentId, OutputId, ShutdownCoordinator};
8use snafu::Snafu;
9use std::collections::{HashMap, HashSet};
10use tracing::error;
11
12#[derive(Debug, Snafu)]
13pub enum Error {
14    #[snafu(display("Component '{}' references unknown input '{}'", component, input))]
15    UnknownInput { component: String, input: String },
16
17    #[snafu(display("Cycle detected in pipeline topology"))]
18    CycleDetected,
19
20    #[snafu(display("Internal error: {}", message))]
21    InternalError { message: String },
22}
23
24pub type Result<T, E = Error> = std::result::Result<T, E>;
25
26/// A parsed and validated reference to an upstream output.
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
28struct InputRef {
29    component: String,
30    output_name: String,
31}
32
33impl InputRef {
34    /// Parses an input string like "source_name" or "source_name.logs" into an `InputRef`.
35    /// If no output name is specified, defaults to "default".
36    fn parse(input: &str) -> Self {
37        match input.split_once('.') {
38            Some((comp, out)) => Self {
39                component: comp.to_string(),
40                output_name: out.to_string(),
41            },
42            None => Self {
43                component: input.to_string(),
44                output_name: "default".to_string(),
45            },
46        }
47    }
48}
49
50/// Context for running topology components
51pub struct TopologyContext {
52    pub transform_receivers: HashMap<String, kinetic_buffers::BufferReceiver>,
53    pub sink_receivers: HashMap<String, kinetic_buffers::BufferReceiver>,
54    pub shadow_archive_receiver: Option<kinetic_buffers::BufferReceiver>,
55    pub fanouts: HashMap<OutputId, Fanout>,
56}
57
58/// The topology builder validates the configuration and wires the channels between components.
59pub struct TopologyBuilder {
60    config: Config,
61    data_dir: std::path::PathBuf,
62}
63
64impl TopologyBuilder {
65    pub fn new(config: Config, data_dir: std::path::PathBuf) -> Self {
66        Self { config, data_dir }
67    }
68
69    pub fn build(&self) -> Result<TopologyContext> {
70        self.validate_references()?;
71        self.detect_cycles()?;
72
73        let mut fanouts: HashMap<OutputId, Fanout> = HashMap::new();
74        let mut transform_receivers = HashMap::new();
75        let mut sink_receivers = HashMap::new();
76        let mut component_senders = HashMap::new();
77
78        // Initialize shadow archive sink if configured.
79        // This is an invisible sink that captures raw data from all sources.
80        let mut shadow_archive_receiver = None;
81        if let Some(_archive_config) = &self.config.archive {
82            let id = "shadow_archive_sink".to_string();
83            // Larger capacity for archive to minimize backpressure impact
84            let (tx, rx) =
85                kinetic_buffers::channel(1000, kinetic_buffers::WhenFull::Block, id.clone());
86            shadow_archive_receiver = Some(rx);
87            component_senders.insert(id, tx);
88        }
89
90        // Initialize channels for all transforms and sinks.
91        for id in self.config.transforms.keys() {
92            let (tx, rx) =
93                kinetic_buffers::channel(100, kinetic_buffers::WhenFull::Block, id.clone());
94            transform_receivers.insert(id.clone(), rx);
95            component_senders.insert(id.clone(), tx);
96        }
97        for (id, sink) in &self.config.sinks {
98            let (tx, rx) = match sink.buffer() {
99                kinetic_config::model::BufferConfig::Memory { max_batches } => {
100                    kinetic_buffers::channel(
101                        *max_batches,
102                        kinetic_buffers::WhenFull::Block,
103                        id.clone(),
104                    )
105                }
106                kinetic_config::model::BufferConfig::Disk {
107                    path,
108                    max_size_bytes,
109                } => kinetic_buffers::disk::channel(path.clone(), id.clone(), *max_size_bytes)
110                    .map_err(|e| Error::InternalError {
111                        message: format!("Failed to create disk buffer for sink '{}': {}", id, e),
112                    })?,
113            };
114            sink_receivers.insert(id.clone(), rx);
115            component_senders.insert(id.clone(), tx);
116        }
117
118        for id in self.config.sources.keys() {
119            let fanout = Fanout::new();
120
121            if let Some(shadow_tx) = component_senders.get("shadow_archive_sink") {
122                fanout.add_sender(shadow_tx.clone());
123            }
124
125            fanouts.insert(
126                OutputId {
127                    component: ComponentId(id.clone()),
128                    output_name: "default".to_string(),
129                },
130                fanout,
131            );
132            fanouts.insert(
133                OutputId {
134                    component: ComponentId(id.clone()),
135                    output_name: "error".to_string(),
136                },
137                Fanout::new(),
138            );
139        }
140        for (id, transform) in &self.config.transforms {
141            fanouts.insert(
142                OutputId {
143                    component: ComponentId(id.clone()),
144                    output_name: "default".to_string(),
145                },
146                Fanout::new(),
147            );
148            fanouts.insert(
149                OutputId {
150                    component: ComponentId(id.clone()),
151                    output_name: "error".to_string(),
152                },
153                Fanout::new(),
154            );
155            for output_name in transform.outputs().keys() {
156                fanouts.insert(
157                    OutputId {
158                        component: ComponentId(id.clone()),
159                        output_name: output_name.clone(),
160                    },
161                    Fanout::new(),
162                );
163            }
164        }
165
166        let inputs = self
167            .config
168            .transforms
169            .iter()
170            .map(|(id, t)| (id, t.inputs()))
171            .chain(self.config.sinks.iter().map(|(id, s)| (id, s.inputs())));
172
173        for (component_id, inputs) in inputs {
174            for input_str in inputs {
175                let parsed = InputRef::parse(input_str);
176                let output_id = OutputId {
177                    component: ComponentId(parsed.component),
178                    output_name: parsed.output_name,
179                };
180                let fanout = fanouts
181                    .get_mut(&output_id)
182                    .ok_or_else(|| Error::UnknownInput {
183                        component: component_id.clone(),
184                        input: input_str.clone(),
185                    })?;
186                let sender = component_senders
187                    .get(component_id)
188                    .ok_or_else(|| Error::InternalError {
189                        message: format!("Sender missing for component: {}", component_id),
190                    })?
191                    .clone();
192                fanout.add_sender(sender);
193            }
194        }
195
196        Ok(TopologyContext {
197            transform_receivers,
198            sink_receivers,
199            shadow_archive_receiver,
200            fanouts,
201        })
202    }
203
204    pub async fn spawn(
205        self,
206        diff: Option<&kinetic_config::model::ConfigDiff>,
207    ) -> anyhow::Result<RunningTopology> {
208        let context = self.build()?;
209        self.spawn_with_context(context, diff).await
210    }
211
212    pub async fn spawn_with_context(
213        self,
214        mut context: TopologyContext,
215        diff: Option<&kinetic_config::model::ConfigDiff>,
216    ) -> anyhow::Result<RunningTopology> {
217        // 1. Run healthchecks for NEW or CHANGED sinks and sources
218        let mut healthchecks = Vec::new();
219        for (id, sink_config) in &self.config.sinks {
220            if diff.is_none_or(|d| d.sinks_added.contains(id) || d.sinks_changed.contains(id))
221                && let Some(hc) =
222                    crate::topology::build_impl::build_sink_healthcheck(sink_config, id.clone())
223                        .await?
224            {
225                healthchecks.push((id.clone(), hc));
226            }
227        }
228
229        for (id, source_config) in &self.config.sources {
230            if diff.is_none_or(|d| d.sources_added.contains(id) || d.sources_changed.contains(id))
231                && let Some(hc) = crate::topology::build_impl::build_source_healthcheck(
232                    source_config,
233                    id.clone(),
234                    self.config.pipeline_id.clone(),
235                )
236                .await?
237            {
238                healthchecks.push((id.clone(), hc));
239            }
240        }
241
242        if !healthchecks.is_empty() {
243            crate::topology::healthcheck::run_all(healthchecks)
244                .await
245                .map_err(|e| Error::InternalError {
246                    message: format!("Healthchecks failed: {}", e),
247                })?;
248        }
249
250        let mut sources = HashMap::new();
251        let mut transforms = HashMap::new();
252        let mut sinks = HashMap::new();
253
254        // 2. Spawn components with error cleanup
255        let result = self
256            .spawn_inner(
257                &mut context,
258                diff,
259                &mut sources,
260                &mut transforms,
261                &mut sinks,
262            )
263            .await;
264
265        if let Err(e) = result {
266            error!("Failed to spawn topology, cleaning up partial tasks: {}", e);
267            let partial = RunningTopology::new(
268                self.config.clone(),
269                self.data_dir.clone(),
270                sources,
271                transforms,
272                sinks,
273                context.fanouts,
274            );
275            partial.stop().await;
276            return Err(e);
277        }
278
279        Ok(RunningTopology::new(
280            self.config,
281            self.data_dir,
282            sources,
283            transforms,
284            sinks,
285            context.fanouts,
286        ))
287    }
288
289    async fn spawn_inner(
290        &self,
291        context: &mut TopologyContext,
292        diff: Option<&kinetic_config::model::ConfigDiff>,
293        sources: &mut HashMap<String, ComponentTask>,
294        transforms: &mut HashMap<String, ComponentTask>,
295        sinks: &mut HashMap<String, ComponentTask>,
296    ) -> anyhow::Result<()> {
297        // Spawn shadow archive sink if configured
298        if let Some(archive_config) = self.config.archive.clone()
299            && let Some(receiver) = context.shadow_archive_receiver.take()
300        {
301            let coordinator = ShutdownCoordinator::new();
302            let sink = crate::sinks::shadow_archive::ShadowArchiveSink::new(
303                archive_config,
304                receiver,
305                coordinator.register(),
306                self.config.pipeline_id.clone(),
307            );
308            let handle = sink.run();
309            sinks.insert(
310                "shadow_archive_sink".to_string(),
311                ComponentTask {
312                    coordinator,
313                    handle,
314                },
315            );
316        }
317
318        for (id, source_config) in &self.config.sources {
319            if diff.is_none_or(|d| d.sources_added.contains(id) || d.sources_changed.contains(id)) {
320                let output_id = OutputId {
321                    component: ComponentId(id.clone()),
322                    output_name: "default".to_string(),
323                };
324                let fanout = context
325                    .fanouts
326                    .get(&output_id)
327                    .ok_or_else(|| anyhow::anyhow!("Default fanout missing for source {}", id))?;
328                let error_output_id = OutputId {
329                    component: ComponentId(id.clone()),
330                    output_name: "error".to_string(),
331                };
332                let error_fanout = context
333                    .fanouts
334                    .get(&error_output_id)
335                    .ok_or_else(|| anyhow::anyhow!("Error fanout missing for source {}", id))?;
336
337                let coordinator = ShutdownCoordinator::new();
338                let cx = SourceContext {
339                    id: ComponentId(id.clone()),
340                    pipeline_id: self.config.pipeline_id.clone(),
341                    out: fanout.clone_to_sender(id.clone()),
342                    error_out: error_fanout.clone_to_sender(format!("{}_error", id)),
343                    shutdown: coordinator.register(),
344                    acknowledgements: Default::default(),
345                    data_dir: Some(self.data_dir.join("sources").join(id)),
346                };
347                let handle = crate::topology::build_impl::build_source(source_config, cx)?;
348                sources.insert(
349                    id.clone(),
350                    ComponentTask {
351                        coordinator,
352                        handle,
353                    },
354                );
355            }
356        }
357
358        for (id, transform_config) in &self.config.transforms {
359            if diff.is_none_or(|d| {
360                d.transforms_added.contains(id) || d.transforms_changed.contains(id)
361            }) {
362                let receiver = context
363                    .transform_receivers
364                    .remove(id)
365                    .ok_or_else(|| anyhow::anyhow!("Receiver missing"))?;
366
367                let mut outs = HashMap::new();
368                for output_name in transform_config.outputs().keys() {
369                    let output_id = OutputId {
370                        component: ComponentId(id.clone()),
371                        output_name: output_name.clone(),
372                    };
373                    let fanout = context
374                        .fanouts
375                        .get(&output_id)
376                        .ok_or_else(|| anyhow::anyhow!("Fanout missing"))?;
377                    outs.insert(
378                        output_name.clone(),
379                        fanout.clone_to_sender(format!("{}_{}", id, output_name)),
380                    );
381                }
382                if !outs.contains_key("default") {
383                    let output_id = OutputId {
384                        component: ComponentId(id.clone()),
385                        output_name: "default".to_string(),
386                    };
387                    if let Some(fanout) = context.fanouts.get(&output_id) {
388                        outs.insert(
389                            "default".to_string(),
390                            fanout.clone_to_sender(format!("{}_default", id)),
391                        );
392                    }
393                }
394
395                let coordinator = ShutdownCoordinator::new();
396                let cx = TransformContext {
397                    id: ComponentId(id.clone()),
398                    pipeline_id: self.config.pipeline_id.clone(),
399                    in_rx: receiver,
400                    outs,
401                    shutdown: coordinator.register(),
402                    data_dir: Some(self.data_dir.join("transforms").join(id)),
403                };
404                let handle = crate::topology::build_impl::build_transform(transform_config, cx)?;
405                transforms.insert(
406                    id.clone(),
407                    ComponentTask {
408                        coordinator,
409                        handle,
410                    },
411                );
412            }
413        }
414
415        for (id, sink_config) in &self.config.sinks {
416            if diff.is_none_or(|d| d.sinks_added.contains(id) || d.sinks_changed.contains(id)) {
417                let receiver = context
418                    .sink_receivers
419                    .remove(id)
420                    .ok_or_else(|| anyhow::anyhow!("Receiver missing"))?;
421
422                let coordinator = ShutdownCoordinator::new();
423                let cx = SinkContext {
424                    id: ComponentId(id.clone()),
425                    in_rx: receiver,
426                    shutdown: coordinator.register(),
427                    acknowledgements: Default::default(),
428                    data_dir: Some(self.data_dir.join("sinks").join(id)),
429                };
430                let handle = crate::topology::build_impl::build_sink(sink_config, cx)?;
431                sinks.insert(
432                    id.clone(),
433                    ComponentTask {
434                        coordinator,
435                        handle,
436                    },
437                );
438            }
439        }
440
441        Ok(())
442    }
443
444    fn validate_references(&self) -> Result<()> {
445        let valid_components: HashSet<&String> = self
446            .config
447            .sources
448            .keys()
449            .chain(self.config.transforms.keys())
450            .collect();
451        for (id, transform) in &self.config.transforms {
452            self.check_inputs(id, transform.inputs(), &valid_components)?;
453        }
454        for (id, sink) in &self.config.sinks {
455            self.check_inputs(id, sink.inputs(), &valid_components)?;
456        }
457        Ok(())
458    }
459
460    fn check_inputs(
461        &self,
462        component_id: &str,
463        inputs: &[String],
464        valid_components: &HashSet<&String>,
465    ) -> Result<()> {
466        for input in inputs {
467            let parsed = InputRef::parse(input);
468            if !valid_components.contains(&parsed.component) {
469                return Err(Error::UnknownInput {
470                    component: component_id.to_owned(),
471                    input: input.clone(),
472                });
473            }
474        }
475        Ok(())
476    }
477
478    fn detect_cycles(&self) -> Result<()> {
479        let mut visited = HashSet::new();
480        let mut visiting = HashSet::new();
481        let mut graph: HashMap<String, Vec<String>> = HashMap::new();
482        for (id, transform) in &self.config.transforms {
483            for input in transform.inputs() {
484                graph
485                    .entry(InputRef::parse(input).component)
486                    .or_default()
487                    .push(id.clone());
488            }
489        }
490        for (id, sink) in &self.config.sinks {
491            for input in sink.inputs() {
492                graph
493                    .entry(InputRef::parse(input).component)
494                    .or_default()
495                    .push(id.clone());
496            }
497        }
498        for node in graph.keys() {
499            if !visited.contains(node) && self.dfs_cycle(node, &graph, &mut visited, &mut visiting)
500            {
501                return Err(Error::CycleDetected);
502            }
503        }
504        Ok(())
505    }
506
507    fn dfs_cycle(
508        &self,
509        node: &str,
510        graph: &HashMap<String, Vec<String>>,
511        visited: &mut HashSet<String>,
512        visiting: &mut HashSet<String>,
513    ) -> bool {
514        visiting.insert(node.to_string());
515        if let Some(neighbors) = graph.get(node) {
516            for neighbor in neighbors {
517                if visiting.contains(neighbor) {
518                    return true;
519                }
520                if !visited.contains(neighbor) && self.dfs_cycle(neighbor, graph, visited, visiting)
521                {
522                    return true;
523                }
524            }
525        }
526        visiting.remove(node);
527        visited.insert(node.to_string());
528        false
529    }
530}