1use crate::sinks;
4use crate::sources;
5use crate::transforms::{self, Transform};
6use kinetic_buffers::{self, WhenFull};
7use kinetic_common::config::{
8 AcknowledgementsConfig, SinkContext, SourceContext, TransformContext,
9};
10use kinetic_config::model::{SinkConfigEnum, SourceConfigEnum, TransformConfigEnum};
11use kinetic_core::encode::{DecoderConfig, EncoderConfig};
12use kinetic_core::healthcheck::Healthcheck;
13use kinetic_core::metadata::ComponentId;
14use kinetic_core::shutdown::ShutdownCoordinator;
15use std::sync::Arc;
16use tokio::task::JoinHandle;
17use tracing::error;
18
19pub fn build_source(
20 config: &SourceConfigEnum,
21 cx: SourceContext,
22) -> anyhow::Result<JoinHandle<()>> {
23 let decoder = if let Some(decoder_config) = get_decoder_config(config) {
24 decoder_config.build(Arc::new(arrow_schema::Schema::empty()))?
25 } else {
26 kinetic_encoder_json::JsonDecoderOptions::default()
27 .build(Arc::new(arrow_schema::Schema::empty()))?
28 };
29
30 match config {
31 SourceConfigEnum::Kafka(c) => {
32 let task = sources::kafka::consumer::ConsumerTask::new(
33 &c.config,
34 cx.id.0.clone(),
35 cx.pipeline_id.clone(),
36 cx.out,
37 cx.error_out,
38 cx.shutdown,
39 decoder,
40 )
41 .map_err(|e| anyhow::anyhow!("Failed to create Kafka consumer: {}", e))?;
42 Ok(tokio::spawn(async move { task.run().await }))
43 }
44 SourceConfigEnum::AwsS3(c) => {
45 let task = sources::aws::s3::S3SourceTask::new(c.config.clone(), cx, decoder);
46 Ok(tokio::spawn(async move { task.run().await }))
47 }
48 SourceConfigEnum::AwsSqs(c) => {
49 let task = sources::aws::sqs::SqsSourceTask::new(
50 c.config.clone(),
51 cx.id.0.clone(),
52 cx.pipeline_id.clone(),
53 cx.out,
54 cx.shutdown,
55 );
56 Ok(tokio::spawn(async move { task.run().await }))
57 }
58 SourceConfigEnum::AwsKinesis(_c) => {
59 anyhow::bail!("Kinesis source not yet implemented in data plane")
60 }
61 SourceConfigEnum::GcpCloudStorage(c) => {
62 let task = sources::gcp::gcs::GcsSourceTask::new(c.config.clone(), cx, decoder);
63 Ok(tokio::spawn(async move { task.run().await }))
64 }
65 SourceConfigEnum::GcpPubsub(c) => {
66 let task = sources::gcp::pubsub::PubSubSourceTask::new(
67 c.config.clone(),
68 cx.id.0.clone(),
69 cx.pipeline_id.clone(),
70 cx.out,
71 cx.shutdown,
72 );
73 Ok(tokio::spawn(async move { task.run().await }))
74 }
75 SourceConfigEnum::InternalMetrics(_) => {
76 let task = sources::internal::metrics::InternalMetricsTask::new(
77 cx.id.0.clone(),
78 cx.pipeline_id.clone(),
79 cx.out,
80 cx.error_out,
81 60,
82 cx.shutdown,
83 );
84 Ok(tokio::spawn(async move { task.run().await }))
85 }
86 SourceConfigEnum::InternalLogs(_) => {
87 anyhow::bail!("Internal logs source requires log_rx handle")
88 }
89 SourceConfigEnum::Opentelemetry(c) => {
90 let task = sources::opentelemetry::server::OtelSourceServer::new(
91 c.config.clone(),
92 cx.id.0.clone(),
93 Some(cx.out),
94 None,
95 None,
96 );
97 Ok(tokio::spawn(async move {
98 if let Err(e) = task.run().await {
99 error!("OTel source error: {}", e);
100 }
101 }))
102 }
103 }
104}
105
106pub async fn build_source_healthcheck(
107 config: &SourceConfigEnum,
108 id: String,
109 pipeline_id: String,
110) -> anyhow::Result<Option<Box<dyn Healthcheck>>> {
111 let decoder = if let Some(decoder_config) = get_decoder_config(config) {
112 decoder_config.build(Arc::new(arrow_schema::Schema::empty()))?
113 } else {
114 kinetic_encoder_json::JsonDecoderOptions::default()
115 .build(Arc::new(arrow_schema::Schema::empty()))?
116 };
117
118 let (out, _) = kinetic_buffers::channel(1, WhenFull::Block, format!("{}_hc_out", id));
119 let (error_out, _) = kinetic_buffers::channel(1, WhenFull::Block, format!("{}_hc_err", id));
120 let coordinator = ShutdownCoordinator::new();
121 let shutdown = coordinator.register();
122
123 let cx = SourceContext {
124 id: ComponentId(id.clone()),
125 pipeline_id,
126 out,
127 error_out,
128 shutdown,
129 acknowledgements: AcknowledgementsConfig::default(),
130 data_dir: None,
131 };
132
133 match config {
134 SourceConfigEnum::AwsS3(c) => Ok(Some(Box::new(sources::aws::s3::S3SourceTask::new(
135 c.config.clone(),
136 cx,
137 decoder,
138 )))),
139 SourceConfigEnum::GcpCloudStorage(c) => Ok(Some(Box::new(
140 sources::gcp::gcs::GcsSourceTask::new(c.config.clone(), cx, decoder),
141 ))),
142 SourceConfigEnum::GcpPubsub(c) => {
143 Ok(Some(Box::new(sources::gcp::pubsub::PubSubSourceTask::new(
144 c.config.clone(),
145 cx.id.0.clone(),
146 cx.pipeline_id.clone(),
147 cx.out,
148 cx.shutdown,
149 ))))
150 }
151 _ => Ok(None),
152 }
153}
154
155pub fn build_transform(
156 config: &TransformConfigEnum,
157 cx: TransformContext,
158) -> anyhow::Result<JoinHandle<()>> {
159 let transform: Box<dyn Transform> = match config {
160 TransformConfigEnum::Filter(c) => Box::new(transforms::filter::task::FilterTask::new(
161 cx.id.0.clone(),
162 cx.pipeline_id.clone(),
163 cx.in_rx,
164 cx.outs,
165 c.condition.clone(),
166 c.error_policy.clone(),
167 )),
168 TransformConfigEnum::OtelAggregate(c) => {
169 Box::new(transforms::otel_aggregate::task::OtelAggregateTask::new(
170 cx.id.0.clone(),
171 cx.pipeline_id.clone(),
172 cx.in_rx,
173 cx.outs,
174 c.config.clone(),
175 c.error_policy.clone(),
176 ))
177 }
178 TransformConfigEnum::Aggregate(c) => {
179 Box::new(transforms::aggregate::task::AggregateTask::new(
180 cx.id.0.clone(),
181 cx.pipeline_id.clone(),
182 cx.in_rx,
183 cx.outs,
184 c.config.clone(),
185 c.error_policy.clone(),
186 )?)
187 }
188 TransformConfigEnum::Sample(c) => Box::new(crate::transforms::sample::SampleTask::new(
189 cx.id.0.clone(),
190 cx.in_rx,
191 cx.outs,
192 c.mode.clone(),
193 c.error_policy.clone(),
194 c.route_all,
195 )),
196 TransformConfigEnum::SqlMap(c) => Box::new(transforms::sql_map::task::SqlMapTask::new(
197 cx.id.0.clone(),
198 cx.pipeline_id.clone(),
199 cx.in_rx,
200 cx.outs,
201 c.config.clone(),
202 c.error_policy.clone(),
203 )),
204 TransformConfigEnum::Map(_c) => {
205 return Err(anyhow::anyhow!(
206 "Map transform not yet fully implemented - WASM runtime integration pending"
207 ));
208 }
209 };
210
211 Ok(tokio::spawn(async move { transform.run().await }))
212}
213
214pub fn build_sink(config: &SinkConfigEnum, cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
215 let encoder = if let Some(encoder_config) = get_encoder_config(config) {
216 encoder_config.build()?
217 } else {
218 kinetic_encoder_json::JsonEncoderOptions::default().build()?
219 };
220
221 match config {
222 SinkConfigEnum::Kafka(c) => {
223 let task = sinks::kafka::producer::ProducerTask::new(
224 &c.config,
225 cx.id.0.clone(),
226 cx.in_rx,
227 cx.shutdown,
228 encoder,
229 )
230 .map_err(|e| anyhow::anyhow!("Failed to create Kafka producer: {}", e))?;
231 Ok(tokio::spawn(async move { task.run().await }))
232 }
233 SinkConfigEnum::S3(c) => {
234 let task = sinks::aws::s3::S3SinkTask::new(
235 c.config.clone(),
236 cx.id.0.clone(),
237 cx.in_rx,
238 cx.shutdown,
239 encoder,
240 );
241 Ok(tokio::spawn(async move { task.run().await }))
242 }
243 SinkConfigEnum::AwsKinesis(_c) => {
244 anyhow::bail!("Kinesis sink not yet implemented in data plane")
245 }
246 SinkConfigEnum::GcpCloudStorage(c) => {
247 let task = sinks::gcp::gcs::GcsSinkTask::new(
248 c.config.clone(),
249 cx.id.0.clone(),
250 cx.in_rx,
251 cx.shutdown,
252 encoder,
253 );
254 Ok(tokio::spawn(async move { task.run().await }))
255 }
256 SinkConfigEnum::GcpPubsub(c) => {
257 let task = sinks::gcp::pubsub::PubSubSinkTask::new(
258 c.config.clone(),
259 cx.id.0.clone(),
260 cx.in_rx,
261 cx.shutdown,
262 encoder,
263 );
264 Ok(tokio::spawn(async move { task.run().await }))
265 }
266 SinkConfigEnum::FlightSql(c) => {
267 let task = sinks::flight::client::FlightSinkClient::new(
268 c.config.clone(),
269 cx.id.0.clone(),
270 cx.in_rx,
271 );
272 Ok(tokio::spawn(async move { task.run().await }))
273 }
274 SinkConfigEnum::Dlq(c) => {
275 let task = sinks::aws::s3::S3SinkTask::new(
276 aws_common::config::S3SinkConfig {
277 auth: None,
278 bucket: c.config.bucket.clone(),
279 key_prefix: c.config.prefix.clone(),
280 compression: None,
281 encoding: Some(c.config.encoding.clone()),
282 batch: aws_common::config::BatchConfig {
283 max_size: Some(c.config.batch_size),
284 timeout_secs: Some(60), },
286 },
287 cx.id.0.clone(),
288 cx.in_rx,
289 cx.shutdown,
290 encoder,
291 );
292 Ok(tokio::spawn(async move { task.run().await }))
293 }
294 SinkConfigEnum::Iceberg(c) => {
295 let provider = iceberg_common::IcebergCatalogProvider::new(c.catalog_endpoint.clone());
296 let parts: Vec<String> = c.table.split('.').map(|s| s.to_string()).collect();
297 let table = if parts.len() > 1 {
298 catalog_common::TableIdent {
299 catalog: "default".to_string(),
300 namespace: parts[..parts.len() - 1].to_vec(),
301 name: parts.last().unwrap_or(&c.table).to_string(),
302 }
303 } else {
304 catalog_common::TableIdent {
305 catalog: "default".to_string(),
306 namespace: vec![],
307 name: c.table.clone(),
308 }
309 };
310 let task = sinks::iceberg::IcebergSink::new(
311 provider,
312 table,
313 cx.id.0.clone(),
314 cx.in_rx,
315 cx.shutdown,
316 encoder,
317 c.max_rows,
318 c.max_bytes,
319 c.max_age_ms,
320 );
321 Ok(tokio::spawn(async move { task.run().await }))
322 }
323 SinkConfigEnum::Delta(c) => {
324 let provider = delta_common::DeltaCatalogProvider::new(c.warehouse.clone());
325 let table = catalog_common::TableIdent {
326 catalog: "default".to_string(),
327 namespace: vec![],
328 name: c.table.clone(),
329 };
330 let task = sinks::delta::DeltaSink::new(
331 provider,
332 table,
333 cx.id.0.clone(),
334 cx.in_rx,
335 cx.shutdown,
336 encoder,
337 c.max_rows,
338 c.max_bytes,
339 c.max_age_ms,
340 );
341 Ok(tokio::spawn(async move { task.run().await }))
342 }
343 SinkConfigEnum::Starrocks(c) => {
344 let provider = starrocks_common::StarRocksCatalogProvider::new(
345 c.fe_http.clone(),
346 c.flight_sql.clone(),
347 match c.ingestion_mode.as_str() {
348 "flight_sql" => starrocks_common::IngestionMode::FlightSql,
349 "stream_load" => starrocks_common::IngestionMode::StreamLoad,
350 _ => starrocks_common::IngestionMode::Auto,
351 },
352 );
353 let table = catalog_common::TableIdent {
354 catalog: c.database.clone(),
355 namespace: vec![],
356 name: c.table.clone(),
357 };
358 let task = sinks::starrocks::StarRocksSink::new(
359 provider,
360 table,
361 cx.id.0.clone(),
362 cx.in_rx,
363 cx.shutdown,
364 encoder,
365 c.max_rows,
366 c.max_bytes,
367 c.max_age_ms,
368 );
369 Ok(tokio::spawn(async move { task.run().await }))
370 }
371 }
372}
373
374pub async fn build_sink_healthcheck(
375 config: &SinkConfigEnum,
376 id: String,
377) -> anyhow::Result<Option<Box<dyn Healthcheck>>> {
378 let encoder = if let Some(encoder_config) = get_encoder_config(config) {
379 encoder_config.build()?
380 } else {
381 kinetic_encoder_json::JsonEncoderOptions::default().build()?
382 };
383
384 let (_, in_rx) = kinetic_buffers::channel(1, WhenFull::Block, format!("{}_hc_in", id));
385 let coordinator = ShutdownCoordinator::new();
386 let shutdown = coordinator.register();
387
388 let cx = SinkContext {
389 id: ComponentId(id.clone()),
390 in_rx,
391 shutdown,
392 acknowledgements: AcknowledgementsConfig::default(),
393 data_dir: None,
394 };
395
396 match config {
397 SinkConfigEnum::S3(c) => Ok(Some(Box::new(sinks::aws::s3::S3SinkTask::new(
398 c.config.clone(),
399 cx.id.0.clone(),
400 cx.in_rx,
401 cx.shutdown,
402 encoder,
403 )))),
404 SinkConfigEnum::GcpCloudStorage(c) => {
405 Ok(Some(Box::new(sinks::gcp::gcs::GcsSinkTask::new(
406 c.config.clone(),
407 cx.id.0.clone(),
408 cx.in_rx,
409 cx.shutdown,
410 encoder,
411 ))))
412 }
413 SinkConfigEnum::GcpPubsub(c) => {
414 Ok(Some(Box::new(sinks::gcp::pubsub::PubSubSinkTask::new(
415 c.config.clone(),
416 cx.id.0.clone(),
417 cx.in_rx,
418 cx.shutdown,
419 encoder,
420 ))))
421 }
422 _ => Ok(None),
423 }
424}
425
426pub fn get_encoder_config(
427 config: &SinkConfigEnum,
428) -> Option<&kinetic_config::model::EncoderConfigEnum> {
429 match config {
430 SinkConfigEnum::Kafka(c) => c.encoding.as_ref(),
431 SinkConfigEnum::S3(c) => c.encoding.as_ref(),
432 SinkConfigEnum::AwsKinesis(_) => None,
433 SinkConfigEnum::GcpCloudStorage(c) => c.encoding.as_ref(),
434 SinkConfigEnum::GcpPubsub(c) => c.encoding.as_ref(),
435 SinkConfigEnum::FlightSql(c) => c.encoding.as_ref(),
436 SinkConfigEnum::Dlq(c) => c.encoding.as_ref(),
437 SinkConfigEnum::Iceberg(c) => c.encoding.as_ref(),
438 SinkConfigEnum::Delta(c) => c.encoding.as_ref(),
439 SinkConfigEnum::Starrocks(c) => c.encoding.as_ref(),
440 }
441}
442
443pub fn get_decoder_config(
444 config: &SourceConfigEnum,
445) -> Option<&kinetic_config::model::DecoderConfigEnum> {
446 match config {
447 SourceConfigEnum::Kafka(c) => c.decoding.as_ref(),
448 SourceConfigEnum::AwsS3(c) => c.decoding.as_ref(),
449 SourceConfigEnum::AwsSqs(c) => c.decoding.as_ref(),
450 SourceConfigEnum::AwsKinesis(c) => c.decoding.as_ref(),
451 SourceConfigEnum::GcpCloudStorage(c) => c.decoding.as_ref(),
452 SourceConfigEnum::GcpPubsub(c) => c.decoding.as_ref(),
453 SourceConfigEnum::Opentelemetry(c) => c.decoding.as_ref(),
454 _ => None,
455 }
456}