1use arrow_select::concat::concat_batches;
4use gcloud_storage::client::Client as StorageClient;
5use gcloud_storage::http::objects::upload::{Media, UploadObjectRequest, UploadType};
6use gcp_common::{
7 client::create_storage_client,
8 config::{GcpConfig, GcsSinkConfig},
9};
10use kinetic_buffers::BufferReceiver;
11use kinetic_core::{EventBatch, ShutdownSignal};
12use metrics::{Label, counter};
13use time::OffsetDateTime;
14use tokio::time::{Duration, interval};
15use tracing::{debug, error, info};
16
17use kinetic_core::encode::Encoder;
18use std::sync::Arc;
19
20pub struct GcsSinkTask {
21 config: GcsSinkConfig,
22 component_id: String,
23 receiver: BufferReceiver,
24 shutdown: ShutdownSignal,
25 encoder: Arc<dyn Encoder>,
26 labels: Arc<[Label]>,
27}
28
29use async_trait::async_trait;
30use kinetic_core::healthcheck::Healthcheck;
31
32#[async_trait]
33impl Healthcheck for GcsSinkTask {
34 async fn check(&self) -> anyhow::Result<()> {
35 let gcp: GcpConfig = self.config.auth.as_ref().cloned().unwrap_or_default();
36 let client: StorageClient = create_storage_client(&gcp).await?;
37
38 let req = gcloud_storage::http::objects::list::ListObjectsRequest {
39 bucket: self.config.bucket.clone(),
40 max_results: Some(1),
41 ..Default::default()
42 };
43
44 client.list_objects(&req).await.map_err(|e| {
45 anyhow::anyhow!(
46 "GCS healthcheck failed for bucket '{}': {}",
47 self.config.bucket,
48 e
49 )
50 })?;
51 Ok(())
52 }
53}
54
55impl GcsSinkTask {
56 pub fn new(
57 config: GcsSinkConfig,
58 component_id: String,
59 receiver: BufferReceiver,
60 shutdown: ShutdownSignal,
61 encoder: Arc<dyn Encoder>,
62 ) -> Self {
63 let labels: Arc<[Label]> = Arc::new([
64 Label::new("component_id", component_id.clone()),
65 Label::new("component_type", "sink"),
66 Label::new("component_kind", "gcp_cloud_storage"),
67 ]);
68
69 Self {
70 config,
71 component_id,
72 receiver,
73 shutdown,
74 encoder,
75 labels,
76 }
77 }
78
79 pub async fn run(mut self) {
80 info!(
81 "Starting GCS sink '{}' to bucket: {}",
82 self.component_id, self.config.bucket
83 );
84
85 let gcp: GcpConfig = self.config.auth.as_ref().cloned().unwrap_or_default();
86 let client: StorageClient = match create_storage_client(&gcp).await {
87 Ok(c) => c,
88 Err(e) => {
89 error!("Failed to create GCS client: {}", e);
90 return;
91 }
92 };
93
94 let mut current_batch: Vec<EventBatch> = Vec::new();
95 let mut current_rows = 0;
96
97 let timeout = self.config.batch.timeout_secs.unwrap_or(30);
98 let mut flush_interval = interval(Duration::from_secs(timeout));
99 let max_size = self.config.batch.max_size.unwrap_or(1000);
100
101 let mut shutdown = self.shutdown.clone();
102
103 loop {
104 tokio::select! {
105 reason = shutdown.recv() => {
106 info!("GCS sink '{}' received shutdown signal: {:?}", self.component_id, reason);
107 if !current_batch.is_empty() && let Err(e) = self.flush_batch(&client, &mut current_batch, &mut current_rows).await {
108 error!("Final flush failed in GCS sink {}: {}", self.component_id, e);
109 }
110 break;
111 }
112 _ = flush_interval.tick() => {
113 if !current_batch.is_empty() && let Err(e) = self.flush_batch(&client, &mut current_batch, &mut current_rows).await {
114 error!(
115 "Scheduled flush failed in GCS sink {}: {}",
116 self.component_id, e
117 );
118 }
119 }
120 batch = self.receiver.recv() => {
121 let Some(batch) = batch else {
122 info!("GCS sink '{}' receiver closed", self.component_id);
123 if !current_batch.is_empty()
124 && let Err(e) = self
125 .flush_batch(&client, &mut current_batch, &mut current_rows)
126 .await
127 {
128 error!(
129 "Final flush after receiver closed failed in GCS sink {}: {}",
130 self.component_id, e
131 );
132 }
133 break;
134 };
135
136 counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
137 counter!("component_received_event_bytes_total", self.labels.iter()).increment(batch.estimated_size() as u64);
138
139 if !current_batch.is_empty() && batch.payload.schema() != current_batch[0].payload.schema() {
140 debug!("Schema change detected in GCS sink {}, flushing current buffer before accepting new schema", self.component_id);
141 self.flush_with_retry(&client, &mut current_batch, &mut current_rows, &mut shutdown).await;
142 if !current_batch.is_empty() {
143 break;
144 }
145 flush_interval.reset();
146 }
147
148 current_rows += batch.num_rows();
149 current_batch.push(batch);
150
151 if current_rows >= max_size {
152 self.flush_with_retry(&client, &mut current_batch, &mut current_rows, &mut shutdown).await;
153 flush_interval.reset();
154 }
155 }
156 }
157 }
158
159 info!("GCS sink task {} shutting down", self.component_id);
160 }
161
162 async fn flush_with_retry(
163 &self,
164 client: &StorageClient,
165 batches: &mut Vec<EventBatch>,
166 rows: &mut usize,
167 shutdown: &mut ShutdownSignal,
168 ) {
169 let mut backoff = Duration::from_secs(1);
170 while !batches.is_empty() {
171 match self.flush_batch(client, batches, rows).await {
172 Ok(_) => break,
173 Err(e) => {
174 error!(
175 "GCS flush failed for component {}: {}. Retrying in {:?}...",
176 self.component_id, e, backoff
177 );
178 tokio::select! {
179 _ = shutdown.recv() => {
180 error!("Shutdown received during GCS flush retry for {}. Data in current buffer may be lost.", self.component_id);
181 return;
182 }
183 _ = tokio::time::sleep(backoff) => {
184 backoff = (backoff * 2).min(Duration::from_secs(60));
185 }
186 }
187 }
188 }
189 }
190 }
191
192 async fn flush_batch(
193 &self,
194 client: &StorageClient,
195 batches: &mut Vec<EventBatch>,
196 rows: &mut usize,
197 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
198 if batches.is_empty() {
199 return Ok(());
200 }
201
202 debug!("GCS sink {} flushing {} rows", self.component_id, rows);
203
204 let record_batches: Vec<_> = batches.iter().map(|b| b.payload.clone()).collect();
205 let schema = record_batches[0].schema();
206 let merged_batch = concat_batches(&schema, record_batches.iter())?;
207
208 let metadata = batches[0].metadata.clone();
209 let event_batch = EventBatch::new(merged_batch, metadata)?;
210
211 let buf = self.encoder.encode(&event_batch)?;
212
213 let now = OffsetDateTime::now_utc();
214 let extension = if self.config.encoding.as_deref() == Some("parquet") {
215 "parquet"
216 } else {
217 "json"
218 };
219 let name = format!(
220 "{}{}.{}",
221 self.config.key_prefix,
222 now.unix_timestamp_nanos(),
223 extension
224 );
225
226 let buf_len = buf.len();
227
228 let upload_type = UploadType::Simple(Media::new(name.clone()));
229
230 let _: gcloud_storage::http::objects::Object = client
231 .upload_object(
232 &UploadObjectRequest {
233 bucket: self.config.bucket.clone(),
234 ..Default::default()
235 },
236 buf.to_vec(),
238 &upload_type,
239 )
240 .await?;
241
242 counter!("component_sent_network_bytes_total", self.labels.iter())
243 .increment(buf_len as u64);
244 counter!("component_sent_events_total", self.labels.iter()).increment(*rows as u64);
245 counter!("component_sent_event_bytes_total", self.labels.iter())
246 .increment(event_batch.estimated_size() as u64);
247
248 info!(
249 "Successfully uploaded {} rows to gs://{}/{}",
250 rows, self.config.bucket, name
251 );
252
253 for mut batch in batches.drain(..) {
254 if let Some(token) = batch.ack_token.take() {
255 token.ack();
256 }
257 }
258
259 *rows = 0;
260 Ok(())
261 }
262}