1use kinetic_common::config::{SinkConfig, SinkContext, SourceConfig, SourceContext};
4use kinetic_doc_derive::{ComponentDoc, FieldDoc};
5use serde::{Deserialize, Serialize};
6use tokio::task::JoinHandle;
7
8#[derive(Clone, Deserialize, Serialize, Default, PartialEq, FieldDoc)]
10pub struct AwsConfig {
11 #[doc_field(example = "us-west-2")]
13 pub region: Option<String>,
14 #[doc_field(example = "http://localhost:4566")]
16 pub endpoint: Option<String>,
17 #[doc_field(example = "AKIAIOSFODNN7EXAMPLE")]
19 pub access_key_id: Option<String>,
20 #[doc_field(secret, example = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")]
22 pub secret_access_key: Option<String>,
23 #[doc_field(secret)]
25 pub session_token: Option<String>,
26 #[doc_field(example = "arn:aws:iam::123456789012:role/kinetic-ingest")]
28 pub assume_role: Option<String>,
29 #[doc_field(example = "kinetic-session")]
31 pub session_name: Option<String>,
32 pub external_id: Option<String>,
34 #[doc_field(example = "default")]
36 pub profile: Option<String>,
37 #[doc_field(example = "/home/kinetic/.aws/credentials")]
39 pub credentials_file: Option<String>,
40 #[serde(default)]
42 pub imds: Option<ImdsConfig>,
43 #[doc_field(default = "10")]
45 pub load_timeout_secs: Option<u64>,
46}
47
48impl std::fmt::Debug for AwsConfig {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 f.debug_struct("AwsConfig")
51 .field("region", &self.region)
52 .field("endpoint", &self.endpoint)
53 .field("access_key_id", &self.access_key_id)
54 .field(
55 "secret_access_key",
56 &self.secret_access_key.as_ref().map(|_| "***REDACTED***"),
57 )
58 .field(
59 "session_token",
60 &self.session_token.as_ref().map(|_| "***REDACTED***"),
61 )
62 .field("assume_role", &self.assume_role)
63 .field("session_name", &self.session_name)
64 .field("external_id", &self.external_id)
65 .field("profile", &self.profile)
66 .field("credentials_file", &self.credentials_file)
67 .field("imds", &self.imds)
68 .field("load_timeout_secs", &self.load_timeout_secs)
69 .finish()
70 }
71}
72
73#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq, FieldDoc)]
75pub struct ImdsConfig {
76 #[doc_field(default = "1", example = "5")]
78 pub connect_timeout_seconds: Option<u64>,
79 #[doc_field(default = "3", example = "5")]
81 pub max_attempts: Option<u32>,
82 #[doc_field(default = "1", example = "5")]
84 pub read_timeout_seconds: Option<u64>,
85}
86
87#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq, FieldDoc)]
89pub struct DeferredConfig {
90 #[doc_field(default = "3600", example = "86400")]
92 pub max_age_secs: Option<u64>,
93 #[doc_field(
95 required,
96 example = "https://sqs.us-west-2.amazonaws.com/123456789012/deferred-queue"
97 )]
98 pub queue_url: String,
99}
100
101#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
103pub struct SqsConfig {
104 pub auth: Option<AwsConfig>,
106
107 #[doc_field(
109 required,
110 example = "https://sqs.us-west-2.amazonaws.com/123456789012/my-queue"
111 )]
112 pub queue_url: String,
113
114 #[serde(default = "default_poll_secs")]
116 #[doc_field(default = "20", example = "10")]
117 pub poll_secs: u64,
118
119 #[serde(default = "default_max_number_of_messages")]
121 #[doc_field(default = "10", example = "10")]
122 pub max_number_of_messages: i32,
123
124 #[serde(default = "default_visibility_timeout_secs")]
126 #[doc_field(example = "30")]
127 pub visibility_timeout_secs: Option<i32>,
128
129 #[serde(default = "default_client_concurrency")]
131 #[doc_field(default = "1", example = "4")]
132 pub client_concurrency: usize,
133
134 pub connect_timeout_seconds: Option<u64>,
136 pub read_timeout_seconds: Option<u64>,
138 pub operation_timeout_seconds: Option<u64>,
140
141 #[serde(default = "default_true")]
143 #[doc_field(default = "true")]
144 pub delete_message: bool,
145
146 #[serde(default = "default_false")]
148 #[doc_field(default = "false")]
149 pub delete_failed_message: bool,
150
151 pub deferred: Option<DeferredConfig>,
153}
154
155impl SourceConfig for SqsConfig {
156 fn build(&self, _cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
157 Err(anyhow::anyhow!(
158 "SQS source build must be implemented in the data plane"
159 ))
160 }
161}
162
163#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
169#[serde(tag = "mode", rename_all = "snake_case")]
170pub enum S3SourceConfig {
171 List {
173 auth: Option<AwsConfig>,
175 bucket: String,
177 prefix: Option<String>,
179 #[serde(default = "default_list_interval")]
181 interval_secs: u64,
182 #[serde(default = "default_false")]
184 delete_after_read: bool,
185 },
186 EventStream {
188 sqs: SqsConfig,
190 bucket: String,
192 },
193}
194
195impl S3SourceConfig {
196 pub fn component_metadata() -> kinetic_doc_types::ComponentMetadata {
198 use kinetic_doc_types::{ComponentMetadata, FieldMetadata, HasFieldsMetadata};
199 let fields = vec![
200 FieldMetadata {
201 name: "mode".to_string(),
202 rust_type: "S3SourceMode".to_string(),
203 user_type: "string".to_string(),
204 required: true,
205 default: None,
206 example: Some("list".to_string()),
207 secret: false,
208 description:
209 "Source mode: `list` for polling or `event_stream` for SQS notifications."
210 .to_string(),
211 children: Vec::new(),
212 category: None,
213 enum_values: Vec::new(),
214 variants: Vec::new(),
215 reference: None,
216 reference_type: None,
217 },
218 FieldMetadata {
219 name: "bucket".to_string(),
220 rust_type: "String".to_string(),
221 user_type: "string".to_string(),
222 required: true,
223 default: None,
224 example: Some("my-data-bucket".to_string()),
225 secret: false,
226 description: "S3 bucket to read objects from.".to_string(),
227 children: Vec::new(),
228 category: None,
229 enum_values: Vec::new(),
230 variants: Vec::new(),
231 reference: None,
232 reference_type: None,
233 },
234 FieldMetadata {
235 name: "prefix".to_string(),
236 rust_type: "Option<String>".to_string(),
237 user_type: "string (optional)".to_string(),
238 required: false,
239 default: None,
240 example: Some("logs/2024/".to_string()),
241 secret: false,
242 description: "Key prefix filter (List mode only).".to_string(),
243 children: Vec::new(),
244 category: None,
245 enum_values: Vec::new(),
246 variants: Vec::new(),
247 reference: None,
248 reference_type: None,
249 },
250 FieldMetadata {
251 name: "interval_secs".to_string(),
252 rust_type: "u64".to_string(),
253 user_type: "unsigned integer".to_string(),
254 required: false,
255 default: Some("60".to_string()),
256 example: Some("300".to_string()),
257 secret: false,
258 description: "Polling interval in seconds (List mode only).".to_string(),
259 children: Vec::new(),
260 category: None,
261 enum_values: Vec::new(),
262 variants: Vec::new(),
263 reference: None,
264 reference_type: None,
265 },
266 FieldMetadata {
267 name: "delete_after_read".to_string(),
268 rust_type: "bool".to_string(),
269 user_type: "boolean".to_string(),
270 required: false,
271 default: Some("false".to_string()),
272 example: Some("true".to_string()),
273 secret: false,
274 description: "Whether to delete objects after reading (List mode only)."
275 .to_string(),
276 children: Vec::new(),
277 category: None,
278 enum_values: Vec::new(),
279 variants: Vec::new(),
280 reference: None,
281 reference_type: None,
282 },
283 FieldMetadata {
284 name: "auth".to_string(),
285 rust_type: "Option<AwsConfig>".to_string(),
286 user_type: "AwsConfig (optional)".to_string(),
287 required: false,
288 default: None,
289 example: None,
290 secret: false,
291 description: "AWS authentication and region settings.".to_string(),
292 children: <AwsConfig as HasFieldsMetadata>::fields_metadata(),
293 category: Some("Authentication".to_string()),
294 enum_values: Vec::new(),
295 variants: Vec::new(),
296 reference: None,
297 reference_type: None,
298 },
299 FieldMetadata {
300 name: "sqs".to_string(),
301 rust_type: "SqsConfig".to_string(),
302 user_type: "SqsConfig".to_string(),
303 required: false,
304 default: None,
305 example: None,
306 secret: false,
307 description: "SQS configuration for consuming S3 event notifications.".to_string(),
308 children: <SqsConfig as HasFieldsMetadata>::fields_metadata(),
309 category: Some("Event Stream".to_string()),
310 enum_values: Vec::new(),
311 variants: Vec::new(),
312 reference: None,
313 reference_type: None,
314 },
315 ];
316
317 ComponentMetadata {
318 component_type: "source".to_string(),
319 name: "aws_s3".to_string(),
320 status: "stable".to_string(),
321 description: "Reads events from Amazon S3 objects. Supports both list-based polling and SQS event notifications.".to_string(),
322 fields,
323 metrics: Vec::new(),
324 outputs: Vec::new(),
325 env_vars: Vec::new(),
326 permissions: Vec::new(),
327 }
328 }
329}
330
331impl kinetic_doc_types::HasFieldsMetadata for S3SourceConfig {
332 fn fields_metadata() -> Vec<kinetic_doc_types::FieldMetadata> {
333 Self::component_metadata().fields
334 }
335}
336
337impl SourceConfig for S3SourceConfig {
338 fn build(&self, _cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
339 Err(anyhow::anyhow!(
340 "S3 source build must be implemented in the data plane"
341 ))
342 }
343}
344
345#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
347#[component(type = "sink", name = "aws_s3")]
348pub struct S3SinkConfig {
349 pub auth: Option<AwsConfig>,
351
352 #[doc_field(required, example = "my-data-lake")]
354 pub bucket: String,
355
356 #[doc_field(required, example = "logs/kinetic/")]
358 pub key_prefix: String,
359
360 #[doc_field(example = "gzip")]
362 pub compression: Option<String>,
363
364 #[doc_field(example = "parquet")]
366 pub encoding: Option<String>,
367
368 #[serde(default)]
370 pub batch: BatchConfig,
371}
372
373impl SinkConfig for S3SinkConfig {
374 fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
375 Err(anyhow::anyhow!(
376 "S3 sink build must be implemented in the data plane"
377 ))
378 }
379}
380
381#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
385#[component(type = "source", name = "aws_kinesis")]
386pub struct KinesisConfig {
387 pub auth: Option<AwsConfig>,
389
390 #[doc_field(required, example = "my-stream")]
392 pub stream_name: String,
393}
394
395impl SourceConfig for KinesisConfig {
396 fn build(&self, _cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
397 Err(anyhow::anyhow!(
398 "Kinesis source build must be implemented in the data plane"
399 ))
400 }
401}
402
403impl SinkConfig for KinesisConfig {
404 fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
405 Err(anyhow::anyhow!(
406 "Kinesis sink build must be implemented in the data plane"
407 ))
408 }
409}
410
411#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq, FieldDoc)]
413pub struct BatchConfig {
414 #[doc_field(example = "1000")]
416 pub max_size: Option<usize>,
417 #[doc_field(example = "30")]
419 pub timeout_secs: Option<u64>,
420}
421
422fn default_poll_secs() -> u64 {
423 20
424}
425
426fn default_max_number_of_messages() -> i32 {
427 10
428}
429
430fn default_visibility_timeout_secs() -> Option<i32> {
431 None
432}
433
434fn default_client_concurrency() -> usize {
435 1
436}
437
438fn default_true() -> bool {
439 true
440}
441
442fn default_false() -> bool {
443 false
444}
445
446fn default_list_interval() -> u64 {
447 60
448}
449
450#[cfg(test)]
451#[allow(clippy::unwrap_used)]
452mod tests {
453 use super::*;
454
455 #[test]
456 fn test_aws_config_deserialization() {
457 let yaml = r#"
458 region: us-west-2
459 access_key_id: AKIA12345
460 secret_access_key: secret
461 assume_role: arn:aws:iam::123456789012:role/Role
462 imds:
463 connect_timeout_seconds: 5
464 "#;
465 let config: AwsConfig = serde_yml::from_str(yaml).unwrap();
466 assert_eq!(config.region.unwrap(), "us-west-2");
467 assert_eq!(config.access_key_id.unwrap(), "AKIA12345");
468 assert_eq!(config.imds.unwrap().connect_timeout_seconds.unwrap(), 5);
469 }
470
471 #[test]
472 fn test_s3_source_config_list_mode() {
473 let yaml = r#"
474 mode: list
475 bucket: my-bucket
476 prefix: logs/
477 interval_secs: 300
478 auth:
479 region: us-east-1
480 "#;
481 let config: S3SourceConfig = serde_yml::from_str(yaml).unwrap();
482 match config {
483 S3SourceConfig::List {
484 bucket,
485 interval_secs,
486 delete_after_read,
487 ..
488 } => {
489 assert_eq!(bucket, "my-bucket");
490 assert_eq!(interval_secs, 300);
491 assert!(!delete_after_read);
492 }
493 _ => panic!("Expected List mode"),
494 }
495 }
496
497 #[test]
498 fn test_s3_source_config_event_mode() {
499 let yaml = r#"
500 mode: event_stream
501 bucket: my-bucket
502 sqs:
503 queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/my-queue
504 poll_secs: 10
505 "#;
506 let config: S3SourceConfig = serde_yml::from_str(yaml).unwrap();
507 match config {
508 S3SourceConfig::EventStream { bucket, sqs } => {
509 assert_eq!(bucket, "my-bucket");
510 assert_eq!(
511 sqs.queue_url,
512 "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"
513 );
514 assert_eq!(sqs.poll_secs, 10);
515 }
516 _ => panic!("Expected EventStream mode"),
517 }
518 }
519}