Skip to main content

kinetic/sinks/
shadow_archive.rs

1use arrow_select::concat::concat_batches;
2use aws_common::client::create_s3_client;
3use kinetic_buffers::BufferReceiver;
4use kinetic_config::model::ArchiveStorageConfig;
5use kinetic_core::encode::Encoder;
6use kinetic_core::{EventBatch, ShutdownSignal};
7use kinetic_encoder_parquet::ParquetEncoder;
8use metrics::{Label, counter};
9use std::sync::Arc;
10use time::OffsetDateTime;
11use tokio::task::JoinHandle;
12use tokio::time::{Duration, interval};
13use tracing::{error, info};
14
15pub struct ShadowArchiveSink {
16    config: ArchiveStorageConfig,
17    receiver: BufferReceiver,
18    shutdown: ShutdownSignal,
19    labels: Arc<[Label]>,
20    pipeline_id: String,
21    encoder: ParquetEncoder,
22}
23
24impl ShadowArchiveSink {
25    pub fn new(
26        config: ArchiveStorageConfig,
27        receiver: BufferReceiver,
28        shutdown: ShutdownSignal,
29        pipeline_id: String,
30    ) -> Self {
31        let labels: Arc<[Label]> = Arc::new([
32            Label::new("component_id", "shadow_archive_sink"),
33            Label::new("component_type", "sink"),
34            Label::new("component_kind", "shadow_archive"),
35        ]);
36
37        Self {
38            config,
39            receiver,
40            shutdown,
41            labels,
42            pipeline_id,
43            encoder: ParquetEncoder::default(),
44        }
45    }
46
47    pub fn run(mut self) -> JoinHandle<()> {
48        tokio::spawn(async move {
49            info!("Starting Shadow Archive sink to: {}", self.config.endpoint);
50
51            let is_s3 = self.config.endpoint.starts_with("s3://");
52
53            if is_s3 {
54                self.run_s3().await;
55            } else {
56                self.run_filesystem().await;
57            }
58
59            info!("Shadow Archive sink shutting down");
60        })
61    }
62
63    async fn run_s3(&mut self) {
64        let bucket = self
65            .config
66            .endpoint
67            .strip_prefix("s3://")
68            .unwrap_or(&self.config.endpoint)
69            .trim_end_matches('/');
70        let aws = self.config.auth.as_ref().cloned().unwrap_or_default();
71        let client = create_s3_client(&aws).await;
72
73        let mut flush_interval = interval(Duration::from_secs(60));
74        let mut current_batch: Vec<EventBatch> = Vec::new();
75        let mut current_rows = 0;
76
77        loop {
78            tokio::select! {
79                _ = self.shutdown.recv() => {
80                    if !current_batch.is_empty() {
81                        let _ = self.flush_s3(&client, bucket, &mut current_batch, &mut current_rows).await;
82                    }
83                    break;
84                }
85                _ = flush_interval.tick() => {
86                    if !current_batch.is_empty() {
87                        let _ = self.flush_s3(&client, bucket, &mut current_batch, &mut current_rows).await;
88                    }
89                }
90                batch = self.receiver.recv() => {
91                    let Some(batch) = batch else { break; };
92
93                    // If schema changes, flush current first
94                    if !current_batch.is_empty() && batch.payload.schema() != current_batch[0].payload.schema() {
95                        let _ = self.flush_s3(&client, bucket, &mut current_batch, &mut current_rows).await;
96                    }
97
98                    current_rows += batch.num_rows();
99                    current_batch.push(batch);
100
101                    if current_rows >= 5000 {
102                        let _ = self.flush_s3(&client, bucket, &mut current_batch, &mut current_rows).await;
103                        flush_interval.reset();
104                    }
105                }
106            }
107        }
108    }
109
110    async fn flush_s3(
111        &self,
112        client: &aws_sdk_s3::Client,
113        bucket: &str,
114        batches: &mut Vec<EventBatch>,
115        rows: &mut usize,
116    ) -> anyhow::Result<()> {
117        let now = OffsetDateTime::now_utc();
118        let key = self.construct_key(now);
119
120        let merged = self.merge_batches(batches)?;
121        let encoded = self
122            .encoder
123            .encode(&merged)
124            .map_err(|e| anyhow::anyhow!("Parquet encoding failed: {}", e))?;
125
126        match client
127            .put_object()
128            .bucket(bucket)
129            .key(&key)
130            .body(encoded.into())
131            .send()
132            .await
133        {
134            Ok(_) => {
135                counter!("component_sent_events_total", self.labels.iter()).increment(*rows as u64);
136                batches.clear();
137                *rows = 0;
138                Ok(())
139            }
140            Err(e) => {
141                error!("Shadow Archive S3 flush failed: {}", e);
142                Err(e.into())
143            }
144        }
145    }
146
147    async fn run_filesystem(&mut self) {
148        let base_path = std::path::PathBuf::from(
149            self.config
150                .endpoint
151                .strip_prefix("file://")
152                .unwrap_or(&self.config.endpoint),
153        );
154
155        let mut flush_interval = interval(Duration::from_secs(60));
156        let mut current_batch: Vec<EventBatch> = Vec::new();
157        let mut current_rows = 0;
158
159        loop {
160            tokio::select! {
161                _ = self.shutdown.recv() => {
162                    if !current_batch.is_empty() {
163                        let _ = self.flush_fs(&base_path, &mut current_batch, &mut current_rows).await;
164                    }
165                    break;
166                }
167                _ = flush_interval.tick() => {
168                    if !current_batch.is_empty() {
169                        let _ = self.flush_fs(&base_path, &mut current_batch, &mut current_rows).await;
170                    }
171                }
172                batch = self.receiver.recv() => {
173                    let Some(batch) = batch else { break; };
174
175                    // If schema changes, flush current first
176                    if !current_batch.is_empty() && batch.payload.schema() != current_batch[0].payload.schema() {
177                        let _ = self.flush_fs(&base_path, &mut current_batch, &mut current_rows).await;
178                    }
179
180                    current_rows += batch.num_rows();
181                    current_batch.push(batch);
182
183                    if current_rows >= 5000 {
184                        let _ = self.flush_fs(&base_path, &mut current_batch, &mut current_rows).await;
185                        flush_interval.reset();
186                    }
187                }
188            }
189        }
190    }
191
192    async fn flush_fs(
193        &self,
194        base_path: &std::path::Path,
195        batches: &mut Vec<EventBatch>,
196        rows: &mut usize,
197    ) -> anyhow::Result<()> {
198        let now = OffsetDateTime::now_utc();
199        let relative_path = self.construct_key(now);
200        let full_path = base_path.join(relative_path);
201
202        if let Some(parent) = full_path.parent() {
203            tokio::fs::create_dir_all(parent).await?;
204        }
205
206        let merged = self.merge_batches(batches)?;
207        let encoded = self
208            .encoder
209            .encode(&merged)
210            .map_err(|e| anyhow::anyhow!("Parquet encoding failed: {}", e))?;
211
212        tokio::fs::write(&full_path, encoded).await?;
213
214        counter!("component_sent_events_total", self.labels.iter()).increment(*rows as u64);
215        batches.clear();
216        *rows = 0;
217        Ok(())
218    }
219
220    fn merge_batches(&self, batches: &[EventBatch]) -> anyhow::Result<EventBatch> {
221        if batches.is_empty() {
222            anyhow::bail!("Cannot merge empty batch list");
223        }
224
225        let payloads: Vec<_> = batches.iter().map(|b| &b.payload).collect();
226        let merged_payload = concat_batches(&batches[0].payload.schema(), payloads)?;
227
228        EventBatch::new(merged_payload, batches[0].metadata.clone())
229            .map_err(|e| anyhow::anyhow!("Failed to create merged EventBatch: {}", e))
230    }
231
232    fn construct_key(&self, now: OffsetDateTime) -> String {
233        // hive-hourly: .../org={org_id}/pipeline={pipeline_id}/year={YYYY}/month={MM}/day={DD}/hour={HH}/
234        match self.config.layout.as_str() {
235            "hive-hourly" => format!(
236                "pipeline={}/year={:04}/month={:02}/day={:02}/hour={:02}/{}.parquet",
237                self.pipeline_id,
238                now.year(),
239                now.month() as u8,
240                now.day(),
241                now.hour(),
242                uuid::Uuid::new_v4()
243            ),
244            "hive-daily" => format!(
245                "pipeline={}/year={:04}/month={:02}/day={:02}/{}.parquet",
246                self.pipeline_id,
247                now.year(),
248                now.month() as u8,
249                now.day(),
250                uuid::Uuid::new_v4()
251            ),
252            _ => format!(
253                "{}/{}/{:04}/{:02}/{:02}/{}.parquet",
254                self.pipeline_id,
255                now.year(),
256                now.month() as u8,
257                now.day(),
258                now.hour(),
259                uuid::Uuid::new_v4()
260            ),
261        }
262    }
263}
264
265#[cfg(test)]
266#[allow(clippy::unwrap_used, clippy::expect_used)]
267mod tests {
268    use super::*;
269    use kinetic_core::{ComponentId, EventMetadata};
270    use tempfile::tempdir;
271
272    #[test]
273    fn test_construct_key_layouts() {
274        let config = ArchiveStorageConfig {
275            endpoint: "file:///tmp/archive".to_string(),
276            auth: None,
277            layout: "hive-hourly".to_string(),
278            ttl_hours: 72,
279        };
280        let (_tx, rx) =
281            kinetic_buffers::channel(1, kinetic_buffers::WhenFull::Block, "test".to_string());
282        let coord = kinetic_core::ShutdownCoordinator::new();
283        let sink =
284            ShadowArchiveSink::new(config, rx, coord.register(), "test-pipeline".to_string());
285
286        let now = OffsetDateTime::now_utc();
287
288        // Test hive-hourly
289        let key = sink.construct_key(now);
290        assert!(key.contains("pipeline=test-pipeline"));
291        assert!(key.contains(&format!("year={:04}", now.year())));
292        assert!(key.contains(&format!("month={:02}", now.month() as u8)));
293        assert!(key.contains(&format!("hour={:02}", now.hour())));
294        assert!(key.ends_with(".parquet"));
295
296        // Test hive-daily
297        let mut sink_daily = sink;
298        sink_daily.config.layout = "hive-daily".to_string();
299        let key = sink_daily.construct_key(now);
300        assert!(key.contains("pipeline=test-pipeline"));
301        assert!(key.contains(&format!("day={:02}", now.day())));
302        assert!(!key.contains("hour="));
303    }
304
305    #[tokio::test]
306    async fn test_flush_fs() {
307        let tmp = tempdir().unwrap();
308        let config = ArchiveStorageConfig {
309            endpoint: format!("file://{}", tmp.path().display()),
310            auth: None,
311            layout: "hive-hourly".to_string(),
312            ttl_hours: 72,
313        };
314        let (_tx, rx) =
315            kinetic_buffers::channel(1, kinetic_buffers::WhenFull::Block, "test".to_string());
316        let coord = kinetic_core::ShutdownCoordinator::new();
317        let sink =
318            ShadowArchiveSink::new(config, rx, coord.register(), "test-pipeline".to_string());
319
320        let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
321            "f1",
322            arrow_schema::DataType::Int32,
323            false,
324        )]));
325        let rb = arrow_array::RecordBatch::try_new(
326            schema,
327            vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))],
328        )
329        .unwrap();
330        let metadata = Arc::new(EventMetadata::new("test", ComponentId::from("source")));
331        let batch = EventBatch::new(rb, metadata).unwrap();
332
333        let mut batches = vec![batch];
334        let mut rows = 3;
335
336        let res = sink.flush_fs(tmp.path(), &mut batches, &mut rows).await;
337        if let Err(e) = &res {
338            tracing::error!("Flush FS error: {}", e);
339        }
340        assert!(res.is_ok());
341        assert_eq!(rows, 0);
342        assert!(batches.is_empty());
343
344        // Verify file exists
345        let mut found = false;
346        for entry in std::fs::read_dir(tmp.path()).unwrap() {
347            let entry = entry.unwrap();
348            if entry.path().is_dir() && entry.file_name() == "pipeline=test-pipeline" {
349                found = true;
350                break;
351            }
352        }
353        assert!(found, "Should have created pipeline directory");
354    }
355}