Skip to main content

kinetic/sinks/aws/
s3.rs

1//! AWS S3 Sink.
2
3use arrow_select::concat::concat_batches;
4use aws_common::{client::create_s3_client, config::S3SinkConfig};
5use aws_sdk_s3::Client as S3Client;
6use kinetic_buffers::BufferReceiver;
7use kinetic_core::{EventBatch, ShutdownSignal};
8use metrics::{Label, counter};
9use time::OffsetDateTime;
10use tokio::time::{Duration, interval};
11use tracing::{debug, error, info};
12
13use kinetic_core::encode::Encoder;
14use std::sync::Arc;
15
16use kinetic_common::register;
17use kinetic_common::telemetry::EventDuration;
18use std::time::Instant;
19
20pub struct S3SinkTask {
21    config: S3SinkConfig,
22    component_id: String,
23    receiver: BufferReceiver,
24    shutdown: ShutdownSignal,
25    encoder: Arc<dyn Encoder>,
26    labels: Arc<[Label]>,
27    event_duration: EventDuration,
28}
29
30use crate::topology::healthcheck::Healthcheck;
31use async_trait::async_trait;
32
33#[async_trait]
34impl Healthcheck for S3SinkTask {
35    async fn check(&self) -> anyhow::Result<()> {
36        let aws = self.config.auth.as_ref().cloned().unwrap_or_default();
37        let client = create_s3_client(&aws).await;
38        client
39            .head_bucket()
40            .bucket(&self.config.bucket)
41            .send()
42            .await
43            .map_err(|e| {
44                anyhow::anyhow!("Failed to head bucket '{}': {}", self.config.bucket, e)
45            })?;
46        Ok(())
47    }
48}
49
50impl S3SinkTask {
51    pub fn new(
52        config: S3SinkConfig,
53        component_id: String,
54        receiver: BufferReceiver,
55        shutdown: ShutdownSignal,
56        encoder: Arc<dyn Encoder>,
57    ) -> Self {
58        let labels: Arc<[Label]> = Arc::new([
59            Label::new("component_id", component_id.clone()),
60            Label::new("component_type", "sink"),
61            Label::new("component_kind", "aws_s3"),
62        ]);
63        let event_duration = register!(EventDuration::new(component_id.clone(), "sink"));
64
65        Self {
66            config,
67            component_id,
68            receiver,
69            shutdown,
70            encoder,
71            labels,
72            event_duration,
73        }
74    }
75
76    pub async fn run(mut self) {
77        info!(
78            "Starting S3 sink '{}' to bucket: {}",
79            self.component_id, self.config.bucket
80        );
81
82        let aws = self.config.auth.as_ref().cloned().unwrap_or_default();
83        let client = create_s3_client(&aws).await;
84
85        let mut current_batch: Vec<EventBatch> = Vec::new();
86        let mut current_rows = 0;
87
88        let timeout = self.config.batch.timeout_secs.unwrap_or(30);
89        let mut flush_interval = interval(Duration::from_secs(timeout));
90        let max_size = self.config.batch.max_size.unwrap_or(1000);
91
92        let mut shutdown = self.shutdown.clone();
93
94        loop {
95            tokio::select! {
96                reason = shutdown.recv() => {
97                    info!("S3 sink '{}' received shutdown signal: {:?}", self.component_id, reason);
98                    if !current_batch.is_empty() {
99                        // Attempt one last flush. If it fails, we might lose data but we are shutting down.
100                        if let Err(e) = self.flush_batch(&client, &mut current_batch, &mut current_rows).await {
101                            error!("Final flush failed in S3 sink {}: {}", self.component_id, e);
102                        }
103                    }
104                    break;
105                }
106                _ = flush_interval.tick() => {
107                    if !current_batch.is_empty() {
108                        // We don't block the main loop for scheduled flushes, just try it.
109                        // If it fails, the next batch or next tick will try again.
110                        if let Err(e) = self.flush_batch(&client, &mut current_batch, &mut current_rows).await {
111                            error!(
112                                "Scheduled flush failed in S3 sink {}: {}",
113                                self.component_id, e
114                            );
115                        }
116                    }
117                }
118                batch = self.receiver.recv() => {
119                    let Some(batch) = batch else {
120                        info!("S3 sink '{}' receiver closed", self.component_id);
121                        if !current_batch.is_empty()
122                        {
123                            let start = Instant::now();
124                            if let Err(e) = self
125                                .flush_batch(&client, &mut current_batch, &mut current_rows)
126                                .await
127                            {
128                                error!(
129                                    "Final flush after receiver closed failed in S3 sink {}: {}",
130                                    self.component_id, e
131                                );
132                            }
133                            self.event_duration.emit(start.elapsed());
134                        }
135                        break;
136                    };
137                    let start = Instant::now();
138
139                    counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
140                    counter!("component_received_event_bytes_total", self.labels.iter()).increment(batch.estimated_size() as u64);
141
142                    // 1. Check for schema change. If changed, we MUST flush existing data first.
143                    if !current_batch.is_empty() && batch.payload.schema() != current_batch[0].payload.schema() {
144                        debug!("Schema change detected in S3 sink {}, flushing current buffer before accepting new schema", self.component_id);
145                        self.flush_with_retry(&client, &mut current_batch, &mut current_rows, &mut shutdown).await;
146                        // If we returned from flush_with_retry and it's still not empty, it means we are shutting down.
147                        if !current_batch.is_empty() {
148                            break;
149                        }
150                        flush_interval.reset();
151                    }
152
153                    // 2. Add to current batch
154                    current_rows += batch.num_rows();
155                    current_batch.push(batch);
156
157                    // 3. If over size, flush with retry to apply backpressure
158                    if current_rows >= max_size {
159                        self.flush_with_retry(&client, &mut current_batch, &mut current_rows, &mut shutdown).await;
160                        flush_interval.reset();
161                    }
162                    self.event_duration.emit(start.elapsed());
163                }
164            }
165        }
166
167        info!("S3 sink task {} shutting down", self.component_id);
168    }
169
170    async fn flush_with_retry(
171        &self,
172        client: &S3Client,
173        batches: &mut Vec<EventBatch>,
174        rows: &mut usize,
175        shutdown: &mut ShutdownSignal,
176    ) {
177        let mut backoff = Duration::from_secs(1);
178        while !batches.is_empty() {
179            match self.flush_batch(client, batches, rows).await {
180                Ok(_) => break,
181                Err(e) => {
182                    error!(
183                        "S3 flush failed for component {}: {}. Retrying in {:?}...",
184                        self.component_id, e, backoff
185                    );
186                    tokio::select! {
187                        _ = shutdown.recv() => {
188                            error!("Shutdown received during S3 flush retry for {}. Data in current buffer may be lost.", self.component_id);
189                            return;
190                        }
191                        _ = tokio::time::sleep(backoff) => {
192                            backoff = (backoff * 2).min(Duration::from_secs(60));
193                        }
194                    }
195                }
196            }
197        }
198    }
199
200    async fn flush_batch(
201        &self,
202        client: &S3Client,
203        batches: &mut Vec<EventBatch>,
204        rows: &mut usize,
205    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
206        if batches.is_empty() {
207            return Ok(());
208        }
209
210        debug!("S3 sink {} flushing {} rows", self.component_id, rows);
211
212        // Prepare the merged batch
213        let record_batches: Vec<_> = batches.iter().map(|b| b.payload.clone()).collect();
214        let schema = record_batches[0].schema();
215        let merged_batch = concat_batches(&schema, record_batches.iter())?;
216
217        // Wrap the merged batch in a temporary EventBatch for encoding
218        let metadata = batches[0].metadata.clone();
219        let event_batch = EventBatch::new(merged_batch, metadata)?;
220
221        let buf = self.encoder.encode(&event_batch)?;
222
223        let now = OffsetDateTime::now_utc();
224        let extension = if self.config.encoding.as_deref() == Some("parquet") {
225            "parquet"
226        } else {
227            "json"
228        };
229        let key = format!(
230            "{}{}.{}",
231            self.config.key_prefix,
232            now.unix_timestamp_nanos(),
233            extension
234        );
235
236        let body = aws_sdk_s3::primitives::ByteStream::from(buf.to_vec());
237        let buf_len = buf.len();
238
239        // The actual S3 upload
240        match client
241            .put_object()
242            .bucket(&self.config.bucket)
243            .key(&key)
244            .body(body)
245            .send()
246            .await
247        {
248            Ok(_) => {
249                counter!("component_sent_network_bytes_total", self.labels.iter())
250                    .increment(buf_len as u64);
251                counter!("component_sent_events_total", self.labels.iter()).increment(*rows as u64);
252                // For simplicity, we use the estimated size of the EventBatch as sent_event_bytes
253                counter!("component_sent_event_bytes_total", self.labels.iter())
254                    .increment(event_batch.estimated_size() as u64);
255            }
256            Err(e) => {
257                counter!("component_errors_total", self.labels.iter()).increment(1);
258                return Err(e.into());
259            }
260        }
261
262        info!(
263            "Successfully uploaded {} rows to s3://{}/{}",
264            rows, self.config.bucket, key
265        );
266
267        // Success! Now clear and acknowledge all batches
268        for mut batch in batches.drain(..) {
269            if let Some(token) = batch.ack_token.take() {
270                token.ack();
271            }
272        }
273
274        *rows = 0;
275        Ok(())
276    }
277}