1use arrow_schema::Schema;
4use aws_common::config::{KinesisConfig, S3SinkConfig, S3SourceConfig, SqsConfig};
5use flight_common::config::FlightClientConfig;
6use gcp_common::config::{GcsSinkConfig, GcsSourceConfig, PubSubSinkConfig, PubSubSourceConfig};
7use kafka_common::{KafkaConsumerConfig, KafkaProducerConfig};
8use kinetic_common::config::{
9 SinkConfig, SinkContext, SourceConfig, SourceContext, TransformConfig, TransformContext,
10};
11use kinetic_core::encode::{
12 Decoder, DecoderConfig, Encoder, EncoderConfig, Result as EncodeResult,
13};
14use kinetic_doc_derive::{ComponentDoc, EnumDoc, FieldDoc};
15use kinetic_encoder_arrow::{ArrowIpcDecoderOptions, ArrowIpcEncoderOptions};
16use kinetic_encoder_avro::{AvroDecoderOptions, AvroEncoderOptions};
17use kinetic_encoder_json::{JsonDecoderOptions, JsonEncoderOptions};
18use kinetic_encoder_ocsf::{OcsfDecoderConfig, OcsfEncoderConfig};
19use otel_common::config::OtelServerConfig;
20use kinetic_encoder_parquet::{ParquetDecoderOptions, ParquetEncoderOptions};
21use kinetic_encoder_protobuf::{ProtobufDecoderOptions, ProtobufEncoderOptions};
22use serde::{Deserialize, Serialize};
23use std::collections::{HashMap, HashSet};
24use std::sync::Arc;
25use std::sync::LazyLock;
26use tokio::task::JoinHandle;
27
28#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, EnumDoc)]
30#[serde(tag = "codec", rename_all = "snake_case")]
31pub enum EncoderConfigEnum {
32 Json(JsonEncoderOptions),
33 Arrow(ArrowIpcEncoderOptions),
34 Ocsf(OcsfEncoderConfig),
35 Avro(AvroEncoderOptions),
36 Parquet(ParquetEncoderOptions),
37 Protobuf(ProtobufEncoderOptions),
38}
39
40impl EncoderConfig for EncoderConfigEnum {
41 fn build(&self) -> EncodeResult<Arc<dyn Encoder>> {
42 match self {
43 Self::Json(c) => c.build(),
44 Self::Arrow(c) => c.build(),
45 Self::Ocsf(c) => c.build(),
46 Self::Avro(c) => c.build(),
47 Self::Parquet(c) => c.build(),
48 Self::Protobuf(c) => c.build(),
49 }
50 }
51}
52
53#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, EnumDoc)]
55#[serde(tag = "codec", rename_all = "snake_case")]
56pub enum DecoderConfigEnum {
57 Json(JsonDecoderOptions),
58 Arrow(ArrowIpcDecoderOptions),
59 Ocsf(OcsfDecoderConfig),
60 Avro(AvroDecoderOptions),
61 Parquet(ParquetDecoderOptions),
62 Protobuf(ProtobufDecoderOptions),
63}
64
65impl DecoderConfig for DecoderConfigEnum {
66 fn build(&self, schema: Arc<Schema>) -> EncodeResult<Arc<dyn Decoder>> {
67 match self {
68 Self::Json(c) => c.build(schema),
69 Self::Arrow(c) => c.build(schema),
70 Self::Ocsf(c) => c.build(schema),
71 Self::Avro(c) => c.build(schema),
72 Self::Parquet(c) => c.build(schema),
73 Self::Protobuf(c) => c.build(schema),
74 }
75 }
76}
77
78#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq)]
83pub struct Config {
84 #[serde(default = "default_pipeline_id")]
86 pub pipeline_id: String,
87
88 #[serde(default)]
90 pub archive: Option<ArchiveStorageConfig>,
91
92 #[serde(default)]
94 pub sources: HashMap<String, SourceConfigEnum>,
95
96 #[serde(default)]
98 pub transforms: HashMap<String, TransformConfigEnum>,
99
100 #[serde(default)]
102 pub sinks: HashMap<String, SinkConfigEnum>,
103}
104
105#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
107pub struct ArchiveStorageConfig {
108 #[doc_field(required, example = "s3://my-archive-bucket/")]
110 pub endpoint: String,
111 pub auth: Option<aws_common::config::AwsConfig>,
113 #[serde(default = "default_archive_layout")]
115 #[doc_field(default = "hive-hourly")]
116 pub layout: String,
117 #[serde(default = "default_archive_ttl")]
119 #[doc_field(default = "72")]
120 pub ttl_hours: u64,
121}
122
123fn default_archive_layout() -> String {
124 "hive-hourly".to_string()
125}
126
127fn default_archive_ttl() -> u64 {
128 72
129}
130
131fn default_pipeline_id() -> String {
132 "default".to_string()
133}
134
135impl Config {
136 pub fn validate(&self) -> anyhow::Result<()> {
138 if self.sources.is_empty() {
139 anyhow::bail!("Configuration must have at least one source.");
140 }
141 if self.sinks.is_empty() {
142 anyhow::bail!("Configuration must have at least one sink.");
143 }
144 Ok(())
145 }
146
147 pub fn diff(&self, other: &Self) -> ConfigDiff {
148 let mut diff = ConfigDiff::default();
149
150 for (name, source) in &self.sources {
152 if let Some(other_source) = other.sources.get(name) {
153 if source != other_source {
154 diff.sources_changed.insert(name.clone());
155 }
156 } else {
157 diff.sources_removed.insert(name.clone());
158 }
159 }
160 for name in other.sources.keys() {
161 if !self.sources.contains_key(name) {
162 diff.sources_added.insert(name.clone());
163 }
164 }
165
166 for (name, transform) in &self.transforms {
168 if let Some(other_transform) = other.transforms.get(name) {
169 if transform != other_transform {
170 diff.transforms_changed.insert(name.clone());
171 }
172 } else {
173 diff.transforms_removed.insert(name.clone());
174 }
175 }
176 for name in other.transforms.keys() {
177 if !self.transforms.contains_key(name) {
178 diff.transforms_added.insert(name.clone());
179 }
180 }
181
182 for (name, sink) in &self.sinks {
184 if let Some(other_sink) = other.sinks.get(name) {
185 if sink != other_sink {
186 diff.sinks_changed.insert(name.clone());
187 }
188 } else {
189 diff.sinks_removed.insert(name.clone());
190 }
191 }
192 for name in other.sinks.keys() {
193 if !self.sources.contains_key(name) {
194 diff.sinks_added.insert(name.clone());
195 }
196 }
197
198 diff
199 }
200}
201
202#[derive(Debug, Default)]
207pub struct ConfigDiff {
208 pub sources_added: HashSet<String>,
209 pub sources_removed: HashSet<String>,
210 pub sources_changed: HashSet<String>,
211
212 pub transforms_added: HashSet<String>,
213 pub transforms_removed: HashSet<String>,
214 pub transforms_changed: HashSet<String>,
215
216 pub sinks_added: HashSet<String>,
217 pub sinks_removed: HashSet<String>,
218 pub sinks_changed: HashSet<String>,
219}
220
221impl ConfigDiff {
222 pub fn is_empty(&self) -> bool {
223 self.sources_added.is_empty()
224 && self.sources_removed.is_empty()
225 && self.sources_changed.is_empty()
226 && self.transforms_added.is_empty()
227 && self.transforms_removed.is_empty()
228 && self.transforms_changed.is_empty()
229 && self.sinks_added.is_empty()
230 && self.sinks_removed.is_empty()
231 && self.sinks_changed.is_empty()
232 }
233}
234
235#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
239#[serde(tag = "type", rename_all = "snake_case")]
240pub enum SourceConfigEnum {
241 Kafka(Box<KafkaConsumerConfigWrapped>),
242 AwsS3(Box<S3SourceConfigWrapped>),
243 AwsSqs(Box<SqsConfigWrapped>),
244 AwsKinesis(Box<KinesisConfigWrapped>),
245 GcpCloudStorage(Box<GcsSourceConfigWrapped>),
246 GcpPubsub(Box<PubSubSourceConfigWrapped>),
247 InternalMetrics(InternalMetricsConfig),
248 InternalLogs(InternalLogsConfig),
249 Opentelemetry(Box<OtelSourceConfigWrapped>),
250}
251
252impl SourceConfig for SourceConfigEnum {
253 fn build(&self, cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
254 match self {
255 Self::Kafka(c) => c.build(cx),
256 Self::AwsS3(c) => c.build(cx),
257 Self::AwsSqs(c) => SourceConfig::build(c.as_ref(), cx),
258 Self::AwsKinesis(c) => SourceConfig::build(c.as_ref(), cx),
259 Self::GcpCloudStorage(c) => c.build(cx),
260 Self::GcpPubsub(c) => c.build(cx),
261 Self::InternalMetrics(c) => c.build(cx),
262 Self::InternalLogs(c) => c.build(cx),
263 Self::Opentelemetry(c) => c.build(cx),
264 }
265 }
266}
267
268#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
270#[component(type = "source", name = "kafka")]
271pub struct KafkaConsumerConfigWrapped {
272 #[serde(default)]
274 pub decoding: Option<DecoderConfigEnum>,
275 #[serde(flatten)]
277 #[doc_field(flatten)]
278 pub config: KafkaConsumerConfig,
279}
280
281impl SourceConfig for KafkaConsumerConfigWrapped {
282 fn build(&self, cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
283 self.config.build(cx)
284 }
285}
286
287#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
289#[component(type = "source", name = "aws_s3")]
290pub struct S3SourceConfigWrapped {
291 #[serde(default)]
293 pub decoding: Option<DecoderConfigEnum>,
294 #[serde(flatten)]
296 #[doc_field(flatten)]
297 pub config: S3SourceConfig,
298}
299
300impl SourceConfig for S3SourceConfigWrapped {
301 fn build(&self, cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
302 self.config.build(cx)
303 }
304}
305
306#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
308#[component(type = "source", name = "aws_sqs")]
309pub struct SqsConfigWrapped {
310 #[serde(default)]
312 pub decoding: Option<DecoderConfigEnum>,
313 #[serde(flatten)]
315 #[doc_field(flatten)]
316 pub config: SqsConfig,
317}
318
319impl SourceConfig for SqsConfigWrapped {
320 fn build(&self, cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
321 SourceConfig::build(&self.config, cx)
322 }
323}
324
325#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
327#[component(type = "source", name = "aws_kinesis")]
328pub struct KinesisConfigWrapped {
329 #[serde(default)]
331 #[doc_field(example = "[\"my_transform\"]")]
332 pub inputs: Vec<String>,
333 #[serde(default)]
335 #[doc_field(default = "drop_on_error")]
336 pub error_policy: ErrorPolicy,
337 #[serde(default)]
339 pub decoding: Option<DecoderConfigEnum>,
340 #[serde(default)]
342 pub buffer: BufferConfig,
343 #[serde(flatten)]
345 #[doc_field(flatten)]
346 pub config: KinesisConfig,
347}
348
349impl SourceConfig for KinesisConfigWrapped {
350 fn build(&self, cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
351 SourceConfig::build(&self.config, cx)
352 }
353}
354
355impl SinkConfig for KinesisConfigWrapped {
356 fn build(&self, cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
357 SinkConfig::build(&self.config, cx)
358 }
359}
360
361#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
363#[component(type = "source", name = "gcp_cloud_storage")]
364pub struct GcsSourceConfigWrapped {
365 #[serde(default)]
367 pub decoding: Option<DecoderConfigEnum>,
368 #[serde(flatten)]
370 #[doc_field(flatten)]
371 pub config: GcsSourceConfig,
372}
373
374impl SourceConfig for GcsSourceConfigWrapped {
375 fn build(&self, cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
376 self.config.build(cx)
377 }
378}
379
380#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
382#[component(type = "source", name = "gcp_pubsub")]
383pub struct PubSubSourceConfigWrapped {
384 #[serde(default)]
386 pub decoding: Option<DecoderConfigEnum>,
387 #[serde(flatten)]
389 #[doc_field(flatten)]
390 pub config: PubSubSourceConfig,
391}
392
393impl SourceConfig for PubSubSourceConfigWrapped {
394 fn build(&self, cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
395 self.config.build(cx)
396 }
397}
398
399#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq, ComponentDoc)]
401#[component(type = "source", name = "internal_metrics")]
402pub struct InternalMetricsConfig {}
403impl SourceConfig for InternalMetricsConfig {
404 fn build(&self, _cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
405 Err(anyhow::anyhow!(
406 "Internal metrics build must be implemented in the data plane"
407 ))
408 }
409}
410
411#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq, ComponentDoc)]
413#[component(type = "source", name = "internal_logs")]
414pub struct InternalLogsConfig {}
415impl SourceConfig for InternalLogsConfig {
416 fn build(&self, _cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
417 anyhow::bail!("Internal logs source requires log_rx handle")
418 }
419}
420
421#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
423#[component(type = "source", name = "opentelemetry")]
424pub struct OtelSourceConfigWrapped {
425 pub id: String,
427 #[serde(default)]
429 pub decoding: Option<DecoderConfigEnum>,
430 pub config: OtelServerConfig,
432}
433
434impl SourceConfig for OtelSourceConfigWrapped {
435 fn build(&self, _cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
436 Err(anyhow::anyhow!(
437 "OTel source build must be implemented in the data plane"
438 ))
439 }
440}
441
442#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq, Eq, FieldDoc)]
444#[serde(rename_all = "snake_case")]
445pub enum ErrorPolicy {
446 #[default]
448 DropOnError,
449 RerouteOnError,
451 HaltOnError,
453}
454
455#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
459#[serde(tag = "type", rename_all = "snake_case")]
460pub enum TransformConfigEnum {
461 Filter(FilterConfig),
462 OtelAggregate(OtelAggregateConfigWrapped),
463 Aggregate(AggregateConfigWrapped),
464 Sample(SamplingConfigWrapped),
465 SqlMap(SqlMapConfigWrapped),
466 Map(MapTransformConfigWrapped),
467}
468
469impl TransformConfig for TransformConfigEnum {
470 fn build(&self, cx: TransformContext) -> anyhow::Result<JoinHandle<()>> {
471 match self {
472 Self::Filter(c) => c.build(cx),
473 Self::OtelAggregate(c) => c.build(cx),
474 Self::Aggregate(c) => c.build(cx),
475 Self::Sample(c) => c.build(cx),
476 Self::SqlMap(c) => c.build(cx),
477 Self::Map(c) => c.build(cx),
478 }
479 }
480}
481
482#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
484#[component(type = "transform", name = "filter")]
485pub struct FilterConfig {
486 #[doc_field(required)]
488 pub inputs: Vec<String>,
489 #[doc_field(required)]
491 pub condition: FilterCondition,
492 #[serde(default)]
494 #[doc_field(default = "drop_on_error")]
495 pub error_policy: ErrorPolicy,
496}
497
498impl TransformConfig for FilterConfig {
499 fn build(&self, _cx: TransformContext) -> anyhow::Result<JoinHandle<()>> {
500 Err(anyhow::anyhow!(
501 "Filter build must be implemented in the data plane"
502 ))
503 }
504}
505
506#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
508#[component(type = "transform", name = "otel_aggregate")]
509pub struct OtelAggregateConfigWrapped {
510 #[doc_field(required)]
512 pub inputs: Vec<String>,
513 #[serde(default)]
515 #[doc_field(default = "drop_on_error")]
516 pub error_policy: ErrorPolicy,
517 #[serde(flatten)]
519 #[doc_field(flatten)]
520 pub config: OtelAggregateConfig,
521}
522
523impl TransformConfig for OtelAggregateConfigWrapped {
524 fn build(&self, _cx: TransformContext) -> anyhow::Result<JoinHandle<()>> {
525 Err(anyhow::anyhow!(
526 "OtelAggregate build must be implemented in the data plane"
527 ))
528 }
529}
530
531#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
533#[component(type = "transform", name = "aggregate")]
534pub struct AggregateConfigWrapped {
535 #[doc_field(required)]
537 pub inputs: Vec<String>,
538 #[serde(default)]
540 #[doc_field(default = "drop_on_error")]
541 pub error_policy: ErrorPolicy,
542 #[serde(flatten)]
544 #[doc_field(flatten)]
545 pub config: AggregateConfig,
546}
547
548impl TransformConfig for AggregateConfigWrapped {
549 fn build(&self, _cx: TransformContext) -> anyhow::Result<JoinHandle<()>> {
550 Err(anyhow::anyhow!(
551 "Aggregate build must be implemented in the data plane"
552 ))
553 }
554}
555
556#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
558pub struct SqlMapConfig {
559 #[doc_field(required, example = "SELECT * FROM payload")]
561 pub query: String,
562}
563
564#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
566#[component(type = "transform", name = "sql_map")]
567pub struct SqlMapConfigWrapped {
568 #[doc_field(required)]
570 pub inputs: Vec<String>,
571 #[serde(default)]
573 #[doc_field(default = "drop_on_error")]
574 pub error_policy: ErrorPolicy,
575 #[serde(flatten)]
577 #[doc_field(flatten)]
578 pub config: SqlMapConfig,
579}
580
581impl TransformConfig for SqlMapConfigWrapped {
582 fn build(&self, _cx: TransformContext) -> anyhow::Result<JoinHandle<()>> {
583 Err(anyhow::anyhow!(
584 "SqlMap build must be implemented in the data plane"
585 ))
586 }
587}
588
589#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
591pub struct MapTransformConfig {
592 #[doc_field(required, example = "artifact://transforms/enrich/3")]
594 pub artifact_id: String,
595 #[serde(default = "default_artifact_version")]
597 #[doc_field(default = "1")]
598 pub artifact_version: u64,
599 #[serde(default = "default_wasm_memory_limit")]
601 #[doc_field(default = "4194304")]
602 pub memory_limit_bytes: usize,
603 #[serde(default = "default_wasm_timeout_ms")]
605 #[doc_field(default = "50")]
606 pub timeout_ms: u64,
607 #[serde(default)]
609 pub archive: ArchiveConfig,
610 #[serde(default)]
612 #[doc_field(example = "{\"default\": \"output_1\", \"anomalies\": \"output_2\"}")]
613 pub outputs: HashMap<String, String>,
614}
615
616fn default_artifact_version() -> u64 {
617 1
618}
619
620fn default_wasm_memory_limit() -> usize {
621 4 * 1024 * 1024 }
623
624fn default_wasm_timeout_ms() -> u64 {
625 50
626}
627
628#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Default, FieldDoc)]
630pub struct ArchiveConfig {
631 #[serde(default)]
633 #[doc_field(default = "false")]
634 pub enabled: bool,
635}
636
637#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
639#[component(type = "transform", name = "map")]
640pub struct MapTransformConfigWrapped {
641 #[doc_field(required)]
643 pub inputs: Vec<String>,
644 #[serde(default)]
646 #[doc_field(default = "drop_on_error")]
647 pub error_policy: ErrorPolicy,
648 #[serde(flatten)]
650 #[doc_field(flatten)]
651 pub config: MapTransformConfig,
652}
653
654impl TransformConfig for MapTransformConfigWrapped {
655 fn build(&self, _cx: TransformContext) -> anyhow::Result<JoinHandle<()>> {
656 Err(anyhow::anyhow!(
657 "Map transform build must be implemented in the data plane"
658 ))
659 }
660}
661
662#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
664pub struct FilterCondition {
665 #[serde(rename = "type")]
667 pub kind: FilterType,
668 pub source: String,
670 pub pattern: String,
672 #[serde(default)]
674 pub log: bool,
675}
676
677#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, FieldDoc)]
679#[serde(rename_all = "snake_case")]
680pub enum FilterType {
681 Drop,
683 Pass,
685}
686
687#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, FieldDoc)]
689#[serde(rename_all = "snake_case")]
690pub enum OtelAggregation {
691 Metrics,
693 Traces,
695}
696
697#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
699pub struct OtelAggregateConfig {
700 #[doc_field(required, example = "metrics")]
702 pub aggregation: OtelAggregation,
703 #[doc_field(required, example = "60s")]
705 pub window_duration: String,
706 #[serde(default)]
708 #[doc_field(example = "prometheus")]
709 pub naming_convention: Option<NamingConvention>,
710 #[serde(default)]
712 #[doc_field(default = "1.0", example = "1.0")]
713 pub otel_schema_version: Option<String>,
714 #[serde(default = "default_memory_limit")]
716 #[doc_field(default = "1GB", example = "8GB")]
717 pub memory_limit: Option<String>,
718 #[serde(default)]
720 #[doc_field(example = "/var/tmp/kinetic")]
721 pub temp_directory: Option<std::path::PathBuf>,
722 #[serde(default)]
724 #[doc_field(example = "{\"errors\": \"error_branch\"}")]
725 pub outputs: HashMap<String, String>,
726}
727
728#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
730pub struct AggregateConfig {
731 #[doc_field(required, example = "SELECT count(*) AS total FROM events")]
733 pub sql: String,
734 #[doc_field(required, example = "60s")]
736 pub window_duration: String,
737 #[serde(default = "default_memory_limit")]
739 #[doc_field(default = "1GB", example = "8GB")]
740 pub memory_limit: Option<String>,
741 #[serde(default)]
743 #[doc_field(example = "/var/tmp/kinetic")]
744 pub temp_directory: Option<std::path::PathBuf>,
745 #[serde(default)]
747 #[doc_field(example = "{\"errors\": \"error_branch\"}")]
748 pub outputs: HashMap<String, String>,
749}
750
751fn default_memory_limit() -> Option<String> {
752 Some("8GB".to_string())
753}
754
755#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, FieldDoc)]
757#[serde(rename_all = "snake_case")]
758pub enum AggregationType {
759 Count,
761 Histogram,
763 Summary,
765 ExponentialHistogram,
767}
768
769#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
773#[serde(tag = "type", rename_all = "snake_case")]
774pub enum SamplingMode {
775 Random {
777 #[doc_field(required, example = "0.1")]
779 rate: f64,
780 #[serde(default)]
782 #[doc_field(default = "entropy", example = "42")]
783 seed: Option<u64>,
784 },
785 Systematic {
787 #[doc_field(required, example = "100")]
789 interval: u64,
790 },
791 Hash {
793 #[doc_field(required, example = "trace_id")]
795 key: String,
796 #[doc_field(required, example = "10")]
798 rate: u8,
799 #[serde(default = "default_missing_key")]
801 #[doc_field(default = "drop", example = "keep")]
802 missing_key_behaviour: String,
803 },
804 Recordbatch {
806 #[doc_field(required, example = "0.1")]
808 rate: f64,
809 #[serde(default)]
811 #[doc_field(default = "entropy", example = "42")]
812 seed: Option<u64>,
813 },
814 Stratified {
816 #[doc_field(required, example = "severity")]
818 field: String,
819 #[doc_field(required, example = "{\"error\": 1.0, \"info\": 0.01}")]
821 strata: std::collections::HashMap<String, f64>,
822 #[doc_field(required, example = "0.1")]
824 default_rate: f64,
825 #[serde(default = "default_missing_key")]
827 #[doc_field(default = "keep", example = "drop")]
828 missing_field_behaviour: String,
829 #[serde(default)]
831 #[doc_field(default = "entropy", example = "42")]
832 seed: Option<u64>,
833 },
834 Adaptive {
836 #[doc_field(required, example = "0.1")]
838 base_rate: f64,
839 #[doc_field(required, example = "0.01")]
841 min_rate: f64,
842 #[doc_field(required, example = "1.0")]
844 max_rate: f64,
845 #[doc_field(required, example = "level")]
847 error_field: String,
848 #[doc_field(required, example = "error")]
850 error_value: String,
851 #[serde(default)]
853 #[doc_field(example = "duration_ms")]
854 latency_field: Option<String>,
855 #[serde(default = "default_latency_threshold")]
857 #[doc_field(default = "2000", example = "500")]
858 latency_threshold_ms: u64,
859 #[serde(default = "default_window_secs")]
861 #[doc_field(default = "60", example = "30")]
862 window_secs: u64,
863 #[serde(default = "default_sensitivity")]
865 #[doc_field(default = "2.0", example = "1.5")]
866 sensitivity: f64,
867 },
868 Reservoir {
870 #[doc_field(required, example = "1000")]
872 size: usize,
873 #[serde(default = "default_flush_interval")]
875 #[doc_field(default = "300", example = "60")]
876 flush_interval_secs: u64,
877 },
878 Tail {
880 #[doc_field(required, example = "trace_id")]
882 group_key: String,
883 #[serde(default = "default_decision_window")]
885 #[doc_field(default = "30", example = "60")]
886 decision_window_secs: u64,
887 #[serde(default = "default_max_groups")]
889 #[doc_field(default = "10000", example = "50000")]
890 max_groups: usize,
891 #[serde(default = "default_tail_decision")]
893 #[doc_field(default = "drop", example = "keep")]
894 default_decision: String,
895 #[doc_field(required)]
897 rules: Vec<TailRuleConfig>,
898 },
899}
900
901fn default_missing_key() -> String {
902 "drop".to_string()
903}
904
905fn default_latency_threshold() -> u64 {
906 2000
907}
908
909fn default_window_secs() -> u64 {
910 60
911}
912
913fn default_sensitivity() -> f64 {
914 2.0
915}
916
917fn default_flush_interval() -> u64 {
918 300
919}
920
921fn default_decision_window() -> u64 {
922 30
923}
924
925fn default_max_groups() -> usize {
926 10_000
927}
928
929fn default_tail_decision() -> String {
930 "drop".to_string()
931}
932
933#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
935pub struct TailRuleConfig {
936 #[doc_field(required, example = "attributes.http.status_code")]
938 pub field: String,
939 #[doc_field(required, example = ">=")]
941 pub operator: String,
942 #[doc_field(required, example = "500")]
944 pub value: String,
945 #[doc_field(required, example = "keep")]
947 pub decision: String,
948}
949
950#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
952#[component(type = "transform", name = "sample")]
953pub struct SamplingConfigWrapped {
954 pub inputs: Vec<String>,
956 #[serde(default)]
958 pub error_policy: ErrorPolicy,
959 #[serde(default)]
961 pub route_all: bool,
962 #[serde(flatten)]
964 #[doc_field(flatten)]
965 pub mode: SamplingMode,
966}
967
968impl TransformConfig for SamplingConfigWrapped {
969 fn build(&self, _cx: TransformContext) -> anyhow::Result<JoinHandle<()>> {
970 Err(anyhow::anyhow!(
971 "Sample build must be implemented in the data plane"
972 ))
973 }
974}
975
976#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, Default, FieldDoc)]
978#[serde(rename_all = "snake_case")]
979pub enum NamingConvention {
980 #[default]
982 Preserve,
983 Standard,
985 Custom,
987}
988
989#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
993#[serde(tag = "type", rename_all = "snake_case")]
994pub enum SinkConfigEnum {
995 Kafka(Box<KafkaProducerConfigWrapped>),
996 S3(Box<S3SinkConfigWrapped>),
997 AwsKinesis(Box<KinesisConfigWrapped>),
998 GcpCloudStorage(Box<GcsSinkConfigWrapped>),
999 GcpPubsub(Box<PubSubSinkConfigWrapped>),
1000 FlightSql(Box<FlightSqlConfigWrapped>),
1001 Dlq(Box<DlqSinkConfigWrapped>),
1002 Iceberg(Box<IcebergSinkConfigWrapped>),
1003 Delta(Box<DeltaSinkConfigWrapped>),
1004 Starrocks(Box<StarRocksSinkConfigWrapped>),
1005}
1006
1007impl SinkConfig for SinkConfigEnum {
1008 fn build(&self, cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1009 match self {
1010 Self::Kafka(c) => c.build(cx),
1011 Self::S3(c) => c.build(cx),
1012 Self::AwsKinesis(c) => SinkConfig::build(c.as_ref(), cx),
1013 Self::GcpCloudStorage(c) => c.build(cx),
1014 Self::GcpPubsub(c) => c.build(cx),
1015 Self::FlightSql(c) => c.build(cx),
1016 Self::Dlq(c) => c.build(cx),
1017 Self::Iceberg(c) => c.build(cx),
1018 Self::Delta(c) => c.build(cx),
1019 Self::Starrocks(c) => c.build(cx),
1020 }
1021 }
1022}
1023
1024#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
1026#[component(type = "sink", name = "iceberg")]
1027pub struct IcebergSinkConfigWrapped {
1028 pub inputs: Vec<String>,
1030 #[serde(default)]
1032 pub error_policy: ErrorPolicy,
1033 #[serde(default)]
1035 pub encoding: Option<EncoderConfigEnum>,
1036 #[serde(default)]
1038 pub buffer: BufferConfig,
1039
1040 pub catalog_endpoint: String,
1042 pub table: String,
1044 pub warehouse: String,
1046
1047 #[serde(default = "default_max_rows")]
1049 pub max_rows: u64,
1050 #[serde(default = "default_max_bytes")]
1052 pub max_bytes: u64,
1053 #[serde(default = "default_max_age_ms")]
1055 pub max_age_ms: u64,
1056}
1057
1058fn default_max_rows() -> u64 {
1059 50_000
1060}
1061fn default_max_bytes() -> u64 {
1062 67_108_864
1063} fn default_max_age_ms() -> u64 {
1065 60_000
1066}
1067
1068impl SinkConfig for IcebergSinkConfigWrapped {
1069 fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1070 Err(anyhow::anyhow!(
1071 "Iceberg build must be implemented in the data plane"
1072 ))
1073 }
1074}
1075
1076#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
1078#[component(type = "sink", name = "delta")]
1079pub struct DeltaSinkConfigWrapped {
1080 pub inputs: Vec<String>,
1082 #[serde(default)]
1084 pub error_policy: ErrorPolicy,
1085 #[serde(default)]
1087 pub encoding: Option<EncoderConfigEnum>,
1088 #[serde(default)]
1090 pub buffer: BufferConfig,
1091
1092 pub warehouse: String,
1094 pub table: String,
1096
1097 #[serde(default = "default_max_rows")]
1099 pub max_rows: u64,
1100 #[serde(default = "default_max_bytes")]
1102 pub max_bytes: u64,
1103 #[serde(default = "default_max_age_ms")]
1105 pub max_age_ms: u64,
1106}
1107
1108impl SinkConfig for DeltaSinkConfigWrapped {
1109 fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1110 Err(anyhow::anyhow!(
1111 "Delta build must be implemented in the data plane"
1112 ))
1113 }
1114}
1115
1116#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
1118#[component(type = "sink", name = "starrocks")]
1119pub struct StarRocksSinkConfigWrapped {
1120 pub inputs: Vec<String>,
1122 #[serde(default)]
1124 pub error_policy: ErrorPolicy,
1125 #[serde(default)]
1127 pub encoding: Option<EncoderConfigEnum>,
1128 #[serde(default)]
1130 pub buffer: BufferConfig,
1131
1132 pub fe_http: String,
1134 pub flight_sql: Option<String>,
1136 #[serde(default = "default_ingestion_mode")]
1138 pub ingestion_mode: String,
1139 pub database: String,
1141 pub table: String,
1143
1144 #[serde(default = "default_max_rows")]
1146 pub max_rows: u64,
1147 #[serde(default = "default_max_bytes")]
1149 pub max_bytes: u64,
1150 #[serde(default = "default_max_age_ms")]
1152 pub max_age_ms: u64,
1153}
1154
1155fn default_ingestion_mode() -> String {
1156 "auto".to_string()
1157}
1158
1159impl SinkConfig for StarRocksSinkConfigWrapped {
1160 fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1161 Err(anyhow::anyhow!(
1162 "StarRocks build must be implemented in the data plane"
1163 ))
1164 }
1165}
1166
1167#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
1169#[serde(tag = "type", rename_all = "snake_case")]
1170pub enum BufferConfig {
1171 Memory {
1173 #[serde(default = "default_buffer_capacity")]
1175 #[doc_field(default = "100")]
1176 max_batches: usize,
1177 },
1178 Disk {
1180 path: std::path::PathBuf,
1182 #[serde(default)]
1184 #[doc_field(example = "1073741824")]
1185 max_size_bytes: Option<u64>,
1186 },
1187}
1188
1189impl Default for BufferConfig {
1190 fn default() -> Self {
1191 Self::Memory {
1192 max_batches: default_buffer_capacity(),
1193 }
1194 }
1195}
1196
1197fn default_buffer_capacity() -> usize {
1198 100
1199}
1200
1201#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
1203#[component(type = "sink", name = "kafka")]
1204pub struct KafkaProducerConfigWrapped {
1205 #[doc_field(required, example = "[\"my_transform\"]")]
1207 pub inputs: Vec<String>,
1208 #[serde(default)]
1210 #[doc_field(default = "drop_on_error")]
1211 pub error_policy: ErrorPolicy,
1212 #[serde(default)]
1214 pub encoding: Option<EncoderConfigEnum>,
1215 #[serde(default)]
1217 pub buffer: BufferConfig,
1218 #[serde(flatten)]
1220 #[doc_field(flatten)]
1221 pub config: KafkaProducerConfig,
1222}
1223
1224impl SinkConfig for KafkaProducerConfigWrapped {
1225 fn build(&self, cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1226 self.config.build(cx)
1227 }
1228}
1229
1230#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
1232#[component(type = "sink", name = "s3")]
1233pub struct S3SinkConfigWrapped {
1234 #[doc_field(required, example = "[\"my_transform\"]")]
1236 pub inputs: Vec<String>,
1237 #[serde(default)]
1239 #[doc_field(default = "drop_on_error")]
1240 pub error_policy: ErrorPolicy,
1241 #[serde(default)]
1243 pub encoding: Option<EncoderConfigEnum>,
1244 #[serde(default)]
1246 pub buffer: BufferConfig,
1247 #[serde(flatten)]
1249 #[doc_field(flatten)]
1250 pub config: S3SinkConfig,
1251}
1252
1253impl SinkConfig for S3SinkConfigWrapped {
1254 fn build(&self, cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1255 self.config.build(cx)
1256 }
1257}
1258
1259#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
1261#[component(type = "sink", name = "gcp_cloud_storage")]
1262pub struct GcsSinkConfigWrapped {
1263 #[doc_field(required, example = "[\"my_transform\"]")]
1265 pub inputs: Vec<String>,
1266 #[serde(default)]
1268 #[doc_field(default = "drop_on_error")]
1269 pub error_policy: ErrorPolicy,
1270 #[serde(default)]
1272 pub encoding: Option<EncoderConfigEnum>,
1273 #[serde(default)]
1275 pub buffer: BufferConfig,
1276 #[serde(flatten)]
1278 #[doc_field(flatten)]
1279 pub config: GcsSinkConfig,
1280}
1281
1282impl SinkConfig for GcsSinkConfigWrapped {
1283 fn build(&self, cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1284 self.config.build(cx)
1285 }
1286}
1287
1288#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
1290#[component(type = "sink", name = "flight_sql")]
1291pub struct FlightSqlConfigWrapped {
1292 #[doc_field(required, example = "[\"my_transform\"]")]
1294 pub inputs: Vec<String>,
1295 #[serde(default)]
1297 #[doc_field(default = "drop_on_error")]
1298 pub error_policy: ErrorPolicy,
1299 #[serde(default)]
1301 pub encoding: Option<EncoderConfigEnum>,
1302 #[serde(default)]
1304 pub buffer: BufferConfig,
1305 #[serde(flatten)]
1307 #[doc_field(flatten)]
1308 pub config: FlightClientConfig,
1309}
1310
1311impl SinkConfig for FlightSqlConfigWrapped {
1312 fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1313 Err(anyhow::anyhow!(
1314 "FlightSql build must be implemented in the data plane"
1315 ))
1316 }
1317}
1318
1319#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
1321#[component(type = "sink", name = "gcp_pubsub")]
1322pub struct PubSubSinkConfigWrapped {
1323 #[doc_field(required, example = "[\"my_transform\"]")]
1325 pub inputs: Vec<String>,
1326 #[serde(default)]
1328 #[doc_field(default = "drop_on_error")]
1329 pub error_policy: ErrorPolicy,
1330 #[serde(default)]
1332 pub encoding: Option<EncoderConfigEnum>,
1333 #[serde(default)]
1335 pub buffer: BufferConfig,
1336 #[serde(flatten)]
1338 #[doc_field(flatten)]
1339 pub config: PubSubSinkConfig,
1340}
1341
1342impl SinkConfig for PubSubSinkConfigWrapped {
1343 fn build(&self, cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1344 self.config.build(cx)
1345 }
1346}
1347
1348#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
1350#[component(type = "sink", name = "dlq")]
1351pub struct DlqSinkConfigWrapped {
1352 #[doc_field(required, example = "[\"my_source\"]")]
1354 pub inputs: Vec<String>,
1355 #[serde(default)]
1357 #[doc_field(default = "drop_on_error")]
1358 pub error_policy: ErrorPolicy,
1359 #[serde(default)]
1361 pub encoding: Option<EncoderConfigEnum>,
1362 #[serde(default)]
1364 pub buffer: BufferConfig,
1365 #[serde(flatten)]
1367 #[doc_field(flatten)]
1368 pub config: DlqSinkConfig,
1369}
1370
1371#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
1373pub struct DlqSinkConfig {
1374 #[doc_field(required, example = "my-kinetic-dlq")]
1376 pub bucket: String,
1377 #[doc_field(required, example = "us-east-1")]
1379 pub region: String,
1380 #[doc_field(required, example = "dlq/alerts/")]
1382 pub prefix: String,
1383 #[serde(default = "default_dlq_batch_size")]
1385 #[doc_field(default = "20")]
1386 pub batch_size: usize,
1387 #[serde(default = "default_dlq_timeout")]
1389 #[doc_field(default = "1m")]
1390 pub timeout: String,
1391 #[serde(default = "default_dlq_encoding")]
1393 #[doc_field(default = "json")]
1394 pub encoding: String,
1395 #[serde(default = "default_dlq_retry_delay")]
1397 #[doc_field(default = "5s")]
1398 pub retry_delay: String,
1399 #[serde(default = "default_dlq_retry_max_delay")]
1401 #[doc_field(default = "300s")]
1402 pub retry_max_delay: String,
1403 #[serde(default = "default_dlq_retry_max_attempts")]
1405 #[doc_field(default = "5")]
1406 pub retry_max_attempts: usize,
1407 #[serde(default = "default_dlq_max_pending_writes")]
1409 #[doc_field(default = "1024")]
1410 pub max_pending_writes: usize,
1411}
1412
1413fn default_dlq_batch_size() -> usize {
1414 20
1415}
1416
1417fn default_dlq_timeout() -> String {
1418 "1m".to_string()
1419}
1420
1421fn default_dlq_encoding() -> String {
1422 "json".to_string()
1423}
1424
1425fn default_dlq_retry_delay() -> String {
1426 "5s".to_string()
1427}
1428
1429fn default_dlq_retry_max_delay() -> String {
1430 "300s".to_string()
1431}
1432
1433fn default_dlq_retry_max_attempts() -> usize {
1434 5
1435}
1436
1437fn default_dlq_max_pending_writes() -> usize {
1438 1024
1439}
1440
1441impl SinkConfig for DlqSinkConfigWrapped {
1442 fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1443 Err(anyhow::anyhow!(
1444 "DLQ sink build must be implemented in the data plane"
1445 ))
1446 }
1447}
1448
1449static EMPTY_OUTPUTS: LazyLock<HashMap<String, String>> = LazyLock::new(HashMap::new);
1450
1451impl TransformConfigEnum {
1452 pub fn inputs(&self) -> &[String] {
1453 match self {
1454 Self::Filter(c) => &c.inputs,
1455 Self::OtelAggregate(c) => &c.inputs,
1456 Self::Aggregate(c) => &c.inputs,
1457 Self::SqlMap(c) => &c.inputs,
1458 Self::Sample(c) => &c.inputs,
1459 Self::Map(c) => &c.inputs,
1460 }
1461 }
1462
1463 pub fn error_policy(&self) -> &ErrorPolicy {
1464 match self {
1465 Self::Filter(c) => &c.error_policy,
1466 Self::OtelAggregate(c) => &c.error_policy,
1467 Self::Aggregate(c) => &c.error_policy,
1468 Self::SqlMap(c) => &c.error_policy,
1469 Self::Sample(c) => &c.error_policy,
1470 Self::Map(c) => &c.error_policy,
1471 }
1472 }
1473
1474 pub fn outputs(&self) -> &HashMap<String, String> {
1475 match self {
1476 Self::OtelAggregate(c) => &c.config.outputs,
1477 Self::Aggregate(c) => &c.config.outputs,
1478 Self::Map(c) => &c.config.outputs,
1479 _ => &EMPTY_OUTPUTS,
1480 }
1481 }
1482}
1483
1484impl SinkConfigEnum {
1485 pub fn inputs(&self) -> &[String] {
1486 match self {
1487 Self::Kafka(c) => &c.inputs,
1488 Self::S3(c) => &c.inputs,
1489 Self::AwsKinesis(c) => &c.inputs,
1490 Self::GcpCloudStorage(c) => &c.inputs,
1491 Self::GcpPubsub(c) => &c.inputs,
1492 Self::FlightSql(c) => &c.inputs,
1493 Self::Dlq(c) => &c.inputs,
1494 Self::Iceberg(c) => &c.inputs,
1495 Self::Delta(c) => &c.inputs,
1496 Self::Starrocks(c) => &c.inputs,
1497 }
1498 }
1499
1500 pub fn error_policy(&self) -> &ErrorPolicy {
1501 match self {
1502 Self::Kafka(c) => &c.error_policy,
1503 Self::S3(c) => &c.error_policy,
1504 Self::AwsKinesis(c) => &c.error_policy,
1505 Self::GcpCloudStorage(c) => &c.error_policy,
1506 Self::GcpPubsub(c) => &c.error_policy,
1507 Self::FlightSql(c) => &c.error_policy,
1508 Self::Dlq(c) => &c.error_policy,
1509 Self::Iceberg(c) => &c.error_policy,
1510 Self::Delta(c) => &c.error_policy,
1511 Self::Starrocks(c) => &c.error_policy,
1512 }
1513 }
1514
1515 pub fn buffer(&self) -> &BufferConfig {
1516 match self {
1517 Self::Kafka(c) => &c.buffer,
1518 Self::S3(c) => &c.buffer,
1519 Self::AwsKinesis(c) => &c.buffer,
1520 Self::GcpCloudStorage(c) => &c.buffer,
1521 Self::GcpPubsub(c) => &c.buffer,
1522 Self::FlightSql(c) => &c.buffer,
1523 Self::Dlq(c) => &c.buffer,
1524 Self::Iceberg(c) => &c.buffer,
1525 Self::Delta(c) => &c.buffer,
1526 Self::Starrocks(c) => &c.buffer,
1527 }
1528 }
1529}