Skip to main content

aws_common/
config.rs

1//! Configuration for AWS components.
2
3use kinetic_common::config::{SinkConfig, SinkContext, SourceConfig, SourceContext};
4use kinetic_doc_derive::{ComponentDoc, FieldDoc};
5use serde::{Deserialize, Serialize};
6use tokio::task::JoinHandle;
7
8/// Shared AWS authentication and region configuration.
9#[derive(Clone, Deserialize, Serialize, Default, PartialEq, FieldDoc)]
10pub struct AwsConfig {
11    /// AWS region to operate in.
12    #[doc_field(example = "us-west-2")]
13    pub region: Option<String>,
14    /// Custom AWS endpoint URL (for LocalStack, MinIO, etc.).
15    #[doc_field(example = "http://localhost:4566")]
16    pub endpoint: Option<String>,
17    /// AWS access key ID for static credentials.
18    #[doc_field(example = "AKIAIOSFODNN7EXAMPLE")]
19    pub access_key_id: Option<String>,
20    /// AWS secret access key for static credentials.
21    #[doc_field(secret, example = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")]
22    pub secret_access_key: Option<String>,
23    /// AWS session token for temporary credentials.
24    #[doc_field(secret)]
25    pub session_token: Option<String>,
26    /// IAM role ARN to assume.
27    #[doc_field(example = "arn:aws:iam::123456789012:role/kinetic-ingest")]
28    pub assume_role: Option<String>,
29    /// Session name when assuming a role.
30    #[doc_field(example = "kinetic-session")]
31    pub session_name: Option<String>,
32    /// External ID for cross-account role assumption.
33    pub external_id: Option<String>,
34    /// AWS profile name from shared credentials file.
35    #[doc_field(example = "default")]
36    pub profile: Option<String>,
37    /// Path to an AWS credentials file.
38    #[doc_field(example = "/home/kinetic/.aws/credentials")]
39    pub credentials_file: Option<String>,
40    /// Instance Metadata Service (IMDS) configuration.
41    #[serde(default)]
42    pub imds: Option<ImdsConfig>,
43    /// Timeout in seconds for loading AWS credentials.
44    #[doc_field(default = "10")]
45    pub load_timeout_secs: Option<u64>,
46}
47
48impl std::fmt::Debug for AwsConfig {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        f.debug_struct("AwsConfig")
51            .field("region", &self.region)
52            .field("endpoint", &self.endpoint)
53            .field("access_key_id", &self.access_key_id)
54            .field(
55                "secret_access_key",
56                &self.secret_access_key.as_ref().map(|_| "***REDACTED***"),
57            )
58            .field(
59                "session_token",
60                &self.session_token.as_ref().map(|_| "***REDACTED***"),
61            )
62            .field("assume_role", &self.assume_role)
63            .field("session_name", &self.session_name)
64            .field("external_id", &self.external_id)
65            .field("profile", &self.profile)
66            .field("credentials_file", &self.credentials_file)
67            .field("imds", &self.imds)
68            .field("load_timeout_secs", &self.load_timeout_secs)
69            .finish()
70    }
71}
72
73/// EC2 Instance Metadata Service (IMDS) configuration.
74#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq, FieldDoc)]
75pub struct ImdsConfig {
76    /// Timeout in seconds for IMDS connection.
77    #[doc_field(default = "1", example = "5")]
78    pub connect_timeout_seconds: Option<u64>,
79    /// Maximum number of IMDS retry attempts.
80    #[doc_field(default = "3", example = "5")]
81    pub max_attempts: Option<u32>,
82    /// Timeout in seconds for IMDS read operations.
83    #[doc_field(default = "1", example = "5")]
84    pub read_timeout_seconds: Option<u64>,
85}
86
87/// Deferred message configuration for SQS-based sources.
88#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq, FieldDoc)]
89pub struct DeferredConfig {
90    /// Maximum age in seconds for deferred messages.
91    #[doc_field(default = "3600", example = "86400")]
92    pub max_age_secs: Option<u64>,
93    /// SQS queue URL for deferred message processing.
94    #[doc_field(
95        required,
96        example = "https://sqs.us-west-2.amazonaws.com/123456789012/deferred-queue"
97    )]
98    pub queue_url: String,
99}
100
101/// Amazon SQS (Simple Queue Service) source configuration.
102#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
103pub struct SqsConfig {
104    /// AWS authentication and region settings.
105    pub auth: Option<AwsConfig>,
106
107    /// SQS queue URL to consume from.
108    #[doc_field(
109        required,
110        example = "https://sqs.us-west-2.amazonaws.com/123456789012/my-queue"
111    )]
112    pub queue_url: String,
113
114    /// Polling interval in seconds.
115    #[serde(default = "default_poll_secs")]
116    #[doc_field(default = "20", example = "10")]
117    pub poll_secs: u64,
118
119    /// Maximum number of messages per poll request.
120    #[serde(default = "default_max_number_of_messages")]
121    #[doc_field(default = "10", example = "10")]
122    pub max_number_of_messages: i32,
123
124    /// Visibility timeout in seconds for received messages.
125    #[serde(default = "default_visibility_timeout_secs")]
126    #[doc_field(example = "30")]
127    pub visibility_timeout_secs: Option<i32>,
128
129    /// Number of concurrent SQS client workers.
130    #[serde(default = "default_client_concurrency")]
131    #[doc_field(default = "1", example = "4")]
132    pub client_concurrency: usize,
133
134    /// Connection timeout in seconds.
135    pub connect_timeout_seconds: Option<u64>,
136    /// Read timeout in seconds.
137    pub read_timeout_seconds: Option<u64>,
138    /// Operation timeout in seconds.
139    pub operation_timeout_seconds: Option<u64>,
140
141    /// Whether to delete messages after successful processing.
142    #[serde(default = "default_true")]
143    #[doc_field(default = "true")]
144    pub delete_message: bool,
145
146    /// Whether to delete messages that failed processing.
147    #[serde(default = "default_false")]
148    #[doc_field(default = "false")]
149    pub delete_failed_message: bool,
150
151    /// Deferred message configuration.
152    pub deferred: Option<DeferredConfig>,
153}
154
155impl SourceConfig for SqsConfig {
156    fn build(&self, _cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
157        Err(anyhow::anyhow!(
158            "SQS source build must be implemented in the data plane"
159        ))
160    }
161}
162
163/// Amazon S3 source configuration.
164///
165/// Supports two modes:
166/// - `List`: Periodically lists objects in a bucket/prefix.
167/// - `EventStream`: Consumes S3 event notifications from an SQS queue.
168#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
169#[serde(tag = "mode", rename_all = "snake_case")]
170pub enum S3SourceConfig {
171    /// List mode: periodically polls for new objects.
172    List {
173        /// AWS authentication and region settings.
174        auth: Option<AwsConfig>,
175        /// S3 bucket to list objects from.
176        bucket: String,
177        /// Optional key prefix filter.
178        prefix: Option<String>,
179        /// Polling interval in seconds.
180        #[serde(default = "default_list_interval")]
181        interval_secs: u64,
182        /// Whether to delete S3 objects after they have been successfully processed.
183        #[serde(default = "default_false")]
184        delete_after_read: bool,
185    },
186    /// Event stream mode: consumes S3 notifications from SQS.
187    EventStream {
188        /// SQS configuration for consuming S3 event notifications.
189        sqs: SqsConfig,
190        /// S3 bucket to read objects from.
191        bucket: String,
192    },
193}
194
195impl S3SourceConfig {
196    /// Returns component metadata for documentation generation.
197    pub fn component_metadata() -> kinetic_doc_types::ComponentMetadata {
198        use kinetic_doc_types::{ComponentMetadata, FieldMetadata, HasFieldsMetadata};
199        let fields = vec![
200            FieldMetadata {
201                name: "mode".to_string(),
202                rust_type: "S3SourceMode".to_string(),
203                user_type: "string".to_string(),
204                required: true,
205                default: None,
206                example: Some("list".to_string()),
207                secret: false,
208                description:
209                    "Source mode: `list` for polling or `event_stream` for SQS notifications."
210                        .to_string(),
211                children: Vec::new(),
212                category: None,
213                enum_values: Vec::new(),
214                variants: Vec::new(),
215                reference: None,
216                reference_type: None,
217            },
218            FieldMetadata {
219                name: "bucket".to_string(),
220                rust_type: "String".to_string(),
221                user_type: "string".to_string(),
222                required: true,
223                default: None,
224                example: Some("my-data-bucket".to_string()),
225                secret: false,
226                description: "S3 bucket to read objects from.".to_string(),
227                children: Vec::new(),
228                category: None,
229                enum_values: Vec::new(),
230                variants: Vec::new(),
231                reference: None,
232                reference_type: None,
233            },
234            FieldMetadata {
235                name: "prefix".to_string(),
236                rust_type: "Option<String>".to_string(),
237                user_type: "string (optional)".to_string(),
238                required: false,
239                default: None,
240                example: Some("logs/2024/".to_string()),
241                secret: false,
242                description: "Key prefix filter (List mode only).".to_string(),
243                children: Vec::new(),
244                category: None,
245                enum_values: Vec::new(),
246                variants: Vec::new(),
247                reference: None,
248                reference_type: None,
249            },
250            FieldMetadata {
251                name: "interval_secs".to_string(),
252                rust_type: "u64".to_string(),
253                user_type: "unsigned integer".to_string(),
254                required: false,
255                default: Some("60".to_string()),
256                example: Some("300".to_string()),
257                secret: false,
258                description: "Polling interval in seconds (List mode only).".to_string(),
259                children: Vec::new(),
260                category: None,
261                enum_values: Vec::new(),
262                variants: Vec::new(),
263                reference: None,
264                reference_type: None,
265            },
266            FieldMetadata {
267                name: "delete_after_read".to_string(),
268                rust_type: "bool".to_string(),
269                user_type: "boolean".to_string(),
270                required: false,
271                default: Some("false".to_string()),
272                example: Some("true".to_string()),
273                secret: false,
274                description: "Whether to delete objects after reading (List mode only)."
275                    .to_string(),
276                children: Vec::new(),
277                category: None,
278                enum_values: Vec::new(),
279                variants: Vec::new(),
280                reference: None,
281                reference_type: None,
282            },
283            FieldMetadata {
284                name: "auth".to_string(),
285                rust_type: "Option<AwsConfig>".to_string(),
286                user_type: "AwsConfig (optional)".to_string(),
287                required: false,
288                default: None,
289                example: None,
290                secret: false,
291                description: "AWS authentication and region settings.".to_string(),
292                children: <AwsConfig as HasFieldsMetadata>::fields_metadata(),
293                category: Some("Authentication".to_string()),
294                enum_values: Vec::new(),
295                variants: Vec::new(),
296                reference: None,
297                reference_type: None,
298            },
299            FieldMetadata {
300                name: "sqs".to_string(),
301                rust_type: "SqsConfig".to_string(),
302                user_type: "SqsConfig".to_string(),
303                required: false,
304                default: None,
305                example: None,
306                secret: false,
307                description: "SQS configuration for consuming S3 event notifications.".to_string(),
308                children: <SqsConfig as HasFieldsMetadata>::fields_metadata(),
309                category: Some("Event Stream".to_string()),
310                enum_values: Vec::new(),
311                variants: Vec::new(),
312                reference: None,
313                reference_type: None,
314            },
315        ];
316
317        ComponentMetadata {
318            component_type: "source".to_string(),
319            name: "aws_s3".to_string(),
320            status: "stable".to_string(),
321            description: "Reads events from Amazon S3 objects. Supports both list-based polling and SQS event notifications.".to_string(),
322            fields,
323            metrics: Vec::new(),
324            outputs: Vec::new(),
325            env_vars: Vec::new(),
326            permissions: Vec::new(),
327        }
328    }
329}
330
331impl kinetic_doc_types::HasFieldsMetadata for S3SourceConfig {
332    fn fields_metadata() -> Vec<kinetic_doc_types::FieldMetadata> {
333        Self::component_metadata().fields
334    }
335}
336
337impl SourceConfig for S3SourceConfig {
338    fn build(&self, _cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
339        Err(anyhow::anyhow!(
340            "S3 source build must be implemented in the data plane"
341        ))
342    }
343}
344
345/// Writes events to Amazon S3 as batched objects.
346#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
347#[component(type = "sink", name = "aws_s3")]
348pub struct S3SinkConfig {
349    /// AWS authentication and region settings.
350    pub auth: Option<AwsConfig>,
351
352    /// S3 bucket name.
353    #[doc_field(required, example = "my-data-lake")]
354    pub bucket: String,
355
356    /// Key prefix for S3 objects.
357    #[doc_field(required, example = "logs/kinetic/")]
358    pub key_prefix: String,
359
360    /// Compression algorithm for S3 objects.
361    #[doc_field(example = "gzip")]
362    pub compression: Option<String>,
363
364    /// Encoding format for the S3 object (e.g. 'json' or 'parquet').
365    #[doc_field(example = "parquet")]
366    pub encoding: Option<String>,
367
368    /// Batching configuration for S3 writes.
369    #[serde(default)]
370    pub batch: BatchConfig,
371}
372
373impl SinkConfig for S3SinkConfig {
374    fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
375        Err(anyhow::anyhow!(
376            "S3 sink build must be implemented in the data plane"
377        ))
378    }
379}
380
381/// Amazon Kinesis stream configuration.
382///
383/// Can be used as either a source (consumer) or sink (producer).
384#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
385#[component(type = "source", name = "aws_kinesis")]
386pub struct KinesisConfig {
387    /// AWS authentication and region settings.
388    pub auth: Option<AwsConfig>,
389
390    /// Kinesis stream name.
391    #[doc_field(required, example = "my-stream")]
392    pub stream_name: String,
393}
394
395impl SourceConfig for KinesisConfig {
396    fn build(&self, _cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
397        Err(anyhow::anyhow!(
398            "Kinesis source build must be implemented in the data plane"
399        ))
400    }
401}
402
403impl SinkConfig for KinesisConfig {
404    fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
405        Err(anyhow::anyhow!(
406            "Kinesis sink build must be implemented in the data plane"
407        ))
408    }
409}
410
411/// Batching configuration for sink output.
412#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq, FieldDoc)]
413pub struct BatchConfig {
414    /// Maximum number of events in a batch before flushing.
415    #[doc_field(example = "1000")]
416    pub max_size: Option<usize>,
417    /// Maximum time in seconds before flushing a batch.
418    #[doc_field(example = "30")]
419    pub timeout_secs: Option<u64>,
420}
421
422fn default_poll_secs() -> u64 {
423    20
424}
425
426fn default_max_number_of_messages() -> i32 {
427    10
428}
429
430fn default_visibility_timeout_secs() -> Option<i32> {
431    None
432}
433
434fn default_client_concurrency() -> usize {
435    1
436}
437
438fn default_true() -> bool {
439    true
440}
441
442fn default_false() -> bool {
443    false
444}
445
446fn default_list_interval() -> u64 {
447    60
448}
449
450#[cfg(test)]
451#[allow(clippy::unwrap_used)]
452mod tests {
453    use super::*;
454
455    #[test]
456    fn test_aws_config_deserialization() {
457        let yaml = r#"
458            region: us-west-2
459            access_key_id: AKIA12345
460            secret_access_key: secret
461            assume_role: arn:aws:iam::123456789012:role/Role
462            imds:
463              connect_timeout_seconds: 5
464        "#;
465        let config: AwsConfig = serde_yml::from_str(yaml).unwrap();
466        assert_eq!(config.region.unwrap(), "us-west-2");
467        assert_eq!(config.access_key_id.unwrap(), "AKIA12345");
468        assert_eq!(config.imds.unwrap().connect_timeout_seconds.unwrap(), 5);
469    }
470
471    #[test]
472    fn test_s3_source_config_list_mode() {
473        let yaml = r#"
474            mode: list
475            bucket: my-bucket
476            prefix: logs/
477            interval_secs: 300
478            auth:
479              region: us-east-1
480        "#;
481        let config: S3SourceConfig = serde_yml::from_str(yaml).unwrap();
482        match config {
483            S3SourceConfig::List {
484                bucket,
485                interval_secs,
486                delete_after_read,
487                ..
488            } => {
489                assert_eq!(bucket, "my-bucket");
490                assert_eq!(interval_secs, 300);
491                assert!(!delete_after_read);
492            }
493            _ => panic!("Expected List mode"),
494        }
495    }
496
497    #[test]
498    fn test_s3_source_config_event_mode() {
499        let yaml = r#"
500            mode: event_stream
501            bucket: my-bucket
502            sqs:
503              queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/my-queue
504              poll_secs: 10
505        "#;
506        let config: S3SourceConfig = serde_yml::from_str(yaml).unwrap();
507        match config {
508            S3SourceConfig::EventStream { bucket, sqs } => {
509                assert_eq!(bucket, "my-bucket");
510                assert_eq!(
511                    sqs.queue_url,
512                    "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"
513                );
514                assert_eq!(sqs.poll_secs, 10);
515            }
516            _ => panic!("Expected EventStream mode"),
517        }
518    }
519}