1use crate::topology::Fanout;
4use crate::topology::running::{ComponentTask, RunningTopology};
5use kinetic_common::config::{SinkContext, SourceContext, TransformContext};
6use kinetic_config::model::Config;
7use kinetic_core::{ComponentId, OutputId, ShutdownCoordinator};
8use snafu::Snafu;
9use std::collections::{HashMap, HashSet};
10use tracing::error;
11
12#[derive(Debug, Snafu)]
13pub enum Error {
14 #[snafu(display("Component '{}' references unknown input '{}'", component, input))]
15 UnknownInput { component: String, input: String },
16
17 #[snafu(display("Cycle detected in pipeline topology"))]
18 CycleDetected,
19
20 #[snafu(display("Internal error: {}", message))]
21 InternalError { message: String },
22}
23
24pub type Result<T, E = Error> = std::result::Result<T, E>;
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
28struct InputRef {
29 component: String,
30 output_name: String,
31}
32
33impl InputRef {
34 fn parse(input: &str) -> Self {
37 match input.split_once('.') {
38 Some((comp, out)) => Self {
39 component: comp.to_string(),
40 output_name: out.to_string(),
41 },
42 None => Self {
43 component: input.to_string(),
44 output_name: "default".to_string(),
45 },
46 }
47 }
48}
49
50pub struct TopologyContext {
52 pub transform_receivers: HashMap<String, kinetic_buffers::BufferReceiver>,
53 pub sink_receivers: HashMap<String, kinetic_buffers::BufferReceiver>,
54 pub shadow_archive_receiver: Option<kinetic_buffers::BufferReceiver>,
55 pub fanouts: HashMap<OutputId, Fanout>,
56}
57
58pub struct TopologyBuilder {
60 config: Config,
61 data_dir: std::path::PathBuf,
62}
63
64impl TopologyBuilder {
65 pub fn new(config: Config, data_dir: std::path::PathBuf) -> Self {
66 Self { config, data_dir }
67 }
68
69 pub fn build(&self) -> Result<TopologyContext> {
70 self.validate_references()?;
71 self.detect_cycles()?;
72
73 let mut fanouts: HashMap<OutputId, Fanout> = HashMap::new();
74 let mut transform_receivers = HashMap::new();
75 let mut sink_receivers = HashMap::new();
76 let mut component_senders = HashMap::new();
77
78 let mut shadow_archive_receiver = None;
81 if let Some(_archive_config) = &self.config.archive {
82 let id = "shadow_archive_sink".to_string();
83 let (tx, rx) =
85 kinetic_buffers::channel(1000, kinetic_buffers::WhenFull::Block, id.clone());
86 shadow_archive_receiver = Some(rx);
87 component_senders.insert(id, tx);
88 }
89
90 for id in self.config.transforms.keys() {
92 let (tx, rx) =
93 kinetic_buffers::channel(100, kinetic_buffers::WhenFull::Block, id.clone());
94 transform_receivers.insert(id.clone(), rx);
95 component_senders.insert(id.clone(), tx);
96 }
97 for (id, sink) in &self.config.sinks {
98 let (tx, rx) = match sink.buffer() {
99 kinetic_config::model::BufferConfig::Memory { max_batches } => {
100 kinetic_buffers::channel(
101 *max_batches,
102 kinetic_buffers::WhenFull::Block,
103 id.clone(),
104 )
105 }
106 kinetic_config::model::BufferConfig::Disk {
107 path,
108 max_size_bytes,
109 } => kinetic_buffers::disk::channel(path.clone(), id.clone(), *max_size_bytes)
110 .map_err(|e| Error::InternalError {
111 message: format!("Failed to create disk buffer for sink '{}': {}", id, e),
112 })?,
113 };
114 sink_receivers.insert(id.clone(), rx);
115 component_senders.insert(id.clone(), tx);
116 }
117
118 for id in self.config.sources.keys() {
119 let fanout = Fanout::new();
120
121 if let Some(shadow_tx) = component_senders.get("shadow_archive_sink") {
122 fanout.add_sender(shadow_tx.clone());
123 }
124
125 fanouts.insert(
126 OutputId {
127 component: ComponentId(id.clone()),
128 output_name: "default".to_string(),
129 },
130 fanout,
131 );
132 fanouts.insert(
133 OutputId {
134 component: ComponentId(id.clone()),
135 output_name: "error".to_string(),
136 },
137 Fanout::new(),
138 );
139 }
140 for (id, transform) in &self.config.transforms {
141 fanouts.insert(
142 OutputId {
143 component: ComponentId(id.clone()),
144 output_name: "default".to_string(),
145 },
146 Fanout::new(),
147 );
148 fanouts.insert(
149 OutputId {
150 component: ComponentId(id.clone()),
151 output_name: "error".to_string(),
152 },
153 Fanout::new(),
154 );
155 for output_name in transform.outputs().keys() {
156 fanouts.insert(
157 OutputId {
158 component: ComponentId(id.clone()),
159 output_name: output_name.clone(),
160 },
161 Fanout::new(),
162 );
163 }
164 }
165
166 let inputs = self
167 .config
168 .transforms
169 .iter()
170 .map(|(id, t)| (id, t.inputs()))
171 .chain(self.config.sinks.iter().map(|(id, s)| (id, s.inputs())));
172
173 for (component_id, inputs) in inputs {
174 for input_str in inputs {
175 let parsed = InputRef::parse(input_str);
176 let output_id = OutputId {
177 component: ComponentId(parsed.component),
178 output_name: parsed.output_name,
179 };
180 let fanout = fanouts
181 .get_mut(&output_id)
182 .ok_or_else(|| Error::UnknownInput {
183 component: component_id.clone(),
184 input: input_str.clone(),
185 })?;
186 let sender = component_senders
187 .get(component_id)
188 .ok_or_else(|| Error::InternalError {
189 message: format!("Sender missing for component: {}", component_id),
190 })?
191 .clone();
192 fanout.add_sender(sender);
193 }
194 }
195
196 Ok(TopologyContext {
197 transform_receivers,
198 sink_receivers,
199 shadow_archive_receiver,
200 fanouts,
201 })
202 }
203
204 pub async fn spawn(
205 self,
206 diff: Option<&kinetic_config::model::ConfigDiff>,
207 ) -> anyhow::Result<RunningTopology> {
208 let context = self.build()?;
209 self.spawn_with_context(context, diff).await
210 }
211
212 pub async fn spawn_with_context(
213 self,
214 mut context: TopologyContext,
215 diff: Option<&kinetic_config::model::ConfigDiff>,
216 ) -> anyhow::Result<RunningTopology> {
217 let mut healthchecks = Vec::new();
219 for (id, sink_config) in &self.config.sinks {
220 if diff.is_none_or(|d| d.sinks_added.contains(id) || d.sinks_changed.contains(id))
221 && let Some(hc) =
222 crate::topology::build_impl::build_sink_healthcheck(sink_config, id.clone())
223 .await?
224 {
225 healthchecks.push((id.clone(), hc));
226 }
227 }
228
229 for (id, source_config) in &self.config.sources {
230 if diff.is_none_or(|d| d.sources_added.contains(id) || d.sources_changed.contains(id))
231 && let Some(hc) = crate::topology::build_impl::build_source_healthcheck(
232 source_config,
233 id.clone(),
234 self.config.pipeline_id.clone(),
235 )
236 .await?
237 {
238 healthchecks.push((id.clone(), hc));
239 }
240 }
241
242 if !healthchecks.is_empty() {
243 crate::topology::healthcheck::run_all(healthchecks)
244 .await
245 .map_err(|e| Error::InternalError {
246 message: format!("Healthchecks failed: {}", e),
247 })?;
248 }
249
250 let mut sources = HashMap::new();
251 let mut transforms = HashMap::new();
252 let mut sinks = HashMap::new();
253
254 let result = self
256 .spawn_inner(
257 &mut context,
258 diff,
259 &mut sources,
260 &mut transforms,
261 &mut sinks,
262 )
263 .await;
264
265 if let Err(e) = result {
266 error!("Failed to spawn topology, cleaning up partial tasks: {}", e);
267 let partial = RunningTopology::new(
268 self.config.clone(),
269 self.data_dir.clone(),
270 sources,
271 transforms,
272 sinks,
273 context.fanouts,
274 );
275 partial.stop().await;
276 return Err(e);
277 }
278
279 Ok(RunningTopology::new(
280 self.config,
281 self.data_dir,
282 sources,
283 transforms,
284 sinks,
285 context.fanouts,
286 ))
287 }
288
289 async fn spawn_inner(
290 &self,
291 context: &mut TopologyContext,
292 diff: Option<&kinetic_config::model::ConfigDiff>,
293 sources: &mut HashMap<String, ComponentTask>,
294 transforms: &mut HashMap<String, ComponentTask>,
295 sinks: &mut HashMap<String, ComponentTask>,
296 ) -> anyhow::Result<()> {
297 if let Some(archive_config) = self.config.archive.clone()
299 && let Some(receiver) = context.shadow_archive_receiver.take()
300 {
301 let coordinator = ShutdownCoordinator::new();
302 let sink = crate::sinks::shadow_archive::ShadowArchiveSink::new(
303 archive_config,
304 receiver,
305 coordinator.register(),
306 self.config.pipeline_id.clone(),
307 );
308 let handle = sink.run();
309 sinks.insert(
310 "shadow_archive_sink".to_string(),
311 ComponentTask {
312 coordinator,
313 handle,
314 },
315 );
316 }
317
318 for (id, source_config) in &self.config.sources {
319 if diff.is_none_or(|d| d.sources_added.contains(id) || d.sources_changed.contains(id)) {
320 let output_id = OutputId {
321 component: ComponentId(id.clone()),
322 output_name: "default".to_string(),
323 };
324 let fanout = context
325 .fanouts
326 .get(&output_id)
327 .ok_or_else(|| anyhow::anyhow!("Default fanout missing for source {}", id))?;
328 let error_output_id = OutputId {
329 component: ComponentId(id.clone()),
330 output_name: "error".to_string(),
331 };
332 let error_fanout = context
333 .fanouts
334 .get(&error_output_id)
335 .ok_or_else(|| anyhow::anyhow!("Error fanout missing for source {}", id))?;
336
337 let coordinator = ShutdownCoordinator::new();
338 let cx = SourceContext {
339 id: ComponentId(id.clone()),
340 pipeline_id: self.config.pipeline_id.clone(),
341 out: fanout.clone_to_sender(id.clone()),
342 error_out: error_fanout.clone_to_sender(format!("{}_error", id)),
343 shutdown: coordinator.register(),
344 acknowledgements: Default::default(),
345 data_dir: Some(self.data_dir.join("sources").join(id)),
346 };
347 let handle = crate::topology::build_impl::build_source(source_config, cx)?;
348 sources.insert(
349 id.clone(),
350 ComponentTask {
351 coordinator,
352 handle,
353 },
354 );
355 }
356 }
357
358 for (id, transform_config) in &self.config.transforms {
359 if diff.is_none_or(|d| {
360 d.transforms_added.contains(id) || d.transforms_changed.contains(id)
361 }) {
362 let receiver = context
363 .transform_receivers
364 .remove(id)
365 .ok_or_else(|| anyhow::anyhow!("Receiver missing"))?;
366
367 let mut outs = HashMap::new();
368 for output_name in transform_config.outputs().keys() {
369 let output_id = OutputId {
370 component: ComponentId(id.clone()),
371 output_name: output_name.clone(),
372 };
373 let fanout = context
374 .fanouts
375 .get(&output_id)
376 .ok_or_else(|| anyhow::anyhow!("Fanout missing"))?;
377 outs.insert(
378 output_name.clone(),
379 fanout.clone_to_sender(format!("{}_{}", id, output_name)),
380 );
381 }
382 if !outs.contains_key("default") {
383 let output_id = OutputId {
384 component: ComponentId(id.clone()),
385 output_name: "default".to_string(),
386 };
387 if let Some(fanout) = context.fanouts.get(&output_id) {
388 outs.insert(
389 "default".to_string(),
390 fanout.clone_to_sender(format!("{}_default", id)),
391 );
392 }
393 }
394
395 let coordinator = ShutdownCoordinator::new();
396 let cx = TransformContext {
397 id: ComponentId(id.clone()),
398 pipeline_id: self.config.pipeline_id.clone(),
399 in_rx: receiver,
400 outs,
401 shutdown: coordinator.register(),
402 data_dir: Some(self.data_dir.join("transforms").join(id)),
403 };
404 let handle = crate::topology::build_impl::build_transform(transform_config, cx)?;
405 transforms.insert(
406 id.clone(),
407 ComponentTask {
408 coordinator,
409 handle,
410 },
411 );
412 }
413 }
414
415 for (id, sink_config) in &self.config.sinks {
416 if diff.is_none_or(|d| d.sinks_added.contains(id) || d.sinks_changed.contains(id)) {
417 let receiver = context
418 .sink_receivers
419 .remove(id)
420 .ok_or_else(|| anyhow::anyhow!("Receiver missing"))?;
421
422 let coordinator = ShutdownCoordinator::new();
423 let cx = SinkContext {
424 id: ComponentId(id.clone()),
425 in_rx: receiver,
426 shutdown: coordinator.register(),
427 acknowledgements: Default::default(),
428 data_dir: Some(self.data_dir.join("sinks").join(id)),
429 };
430 let handle = crate::topology::build_impl::build_sink(sink_config, cx)?;
431 sinks.insert(
432 id.clone(),
433 ComponentTask {
434 coordinator,
435 handle,
436 },
437 );
438 }
439 }
440
441 Ok(())
442 }
443
444 fn validate_references(&self) -> Result<()> {
445 let valid_components: HashSet<&String> = self
446 .config
447 .sources
448 .keys()
449 .chain(self.config.transforms.keys())
450 .collect();
451 for (id, transform) in &self.config.transforms {
452 self.check_inputs(id, transform.inputs(), &valid_components)?;
453 }
454 for (id, sink) in &self.config.sinks {
455 self.check_inputs(id, sink.inputs(), &valid_components)?;
456 }
457 Ok(())
458 }
459
460 fn check_inputs(
461 &self,
462 component_id: &str,
463 inputs: &[String],
464 valid_components: &HashSet<&String>,
465 ) -> Result<()> {
466 for input in inputs {
467 let parsed = InputRef::parse(input);
468 if !valid_components.contains(&parsed.component) {
469 return Err(Error::UnknownInput {
470 component: component_id.to_owned(),
471 input: input.clone(),
472 });
473 }
474 }
475 Ok(())
476 }
477
478 fn detect_cycles(&self) -> Result<()> {
479 let mut visited = HashSet::new();
480 let mut visiting = HashSet::new();
481 let mut graph: HashMap<String, Vec<String>> = HashMap::new();
482 for (id, transform) in &self.config.transforms {
483 for input in transform.inputs() {
484 graph
485 .entry(InputRef::parse(input).component)
486 .or_default()
487 .push(id.clone());
488 }
489 }
490 for (id, sink) in &self.config.sinks {
491 for input in sink.inputs() {
492 graph
493 .entry(InputRef::parse(input).component)
494 .or_default()
495 .push(id.clone());
496 }
497 }
498 for node in graph.keys() {
499 if !visited.contains(node) && self.dfs_cycle(node, &graph, &mut visited, &mut visiting)
500 {
501 return Err(Error::CycleDetected);
502 }
503 }
504 Ok(())
505 }
506
507 fn dfs_cycle(
508 &self,
509 node: &str,
510 graph: &HashMap<String, Vec<String>>,
511 visited: &mut HashSet<String>,
512 visiting: &mut HashSet<String>,
513 ) -> bool {
514 visiting.insert(node.to_string());
515 if let Some(neighbors) = graph.get(node) {
516 for neighbor in neighbors {
517 if visiting.contains(neighbor) {
518 return true;
519 }
520 if !visited.contains(neighbor) && self.dfs_cycle(neighbor, graph, visited, visiting)
521 {
522 return true;
523 }
524 }
525 }
526 visiting.remove(node);
527 visited.insert(node.to_string());
528 false
529 }
530}