Skip to main content

kinetic_config/
model.rs

1//! Configuration models for Kinetic pipelines.
2
3use arrow_schema::Schema;
4use aws_common::config::{KinesisConfig, S3SinkConfig, S3SourceConfig, SqsConfig};
5use flight_common::config::FlightClientConfig;
6use gcp_common::config::{GcsSinkConfig, GcsSourceConfig, PubSubSinkConfig, PubSubSourceConfig};
7use kafka_common::{KafkaConsumerConfig, KafkaProducerConfig};
8use kinetic_common::config::{
9    SinkConfig, SinkContext, SourceConfig, SourceContext, TransformConfig, TransformContext,
10};
11use kinetic_core::encode::{
12    Decoder, DecoderConfig, Encoder, EncoderConfig, Result as EncodeResult,
13};
14use kinetic_doc_derive::{ComponentDoc, EnumDoc, FieldDoc};
15use kinetic_encoder_arrow::{ArrowIpcDecoderOptions, ArrowIpcEncoderOptions};
16use kinetic_encoder_avro::{AvroDecoderOptions, AvroEncoderOptions};
17use kinetic_encoder_json::{JsonDecoderOptions, JsonEncoderOptions};
18use kinetic_encoder_ocsf::{OcsfDecoderConfig, OcsfEncoderConfig};
19use otel_common::config::OtelServerConfig;
20use kinetic_encoder_parquet::{ParquetDecoderOptions, ParquetEncoderOptions};
21use kinetic_encoder_protobuf::{ProtobufDecoderOptions, ProtobufEncoderOptions};
22use serde::{Deserialize, Serialize};
23use std::collections::{HashMap, HashSet};
24use std::sync::Arc;
25use std::sync::LazyLock;
26use tokio::task::JoinHandle;
27
28/// Available encoder codecs for output serialization.
29#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, EnumDoc)]
30#[serde(tag = "codec", rename_all = "snake_case")]
31pub enum EncoderConfigEnum {
32    Json(JsonEncoderOptions),
33    Arrow(ArrowIpcEncoderOptions),
34    Ocsf(OcsfEncoderConfig),
35    Avro(AvroEncoderOptions),
36    Parquet(ParquetEncoderOptions),
37    Protobuf(ProtobufEncoderOptions),
38}
39
40impl EncoderConfig for EncoderConfigEnum {
41    fn build(&self) -> EncodeResult<Arc<dyn Encoder>> {
42        match self {
43            Self::Json(c) => c.build(),
44            Self::Arrow(c) => c.build(),
45            Self::Ocsf(c) => c.build(),
46            Self::Avro(c) => c.build(),
47            Self::Parquet(c) => c.build(),
48            Self::Protobuf(c) => c.build(),
49        }
50    }
51}
52
53/// Available decoder codecs for input deserialization.
54#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, EnumDoc)]
55#[serde(tag = "codec", rename_all = "snake_case")]
56pub enum DecoderConfigEnum {
57    Json(JsonDecoderOptions),
58    Arrow(ArrowIpcDecoderOptions),
59    Ocsf(OcsfDecoderConfig),
60    Avro(AvroDecoderOptions),
61    Parquet(ParquetDecoderOptions),
62    Protobuf(ProtobufDecoderOptions),
63}
64
65impl DecoderConfig for DecoderConfigEnum {
66    fn build(&self, schema: Arc<Schema>) -> EncodeResult<Arc<dyn Decoder>> {
67        match self {
68            Self::Json(c) => c.build(schema),
69            Self::Arrow(c) => c.build(schema),
70            Self::Ocsf(c) => c.build(schema),
71            Self::Avro(c) => c.build(schema),
72            Self::Parquet(c) => c.build(schema),
73            Self::Protobuf(c) => c.build(schema),
74        }
75    }
76}
77
78/// Root configuration for a Kinetic pipeline.
79///
80/// Defines all sources, transforms, and sinks in the pipeline topology,
81/// along with global settings like the pipeline identifier.
82#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq)]
83pub struct Config {
84    /// Unique identifier for this pipeline instance.
85    #[serde(default = "default_pipeline_id")]
86    pub pipeline_id: String,
87
88    /// Optional archival storage configuration for the automated shadow sink.
89    #[serde(default)]
90    pub archive: Option<ArchiveStorageConfig>,
91
92    /// Map of source names to their configurations.
93    #[serde(default)]
94    pub sources: HashMap<String, SourceConfigEnum>,
95
96    /// Map of transform names to their configurations.
97    #[serde(default)]
98    pub transforms: HashMap<String, TransformConfigEnum>,
99
100    /// Map of sink names to their configurations.
101    #[serde(default)]
102    pub sinks: HashMap<String, SinkConfigEnum>,
103}
104
105/// Configuration for the automated source-level archival storage.
106#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
107pub struct ArchiveStorageConfig {
108    /// Archival storage endpoint (S3 URI or local path).
109    #[doc_field(required, example = "s3://my-archive-bucket/")]
110    pub endpoint: String,
111    /// Optional AWS authentication configuration.
112    pub auth: Option<aws_common::config::AwsConfig>,
113    /// Predefined file layout strategy (e.g., 'hive-hourly', 'hive-daily').
114    #[serde(default = "default_archive_layout")]
115    #[doc_field(default = "hive-hourly")]
116    pub layout: String,
117    /// Data retention period in hours.
118    #[serde(default = "default_archive_ttl")]
119    #[doc_field(default = "72")]
120    pub ttl_hours: u64,
121}
122
123fn default_archive_layout() -> String {
124    "hive-hourly".to_string()
125}
126
127fn default_archive_ttl() -> u64 {
128    72
129}
130
131fn default_pipeline_id() -> String {
132    "default".to_string()
133}
134
135impl Config {
136    /// Validates the configuration.
137    pub fn validate(&self) -> anyhow::Result<()> {
138        if self.sources.is_empty() {
139            anyhow::bail!("Configuration must have at least one source.");
140        }
141        if self.sinks.is_empty() {
142            anyhow::bail!("Configuration must have at least one sink.");
143        }
144        Ok(())
145    }
146
147    pub fn diff(&self, other: &Self) -> ConfigDiff {
148        let mut diff = ConfigDiff::default();
149
150        // Sources
151        for (name, source) in &self.sources {
152            if let Some(other_source) = other.sources.get(name) {
153                if source != other_source {
154                    diff.sources_changed.insert(name.clone());
155                }
156            } else {
157                diff.sources_removed.insert(name.clone());
158            }
159        }
160        for name in other.sources.keys() {
161            if !self.sources.contains_key(name) {
162                diff.sources_added.insert(name.clone());
163            }
164        }
165
166        // Transforms
167        for (name, transform) in &self.transforms {
168            if let Some(other_transform) = other.transforms.get(name) {
169                if transform != other_transform {
170                    diff.transforms_changed.insert(name.clone());
171                }
172            } else {
173                diff.transforms_removed.insert(name.clone());
174            }
175        }
176        for name in other.transforms.keys() {
177            if !self.transforms.contains_key(name) {
178                diff.transforms_added.insert(name.clone());
179            }
180        }
181
182        // Sinks
183        for (name, sink) in &self.sinks {
184            if let Some(other_sink) = other.sinks.get(name) {
185                if sink != other_sink {
186                    diff.sinks_changed.insert(name.clone());
187                }
188            } else {
189                diff.sinks_removed.insert(name.clone());
190            }
191        }
192        for name in other.sinks.keys() {
193            if !self.sources.contains_key(name) {
194                diff.sinks_added.insert(name.clone());
195            }
196        }
197
198        diff
199    }
200}
201
202/// Difference between two pipeline configurations.
203///
204/// Tracks which sources, transforms, and sinks were added, removed, or changed
205/// between configuration reloads. Used for hot-reload validation.
206#[derive(Debug, Default)]
207pub struct ConfigDiff {
208    pub sources_added: HashSet<String>,
209    pub sources_removed: HashSet<String>,
210    pub sources_changed: HashSet<String>,
211
212    pub transforms_added: HashSet<String>,
213    pub transforms_removed: HashSet<String>,
214    pub transforms_changed: HashSet<String>,
215
216    pub sinks_added: HashSet<String>,
217    pub sinks_removed: HashSet<String>,
218    pub sinks_changed: HashSet<String>,
219}
220
221impl ConfigDiff {
222    pub fn is_empty(&self) -> bool {
223        self.sources_added.is_empty()
224            && self.sources_removed.is_empty()
225            && self.sources_changed.is_empty()
226            && self.transforms_added.is_empty()
227            && self.transforms_removed.is_empty()
228            && self.transforms_changed.is_empty()
229            && self.sinks_added.is_empty()
230            && self.sinks_removed.is_empty()
231            && self.sinks_changed.is_empty()
232    }
233}
234
235/// Discriminated union of all source component types.
236///
237/// The `type` field in YAML selects the variant (e.g., `type: kafka`).
238#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
239#[serde(tag = "type", rename_all = "snake_case")]
240pub enum SourceConfigEnum {
241    Kafka(Box<KafkaConsumerConfigWrapped>),
242    AwsS3(Box<S3SourceConfigWrapped>),
243    AwsSqs(Box<SqsConfigWrapped>),
244    AwsKinesis(Box<KinesisConfigWrapped>),
245    GcpCloudStorage(Box<GcsSourceConfigWrapped>),
246    GcpPubsub(Box<PubSubSourceConfigWrapped>),
247    InternalMetrics(InternalMetricsConfig),
248    InternalLogs(InternalLogsConfig),
249    Opentelemetry(Box<OtelSourceConfigWrapped>),
250}
251
252impl SourceConfig for SourceConfigEnum {
253    fn build(&self, cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
254        match self {
255            Self::Kafka(c) => c.build(cx),
256            Self::AwsS3(c) => c.build(cx),
257            Self::AwsSqs(c) => SourceConfig::build(c.as_ref(), cx),
258            Self::AwsKinesis(c) => SourceConfig::build(c.as_ref(), cx),
259            Self::GcpCloudStorage(c) => c.build(cx),
260            Self::GcpPubsub(c) => c.build(cx),
261            Self::InternalMetrics(c) => c.build(cx),
262            Self::InternalLogs(c) => c.build(cx),
263            Self::Opentelemetry(c) => c.build(cx),
264        }
265    }
266}
267
268/// Reads events from one or more Apache Kafka topics using a consumer group.
269#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
270#[component(type = "source", name = "kafka")]
271pub struct KafkaConsumerConfigWrapped {
272    /// Optional decoder configuration for input events.
273    #[serde(default)]
274    pub decoding: Option<DecoderConfigEnum>,
275    /// Inner Kafka-specific consumer configuration.
276    #[serde(flatten)]
277    #[doc_field(flatten)]
278    pub config: KafkaConsumerConfig,
279}
280
281impl SourceConfig for KafkaConsumerConfigWrapped {
282    fn build(&self, cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
283        self.config.build(cx)
284    }
285}
286
287/// Reads events from Amazon S3 objects. Supports both list-based polling and SQS event notifications.
288#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
289#[component(type = "source", name = "aws_s3")]
290pub struct S3SourceConfigWrapped {
291    /// Optional decoder configuration for input events.
292    #[serde(default)]
293    pub decoding: Option<DecoderConfigEnum>,
294    /// Inner S3-specific source configuration.
295    #[serde(flatten)]
296    #[doc_field(flatten)]
297    pub config: S3SourceConfig,
298}
299
300impl SourceConfig for S3SourceConfigWrapped {
301    fn build(&self, cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
302        self.config.build(cx)
303    }
304}
305
306/// Amazon SQS (Simple Queue Service) source configuration.
307#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
308#[component(type = "source", name = "aws_sqs")]
309pub struct SqsConfigWrapped {
310    /// Optional decoder configuration for input events.
311    #[serde(default)]
312    pub decoding: Option<DecoderConfigEnum>,
313    /// Inner SQS-specific source configuration.
314    #[serde(flatten)]
315    #[doc_field(flatten)]
316    pub config: SqsConfig,
317}
318
319impl SourceConfig for SqsConfigWrapped {
320    fn build(&self, cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
321        SourceConfig::build(&self.config, cx)
322    }
323}
324
325/// Amazon Kinesis stream configuration.
326#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
327#[component(type = "source", name = "aws_kinesis")]
328pub struct KinesisConfigWrapped {
329    /// Upstream component names that feed events into this sink (when used as sink).
330    #[serde(default)]
331    #[doc_field(example = "[\"my_transform\"]")]
332    pub inputs: Vec<String>,
333    /// Policy for handling errors during event processing (when used as sink).
334    #[serde(default)]
335    #[doc_field(default = "drop_on_error")]
336    pub error_policy: ErrorPolicy,
337    /// Optional decoder/encoder configuration for input/output events.
338    #[serde(default)]
339    pub decoding: Option<DecoderConfigEnum>,
340    /// Buffer configuration for event batching (when used as sink).
341    #[serde(default)]
342    pub buffer: BufferConfig,
343    /// Inner Kinesis-specific source configuration.
344    #[serde(flatten)]
345    #[doc_field(flatten)]
346    pub config: KinesisConfig,
347}
348
349impl SourceConfig for KinesisConfigWrapped {
350    fn build(&self, cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
351        SourceConfig::build(&self.config, cx)
352    }
353}
354
355impl SinkConfig for KinesisConfigWrapped {
356    fn build(&self, cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
357        SinkConfig::build(&self.config, cx)
358    }
359}
360
361/// Reads events from Google Cloud Storage objects. Supports both list-based polling and Pub/Sub notifications.
362#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
363#[component(type = "source", name = "gcp_cloud_storage")]
364pub struct GcsSourceConfigWrapped {
365    /// Optional decoder configuration for input events.
366    #[serde(default)]
367    pub decoding: Option<DecoderConfigEnum>,
368    /// Inner GCS-specific source configuration.
369    #[serde(flatten)]
370    #[doc_field(flatten)]
371    pub config: GcsSourceConfig,
372}
373
374impl SourceConfig for GcsSourceConfigWrapped {
375    fn build(&self, cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
376        self.config.build(cx)
377    }
378}
379
380/// Reads events from a Google Cloud Pub/Sub subscription.
381#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
382#[component(type = "source", name = "gcp_pubsub")]
383pub struct PubSubSourceConfigWrapped {
384    /// Optional decoder configuration for input events.
385    #[serde(default)]
386    pub decoding: Option<DecoderConfigEnum>,
387    /// Inner Pub/Sub-specific source configuration.
388    #[serde(flatten)]
389    #[doc_field(flatten)]
390    pub config: PubSubSourceConfig,
391}
392
393impl SourceConfig for PubSubSourceConfigWrapped {
394    fn build(&self, cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
395        self.config.build(cx)
396    }
397}
398
399/// Exposes Kinetic's own internal metrics as a source.
400#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq, ComponentDoc)]
401#[component(type = "source", name = "internal_metrics")]
402pub struct InternalMetricsConfig {}
403impl SourceConfig for InternalMetricsConfig {
404    fn build(&self, _cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
405        Err(anyhow::anyhow!(
406            "Internal metrics build must be implemented in the data plane"
407        ))
408    }
409}
410
411/// Exposes Kinetic's own internal log events as a source.
412#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq, ComponentDoc)]
413#[component(type = "source", name = "internal_logs")]
414pub struct InternalLogsConfig {}
415impl SourceConfig for InternalLogsConfig {
416    fn build(&self, _cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
417        anyhow::bail!("Internal logs source requires log_rx handle")
418    }
419}
420
421/// Receives logs, metrics, and traces via OpenTelemetry gRPC/HTTP.
422#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
423#[component(type = "source", name = "opentelemetry")]
424pub struct OtelSourceConfigWrapped {
425    /// Unique ID for this component within the pipeline.
426    pub id: String,
427    /// Optional decoder configuration for input events (usually not used for OTLP).
428    #[serde(default)]
429    pub decoding: Option<DecoderConfigEnum>,
430    /// OpenTelemetry server specific configuration.
431    pub config: OtelServerConfig,
432}
433
434impl SourceConfig for OtelSourceConfigWrapped {
435    fn build(&self, _cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
436        Err(anyhow::anyhow!(
437            "OTel source build must be implemented in the data plane"
438        ))
439    }
440}
441
442/// Policy for handling errors during event processing.
443#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq, Eq, FieldDoc)]
444#[serde(rename_all = "snake_case")]
445pub enum ErrorPolicy {
446    /// Drop the event and continue processing (default).
447    #[default]
448    DropOnError,
449    /// Route the event to the component's error output instead of dropping.
450    RerouteOnError,
451    /// Stop the pipeline immediately on the first error.
452    HaltOnError,
453}
454
455/// Discriminated union of all transform component types.
456///
457/// The `type` field in YAML selects the variant (e.g., `type: filter`).
458#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
459#[serde(tag = "type", rename_all = "snake_case")]
460pub enum TransformConfigEnum {
461    Filter(FilterConfig),
462    OtelAggregate(OtelAggregateConfigWrapped),
463    Aggregate(AggregateConfigWrapped),
464    Sample(SamplingConfigWrapped),
465    SqlMap(SqlMapConfigWrapped),
466    Map(MapTransformConfigWrapped),
467}
468
469impl TransformConfig for TransformConfigEnum {
470    fn build(&self, cx: TransformContext) -> anyhow::Result<JoinHandle<()>> {
471        match self {
472            Self::Filter(c) => c.build(cx),
473            Self::OtelAggregate(c) => c.build(cx),
474            Self::Aggregate(c) => c.build(cx),
475            Self::Sample(c) => c.build(cx),
476            Self::SqlMap(c) => c.build(cx),
477            Self::Map(c) => c.build(cx),
478        }
479    }
480}
481
482/// Filters events based on a condition, either passing or dropping matches.
483#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
484#[component(type = "transform", name = "filter")]
485pub struct FilterConfig {
486    /// List of upstream component names to receive events from.
487    #[doc_field(required)]
488    pub inputs: Vec<String>,
489    /// The filter condition to evaluate against each event.
490    #[doc_field(required)]
491    pub condition: FilterCondition,
492    /// Policy for handling events that cause errors during filtering.
493    #[serde(default)]
494    #[doc_field(default = "drop_on_error")]
495    pub error_policy: ErrorPolicy,
496}
497
498impl TransformConfig for FilterConfig {
499    fn build(&self, _cx: TransformContext) -> anyhow::Result<JoinHandle<()>> {
500        Err(anyhow::anyhow!(
501            "Filter build must be implemented in the data plane"
502        ))
503    }
504}
505
506/// Wrapper for OTel-aware aggregate transform with common pipeline fields.
507#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
508#[component(type = "transform", name = "otel_aggregate")]
509pub struct OtelAggregateConfigWrapped {
510    /// List of upstream component names to receive events from.
511    #[doc_field(required)]
512    pub inputs: Vec<String>,
513    /// Policy for handling events that cause errors during aggregation.
514    #[serde(default)]
515    #[doc_field(default = "drop_on_error")]
516    pub error_policy: ErrorPolicy,
517    /// OTel aggregation-specific configuration.
518    #[serde(flatten)]
519    #[doc_field(flatten)]
520    pub config: OtelAggregateConfig,
521}
522
523impl TransformConfig for OtelAggregateConfigWrapped {
524    fn build(&self, _cx: TransformContext) -> anyhow::Result<JoinHandle<()>> {
525        Err(anyhow::anyhow!(
526            "OtelAggregate build must be implemented in the data plane"
527        ))
528    }
529}
530
531/// Wrapper for generic SQL aggregate transform with common pipeline fields.
532#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
533#[component(type = "transform", name = "aggregate")]
534pub struct AggregateConfigWrapped {
535    /// List of upstream component names to receive events from.
536    #[doc_field(required)]
537    pub inputs: Vec<String>,
538    /// Policy for handling events that cause errors during aggregation.
539    #[serde(default)]
540    #[doc_field(default = "drop_on_error")]
541    pub error_policy: ErrorPolicy,
542    /// SQL aggregation-specific configuration.
543    #[serde(flatten)]
544    #[doc_field(flatten)]
545    pub config: AggregateConfig,
546}
547
548impl TransformConfig for AggregateConfigWrapped {
549    fn build(&self, _cx: TransformContext) -> anyhow::Result<JoinHandle<()>> {
550        Err(anyhow::anyhow!(
551            "Aggregate build must be implemented in the data plane"
552        ))
553    }
554}
555
556/// Configuration for generic SQL map transform
557#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
558pub struct SqlMapConfig {
559    /// SQL query to execute against the incoming payload.
560    #[doc_field(required, example = "SELECT * FROM payload")]
561    pub query: String,
562}
563
564/// Wrapper for SQL map transform with common pipeline fields.
565#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
566#[component(type = "transform", name = "sql_map")]
567pub struct SqlMapConfigWrapped {
568    /// List of upstream component names to receive events from.
569    #[doc_field(required)]
570    pub inputs: Vec<String>,
571    /// Policy for handling events that cause errors during transformation.
572    #[serde(default)]
573    #[doc_field(default = "drop_on_error")]
574    pub error_policy: ErrorPolicy,
575    /// SQL mapping configuration.
576    #[serde(flatten)]
577    #[doc_field(flatten)]
578    pub config: SqlMapConfig,
579}
580
581impl TransformConfig for SqlMapConfigWrapped {
582    fn build(&self, _cx: TransformContext) -> anyhow::Result<JoinHandle<()>> {
583        Err(anyhow::anyhow!(
584            "SqlMap build must be implemented in the data plane"
585        ))
586    }
587}
588
589/// Configuration for WASM-based map transform.
590#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
591pub struct MapTransformConfig {
592    /// WASM module artifact identifier from Nexus.
593    #[doc_field(required, example = "artifact://transforms/enrich/3")]
594    pub artifact_id: String,
595    /// Artifact version to use.
596    #[serde(default = "default_artifact_version")]
597    #[doc_field(default = "1")]
598    pub artifact_version: u64,
599    /// Memory limit for WASM execution in bytes.
600    #[serde(default = "default_wasm_memory_limit")]
601    #[doc_field(default = "4194304")]
602    pub memory_limit_bytes: usize,
603    /// Timeout per batch in milliseconds.
604    #[serde(default = "default_wasm_timeout_ms")]
605    #[doc_field(default = "50")]
606    pub timeout_ms: u64,
607    /// Archive raw input before transform.
608    #[serde(default)]
609    pub archive: ArchiveConfig,
610    /// Named outputs for routing.
611    #[serde(default)]
612    #[doc_field(example = "{\"default\": \"output_1\", \"anomalies\": \"output_2\"}")]
613    pub outputs: HashMap<String, String>,
614}
615
616fn default_artifact_version() -> u64 {
617    1
618}
619
620fn default_wasm_memory_limit() -> usize {
621    4 * 1024 * 1024 // 4MB
622}
623
624fn default_wasm_timeout_ms() -> u64 {
625    50
626}
627
628/// Archive configuration for preserving raw input.
629#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Default, FieldDoc)]
630pub struct ArchiveConfig {
631    /// Enable archiving of raw input.
632    #[serde(default)]
633    #[doc_field(default = "false")]
634    pub enabled: bool,
635}
636
637/// Wrapper for WASM map transform with common pipeline fields.
638#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
639#[component(type = "transform", name = "map")]
640pub struct MapTransformConfigWrapped {
641    /// List of upstream component names to receive events from.
642    #[doc_field(required)]
643    pub inputs: Vec<String>,
644    /// Policy for handling events that cause errors during transformation.
645    #[serde(default)]
646    #[doc_field(default = "drop_on_error")]
647    pub error_policy: ErrorPolicy,
648    /// WASM transform configuration.
649    #[serde(flatten)]
650    #[doc_field(flatten)]
651    pub config: MapTransformConfig,
652}
653
654impl TransformConfig for MapTransformConfigWrapped {
655    fn build(&self, _cx: TransformContext) -> anyhow::Result<JoinHandle<()>> {
656        Err(anyhow::anyhow!(
657            "Map transform build must be implemented in the data plane"
658        ))
659    }
660}
661
662/// Condition expression for the filter transform.
663#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
664pub struct FilterCondition {
665    /// Type of filter operation (drop or pass).
666    #[serde(rename = "type")]
667    pub kind: FilterType,
668    /// Field or path to evaluate against.
669    pub source: String,
670    /// Pattern or value to match.
671    pub pattern: String,
672    /// Whether to log filter decisions.
673    #[serde(default)]
674    pub log: bool,
675}
676
677/// Type of filter operation.
678#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, FieldDoc)]
679#[serde(rename_all = "snake_case")]
680pub enum FilterType {
681    /// Drop events that match the condition.
682    Drop,
683    /// Pass only events that match the condition.
684    Pass,
685}
686
687/// Type of Otel aggregation to perform.
688#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, FieldDoc)]
689#[serde(rename_all = "snake_case")]
690pub enum OtelAggregation {
691    /// Aggregate OTel metric signals.
692    Metrics,
693    /// Aggregate OTel trace signals.
694    Traces,
695}
696
697/// Configuration for OTel-aware aggregate transform
698#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
699pub struct OtelAggregateConfig {
700    /// Type of aggregation (metrics or traces).
701    #[doc_field(required, example = "metrics")]
702    pub aggregation: OtelAggregation,
703    /// Window duration (e.g., "15s", "60s", "5m").
704    #[doc_field(required, example = "60s")]
705    pub window_duration: String,
706    /// Naming convention for output metrics.
707    #[serde(default)]
708    #[doc_field(example = "prometheus")]
709    pub naming_convention: Option<NamingConvention>,
710    /// OTel schema version to use.
711    #[serde(default)]
712    #[doc_field(default = "1.0", example = "1.0")]
713    pub otel_schema_version: Option<String>,
714    /// DuckDB memory limit (e.g., "8GB").
715    #[serde(default = "default_memory_limit")]
716    #[doc_field(default = "1GB", example = "8GB")]
717    pub memory_limit: Option<String>,
718    /// Directory for DuckDB temporary files (spill-to-disk).
719    #[serde(default)]
720    #[doc_field(example = "/var/tmp/kinetic")]
721    pub temp_directory: Option<std::path::PathBuf>,
722    /// Named output labels for forking the transform output.
723    #[serde(default)]
724    #[doc_field(example = "{\"errors\": \"error_branch\"}")]
725    pub outputs: HashMap<String, String>,
726}
727
728/// Configuration for generic SQL aggregate transform
729#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
730pub struct AggregateConfig {
731    /// SQL query to execute against the sealed DuckDB table.
732    #[doc_field(required, example = "SELECT count(*) AS total FROM events")]
733    pub sql: String,
734    /// Window duration (e.g., "15s", "60s", "5m").
735    #[doc_field(required, example = "60s")]
736    pub window_duration: String,
737    /// DuckDB memory limit (e.g., "8GB").
738    #[serde(default = "default_memory_limit")]
739    #[doc_field(default = "1GB", example = "8GB")]
740    pub memory_limit: Option<String>,
741    /// Directory for DuckDB temporary files (spill-to-disk).
742    #[serde(default)]
743    #[doc_field(example = "/var/tmp/kinetic")]
744    pub temp_directory: Option<std::path::PathBuf>,
745    /// Named output labels for forking the transform output.
746    #[serde(default)]
747    #[doc_field(example = "{\"errors\": \"error_branch\"}")]
748    pub outputs: HashMap<String, String>,
749}
750
751fn default_memory_limit() -> Option<String> {
752    Some("8GB".to_string())
753}
754
755/// Type of aggregation for OTel metrics
756#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, FieldDoc)]
757#[serde(rename_all = "snake_case")]
758pub enum AggregationType {
759    /// Count metric points
760    Count,
761    /// Histogram with explicit boundaries
762    Histogram,
763    /// Summary with quantiles
764    Summary,
765    /// Exponential histogram
766    ExponentialHistogram,
767}
768
769/// Sampling mode configuration.
770///
771/// The `type` field selects the sampling algorithm.
772#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
773#[serde(tag = "type", rename_all = "snake_case")]
774pub enum SamplingMode {
775    /// Retain each event independently with a fixed probability.
776    Random {
777        /// Probability of retaining each event. Must be in `(0.0, 1.0]`.
778        #[doc_field(required, example = "0.1")]
779        rate: f64,
780        /// Optional RNG seed for deterministic sampling.
781        #[serde(default)]
782        #[doc_field(default = "entropy", example = "42")]
783        seed: Option<u64>,
784    },
785    /// Select every Nth event from the stream.
786    Systematic {
787        /// Keep every Nth event. Must be `>= 1`.
788        #[doc_field(required, example = "100")]
789        interval: u64,
790    },
791    /// Hash a key field and retain events deterministically.
792    Hash {
793        /// Dot-separated field path to hash.
794        #[doc_field(required, example = "trace_id")]
795        key: String,
796        /// Percentage of hash space to retain. Must be in `[1, 100]`.
797        #[doc_field(required, example = "10")]
798        rate: u8,
799        /// Behaviour when the key field is absent.
800        #[serde(default = "default_missing_key")]
801        #[doc_field(default = "drop", example = "keep")]
802        missing_key_behaviour: String,
803    },
804    /// Accept or reject entire batches with a single decision.
805    Recordbatch {
806        /// Probability of accepting each batch. Must be in `(0.0, 1.0]`.
807        #[doc_field(required, example = "0.1")]
808        rate: f64,
809        /// Optional RNG seed for deterministic sampling.
810        #[serde(default)]
811        #[doc_field(default = "entropy", example = "42")]
812        seed: Option<u64>,
813    },
814    /// Apply per-stratum sampling rates based on a field value.
815    Stratified {
816        /// Dot-separated field path to stratify on.
817        #[doc_field(required, example = "severity")]
818        field: String,
819        /// Map of stratum value to sampling rate.
820        #[doc_field(required, example = "{\"error\": 1.0, \"info\": 0.01}")]
821        strata: std::collections::HashMap<String, f64>,
822        /// Sampling rate for values not in `strata`. Must be in `(0.0, 1.0]`.
823        #[doc_field(required, example = "0.1")]
824        default_rate: f64,
825        /// Behaviour when the stratum field is absent.
826        #[serde(default = "default_missing_key")]
827        #[doc_field(default = "keep", example = "drop")]
828        missing_field_behaviour: String,
829        /// Optional RNG seed.
830        #[serde(default)]
831        #[doc_field(default = "entropy", example = "42")]
832        seed: Option<u64>,
833    },
834    /// Dynamically adjust sampling rate based on error rate and latency.
835    Adaptive {
836        /// Starting sampling rate. Must be in `(0.0, 1.0]`.
837        #[doc_field(required, example = "0.1")]
838        base_rate: f64,
839        /// Minimum sampling rate floor.
840        #[doc_field(required, example = "0.01")]
841        min_rate: f64,
842        /// Maximum sampling rate ceiling.
843        #[doc_field(required, example = "1.0")]
844        max_rate: f64,
845        /// Dot-separated field path whose value indicates an error.
846        #[doc_field(required, example = "level")]
847        error_field: String,
848        /// Value of `error_field` that signals an error.
849        #[doc_field(required, example = "error")]
850        error_value: String,
851        /// Optional field path for latency values in ms.
852        #[serde(default)]
853        #[doc_field(example = "duration_ms")]
854        latency_field: Option<String>,
855        /// Latency threshold in ms above which the rate increases.
856        #[serde(default = "default_latency_threshold")]
857        #[doc_field(default = "2000", example = "500")]
858        latency_threshold_ms: u64,
859        /// Observation window duration in seconds.
860        #[serde(default = "default_window_secs")]
861        #[doc_field(default = "60", example = "30")]
862        window_secs: u64,
863        /// Rate adjustment speed multiplier.
864        #[serde(default = "default_sensitivity")]
865        #[doc_field(default = "2.0", example = "1.5")]
866        sensitivity: f64,
867    },
868    /// Maintain a fixed-size reservoir using Vitter's Algorithm R.
869    Reservoir {
870        /// Maximum number of events in the reservoir.
871        #[doc_field(required, example = "1000")]
872        size: usize,
873        /// Interval in seconds at which the reservoir is emitted.
874        #[serde(default = "default_flush_interval")]
875        #[doc_field(default = "300", example = "60")]
876        flush_interval_secs: u64,
877    },
878    /// Buffer events by group key and decide after a window.
879    Tail {
880        /// Dot-separated field path to group events by.
881        #[doc_field(required, example = "trace_id")]
882        group_key: String,
883        /// Seconds to buffer a group before deciding.
884        #[serde(default = "default_decision_window")]
885        #[doc_field(default = "30", example = "60")]
886        decision_window_secs: u64,
887        /// Maximum concurrent groups in memory.
888        #[serde(default = "default_max_groups")]
889        #[doc_field(default = "10000", example = "50000")]
890        max_groups: usize,
891        /// Decision when no rule matches.
892        #[serde(default = "default_tail_decision")]
893        #[doc_field(default = "drop", example = "keep")]
894        default_decision: String,
895        /// Ordered list of rules. First match wins.
896        #[doc_field(required)]
897        rules: Vec<TailRuleConfig>,
898    },
899}
900
901fn default_missing_key() -> String {
902    "drop".to_string()
903}
904
905fn default_latency_threshold() -> u64 {
906    2000
907}
908
909fn default_window_secs() -> u64 {
910    60
911}
912
913fn default_sensitivity() -> f64 {
914    2.0
915}
916
917fn default_flush_interval() -> u64 {
918    300
919}
920
921fn default_decision_window() -> u64 {
922    30
923}
924
925fn default_max_groups() -> usize {
926    10_000
927}
928
929fn default_tail_decision() -> String {
930    "drop".to_string()
931}
932
933/// A single rule for tail sampling.
934#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
935pub struct TailRuleConfig {
936    /// Dot-separated field path to evaluate.
937    #[doc_field(required, example = "attributes.http.status_code")]
938    pub field: String,
939    /// Comparison operator: `==`, `!=`, `>`, `>=`, `<`, `<=`, `contains`.
940    #[doc_field(required, example = ">=")]
941    pub operator: String,
942    /// Value to compare against.
943    #[doc_field(required, example = "500")]
944    pub value: String,
945    /// Decision when this rule matches: `keep` or `drop`.
946    #[doc_field(required, example = "keep")]
947    pub decision: String,
948}
949
950/// Sampling transform with common pipeline envelope fields.
951#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
952#[component(type = "transform", name = "sample")]
953pub struct SamplingConfigWrapped {
954    /// List of upstream component names to receive events from.
955    pub inputs: Vec<String>,
956    /// Policy for handling errors during sampling.
957    #[serde(default)]
958    pub error_policy: ErrorPolicy,
959    /// When `true`, the `passthrough` output is active and receives all events.
960    #[serde(default)]
961    pub route_all: bool,
962    /// Sampling mode configuration.
963    #[serde(flatten)]
964    #[doc_field(flatten)]
965    pub mode: SamplingMode,
966}
967
968impl TransformConfig for SamplingConfigWrapped {
969    fn build(&self, _cx: TransformContext) -> anyhow::Result<JoinHandle<()>> {
970        Err(anyhow::anyhow!(
971            "Sample build must be implemented in the data plane"
972        ))
973    }
974}
975
976/// Naming convention for output metric names.
977#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, Default, FieldDoc)]
978#[serde(rename_all = "snake_case")]
979pub enum NamingConvention {
980    /// Preserve the original metric name without modification.
981    #[default]
982    Preserve,
983    /// Append `_sum`, `_count`, `_bucket` suffixes per Prometheus convention.
984    Standard,
985    /// Use a custom template (not yet implemented).
986    Custom,
987}
988
989/// Discriminated union of all sink component types.
990///
991/// The `type` field in YAML selects the variant (e.g., `type: kafka`).
992#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
993#[serde(tag = "type", rename_all = "snake_case")]
994pub enum SinkConfigEnum {
995    Kafka(Box<KafkaProducerConfigWrapped>),
996    S3(Box<S3SinkConfigWrapped>),
997    AwsKinesis(Box<KinesisConfigWrapped>),
998    GcpCloudStorage(Box<GcsSinkConfigWrapped>),
999    GcpPubsub(Box<PubSubSinkConfigWrapped>),
1000    FlightSql(Box<FlightSqlConfigWrapped>),
1001    Dlq(Box<DlqSinkConfigWrapped>),
1002    Iceberg(Box<IcebergSinkConfigWrapped>),
1003    Delta(Box<DeltaSinkConfigWrapped>),
1004    Starrocks(Box<StarRocksSinkConfigWrapped>),
1005}
1006
1007impl SinkConfig for SinkConfigEnum {
1008    fn build(&self, cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1009        match self {
1010            Self::Kafka(c) => c.build(cx),
1011            Self::S3(c) => c.build(cx),
1012            Self::AwsKinesis(c) => SinkConfig::build(c.as_ref(), cx),
1013            Self::GcpCloudStorage(c) => c.build(cx),
1014            Self::GcpPubsub(c) => c.build(cx),
1015            Self::FlightSql(c) => c.build(cx),
1016            Self::Dlq(c) => c.build(cx),
1017            Self::Iceberg(c) => c.build(cx),
1018            Self::Delta(c) => c.build(cx),
1019            Self::Starrocks(c) => c.build(cx),
1020        }
1021    }
1022}
1023
1024/// Writes events to Apache Iceberg tables.
1025#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
1026#[component(type = "sink", name = "iceberg")]
1027pub struct IcebergSinkConfigWrapped {
1028    /// Upstream component names that feed events into this sink.
1029    pub inputs: Vec<String>,
1030    /// Policy for handling errors during event processing.
1031    #[serde(default)]
1032    pub error_policy: ErrorPolicy,
1033    /// Optional encoder configuration for output events.
1034    #[serde(default)]
1035    pub encoding: Option<EncoderConfigEnum>,
1036    /// Buffer configuration for event batching.
1037    #[serde(default)]
1038    pub buffer: BufferConfig,
1039
1040    /// Iceberg catalog endpoint URL.
1041    pub catalog_endpoint: String,
1042    /// Fully qualified Iceberg table name (e.g., 'db.table').
1043    pub table: String,
1044    /// Base URI for the Iceberg warehouse storage.
1045    pub warehouse: String,
1046
1047    /// Maximum number of rows per batch before flushing.
1048    #[serde(default = "default_max_rows")]
1049    pub max_rows: u64,
1050    /// Maximum byte size of a batch before flushing.
1051    #[serde(default = "default_max_bytes")]
1052    pub max_bytes: u64,
1053    /// Maximum age of a batch in milliseconds before flushing.
1054    #[serde(default = "default_max_age_ms")]
1055    pub max_age_ms: u64,
1056}
1057
1058fn default_max_rows() -> u64 {
1059    50_000
1060}
1061fn default_max_bytes() -> u64 {
1062    67_108_864
1063} // 64 MiB
1064fn default_max_age_ms() -> u64 {
1065    60_000
1066}
1067
1068impl SinkConfig for IcebergSinkConfigWrapped {
1069    fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1070        Err(anyhow::anyhow!(
1071            "Iceberg build must be implemented in the data plane"
1072        ))
1073    }
1074}
1075
1076/// Writes events to Delta Lake tables.
1077#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
1078#[component(type = "sink", name = "delta")]
1079pub struct DeltaSinkConfigWrapped {
1080    /// Upstream component names that feed events into this sink.
1081    pub inputs: Vec<String>,
1082    /// Policy for handling errors during event processing.
1083    #[serde(default)]
1084    pub error_policy: ErrorPolicy,
1085    /// Optional encoder configuration for output events.
1086    #[serde(default)]
1087    pub encoding: Option<EncoderConfigEnum>,
1088    /// Buffer configuration for event batching.
1089    #[serde(default)]
1090    pub buffer: BufferConfig,
1091
1092    /// Base URI for the Delta table warehouse storage.
1093    pub warehouse: String,
1094    /// Table name or path within the warehouse.
1095    pub table: String,
1096
1097    /// Maximum number of rows per batch before flushing.
1098    #[serde(default = "default_max_rows")]
1099    pub max_rows: u64,
1100    /// Maximum byte size of a batch before flushing.
1101    #[serde(default = "default_max_bytes")]
1102    pub max_bytes: u64,
1103    /// Maximum age of a batch in milliseconds before flushing.
1104    #[serde(default = "default_max_age_ms")]
1105    pub max_age_ms: u64,
1106}
1107
1108impl SinkConfig for DeltaSinkConfigWrapped {
1109    fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1110        Err(anyhow::anyhow!(
1111            "Delta build must be implemented in the data plane"
1112        ))
1113    }
1114}
1115
1116/// Writes events to StarRocks OLAP tables.
1117#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
1118#[component(type = "sink", name = "starrocks")]
1119pub struct StarRocksSinkConfigWrapped {
1120    /// Upstream component names that feed events into this sink.
1121    pub inputs: Vec<String>,
1122    /// Policy for handling errors during event processing.
1123    #[serde(default)]
1124    pub error_policy: ErrorPolicy,
1125    /// Optional encoder configuration for output events.
1126    #[serde(default)]
1127    pub encoding: Option<EncoderConfigEnum>,
1128    /// Buffer configuration for event batching.
1129    #[serde(default)]
1130    pub buffer: BufferConfig,
1131
1132    /// StarRocks Frontend (FE) HTTP endpoint.
1133    pub fe_http: String,
1134    /// Optional StarRocks Flight SQL endpoint.
1135    pub flight_sql: Option<String>,
1136    /// Ingestion mode: 'auto', 'stream_load', or 'flight_sql'.
1137    #[serde(default = "default_ingestion_mode")]
1138    pub ingestion_mode: String,
1139    /// Target StarRocks database name.
1140    pub database: String,
1141    /// Target StarRocks table name.
1142    pub table: String,
1143
1144    /// Maximum number of rows per batch before flushing.
1145    #[serde(default = "default_max_rows")]
1146    pub max_rows: u64,
1147    /// Maximum byte size of a batch before flushing.
1148    #[serde(default = "default_max_bytes")]
1149    pub max_bytes: u64,
1150    /// Maximum age of a batch in milliseconds before flushing.
1151    #[serde(default = "default_max_age_ms")]
1152    pub max_age_ms: u64,
1153}
1154
1155fn default_ingestion_mode() -> String {
1156    "auto".to_string()
1157}
1158
1159impl SinkConfig for StarRocksSinkConfigWrapped {
1160    fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1161        Err(anyhow::anyhow!(
1162            "StarRocks build must be implemented in the data plane"
1163        ))
1164    }
1165}
1166
1167/// Buffer configuration for event batching between transforms and sinks.
1168#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
1169#[serde(tag = "type", rename_all = "snake_case")]
1170pub enum BufferConfig {
1171    /// Buffers events in memory.
1172    Memory {
1173        /// Maximum number of event batches to buffer in memory.
1174        #[serde(default = "default_buffer_capacity")]
1175        #[doc_field(default = "100")]
1176        max_batches: usize,
1177    },
1178    /// Buffers events to disk.
1179    Disk {
1180        /// Path to the disk buffer storage.
1181        path: std::path::PathBuf,
1182        /// Maximum total byte size of the disk buffer.
1183        #[serde(default)]
1184        #[doc_field(example = "1073741824")]
1185        max_size_bytes: Option<u64>,
1186    },
1187}
1188
1189impl Default for BufferConfig {
1190    fn default() -> Self {
1191        Self::Memory {
1192            max_batches: default_buffer_capacity(),
1193        }
1194    }
1195}
1196
1197fn default_buffer_capacity() -> usize {
1198    100
1199}
1200
1201/// Produces events to an Apache Kafka topic.
1202#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
1203#[component(type = "sink", name = "kafka")]
1204pub struct KafkaProducerConfigWrapped {
1205    /// Upstream component names that feed events into this sink.
1206    #[doc_field(required, example = "[\"my_transform\"]")]
1207    pub inputs: Vec<String>,
1208    /// Policy for handling errors during event processing.
1209    #[serde(default)]
1210    #[doc_field(default = "drop_on_error")]
1211    pub error_policy: ErrorPolicy,
1212    /// Optional encoder configuration for output events.
1213    #[serde(default)]
1214    pub encoding: Option<EncoderConfigEnum>,
1215    /// Buffer configuration for event batching.
1216    #[serde(default)]
1217    pub buffer: BufferConfig,
1218    /// Inner Kafka-specific producer configuration.
1219    #[serde(flatten)]
1220    #[doc_field(flatten)]
1221    pub config: KafkaProducerConfig,
1222}
1223
1224impl SinkConfig for KafkaProducerConfigWrapped {
1225    fn build(&self, cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1226        self.config.build(cx)
1227    }
1228}
1229
1230/// Writes events to Amazon S3 as batched objects.
1231#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
1232#[component(type = "sink", name = "s3")]
1233pub struct S3SinkConfigWrapped {
1234    /// Upstream component names that feed events into this sink.
1235    #[doc_field(required, example = "[\"my_transform\"]")]
1236    pub inputs: Vec<String>,
1237    /// Policy for handling errors during event processing.
1238    #[serde(default)]
1239    #[doc_field(default = "drop_on_error")]
1240    pub error_policy: ErrorPolicy,
1241    /// Optional encoder configuration for output events.
1242    #[serde(default)]
1243    pub encoding: Option<EncoderConfigEnum>,
1244    /// Buffer configuration for event batching.
1245    #[serde(default)]
1246    pub buffer: BufferConfig,
1247    /// Inner S3-specific sink configuration.
1248    #[serde(flatten)]
1249    #[doc_field(flatten)]
1250    pub config: S3SinkConfig,
1251}
1252
1253impl SinkConfig for S3SinkConfigWrapped {
1254    fn build(&self, cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1255        self.config.build(cx)
1256    }
1257}
1258
1259/// Writes events to Google Cloud Storage as batched objects.
1260#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
1261#[component(type = "sink", name = "gcp_cloud_storage")]
1262pub struct GcsSinkConfigWrapped {
1263    /// Upstream component names that feed events into this sink.
1264    #[doc_field(required, example = "[\"my_transform\"]")]
1265    pub inputs: Vec<String>,
1266    /// Policy for handling errors during event processing.
1267    #[serde(default)]
1268    #[doc_field(default = "drop_on_error")]
1269    pub error_policy: ErrorPolicy,
1270    /// Optional encoder configuration for output events.
1271    #[serde(default)]
1272    pub encoding: Option<EncoderConfigEnum>,
1273    /// Buffer configuration for event batching.
1274    #[serde(default)]
1275    pub buffer: BufferConfig,
1276    /// Inner GCS-specific sink configuration.
1277    #[serde(flatten)]
1278    #[doc_field(flatten)]
1279    pub config: GcsSinkConfig,
1280}
1281
1282impl SinkConfig for GcsSinkConfigWrapped {
1283    fn build(&self, cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1284        self.config.build(cx)
1285    }
1286}
1287
1288/// Produces events to an Apache Arrow Flight SQL server.
1289#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
1290#[component(type = "sink", name = "flight_sql")]
1291pub struct FlightSqlConfigWrapped {
1292    /// Upstream component names that feed events into this sink.
1293    #[doc_field(required, example = "[\"my_transform\"]")]
1294    pub inputs: Vec<String>,
1295    /// Policy for handling errors during event processing.
1296    #[serde(default)]
1297    #[doc_field(default = "drop_on_error")]
1298    pub error_policy: ErrorPolicy,
1299    /// Optional encoder configuration for output events.
1300    #[serde(default)]
1301    pub encoding: Option<EncoderConfigEnum>,
1302    /// Buffer configuration for event batching.
1303    #[serde(default)]
1304    pub buffer: BufferConfig,
1305    /// Inner Flight SQL-specific sink configuration.
1306    #[serde(flatten)]
1307    #[doc_field(flatten)]
1308    pub config: FlightClientConfig,
1309}
1310
1311impl SinkConfig for FlightSqlConfigWrapped {
1312    fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1313        Err(anyhow::anyhow!(
1314            "FlightSql build must be implemented in the data plane"
1315        ))
1316    }
1317}
1318
1319/// Produces events to a Google Cloud Pub/Sub topic.
1320#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
1321#[component(type = "sink", name = "gcp_pubsub")]
1322pub struct PubSubSinkConfigWrapped {
1323    /// Upstream component names that feed events into this sink.
1324    #[doc_field(required, example = "[\"my_transform\"]")]
1325    pub inputs: Vec<String>,
1326    /// Policy for handling errors during event processing.
1327    #[serde(default)]
1328    #[doc_field(default = "drop_on_error")]
1329    pub error_policy: ErrorPolicy,
1330    /// Optional encoder configuration for output events.
1331    #[serde(default)]
1332    pub encoding: Option<EncoderConfigEnum>,
1333    /// Buffer configuration for event batching.
1334    #[serde(default)]
1335    pub buffer: BufferConfig,
1336    /// Inner Pub/Sub-specific sink configuration.
1337    #[serde(flatten)]
1338    #[doc_field(flatten)]
1339    pub config: PubSubSinkConfig,
1340}
1341
1342impl SinkConfig for PubSubSinkConfigWrapped {
1343    fn build(&self, cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1344        self.config.build(cx)
1345    }
1346}
1347
1348/// Dead letter queue sink that writes failed events to S3.
1349#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
1350#[component(type = "sink", name = "dlq")]
1351pub struct DlqSinkConfigWrapped {
1352    /// Upstream component names that feed events into this sink.
1353    #[doc_field(required, example = "[\"my_source\"]")]
1354    pub inputs: Vec<String>,
1355    /// Policy for handling errors during event processing.
1356    #[serde(default)]
1357    #[doc_field(default = "drop_on_error")]
1358    pub error_policy: ErrorPolicy,
1359    /// Optional encoder configuration for output events.
1360    #[serde(default)]
1361    pub encoding: Option<EncoderConfigEnum>,
1362    /// Buffer configuration for event batching.
1363    #[serde(default)]
1364    pub buffer: BufferConfig,
1365    /// Inner DLQ-specific sink configuration.
1366    #[serde(flatten)]
1367    #[doc_field(flatten)]
1368    pub config: DlqSinkConfig,
1369}
1370
1371/// Dead letter queue sink that writes failed events to S3.
1372#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
1373pub struct DlqSinkConfig {
1374    /// S3 bucket for DLQ storage.
1375    #[doc_field(required, example = "my-kinetic-dlq")]
1376    pub bucket: String,
1377    /// AWS region for the DLQ bucket.
1378    #[doc_field(required, example = "us-east-1")]
1379    pub region: String,
1380    /// Key prefix for DLQ objects in S3.
1381    #[doc_field(required, example = "dlq/alerts/")]
1382    pub prefix: String,
1383    /// Number of events per batch write.
1384    #[serde(default = "default_dlq_batch_size")]
1385    #[doc_field(default = "20")]
1386    pub batch_size: usize,
1387    /// Maximum time before flushing a batch.
1388    #[serde(default = "default_dlq_timeout")]
1389    #[doc_field(default = "1m")]
1390    pub timeout: String,
1391    /// Encoding format for DLQ objects.
1392    #[serde(default = "default_dlq_encoding")]
1393    #[doc_field(default = "json")]
1394    pub encoding: String,
1395    /// Initial retry delay for failed writes.
1396    #[serde(default = "default_dlq_retry_delay")]
1397    #[doc_field(default = "5s")]
1398    pub retry_delay: String,
1399    /// Maximum retry delay (exponential backoff cap).
1400    #[serde(default = "default_dlq_retry_max_delay")]
1401    #[doc_field(default = "300s")]
1402    pub retry_max_delay: String,
1403    /// Maximum number of retry attempts before giving up.
1404    #[serde(default = "default_dlq_retry_max_attempts")]
1405    #[doc_field(default = "5")]
1406    pub retry_max_attempts: usize,
1407    /// Maximum number of concurrent pending writes.
1408    #[serde(default = "default_dlq_max_pending_writes")]
1409    #[doc_field(default = "1024")]
1410    pub max_pending_writes: usize,
1411}
1412
1413fn default_dlq_batch_size() -> usize {
1414    20
1415}
1416
1417fn default_dlq_timeout() -> String {
1418    "1m".to_string()
1419}
1420
1421fn default_dlq_encoding() -> String {
1422    "json".to_string()
1423}
1424
1425fn default_dlq_retry_delay() -> String {
1426    "5s".to_string()
1427}
1428
1429fn default_dlq_retry_max_delay() -> String {
1430    "300s".to_string()
1431}
1432
1433fn default_dlq_retry_max_attempts() -> usize {
1434    5
1435}
1436
1437fn default_dlq_max_pending_writes() -> usize {
1438    1024
1439}
1440
1441impl SinkConfig for DlqSinkConfigWrapped {
1442    fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
1443        Err(anyhow::anyhow!(
1444            "DLQ sink build must be implemented in the data plane"
1445        ))
1446    }
1447}
1448
1449static EMPTY_OUTPUTS: LazyLock<HashMap<String, String>> = LazyLock::new(HashMap::new);
1450
1451impl TransformConfigEnum {
1452    pub fn inputs(&self) -> &[String] {
1453        match self {
1454            Self::Filter(c) => &c.inputs,
1455            Self::OtelAggregate(c) => &c.inputs,
1456            Self::Aggregate(c) => &c.inputs,
1457            Self::SqlMap(c) => &c.inputs,
1458            Self::Sample(c) => &c.inputs,
1459            Self::Map(c) => &c.inputs,
1460        }
1461    }
1462
1463    pub fn error_policy(&self) -> &ErrorPolicy {
1464        match self {
1465            Self::Filter(c) => &c.error_policy,
1466            Self::OtelAggregate(c) => &c.error_policy,
1467            Self::Aggregate(c) => &c.error_policy,
1468            Self::SqlMap(c) => &c.error_policy,
1469            Self::Sample(c) => &c.error_policy,
1470            Self::Map(c) => &c.error_policy,
1471        }
1472    }
1473
1474    pub fn outputs(&self) -> &HashMap<String, String> {
1475        match self {
1476            Self::OtelAggregate(c) => &c.config.outputs,
1477            Self::Aggregate(c) => &c.config.outputs,
1478            Self::Map(c) => &c.config.outputs,
1479            _ => &EMPTY_OUTPUTS,
1480        }
1481    }
1482}
1483
1484impl SinkConfigEnum {
1485    pub fn inputs(&self) -> &[String] {
1486        match self {
1487            Self::Kafka(c) => &c.inputs,
1488            Self::S3(c) => &c.inputs,
1489            Self::AwsKinesis(c) => &c.inputs,
1490            Self::GcpCloudStorage(c) => &c.inputs,
1491            Self::GcpPubsub(c) => &c.inputs,
1492            Self::FlightSql(c) => &c.inputs,
1493            Self::Dlq(c) => &c.inputs,
1494            Self::Iceberg(c) => &c.inputs,
1495            Self::Delta(c) => &c.inputs,
1496            Self::Starrocks(c) => &c.inputs,
1497        }
1498    }
1499
1500    pub fn error_policy(&self) -> &ErrorPolicy {
1501        match self {
1502            Self::Kafka(c) => &c.error_policy,
1503            Self::S3(c) => &c.error_policy,
1504            Self::AwsKinesis(c) => &c.error_policy,
1505            Self::GcpCloudStorage(c) => &c.error_policy,
1506            Self::GcpPubsub(c) => &c.error_policy,
1507            Self::FlightSql(c) => &c.error_policy,
1508            Self::Dlq(c) => &c.error_policy,
1509            Self::Iceberg(c) => &c.error_policy,
1510            Self::Delta(c) => &c.error_policy,
1511            Self::Starrocks(c) => &c.error_policy,
1512        }
1513    }
1514
1515    pub fn buffer(&self) -> &BufferConfig {
1516        match self {
1517            Self::Kafka(c) => &c.buffer,
1518            Self::S3(c) => &c.buffer,
1519            Self::AwsKinesis(c) => &c.buffer,
1520            Self::GcpCloudStorage(c) => &c.buffer,
1521            Self::GcpPubsub(c) => &c.buffer,
1522            Self::FlightSql(c) => &c.buffer,
1523            Self::Dlq(c) => &c.buffer,
1524            Self::Iceberg(c) => &c.buffer,
1525            Self::Delta(c) => &c.buffer,
1526            Self::Starrocks(c) => &c.buffer,
1527        }
1528    }
1529}