Skip to main content

kinetic/sinks/gcp/
gcs.rs

1//! Google Cloud Storage (GCS) Sink.
2
3use arrow_select::concat::concat_batches;
4use gcloud_storage::client::Client as StorageClient;
5use gcloud_storage::http::objects::upload::{Media, UploadObjectRequest, UploadType};
6use gcp_common::{
7    client::create_storage_client,
8    config::{GcpConfig, GcsSinkConfig},
9};
10use kinetic_buffers::BufferReceiver;
11use kinetic_core::{EventBatch, ShutdownSignal};
12use metrics::{Label, counter};
13use time::OffsetDateTime;
14use tokio::time::{Duration, interval};
15use tracing::{debug, error, info};
16
17use kinetic_core::encode::Encoder;
18use std::sync::Arc;
19
20pub struct GcsSinkTask {
21    config: GcsSinkConfig,
22    component_id: String,
23    receiver: BufferReceiver,
24    shutdown: ShutdownSignal,
25    encoder: Arc<dyn Encoder>,
26    labels: Arc<[Label]>,
27}
28
29use async_trait::async_trait;
30use kinetic_core::healthcheck::Healthcheck;
31
32#[async_trait]
33impl Healthcheck for GcsSinkTask {
34    async fn check(&self) -> anyhow::Result<()> {
35        let gcp: GcpConfig = self.config.auth.as_ref().cloned().unwrap_or_default();
36        let client: StorageClient = create_storage_client(&gcp).await?;
37
38        let req = gcloud_storage::http::objects::list::ListObjectsRequest {
39            bucket: self.config.bucket.clone(),
40            max_results: Some(1),
41            ..Default::default()
42        };
43
44        client.list_objects(&req).await.map_err(|e| {
45            anyhow::anyhow!(
46                "GCS healthcheck failed for bucket '{}': {}",
47                self.config.bucket,
48                e
49            )
50        })?;
51        Ok(())
52    }
53}
54
55impl GcsSinkTask {
56    pub fn new(
57        config: GcsSinkConfig,
58        component_id: String,
59        receiver: BufferReceiver,
60        shutdown: ShutdownSignal,
61        encoder: Arc<dyn Encoder>,
62    ) -> Self {
63        let labels: Arc<[Label]> = Arc::new([
64            Label::new("component_id", component_id.clone()),
65            Label::new("component_type", "sink"),
66            Label::new("component_kind", "gcp_cloud_storage"),
67        ]);
68
69        Self {
70            config,
71            component_id,
72            receiver,
73            shutdown,
74            encoder,
75            labels,
76        }
77    }
78
79    pub async fn run(mut self) {
80        info!(
81            "Starting GCS sink '{}' to bucket: {}",
82            self.component_id, self.config.bucket
83        );
84
85        let gcp: GcpConfig = self.config.auth.as_ref().cloned().unwrap_or_default();
86        let client: StorageClient = match create_storage_client(&gcp).await {
87            Ok(c) => c,
88            Err(e) => {
89                error!("Failed to create GCS client: {}", e);
90                return;
91            }
92        };
93
94        let mut current_batch: Vec<EventBatch> = Vec::new();
95        let mut current_rows = 0;
96
97        let timeout = self.config.batch.timeout_secs.unwrap_or(30);
98        let mut flush_interval = interval(Duration::from_secs(timeout));
99        let max_size = self.config.batch.max_size.unwrap_or(1000);
100
101        let mut shutdown = self.shutdown.clone();
102
103        loop {
104            tokio::select! {
105                reason = shutdown.recv() => {
106                    info!("GCS sink '{}' received shutdown signal: {:?}", self.component_id, reason);
107                    if !current_batch.is_empty() && let Err(e) = self.flush_batch(&client, &mut current_batch, &mut current_rows).await {
108                        error!("Final flush failed in GCS sink {}: {}", self.component_id, e);
109                    }
110                    break;
111                }
112                _ = flush_interval.tick() => {
113                    if !current_batch.is_empty() && let Err(e) = self.flush_batch(&client, &mut current_batch, &mut current_rows).await {
114                        error!(
115                            "Scheduled flush failed in GCS sink {}: {}",
116                            self.component_id, e
117                        );
118                    }
119                }
120                batch = self.receiver.recv() => {
121                    let Some(batch) = batch else {
122                        info!("GCS sink '{}' receiver closed", self.component_id);
123                        if !current_batch.is_empty()
124                            && let Err(e) = self
125                                .flush_batch(&client, &mut current_batch, &mut current_rows)
126                                .await
127                        {
128                            error!(
129                                "Final flush after receiver closed failed in GCS sink {}: {}",
130                                self.component_id, e
131                            );
132                        }
133                        break;
134                    };
135
136                    counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
137                    counter!("component_received_event_bytes_total", self.labels.iter()).increment(batch.estimated_size() as u64);
138
139                    if !current_batch.is_empty() && batch.payload.schema() != current_batch[0].payload.schema() {
140                        debug!("Schema change detected in GCS sink {}, flushing current buffer before accepting new schema", self.component_id);
141                        self.flush_with_retry(&client, &mut current_batch, &mut current_rows, &mut shutdown).await;
142                        if !current_batch.is_empty() {
143                            break;
144                        }
145                        flush_interval.reset();
146                    }
147
148                    current_rows += batch.num_rows();
149                    current_batch.push(batch);
150
151                    if current_rows >= max_size {
152                        self.flush_with_retry(&client, &mut current_batch, &mut current_rows, &mut shutdown).await;
153                        flush_interval.reset();
154                    }
155                }
156            }
157        }
158
159        info!("GCS sink task {} shutting down", self.component_id);
160    }
161
162    async fn flush_with_retry(
163        &self,
164        client: &StorageClient,
165        batches: &mut Vec<EventBatch>,
166        rows: &mut usize,
167        shutdown: &mut ShutdownSignal,
168    ) {
169        let mut backoff = Duration::from_secs(1);
170        while !batches.is_empty() {
171            match self.flush_batch(client, batches, rows).await {
172                Ok(_) => break,
173                Err(e) => {
174                    error!(
175                        "GCS flush failed for component {}: {}. Retrying in {:?}...",
176                        self.component_id, e, backoff
177                    );
178                    tokio::select! {
179                        _ = shutdown.recv() => {
180                            error!("Shutdown received during GCS flush retry for {}. Data in current buffer may be lost.", self.component_id);
181                            return;
182                        }
183                        _ = tokio::time::sleep(backoff) => {
184                            backoff = (backoff * 2).min(Duration::from_secs(60));
185                        }
186                    }
187                }
188            }
189        }
190    }
191
192    async fn flush_batch(
193        &self,
194        client: &StorageClient,
195        batches: &mut Vec<EventBatch>,
196        rows: &mut usize,
197    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
198        if batches.is_empty() {
199            return Ok(());
200        }
201
202        debug!("GCS sink {} flushing {} rows", self.component_id, rows);
203
204        let record_batches: Vec<_> = batches.iter().map(|b| b.payload.clone()).collect();
205        let schema = record_batches[0].schema();
206        let merged_batch = concat_batches(&schema, record_batches.iter())?;
207
208        let metadata = batches[0].metadata.clone();
209        let event_batch = EventBatch::new(merged_batch, metadata)?;
210
211        let buf = self.encoder.encode(&event_batch)?;
212
213        let now = OffsetDateTime::now_utc();
214        let extension = if self.config.encoding.as_deref() == Some("parquet") {
215            "parquet"
216        } else {
217            "json"
218        };
219        let name = format!(
220            "{}{}.{}",
221            self.config.key_prefix,
222            now.unix_timestamp_nanos(),
223            extension
224        );
225
226        let buf_len = buf.len();
227
228        let upload_type = UploadType::Simple(Media::new(name.clone()));
229
230        let _: gcloud_storage::http::objects::Object = client
231            .upload_object(
232                &UploadObjectRequest {
233                    bucket: self.config.bucket.clone(),
234                    ..Default::default()
235                },
236                // PERFORMANCE: upload_object requires owned Vec<u8>, triggering a copy from Bytes.
237                buf.to_vec(),
238                &upload_type,
239            )
240            .await?;
241
242        counter!("component_sent_network_bytes_total", self.labels.iter())
243            .increment(buf_len as u64);
244        counter!("component_sent_events_total", self.labels.iter()).increment(*rows as u64);
245        counter!("component_sent_event_bytes_total", self.labels.iter())
246            .increment(event_batch.estimated_size() as u64);
247
248        info!(
249            "Successfully uploaded {} rows to gs://{}/{}",
250            rows, self.config.bucket, name
251        );
252
253        for mut batch in batches.drain(..) {
254            if let Some(token) = batch.ack_token.take() {
255                token.ack();
256            }
257        }
258
259        *rows = 0;
260        Ok(())
261    }
262}