1use kinetic_common::config::{SinkConfig, SinkContext, SourceConfig, SourceContext};
4use kinetic_doc_derive::{ComponentDoc, FieldDoc};
5use serde::{Deserialize, Serialize};
6use tokio::task::JoinHandle;
7
8#[derive(Clone, Deserialize, Serialize, Default, PartialEq, FieldDoc)]
10pub struct GcpConfig {
11 #[doc_field(required, example = "my-gcp-project")]
13 pub project_id: Option<String>,
14
15 #[doc_field(example = "/secrets/gcp-sa.json")]
18 pub credentials_file: Option<String>,
19
20 #[doc_field(secret)]
23 pub credentials_json: Option<String>,
24}
25
26impl std::fmt::Debug for GcpConfig {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 f.debug_struct("GcpConfig")
29 .field("project_id", &self.project_id)
30 .field("credentials_file", &self.credentials_file)
31 .field(
32 "credentials_json",
33 &self.credentials_json.as_ref().map(|_| "***REDACTED***"),
34 )
35 .finish()
36 }
37}
38
39#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
45#[serde(tag = "mode", rename_all = "snake_case")]
46pub enum GcsSourceConfig {
47 List {
49 auth: Option<GcpConfig>,
51 bucket: String,
53 prefix: Option<String>,
55 #[serde(default = "default_list_interval")]
57 interval_secs: u64,
58 #[serde(default)]
61 delete_after_read: bool,
62 },
63 EventStream {
65 pubsub: PubSubConfig,
67 bucket: String,
69 #[serde(default)]
72 include_metadata_updates: bool,
73 },
74}
75
76impl GcsSourceConfig {
77 pub fn component_metadata() -> kinetic_doc_types::ComponentMetadata {
79 use kinetic_doc_types::{
80 ComponentMetadata, FieldMetadata, HasFieldsMetadata, VariantMetadata,
81 };
82
83 let variants = vec![
84 VariantMetadata {
85 name: "list".to_string(),
86 description: "List mode: periodically polls for new objects.".to_string(),
87 fields: vec![
88 FieldMetadata {
89 name: "auth".to_string(),
90 rust_type: "Option<GcpConfig>".to_string(),
91 user_type: "GcpConfig (optional)".to_string(),
92 required: false,
93 default: None,
94 example: None,
95 secret: false,
96 description: "GCP authentication and project settings.".to_string(),
97 children: <GcpConfig as HasFieldsMetadata>::fields_metadata(),
98 category: Some("Authentication".to_string()),
99 enum_values: Vec::new(),
100 variants: Vec::new(),
101 reference: None,
102 reference_type: None,
103 },
104 FieldMetadata {
105 name: "bucket".to_string(),
106 rust_type: "String".to_string(),
107 user_type: "string".to_string(),
108 required: true,
109 default: None,
110 example: Some("my-gcs-bucket".to_string()),
111 secret: false,
112 description: "GCS bucket to list objects from.".to_string(),
113 children: Vec::new(),
114 category: None,
115 enum_values: Vec::new(),
116 variants: Vec::new(),
117 reference: None,
118 reference_type: None,
119 },
120 FieldMetadata {
121 name: "prefix".to_string(),
122 rust_type: "Option<String>".to_string(),
123 user_type: "string (optional)".to_string(),
124 required: false,
125 default: None,
126 example: Some("logs/".to_string()),
127 secret: false,
128 description: "Key prefix filter.".to_string(),
129 children: Vec::new(),
130 category: None,
131 enum_values: Vec::new(),
132 variants: Vec::new(),
133 reference: None,
134 reference_type: None,
135 },
136 FieldMetadata {
137 name: "interval_secs".to_string(),
138 rust_type: "u64".to_string(),
139 user_type: "unsigned integer".to_string(),
140 required: false,
141 default: Some("60".to_string()),
142 example: Some("300".to_string()),
143 secret: false,
144 description: "Polling interval in seconds.".to_string(),
145 children: Vec::new(),
146 category: None,
147 enum_values: Vec::new(),
148 variants: Vec::new(),
149 reference: None,
150 reference_type: None,
151 },
152 FieldMetadata {
153 name: "delete_after_read".to_string(),
154 rust_type: "bool".to_string(),
155 user_type: "boolean".to_string(),
156 required: false,
157 default: Some("false".to_string()),
158 example: Some("true".to_string()),
159 secret: false,
160 description: "Whether to delete objects from GCS after they have been successfully processed.".to_string(),
161 children: Vec::new(),
162 category: None,
163 enum_values: Vec::new(),
164 variants: Vec::new(),
165 reference: None,
166 reference_type: None,
167 },
168 ],
169 example: Some("mode: list\nbucket: my-gcs-bucket\ninterval_secs: 60".to_string()),
170 },
171 VariantMetadata {
172 name: "event_stream".to_string(),
173 description: "Event stream mode: consumes GCS notifications from Pub/Sub.".to_string(),
174 fields: vec![
175 FieldMetadata {
176 name: "pubsub".to_string(),
177 rust_type: "PubSubConfig".to_string(),
178 user_type: "PubSubConfig".to_string(),
179 required: true,
180 default: None,
181 example: None,
182 secret: false,
183 description: "Pub/Sub configuration for consuming GCS event notifications.".to_string(),
184 children: <PubSubConfig as HasFieldsMetadata>::fields_metadata(),
185 category: Some("Event Stream".to_string()),
186 enum_values: Vec::new(),
187 variants: Vec::new(),
188 reference: None,
189 reference_type: None,
190 },
191 FieldMetadata {
192 name: "bucket".to_string(),
193 rust_type: "String".to_string(),
194 user_type: "string".to_string(),
195 required: true,
196 default: None,
197 example: Some("my-gcs-bucket".to_string()),
198 secret: false,
199 description: "GCS bucket to read objects from.".to_string(),
200 children: Vec::new(),
201 category: None,
202 enum_values: Vec::new(),
203 variants: Vec::new(),
204 reference: None,
205 reference_type: None,
206 },
207 FieldMetadata {
208 name: "include_metadata_updates".to_string(),
209 rust_type: "bool".to_string(),
210 user_type: "boolean".to_string(),
211 required: false,
212 default: Some("false".to_string()),
213 example: Some("true".to_string()),
214 secret: false,
215 description: "Whether to listen for OBJECT_METADATA_UPDATE events.".to_string(),
216 children: Vec::new(),
217 category: None,
218 enum_values: Vec::new(),
219 variants: Vec::new(),
220 reference: None,
221 reference_type: None,
222 },
223 ],
224 example: Some("mode: event_stream\nbucket: my-gcs-bucket\npubsub:\n topic: my-topic\n subscription: my-sub".to_string()),
225 }
226 ];
227
228 let fields = vec![FieldMetadata {
229 name: "mode".to_string(),
230 rust_type: "GcsSourceMode".to_string(),
231 user_type: "string".to_string(),
232 required: true,
233 default: None,
234 example: Some("list".to_string()),
235 secret: false,
236 description:
237 "Source mode: `list` for polling or `event_stream` for Pub/Sub notifications."
238 .to_string(),
239 children: Vec::new(),
240 category: None,
241 enum_values: Vec::new(),
242 variants,
243 reference: None,
244 reference_type: None,
245 }];
246
247 ComponentMetadata {
248 component_type: "source".to_string(),
249 name: "gcp_cloud_storage".to_string(),
250 status: "stable".to_string(),
251 description: "Reads events from Google Cloud Storage objects. Supports both list-based polling and Pub/Sub notifications.".to_string(),
252 fields,
253 metrics: Vec::new(),
254 outputs: Vec::new(),
255 env_vars: Vec::new(),
256 permissions: Vec::new(),
257 }
258 }
259}
260
261impl kinetic_doc_types::HasFieldsMetadata for GcsSourceConfig {
262 fn fields_metadata() -> Vec<kinetic_doc_types::FieldMetadata> {
263 Self::component_metadata().fields
264 }
265}
266
267impl SourceConfig for GcsSourceConfig {
268 fn build(&self, _cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
269 Err(anyhow::anyhow!(
270 "GCS source build must be implemented in the data plane"
271 ))
272 }
273}
274
275#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
277#[component(type = "sink", name = "gcp_cloud_storage")]
278pub struct GcsSinkConfig {
279 pub auth: Option<GcpConfig>,
281
282 #[doc_field(required, example = "my-data-lake")]
284 pub bucket: String,
285
286 #[doc_field(required, example = "logs/kinetic/")]
288 pub key_prefix: String,
289
290 #[doc_field(example = "gzip")]
292 pub compression: Option<String>,
293
294 #[doc_field(example = "parquet")]
296 pub encoding: Option<String>,
297
298 #[serde(default)]
300 pub batch: aws_common::config::BatchConfig,
301}
302
303impl SinkConfig for GcsSinkConfig {
304 fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
305 Err(anyhow::anyhow!(
306 "GCS sink build must be implemented in the data plane"
307 ))
308 }
309}
310
311#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
313pub struct PubSubConfig {
314 pub auth: Option<GcpConfig>,
316
317 #[doc_field(required, example = "my-topic")]
319 pub topic: String,
320
321 #[doc_field(example = "my-subscription")]
323 pub subscription: Option<String>,
324
325 #[doc_field(example = "my-dead-letter-topic")]
327 pub dead_letter_topic: Option<String>,
328
329 #[serde(default)]
331 pub enable_message_ordering: bool,
332
333 #[doc_field(example = "trace_id")]
335 pub ordering_key_field: Option<String>,
336}
337
338#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
340#[component(type = "source", name = "gcp_pubsub")]
341pub struct PubSubSourceConfig {
342 #[serde(flatten)]
344 #[doc_field(flatten)]
345 pub config: PubSubConfig,
346}
347
348impl SourceConfig for PubSubSourceConfig {
349 fn build(&self, _cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
350 Err(anyhow::anyhow!(
351 "Pub/Sub source build must be implemented in the data plane"
352 ))
353 }
354}
355
356#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
358#[component(type = "sink", name = "gcp_pubsub")]
359pub struct PubSubSinkConfig {
360 #[serde(flatten)]
362 #[doc_field(flatten)]
363 pub config: PubSubConfig,
364}
365
366impl SinkConfig for PubSubSinkConfig {
367 fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
368 Err(anyhow::anyhow!(
369 "Pub/Sub sink build must be implemented in the data plane"
370 ))
371 }
372}
373
374fn default_list_interval() -> u64 {
375 60
376}