1use arrow_select::concat::concat_batches;
2use aws_common::client::create_s3_client;
3use kinetic_buffers::BufferReceiver;
4use kinetic_config::model::ArchiveStorageConfig;
5use kinetic_core::encode::Encoder;
6use kinetic_core::{EventBatch, ShutdownSignal};
7use kinetic_encoder_parquet::ParquetEncoder;
8use metrics::{Label, counter};
9use std::sync::Arc;
10use time::OffsetDateTime;
11use tokio::task::JoinHandle;
12use tokio::time::{Duration, interval};
13use tracing::{error, info};
14
15pub struct ShadowArchiveSink {
16 config: ArchiveStorageConfig,
17 receiver: BufferReceiver,
18 shutdown: ShutdownSignal,
19 labels: Arc<[Label]>,
20 pipeline_id: String,
21 encoder: ParquetEncoder,
22}
23
24impl ShadowArchiveSink {
25 pub fn new(
26 config: ArchiveStorageConfig,
27 receiver: BufferReceiver,
28 shutdown: ShutdownSignal,
29 pipeline_id: String,
30 ) -> Self {
31 let labels: Arc<[Label]> = Arc::new([
32 Label::new("component_id", "shadow_archive_sink"),
33 Label::new("component_type", "sink"),
34 Label::new("component_kind", "shadow_archive"),
35 ]);
36
37 Self {
38 config,
39 receiver,
40 shutdown,
41 labels,
42 pipeline_id,
43 encoder: ParquetEncoder::default(),
44 }
45 }
46
47 pub fn run(mut self) -> JoinHandle<()> {
48 tokio::spawn(async move {
49 info!("Starting Shadow Archive sink to: {}", self.config.endpoint);
50
51 let is_s3 = self.config.endpoint.starts_with("s3://");
52
53 if is_s3 {
54 self.run_s3().await;
55 } else {
56 self.run_filesystem().await;
57 }
58
59 info!("Shadow Archive sink shutting down");
60 })
61 }
62
63 async fn run_s3(&mut self) {
64 let bucket = self
65 .config
66 .endpoint
67 .strip_prefix("s3://")
68 .unwrap_or(&self.config.endpoint)
69 .trim_end_matches('/');
70 let aws = self.config.auth.as_ref().cloned().unwrap_or_default();
71 let client = create_s3_client(&aws).await;
72
73 let mut flush_interval = interval(Duration::from_secs(60));
74 let mut current_batch: Vec<EventBatch> = Vec::new();
75 let mut current_rows = 0;
76
77 loop {
78 tokio::select! {
79 _ = self.shutdown.recv() => {
80 if !current_batch.is_empty() {
81 let _ = self.flush_s3(&client, bucket, &mut current_batch, &mut current_rows).await;
82 }
83 break;
84 }
85 _ = flush_interval.tick() => {
86 if !current_batch.is_empty() {
87 let _ = self.flush_s3(&client, bucket, &mut current_batch, &mut current_rows).await;
88 }
89 }
90 batch = self.receiver.recv() => {
91 let Some(batch) = batch else { break; };
92
93 if !current_batch.is_empty() && batch.payload.schema() != current_batch[0].payload.schema() {
95 let _ = self.flush_s3(&client, bucket, &mut current_batch, &mut current_rows).await;
96 }
97
98 current_rows += batch.num_rows();
99 current_batch.push(batch);
100
101 if current_rows >= 5000 {
102 let _ = self.flush_s3(&client, bucket, &mut current_batch, &mut current_rows).await;
103 flush_interval.reset();
104 }
105 }
106 }
107 }
108 }
109
110 async fn flush_s3(
111 &self,
112 client: &aws_sdk_s3::Client,
113 bucket: &str,
114 batches: &mut Vec<EventBatch>,
115 rows: &mut usize,
116 ) -> anyhow::Result<()> {
117 let now = OffsetDateTime::now_utc();
118 let key = self.construct_key(now);
119
120 let merged = self.merge_batches(batches)?;
121 let encoded = self
122 .encoder
123 .encode(&merged)
124 .map_err(|e| anyhow::anyhow!("Parquet encoding failed: {}", e))?;
125
126 match client
127 .put_object()
128 .bucket(bucket)
129 .key(&key)
130 .body(encoded.into())
131 .send()
132 .await
133 {
134 Ok(_) => {
135 counter!("component_sent_events_total", self.labels.iter()).increment(*rows as u64);
136 batches.clear();
137 *rows = 0;
138 Ok(())
139 }
140 Err(e) => {
141 error!("Shadow Archive S3 flush failed: {}", e);
142 Err(e.into())
143 }
144 }
145 }
146
147 async fn run_filesystem(&mut self) {
148 let base_path = std::path::PathBuf::from(
149 self.config
150 .endpoint
151 .strip_prefix("file://")
152 .unwrap_or(&self.config.endpoint),
153 );
154
155 let mut flush_interval = interval(Duration::from_secs(60));
156 let mut current_batch: Vec<EventBatch> = Vec::new();
157 let mut current_rows = 0;
158
159 loop {
160 tokio::select! {
161 _ = self.shutdown.recv() => {
162 if !current_batch.is_empty() {
163 let _ = self.flush_fs(&base_path, &mut current_batch, &mut current_rows).await;
164 }
165 break;
166 }
167 _ = flush_interval.tick() => {
168 if !current_batch.is_empty() {
169 let _ = self.flush_fs(&base_path, &mut current_batch, &mut current_rows).await;
170 }
171 }
172 batch = self.receiver.recv() => {
173 let Some(batch) = batch else { break; };
174
175 if !current_batch.is_empty() && batch.payload.schema() != current_batch[0].payload.schema() {
177 let _ = self.flush_fs(&base_path, &mut current_batch, &mut current_rows).await;
178 }
179
180 current_rows += batch.num_rows();
181 current_batch.push(batch);
182
183 if current_rows >= 5000 {
184 let _ = self.flush_fs(&base_path, &mut current_batch, &mut current_rows).await;
185 flush_interval.reset();
186 }
187 }
188 }
189 }
190 }
191
192 async fn flush_fs(
193 &self,
194 base_path: &std::path::Path,
195 batches: &mut Vec<EventBatch>,
196 rows: &mut usize,
197 ) -> anyhow::Result<()> {
198 let now = OffsetDateTime::now_utc();
199 let relative_path = self.construct_key(now);
200 let full_path = base_path.join(relative_path);
201
202 if let Some(parent) = full_path.parent() {
203 tokio::fs::create_dir_all(parent).await?;
204 }
205
206 let merged = self.merge_batches(batches)?;
207 let encoded = self
208 .encoder
209 .encode(&merged)
210 .map_err(|e| anyhow::anyhow!("Parquet encoding failed: {}", e))?;
211
212 tokio::fs::write(&full_path, encoded).await?;
213
214 counter!("component_sent_events_total", self.labels.iter()).increment(*rows as u64);
215 batches.clear();
216 *rows = 0;
217 Ok(())
218 }
219
220 fn merge_batches(&self, batches: &[EventBatch]) -> anyhow::Result<EventBatch> {
221 if batches.is_empty() {
222 anyhow::bail!("Cannot merge empty batch list");
223 }
224
225 let payloads: Vec<_> = batches.iter().map(|b| &b.payload).collect();
226 let merged_payload = concat_batches(&batches[0].payload.schema(), payloads)?;
227
228 EventBatch::new(merged_payload, batches[0].metadata.clone())
229 .map_err(|e| anyhow::anyhow!("Failed to create merged EventBatch: {}", e))
230 }
231
232 fn construct_key(&self, now: OffsetDateTime) -> String {
233 match self.config.layout.as_str() {
235 "hive-hourly" => format!(
236 "pipeline={}/year={:04}/month={:02}/day={:02}/hour={:02}/{}.parquet",
237 self.pipeline_id,
238 now.year(),
239 now.month() as u8,
240 now.day(),
241 now.hour(),
242 uuid::Uuid::new_v4()
243 ),
244 "hive-daily" => format!(
245 "pipeline={}/year={:04}/month={:02}/day={:02}/{}.parquet",
246 self.pipeline_id,
247 now.year(),
248 now.month() as u8,
249 now.day(),
250 uuid::Uuid::new_v4()
251 ),
252 _ => format!(
253 "{}/{}/{:04}/{:02}/{:02}/{}.parquet",
254 self.pipeline_id,
255 now.year(),
256 now.month() as u8,
257 now.day(),
258 now.hour(),
259 uuid::Uuid::new_v4()
260 ),
261 }
262 }
263}
264
265#[cfg(test)]
266#[allow(clippy::unwrap_used, clippy::expect_used)]
267mod tests {
268 use super::*;
269 use kinetic_core::{ComponentId, EventMetadata};
270 use tempfile::tempdir;
271
272 #[test]
273 fn test_construct_key_layouts() {
274 let config = ArchiveStorageConfig {
275 endpoint: "file:///tmp/archive".to_string(),
276 auth: None,
277 layout: "hive-hourly".to_string(),
278 ttl_hours: 72,
279 };
280 let (_tx, rx) =
281 kinetic_buffers::channel(1, kinetic_buffers::WhenFull::Block, "test".to_string());
282 let coord = kinetic_core::ShutdownCoordinator::new();
283 let sink =
284 ShadowArchiveSink::new(config, rx, coord.register(), "test-pipeline".to_string());
285
286 let now = OffsetDateTime::now_utc();
287
288 let key = sink.construct_key(now);
290 assert!(key.contains("pipeline=test-pipeline"));
291 assert!(key.contains(&format!("year={:04}", now.year())));
292 assert!(key.contains(&format!("month={:02}", now.month() as u8)));
293 assert!(key.contains(&format!("hour={:02}", now.hour())));
294 assert!(key.ends_with(".parquet"));
295
296 let mut sink_daily = sink;
298 sink_daily.config.layout = "hive-daily".to_string();
299 let key = sink_daily.construct_key(now);
300 assert!(key.contains("pipeline=test-pipeline"));
301 assert!(key.contains(&format!("day={:02}", now.day())));
302 assert!(!key.contains("hour="));
303 }
304
305 #[tokio::test]
306 async fn test_flush_fs() {
307 let tmp = tempdir().unwrap();
308 let config = ArchiveStorageConfig {
309 endpoint: format!("file://{}", tmp.path().display()),
310 auth: None,
311 layout: "hive-hourly".to_string(),
312 ttl_hours: 72,
313 };
314 let (_tx, rx) =
315 kinetic_buffers::channel(1, kinetic_buffers::WhenFull::Block, "test".to_string());
316 let coord = kinetic_core::ShutdownCoordinator::new();
317 let sink =
318 ShadowArchiveSink::new(config, rx, coord.register(), "test-pipeline".to_string());
319
320 let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
321 "f1",
322 arrow_schema::DataType::Int32,
323 false,
324 )]));
325 let rb = arrow_array::RecordBatch::try_new(
326 schema,
327 vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))],
328 )
329 .unwrap();
330 let metadata = Arc::new(EventMetadata::new("test", ComponentId::from("source")));
331 let batch = EventBatch::new(rb, metadata).unwrap();
332
333 let mut batches = vec![batch];
334 let mut rows = 3;
335
336 let res = sink.flush_fs(tmp.path(), &mut batches, &mut rows).await;
337 if let Err(e) = &res {
338 tracing::error!("Flush FS error: {}", e);
339 }
340 assert!(res.is_ok());
341 assert_eq!(rows, 0);
342 assert!(batches.is_empty());
343
344 let mut found = false;
346 for entry in std::fs::read_dir(tmp.path()).unwrap() {
347 let entry = entry.unwrap();
348 if entry.path().is_dir() && entry.file_name() == "pipeline=test-pipeline" {
349 found = true;
350 break;
351 }
352 }
353 assert!(found, "Should have created pipeline directory");
354 }
355}