1use arrow_select::concat::concat_batches;
4use aws_common::{client::create_s3_client, config::S3SinkConfig};
5use aws_sdk_s3::Client as S3Client;
6use kinetic_buffers::BufferReceiver;
7use kinetic_core::{EventBatch, ShutdownSignal};
8use metrics::{Label, counter};
9use time::OffsetDateTime;
10use tokio::time::{Duration, interval};
11use tracing::{debug, error, info};
12
13use kinetic_core::encode::Encoder;
14use std::sync::Arc;
15
16use kinetic_common::register;
17use kinetic_common::telemetry::EventDuration;
18use std::time::Instant;
19
20pub struct S3SinkTask {
21 config: S3SinkConfig,
22 component_id: String,
23 receiver: BufferReceiver,
24 shutdown: ShutdownSignal,
25 encoder: Arc<dyn Encoder>,
26 labels: Arc<[Label]>,
27 event_duration: EventDuration,
28}
29
30use crate::topology::healthcheck::Healthcheck;
31use async_trait::async_trait;
32
33#[async_trait]
34impl Healthcheck for S3SinkTask {
35 async fn check(&self) -> anyhow::Result<()> {
36 let aws = self.config.auth.as_ref().cloned().unwrap_or_default();
37 let client = create_s3_client(&aws).await;
38 client
39 .head_bucket()
40 .bucket(&self.config.bucket)
41 .send()
42 .await
43 .map_err(|e| {
44 anyhow::anyhow!("Failed to head bucket '{}': {}", self.config.bucket, e)
45 })?;
46 Ok(())
47 }
48}
49
50impl S3SinkTask {
51 pub fn new(
52 config: S3SinkConfig,
53 component_id: String,
54 receiver: BufferReceiver,
55 shutdown: ShutdownSignal,
56 encoder: Arc<dyn Encoder>,
57 ) -> Self {
58 let labels: Arc<[Label]> = Arc::new([
59 Label::new("component_id", component_id.clone()),
60 Label::new("component_type", "sink"),
61 Label::new("component_kind", "aws_s3"),
62 ]);
63 let event_duration = register!(EventDuration::new(component_id.clone(), "sink"));
64
65 Self {
66 config,
67 component_id,
68 receiver,
69 shutdown,
70 encoder,
71 labels,
72 event_duration,
73 }
74 }
75
76 pub async fn run(mut self) {
77 info!(
78 "Starting S3 sink '{}' to bucket: {}",
79 self.component_id, self.config.bucket
80 );
81
82 let aws = self.config.auth.as_ref().cloned().unwrap_or_default();
83 let client = create_s3_client(&aws).await;
84
85 let mut current_batch: Vec<EventBatch> = Vec::new();
86 let mut current_rows = 0;
87
88 let timeout = self.config.batch.timeout_secs.unwrap_or(30);
89 let mut flush_interval = interval(Duration::from_secs(timeout));
90 let max_size = self.config.batch.max_size.unwrap_or(1000);
91
92 let mut shutdown = self.shutdown.clone();
93
94 loop {
95 tokio::select! {
96 reason = shutdown.recv() => {
97 info!("S3 sink '{}' received shutdown signal: {:?}", self.component_id, reason);
98 if !current_batch.is_empty() {
99 if let Err(e) = self.flush_batch(&client, &mut current_batch, &mut current_rows).await {
101 error!("Final flush failed in S3 sink {}: {}", self.component_id, e);
102 }
103 }
104 break;
105 }
106 _ = flush_interval.tick() => {
107 if !current_batch.is_empty() {
108 if let Err(e) = self.flush_batch(&client, &mut current_batch, &mut current_rows).await {
111 error!(
112 "Scheduled flush failed in S3 sink {}: {}",
113 self.component_id, e
114 );
115 }
116 }
117 }
118 batch = self.receiver.recv() => {
119 let Some(batch) = batch else {
120 info!("S3 sink '{}' receiver closed", self.component_id);
121 if !current_batch.is_empty()
122 {
123 let start = Instant::now();
124 if let Err(e) = self
125 .flush_batch(&client, &mut current_batch, &mut current_rows)
126 .await
127 {
128 error!(
129 "Final flush after receiver closed failed in S3 sink {}: {}",
130 self.component_id, e
131 );
132 }
133 self.event_duration.emit(start.elapsed());
134 }
135 break;
136 };
137 let start = Instant::now();
138
139 counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
140 counter!("component_received_event_bytes_total", self.labels.iter()).increment(batch.estimated_size() as u64);
141
142 if !current_batch.is_empty() && batch.payload.schema() != current_batch[0].payload.schema() {
144 debug!("Schema change detected in S3 sink {}, flushing current buffer before accepting new schema", self.component_id);
145 self.flush_with_retry(&client, &mut current_batch, &mut current_rows, &mut shutdown).await;
146 if !current_batch.is_empty() {
148 break;
149 }
150 flush_interval.reset();
151 }
152
153 current_rows += batch.num_rows();
155 current_batch.push(batch);
156
157 if current_rows >= max_size {
159 self.flush_with_retry(&client, &mut current_batch, &mut current_rows, &mut shutdown).await;
160 flush_interval.reset();
161 }
162 self.event_duration.emit(start.elapsed());
163 }
164 }
165 }
166
167 info!("S3 sink task {} shutting down", self.component_id);
168 }
169
170 async fn flush_with_retry(
171 &self,
172 client: &S3Client,
173 batches: &mut Vec<EventBatch>,
174 rows: &mut usize,
175 shutdown: &mut ShutdownSignal,
176 ) {
177 let mut backoff = Duration::from_secs(1);
178 while !batches.is_empty() {
179 match self.flush_batch(client, batches, rows).await {
180 Ok(_) => break,
181 Err(e) => {
182 error!(
183 "S3 flush failed for component {}: {}. Retrying in {:?}...",
184 self.component_id, e, backoff
185 );
186 tokio::select! {
187 _ = shutdown.recv() => {
188 error!("Shutdown received during S3 flush retry for {}. Data in current buffer may be lost.", self.component_id);
189 return;
190 }
191 _ = tokio::time::sleep(backoff) => {
192 backoff = (backoff * 2).min(Duration::from_secs(60));
193 }
194 }
195 }
196 }
197 }
198 }
199
200 async fn flush_batch(
201 &self,
202 client: &S3Client,
203 batches: &mut Vec<EventBatch>,
204 rows: &mut usize,
205 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
206 if batches.is_empty() {
207 return Ok(());
208 }
209
210 debug!("S3 sink {} flushing {} rows", self.component_id, rows);
211
212 let record_batches: Vec<_> = batches.iter().map(|b| b.payload.clone()).collect();
214 let schema = record_batches[0].schema();
215 let merged_batch = concat_batches(&schema, record_batches.iter())?;
216
217 let metadata = batches[0].metadata.clone();
219 let event_batch = EventBatch::new(merged_batch, metadata)?;
220
221 let buf = self.encoder.encode(&event_batch)?;
222
223 let now = OffsetDateTime::now_utc();
224 let extension = if self.config.encoding.as_deref() == Some("parquet") {
225 "parquet"
226 } else {
227 "json"
228 };
229 let key = format!(
230 "{}{}.{}",
231 self.config.key_prefix,
232 now.unix_timestamp_nanos(),
233 extension
234 );
235
236 let body = aws_sdk_s3::primitives::ByteStream::from(buf.to_vec());
237 let buf_len = buf.len();
238
239 match client
241 .put_object()
242 .bucket(&self.config.bucket)
243 .key(&key)
244 .body(body)
245 .send()
246 .await
247 {
248 Ok(_) => {
249 counter!("component_sent_network_bytes_total", self.labels.iter())
250 .increment(buf_len as u64);
251 counter!("component_sent_events_total", self.labels.iter()).increment(*rows as u64);
252 counter!("component_sent_event_bytes_total", self.labels.iter())
254 .increment(event_batch.estimated_size() as u64);
255 }
256 Err(e) => {
257 counter!("component_errors_total", self.labels.iter()).increment(1);
258 return Err(e.into());
259 }
260 }
261
262 info!(
263 "Successfully uploaded {} rows to s3://{}/{}",
264 rows, self.config.bucket, key
265 );
266
267 for mut batch in batches.drain(..) {
269 if let Some(token) = batch.ack_token.take() {
270 token.ack();
271 }
272 }
273
274 *rows = 0;
275 Ok(())
276 }
277}