Skip to main content

kinetic/topology/
build_impl.rs

1//! Dispatch functions for building Kinetic components from configuration.
2
3use crate::sinks;
4use crate::sources;
5use crate::transforms::{self, Transform};
6use kinetic_buffers::{self, WhenFull};
7use kinetic_common::config::{
8    AcknowledgementsConfig, SinkContext, SourceContext, TransformContext,
9};
10use kinetic_config::model::{SinkConfigEnum, SourceConfigEnum, TransformConfigEnum};
11use kinetic_core::encode::{DecoderConfig, EncoderConfig};
12use kinetic_core::healthcheck::Healthcheck;
13use kinetic_core::metadata::ComponentId;
14use kinetic_core::shutdown::ShutdownCoordinator;
15use std::sync::Arc;
16use tokio::task::JoinHandle;
17use tracing::error;
18
19pub fn build_source(
20    config: &SourceConfigEnum,
21    cx: SourceContext,
22) -> anyhow::Result<JoinHandle<()>> {
23    let decoder = if let Some(decoder_config) = get_decoder_config(config) {
24        decoder_config.build(Arc::new(arrow_schema::Schema::empty()))?
25    } else {
26        kinetic_encoder_json::JsonDecoderOptions::default()
27            .build(Arc::new(arrow_schema::Schema::empty()))?
28    };
29
30    match config {
31        SourceConfigEnum::Kafka(c) => {
32            let task = sources::kafka::consumer::ConsumerTask::new(
33                &c.config,
34                cx.id.0.clone(),
35                cx.pipeline_id.clone(),
36                cx.out,
37                cx.error_out,
38                cx.shutdown,
39                decoder,
40            )
41            .map_err(|e| anyhow::anyhow!("Failed to create Kafka consumer: {}", e))?;
42            Ok(tokio::spawn(async move { task.run().await }))
43        }
44        SourceConfigEnum::AwsS3(c) => {
45            let task = sources::aws::s3::S3SourceTask::new(c.config.clone(), cx, decoder);
46            Ok(tokio::spawn(async move { task.run().await }))
47        }
48        SourceConfigEnum::AwsSqs(c) => {
49            let task = sources::aws::sqs::SqsSourceTask::new(
50                c.config.clone(),
51                cx.id.0.clone(),
52                cx.pipeline_id.clone(),
53                cx.out,
54                cx.shutdown,
55            );
56            Ok(tokio::spawn(async move { task.run().await }))
57        }
58        SourceConfigEnum::AwsKinesis(_c) => {
59            anyhow::bail!("Kinesis source not yet implemented in data plane")
60        }
61        SourceConfigEnum::GcpCloudStorage(c) => {
62            let task = sources::gcp::gcs::GcsSourceTask::new(c.config.clone(), cx, decoder);
63            Ok(tokio::spawn(async move { task.run().await }))
64        }
65        SourceConfigEnum::GcpPubsub(c) => {
66            let task = sources::gcp::pubsub::PubSubSourceTask::new(
67                c.config.clone(),
68                cx.id.0.clone(),
69                cx.pipeline_id.clone(),
70                cx.out,
71                cx.shutdown,
72            );
73            Ok(tokio::spawn(async move { task.run().await }))
74        }
75        SourceConfigEnum::InternalMetrics(_) => {
76            let task = sources::internal::metrics::InternalMetricsTask::new(
77                cx.id.0.clone(),
78                cx.pipeline_id.clone(),
79                cx.out,
80                cx.error_out,
81                60,
82                cx.shutdown,
83            );
84            Ok(tokio::spawn(async move { task.run().await }))
85        }
86        SourceConfigEnum::InternalLogs(_) => {
87            anyhow::bail!("Internal logs source requires log_rx handle")
88        }
89        SourceConfigEnum::Opentelemetry(c) => {
90            let task = sources::opentelemetry::server::OtelSourceServer::new(
91                c.config.clone(),
92                cx.id.0.clone(),
93                Some(cx.out),
94                None,
95                None,
96            );
97            Ok(tokio::spawn(async move {
98                if let Err(e) = task.run().await {
99                    error!("OTel source error: {}", e);
100                }
101            }))
102        }
103    }
104}
105
106pub async fn build_source_healthcheck(
107    config: &SourceConfigEnum,
108    id: String,
109    pipeline_id: String,
110) -> anyhow::Result<Option<Box<dyn Healthcheck>>> {
111    let decoder = if let Some(decoder_config) = get_decoder_config(config) {
112        decoder_config.build(Arc::new(arrow_schema::Schema::empty()))?
113    } else {
114        kinetic_encoder_json::JsonDecoderOptions::default()
115            .build(Arc::new(arrow_schema::Schema::empty()))?
116    };
117
118    let (out, _) = kinetic_buffers::channel(1, WhenFull::Block, format!("{}_hc_out", id));
119    let (error_out, _) = kinetic_buffers::channel(1, WhenFull::Block, format!("{}_hc_err", id));
120    let coordinator = ShutdownCoordinator::new();
121    let shutdown = coordinator.register();
122
123    let cx = SourceContext {
124        id: ComponentId(id.clone()),
125        pipeline_id,
126        out,
127        error_out,
128        shutdown,
129        acknowledgements: AcknowledgementsConfig::default(),
130        data_dir: None,
131    };
132
133    match config {
134        SourceConfigEnum::AwsS3(c) => Ok(Some(Box::new(sources::aws::s3::S3SourceTask::new(
135            c.config.clone(),
136            cx,
137            decoder,
138        )))),
139        SourceConfigEnum::GcpCloudStorage(c) => Ok(Some(Box::new(
140            sources::gcp::gcs::GcsSourceTask::new(c.config.clone(), cx, decoder),
141        ))),
142        SourceConfigEnum::GcpPubsub(c) => {
143            Ok(Some(Box::new(sources::gcp::pubsub::PubSubSourceTask::new(
144                c.config.clone(),
145                cx.id.0.clone(),
146                cx.pipeline_id.clone(),
147                cx.out,
148                cx.shutdown,
149            ))))
150        }
151        _ => Ok(None),
152    }
153}
154
155pub fn build_transform(
156    config: &TransformConfigEnum,
157    cx: TransformContext,
158) -> anyhow::Result<JoinHandle<()>> {
159    let transform: Box<dyn Transform> = match config {
160        TransformConfigEnum::Filter(c) => Box::new(transforms::filter::task::FilterTask::new(
161            cx.id.0.clone(),
162            cx.pipeline_id.clone(),
163            cx.in_rx,
164            cx.outs,
165            c.condition.clone(),
166            c.error_policy.clone(),
167        )),
168        TransformConfigEnum::OtelAggregate(c) => {
169            Box::new(transforms::otel_aggregate::task::OtelAggregateTask::new(
170                cx.id.0.clone(),
171                cx.pipeline_id.clone(),
172                cx.in_rx,
173                cx.outs,
174                c.config.clone(),
175                c.error_policy.clone(),
176            ))
177        }
178        TransformConfigEnum::Aggregate(c) => {
179            Box::new(transforms::aggregate::task::AggregateTask::new(
180                cx.id.0.clone(),
181                cx.pipeline_id.clone(),
182                cx.in_rx,
183                cx.outs,
184                c.config.clone(),
185                c.error_policy.clone(),
186            )?)
187        }
188        TransformConfigEnum::Sample(c) => Box::new(crate::transforms::sample::SampleTask::new(
189            cx.id.0.clone(),
190            cx.in_rx,
191            cx.outs,
192            c.mode.clone(),
193            c.error_policy.clone(),
194            c.route_all,
195        )),
196        TransformConfigEnum::SqlMap(c) => Box::new(transforms::sql_map::task::SqlMapTask::new(
197            cx.id.0.clone(),
198            cx.pipeline_id.clone(),
199            cx.in_rx,
200            cx.outs,
201            c.config.clone(),
202            c.error_policy.clone(),
203        )),
204        TransformConfigEnum::Map(_c) => {
205            return Err(anyhow::anyhow!(
206                "Map transform not yet fully implemented - WASM runtime integration pending"
207            ));
208        }
209    };
210
211    Ok(tokio::spawn(async move { transform.run().await }))
212}
213
214pub fn build_sink(config: &SinkConfigEnum, cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
215    let encoder = if let Some(encoder_config) = get_encoder_config(config) {
216        encoder_config.build()?
217    } else {
218        kinetic_encoder_json::JsonEncoderOptions::default().build()?
219    };
220
221    match config {
222        SinkConfigEnum::Kafka(c) => {
223            let task = sinks::kafka::producer::ProducerTask::new(
224                &c.config,
225                cx.id.0.clone(),
226                cx.in_rx,
227                cx.shutdown,
228                encoder,
229            )
230            .map_err(|e| anyhow::anyhow!("Failed to create Kafka producer: {}", e))?;
231            Ok(tokio::spawn(async move { task.run().await }))
232        }
233        SinkConfigEnum::S3(c) => {
234            let task = sinks::aws::s3::S3SinkTask::new(
235                c.config.clone(),
236                cx.id.0.clone(),
237                cx.in_rx,
238                cx.shutdown,
239                encoder,
240            );
241            Ok(tokio::spawn(async move { task.run().await }))
242        }
243        SinkConfigEnum::AwsKinesis(_c) => {
244            anyhow::bail!("Kinesis sink not yet implemented in data plane")
245        }
246        SinkConfigEnum::GcpCloudStorage(c) => {
247            let task = sinks::gcp::gcs::GcsSinkTask::new(
248                c.config.clone(),
249                cx.id.0.clone(),
250                cx.in_rx,
251                cx.shutdown,
252                encoder,
253            );
254            Ok(tokio::spawn(async move { task.run().await }))
255        }
256        SinkConfigEnum::GcpPubsub(c) => {
257            let task = sinks::gcp::pubsub::PubSubSinkTask::new(
258                c.config.clone(),
259                cx.id.0.clone(),
260                cx.in_rx,
261                cx.shutdown,
262                encoder,
263            );
264            Ok(tokio::spawn(async move { task.run().await }))
265        }
266        SinkConfigEnum::FlightSql(c) => {
267            let task = sinks::flight::client::FlightSinkClient::new(
268                c.config.clone(),
269                cx.id.0.clone(),
270                cx.in_rx,
271            );
272            Ok(tokio::spawn(async move { task.run().await }))
273        }
274        SinkConfigEnum::Dlq(c) => {
275            let task = sinks::aws::s3::S3SinkTask::new(
276                aws_common::config::S3SinkConfig {
277                    auth: None,
278                    bucket: c.config.bucket.clone(),
279                    key_prefix: c.config.prefix.clone(),
280                    compression: None,
281                    encoding: Some(c.config.encoding.clone()),
282                    batch: aws_common::config::BatchConfig {
283                        max_size: Some(c.config.batch_size),
284                        timeout_secs: Some(60), // parse c.config.timeout
285                    },
286                },
287                cx.id.0.clone(),
288                cx.in_rx,
289                cx.shutdown,
290                encoder,
291            );
292            Ok(tokio::spawn(async move { task.run().await }))
293        }
294        SinkConfigEnum::Iceberg(c) => {
295            let provider = iceberg_common::IcebergCatalogProvider::new(c.catalog_endpoint.clone());
296            let parts: Vec<String> = c.table.split('.').map(|s| s.to_string()).collect();
297            let table = if parts.len() > 1 {
298                catalog_common::TableIdent {
299                    catalog: "default".to_string(),
300                    namespace: parts[..parts.len() - 1].to_vec(),
301                    name: parts.last().unwrap_or(&c.table).to_string(),
302                }
303            } else {
304                catalog_common::TableIdent {
305                    catalog: "default".to_string(),
306                    namespace: vec![],
307                    name: c.table.clone(),
308                }
309            };
310            let task = sinks::iceberg::IcebergSink::new(
311                provider,
312                table,
313                cx.id.0.clone(),
314                cx.in_rx,
315                cx.shutdown,
316                encoder,
317                c.max_rows,
318                c.max_bytes,
319                c.max_age_ms,
320            );
321            Ok(tokio::spawn(async move { task.run().await }))
322        }
323        SinkConfigEnum::Delta(c) => {
324            let provider = delta_common::DeltaCatalogProvider::new(c.warehouse.clone());
325            let table = catalog_common::TableIdent {
326                catalog: "default".to_string(),
327                namespace: vec![],
328                name: c.table.clone(),
329            };
330            let task = sinks::delta::DeltaSink::new(
331                provider,
332                table,
333                cx.id.0.clone(),
334                cx.in_rx,
335                cx.shutdown,
336                encoder,
337                c.max_rows,
338                c.max_bytes,
339                c.max_age_ms,
340            );
341            Ok(tokio::spawn(async move { task.run().await }))
342        }
343        SinkConfigEnum::Starrocks(c) => {
344            let provider = starrocks_common::StarRocksCatalogProvider::new(
345                c.fe_http.clone(),
346                c.flight_sql.clone(),
347                match c.ingestion_mode.as_str() {
348                    "flight_sql" => starrocks_common::IngestionMode::FlightSql,
349                    "stream_load" => starrocks_common::IngestionMode::StreamLoad,
350                    _ => starrocks_common::IngestionMode::Auto,
351                },
352            );
353            let table = catalog_common::TableIdent {
354                catalog: c.database.clone(),
355                namespace: vec![],
356                name: c.table.clone(),
357            };
358            let task = sinks::starrocks::StarRocksSink::new(
359                provider,
360                table,
361                cx.id.0.clone(),
362                cx.in_rx,
363                cx.shutdown,
364                encoder,
365                c.max_rows,
366                c.max_bytes,
367                c.max_age_ms,
368            );
369            Ok(tokio::spawn(async move { task.run().await }))
370        }
371    }
372}
373
374pub async fn build_sink_healthcheck(
375    config: &SinkConfigEnum,
376    id: String,
377) -> anyhow::Result<Option<Box<dyn Healthcheck>>> {
378    let encoder = if let Some(encoder_config) = get_encoder_config(config) {
379        encoder_config.build()?
380    } else {
381        kinetic_encoder_json::JsonEncoderOptions::default().build()?
382    };
383
384    let (_, in_rx) = kinetic_buffers::channel(1, WhenFull::Block, format!("{}_hc_in", id));
385    let coordinator = ShutdownCoordinator::new();
386    let shutdown = coordinator.register();
387
388    let cx = SinkContext {
389        id: ComponentId(id.clone()),
390        in_rx,
391        shutdown,
392        acknowledgements: AcknowledgementsConfig::default(),
393        data_dir: None,
394    };
395
396    match config {
397        SinkConfigEnum::S3(c) => Ok(Some(Box::new(sinks::aws::s3::S3SinkTask::new(
398            c.config.clone(),
399            cx.id.0.clone(),
400            cx.in_rx,
401            cx.shutdown,
402            encoder,
403        )))),
404        SinkConfigEnum::GcpCloudStorage(c) => {
405            Ok(Some(Box::new(sinks::gcp::gcs::GcsSinkTask::new(
406                c.config.clone(),
407                cx.id.0.clone(),
408                cx.in_rx,
409                cx.shutdown,
410                encoder,
411            ))))
412        }
413        SinkConfigEnum::GcpPubsub(c) => {
414            Ok(Some(Box::new(sinks::gcp::pubsub::PubSubSinkTask::new(
415                c.config.clone(),
416                cx.id.0.clone(),
417                cx.in_rx,
418                cx.shutdown,
419                encoder,
420            ))))
421        }
422        _ => Ok(None),
423    }
424}
425
426pub fn get_encoder_config(
427    config: &SinkConfigEnum,
428) -> Option<&kinetic_config::model::EncoderConfigEnum> {
429    match config {
430        SinkConfigEnum::Kafka(c) => c.encoding.as_ref(),
431        SinkConfigEnum::S3(c) => c.encoding.as_ref(),
432        SinkConfigEnum::AwsKinesis(_) => None,
433        SinkConfigEnum::GcpCloudStorage(c) => c.encoding.as_ref(),
434        SinkConfigEnum::GcpPubsub(c) => c.encoding.as_ref(),
435        SinkConfigEnum::FlightSql(c) => c.encoding.as_ref(),
436        SinkConfigEnum::Dlq(c) => c.encoding.as_ref(),
437        SinkConfigEnum::Iceberg(c) => c.encoding.as_ref(),
438        SinkConfigEnum::Delta(c) => c.encoding.as_ref(),
439        SinkConfigEnum::Starrocks(c) => c.encoding.as_ref(),
440    }
441}
442
443pub fn get_decoder_config(
444    config: &SourceConfigEnum,
445) -> Option<&kinetic_config::model::DecoderConfigEnum> {
446    match config {
447        SourceConfigEnum::Kafka(c) => c.decoding.as_ref(),
448        SourceConfigEnum::AwsS3(c) => c.decoding.as_ref(),
449        SourceConfigEnum::AwsSqs(c) => c.decoding.as_ref(),
450        SourceConfigEnum::AwsKinesis(c) => c.decoding.as_ref(),
451        SourceConfigEnum::GcpCloudStorage(c) => c.decoding.as_ref(),
452        SourceConfigEnum::GcpPubsub(c) => c.decoding.as_ref(),
453        SourceConfigEnum::Opentelemetry(c) => c.decoding.as_ref(),
454        _ => None,
455    }
456}