Skip to main content

gcp_common/
config.rs

1//! Configuration for Google Cloud Platform (GCP) components.
2
3use kinetic_common::config::{SinkConfig, SinkContext, SourceConfig, SourceContext};
4use kinetic_doc_derive::{ComponentDoc, FieldDoc};
5use serde::{Deserialize, Serialize};
6use tokio::task::JoinHandle;
7
8/// Shared GCP authentication and project configuration.
9#[derive(Clone, Deserialize, Serialize, Default, PartialEq, FieldDoc)]
10pub struct GcpConfig {
11    /// GCP Project ID.
12    #[doc_field(required, example = "my-gcp-project")]
13    pub project_id: Option<String>,
14
15    /// Path to a Service Account JSON key file.
16    /// If not provided, Application Default Credentials (ADC) will be used.
17    #[doc_field(example = "/secrets/gcp-sa.json")]
18    pub credentials_file: Option<String>,
19
20    /// Raw Service Account JSON key content.
21    /// If not provided, credentials_file or ADC will be used.
22    #[doc_field(secret)]
23    pub credentials_json: Option<String>,
24}
25
26impl std::fmt::Debug for GcpConfig {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        f.debug_struct("GcpConfig")
29            .field("project_id", &self.project_id)
30            .field("credentials_file", &self.credentials_file)
31            .field(
32                "credentials_json",
33                &self.credentials_json.as_ref().map(|_| "***REDACTED***"),
34            )
35            .finish()
36    }
37}
38
39/// Google Cloud Storage (GCS) source configuration.
40///
41/// Supports two modes:
42/// - `List`: Periodically lists objects in a bucket/prefix.
43/// - `EventStream`: Consumes GCS notifications from a Pub/Sub topic.
44#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
45#[serde(tag = "mode", rename_all = "snake_case")]
46pub enum GcsSourceConfig {
47    /// List mode: periodically polls for new objects.
48    List {
49        /// GCP authentication and project settings.
50        auth: Option<GcpConfig>,
51        /// GCS bucket to list objects from.
52        bucket: String,
53        /// Optional key prefix filter.
54        prefix: Option<String>,
55        /// Polling interval in seconds.
56        #[serde(default = "default_list_interval")]
57        interval_secs: u64,
58        /// Whether to delete objects from GCS after they have been successfully processed.
59        /// If true, this source will not keep a persistent state of processed objects.
60        #[serde(default)]
61        delete_after_read: bool,
62    },
63    /// Event stream mode: consumes GCS notifications from Pub/Sub.
64    EventStream {
65        /// Pub/Sub configuration for consuming GCS event notifications.
66        pubsub: PubSubConfig,
67        /// GCS bucket to read objects from.
68        bucket: String,
69        /// Whether to listen for OBJECT_METADATA_UPDATE events.
70        /// By default, only OBJECT_FINALIZE events are processed.
71        #[serde(default)]
72        include_metadata_updates: bool,
73    },
74}
75
76impl GcsSourceConfig {
77    /// Returns component metadata for documentation generation.
78    pub fn component_metadata() -> kinetic_doc_types::ComponentMetadata {
79        use kinetic_doc_types::{
80            ComponentMetadata, FieldMetadata, HasFieldsMetadata, VariantMetadata,
81        };
82
83        let variants = vec![
84            VariantMetadata {
85                name: "list".to_string(),
86                description: "List mode: periodically polls for new objects.".to_string(),
87                fields: vec![
88                    FieldMetadata {
89                        name: "auth".to_string(),
90                        rust_type: "Option<GcpConfig>".to_string(),
91                        user_type: "GcpConfig (optional)".to_string(),
92                        required: false,
93                        default: None,
94                        example: None,
95                        secret: false,
96                        description: "GCP authentication and project settings.".to_string(),
97                        children: <GcpConfig as HasFieldsMetadata>::fields_metadata(),
98                        category: Some("Authentication".to_string()),
99                        enum_values: Vec::new(),
100                        variants: Vec::new(),
101                        reference: None,
102                        reference_type: None,
103                    },
104                    FieldMetadata {
105                        name: "bucket".to_string(),
106                        rust_type: "String".to_string(),
107                        user_type: "string".to_string(),
108                        required: true,
109                        default: None,
110                        example: Some("my-gcs-bucket".to_string()),
111                        secret: false,
112                        description: "GCS bucket to list objects from.".to_string(),
113                        children: Vec::new(),
114                        category: None,
115                        enum_values: Vec::new(),
116                        variants: Vec::new(),
117                        reference: None,
118                        reference_type: None,
119                    },
120                    FieldMetadata {
121                        name: "prefix".to_string(),
122                        rust_type: "Option<String>".to_string(),
123                        user_type: "string (optional)".to_string(),
124                        required: false,
125                        default: None,
126                        example: Some("logs/".to_string()),
127                        secret: false,
128                        description: "Key prefix filter.".to_string(),
129                        children: Vec::new(),
130                        category: None,
131                        enum_values: Vec::new(),
132                        variants: Vec::new(),
133                        reference: None,
134                        reference_type: None,
135                    },
136                    FieldMetadata {
137                        name: "interval_secs".to_string(),
138                        rust_type: "u64".to_string(),
139                        user_type: "unsigned integer".to_string(),
140                        required: false,
141                        default: Some("60".to_string()),
142                        example: Some("300".to_string()),
143                        secret: false,
144                        description: "Polling interval in seconds.".to_string(),
145                        children: Vec::new(),
146                        category: None,
147                        enum_values: Vec::new(),
148                        variants: Vec::new(),
149                        reference: None,
150                        reference_type: None,
151                    },
152                    FieldMetadata {
153                        name: "delete_after_read".to_string(),
154                        rust_type: "bool".to_string(),
155                        user_type: "boolean".to_string(),
156                        required: false,
157                        default: Some("false".to_string()),
158                        example: Some("true".to_string()),
159                        secret: false,
160                        description: "Whether to delete objects from GCS after they have been successfully processed.".to_string(),
161                        children: Vec::new(),
162                        category: None,
163                        enum_values: Vec::new(),
164                        variants: Vec::new(),
165                        reference: None,
166                        reference_type: None,
167                    },
168                ],
169                example: Some("mode: list\nbucket: my-gcs-bucket\ninterval_secs: 60".to_string()),
170            },
171            VariantMetadata {
172                name: "event_stream".to_string(),
173                description: "Event stream mode: consumes GCS notifications from Pub/Sub.".to_string(),
174                fields: vec![
175                    FieldMetadata {
176                        name: "pubsub".to_string(),
177                        rust_type: "PubSubConfig".to_string(),
178                        user_type: "PubSubConfig".to_string(),
179                        required: true,
180                        default: None,
181                        example: None,
182                        secret: false,
183                        description: "Pub/Sub configuration for consuming GCS event notifications.".to_string(),
184                        children: <PubSubConfig as HasFieldsMetadata>::fields_metadata(),
185                        category: Some("Event Stream".to_string()),
186                        enum_values: Vec::new(),
187                        variants: Vec::new(),
188                        reference: None,
189                        reference_type: None,
190                    },
191                    FieldMetadata {
192                        name: "bucket".to_string(),
193                        rust_type: "String".to_string(),
194                        user_type: "string".to_string(),
195                        required: true,
196                        default: None,
197                        example: Some("my-gcs-bucket".to_string()),
198                        secret: false,
199                        description: "GCS bucket to read objects from.".to_string(),
200                        children: Vec::new(),
201                        category: None,
202                        enum_values: Vec::new(),
203                        variants: Vec::new(),
204                        reference: None,
205                        reference_type: None,
206                    },
207                    FieldMetadata {
208                        name: "include_metadata_updates".to_string(),
209                        rust_type: "bool".to_string(),
210                        user_type: "boolean".to_string(),
211                        required: false,
212                        default: Some("false".to_string()),
213                        example: Some("true".to_string()),
214                        secret: false,
215                        description: "Whether to listen for OBJECT_METADATA_UPDATE events.".to_string(),
216                        children: Vec::new(),
217                        category: None,
218                        enum_values: Vec::new(),
219                        variants: Vec::new(),
220                        reference: None,
221                        reference_type: None,
222                    },
223                ],
224                example: Some("mode: event_stream\nbucket: my-gcs-bucket\npubsub:\n  topic: my-topic\n  subscription: my-sub".to_string()),
225            }
226        ];
227
228        let fields = vec![FieldMetadata {
229            name: "mode".to_string(),
230            rust_type: "GcsSourceMode".to_string(),
231            user_type: "string".to_string(),
232            required: true,
233            default: None,
234            example: Some("list".to_string()),
235            secret: false,
236            description:
237                "Source mode: `list` for polling or `event_stream` for Pub/Sub notifications."
238                    .to_string(),
239            children: Vec::new(),
240            category: None,
241            enum_values: Vec::new(),
242            variants,
243            reference: None,
244            reference_type: None,
245        }];
246
247        ComponentMetadata {
248            component_type: "source".to_string(),
249            name: "gcp_cloud_storage".to_string(),
250            status: "stable".to_string(),
251            description: "Reads events from Google Cloud Storage objects. Supports both list-based polling and Pub/Sub notifications.".to_string(),
252            fields,
253            metrics: Vec::new(),
254            outputs: Vec::new(),
255            env_vars: Vec::new(),
256            permissions: Vec::new(),
257        }
258    }
259}
260
261impl kinetic_doc_types::HasFieldsMetadata for GcsSourceConfig {
262    fn fields_metadata() -> Vec<kinetic_doc_types::FieldMetadata> {
263        Self::component_metadata().fields
264    }
265}
266
267impl SourceConfig for GcsSourceConfig {
268    fn build(&self, _cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
269        Err(anyhow::anyhow!(
270            "GCS source build must be implemented in the data plane"
271        ))
272    }
273}
274
275/// Google Cloud Storage (GCS) sink configuration.
276#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
277#[component(type = "sink", name = "gcp_cloud_storage")]
278pub struct GcsSinkConfig {
279    /// GCP authentication and project settings.
280    pub auth: Option<GcpConfig>,
281
282    /// GCS bucket name.
283    #[doc_field(required, example = "my-data-lake")]
284    pub bucket: String,
285
286    /// Key prefix for GCS objects.
287    #[doc_field(required, example = "logs/kinetic/")]
288    pub key_prefix: String,
289
290    /// Compression algorithm for GCS objects.
291    #[doc_field(example = "gzip")]
292    pub compression: Option<String>,
293
294    /// Encoding format for the GCS object (e.g. 'json' or 'parquet').
295    #[doc_field(example = "parquet")]
296    pub encoding: Option<String>,
297
298    /// Batching configuration for GCS writes.
299    #[serde(default)]
300    pub batch: aws_common::config::BatchConfig,
301}
302
303impl SinkConfig for GcsSinkConfig {
304    fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
305        Err(anyhow::anyhow!(
306            "GCS sink build must be implemented in the data plane"
307        ))
308    }
309}
310
311/// Google Cloud Pub/Sub configuration.
312#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
313pub struct PubSubConfig {
314    /// GCP authentication and project settings.
315    pub auth: Option<GcpConfig>,
316
317    /// Pub/Sub topic name.
318    #[doc_field(required, example = "my-topic")]
319    pub topic: String,
320
321    /// Pub/Sub subscription name (required for sources).
322    #[doc_field(example = "my-subscription")]
323    pub subscription: Option<String>,
324
325    /// Dead Letter Topic name.
326    #[doc_field(example = "my-dead-letter-topic")]
327    pub dead_letter_topic: Option<String>,
328
329    /// Whether to enable message ordering.
330    #[serde(default)]
331    pub enable_message_ordering: bool,
332
333    /// Field name to use as the ordering key.
334    #[doc_field(example = "trace_id")]
335    pub ordering_key_field: Option<String>,
336}
337
338/// Google Cloud Pub/Sub source configuration.
339#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
340#[component(type = "source", name = "gcp_pubsub")]
341pub struct PubSubSourceConfig {
342    /// Inner Pub/Sub configuration.
343    #[serde(flatten)]
344    #[doc_field(flatten)]
345    pub config: PubSubConfig,
346}
347
348impl SourceConfig for PubSubSourceConfig {
349    fn build(&self, _cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
350        Err(anyhow::anyhow!(
351            "Pub/Sub source build must be implemented in the data plane"
352        ))
353    }
354}
355
356/// Google Cloud Pub/Sub sink configuration.
357#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
358#[component(type = "sink", name = "gcp_pubsub")]
359pub struct PubSubSinkConfig {
360    /// Inner Pub/Sub configuration.
361    #[serde(flatten)]
362    #[doc_field(flatten)]
363    pub config: PubSubConfig,
364}
365
366impl SinkConfig for PubSubSinkConfig {
367    fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
368        Err(anyhow::anyhow!(
369            "Pub/Sub sink build must be implemented in the data plane"
370        ))
371    }
372}
373
374fn default_list_interval() -> u64 {
375    60
376}