Skip to main content

kinetic_buffers/
disk.rs

1//! Persistent disk-backed buffering for Kinetic.
2
3use arrow_ipc::reader::StreamReader;
4use arrow_ipc::writer::StreamWriter;
5use kinetic_core::EventBatch;
6use metrics::{Label, counter, gauge};
7use std::collections::VecDeque;
8use std::fs::{self, File};
9use std::io::{BufReader, BufWriter};
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use tracing::{debug, error, info};
13
14/// A persistent buffer that stores Arrow batches on disk.
15pub struct DiskBuffer {
16    data_dir: PathBuf,
17    component_id: String,
18    pending_files: VecDeque<PathBuf>,
19    labels: Arc<[Label]>,
20    pub max_size_bytes: Option<u64>,
21    pub current_size_bytes: u64,
22}
23
24impl DiskBuffer {
25    /// Creates a new DiskBuffer or loads existing state from disk.
26    pub fn new<P: AsRef<Path>>(
27        data_dir: P,
28        component_id: String,
29        max_size_bytes: Option<u64>,
30    ) -> anyhow::Result<Self> {
31        let path = data_dir.as_ref().join(&component_id);
32        fs::create_dir_all(&path)?;
33
34        let labels: Arc<[Label]> = Arc::new([
35            Label::new("component_id", component_id.clone()),
36            Label::new("buffer_type", "disk"),
37        ]);
38
39        if let Some(max) = max_size_bytes {
40            gauge!("buffer_max_size_bytes", labels.iter()).set(max as f64);
41        }
42
43        let mut buffer = Self {
44            data_dir: path,
45            component_id,
46            pending_files: VecDeque::new(),
47            labels,
48            max_size_bytes,
49            current_size_bytes: 0,
50        };
51
52        buffer.load_existing()?;
53        Ok(buffer)
54    }
55
56    /// Loads existing batch files from the data directory.
57    fn load_existing(&mut self) -> anyhow::Result<()> {
58        let mut files = Vec::new();
59        let mut total_size = 0;
60        for entry in fs::read_dir(&self.data_dir)? {
61            let entry = entry?;
62            let path = entry.path();
63            if path.is_file() && path.extension().is_some_and(|ext| ext == "arrow") {
64                total_size += entry.metadata()?.len();
65                files.push(path);
66            }
67        }
68
69        // Sort by file name (which includes timestamp/xid) to preserve order
70        files.sort();
71        self.pending_files.extend(files);
72        self.current_size_bytes = total_size;
73
74        if !self.pending_files.is_empty() {
75            info!(
76                "Loaded {} existing batches ({} bytes) for component '{}'",
77                self.pending_files.len(),
78                self.current_size_bytes,
79                self.component_id
80            );
81        }
82
83        Ok(())
84    }
85
86    /// Enqueues a batch to disk.
87    pub fn enqueue(&mut self, batch: &EventBatch) -> anyhow::Result<()> {
88        if let Some(max) = self.max_size_bytes
89            && self.current_size_bytes >= max
90        {
91            anyhow::bail!(
92                "Disk buffer is full ({} >= {} bytes)",
93                self.current_size_bytes,
94                max
95            );
96        }
97
98        let events = batch.num_rows();
99        let bytes = batch.estimated_size();
100
101        // Generate a unique filename using timestamp and xid
102        let timestamp = std::time::SystemTime::now()
103            .duration_since(std::time::UNIX_EPOCH)?
104            .as_nanos();
105        let id = xid::new().to_string();
106
107        let filename = format!("{:020}_{}.arrow", timestamp, id);
108        let file_path = self.data_dir.join(filename);
109
110        // Serialize metadata to JSON and store in Arrow schema metadata
111        let metadata_json = serde_json::to_string(&batch.metadata)?;
112        let mut schema_metadata = batch.payload.schema().metadata().clone();
113        schema_metadata.insert("kinetic.metadata".to_string(), metadata_json);
114
115        let new_schema = batch
116            .payload
117            .schema()
118            .as_ref()
119            .clone()
120            .with_metadata(schema_metadata);
121        let rb_with_metadata = arrow_array::RecordBatch::try_new(
122            std::sync::Arc::new(new_schema),
123            batch.payload.columns().to_vec(),
124        )?;
125
126        let file = File::create(&file_path)?;
127        let mut writer = StreamWriter::try_new(BufWriter::new(file), &rb_with_metadata.schema())?;
128        writer.write(&rb_with_metadata)?;
129        writer.finish()?;
130
131        let written_size = fs::metadata(&file_path)?.len();
132        self.current_size_bytes += written_size;
133
134        self.pending_files.push_back(file_path);
135
136        // Ack immediately once successfully on disk.
137        // This shifts durability from end-to-end to source-to-disk.
138        if let Some(token) = batch.ack_token.clone() {
139            token.ack();
140        }
141
142        counter!("buffer_received_events_total", self.labels.iter()).increment(events as u64);
143        counter!("buffer_received_bytes_total", self.labels.iter()).increment(bytes as u64);
144        gauge!("buffer_size_events", self.labels.iter()).increment(events as f64);
145        gauge!("buffer_size_bytes", self.labels.iter()).increment(bytes as f64);
146
147        debug!("Enqueued batch to disk: component={}", self.component_id);
148        Ok(())
149    }
150
151    /// Dequeues the next batch from disk.
152    pub fn dequeue(&mut self) -> anyhow::Result<Option<EventBatch>> {
153        let Some(file_path) = self.pending_files.pop_front() else {
154            return Ok(None);
155        };
156
157        let file_size = fs::metadata(&file_path)?.len();
158
159        let batch = {
160            let file = File::open(&file_path)?;
161            let reader = StreamReader::try_new(BufReader::new(file), None)?;
162
163            // We expect exactly one batch per file in this simple implementation
164            let mut record_batches = Vec::new();
165            for b in reader {
166                record_batches.push(b?);
167            }
168
169            if record_batches.is_empty() {
170                anyhow::bail!("Arrow file was empty: {:?}", file_path);
171            }
172
173            let Some(rb) = record_batches.into_iter().next() else {
174                anyhow::bail!("Arrow file had no record batches: {:?}", file_path);
175            };
176
177            // Extract metadata from schema
178            let schema = rb.schema();
179            let metadata_json = schema
180                .metadata()
181                .get("kinetic.metadata")
182                .ok_or_else(|| anyhow::anyhow!("Missing metadata in disk-buffered batch"))?;
183            let metadata: kinetic_core::EventMetadata = serde_json::from_str(metadata_json)?;
184
185            EventBatch::new(rb, std::sync::Arc::new(metadata))?
186        };
187
188        let events = batch.num_rows();
189        let bytes = batch.estimated_size();
190
191        // Clean up the file after successful read
192        fs::remove_file(&file_path)?;
193        self.current_size_bytes = self.current_size_bytes.saturating_sub(file_size);
194
195        counter!("buffer_sent_events_total", self.labels.iter()).increment(events as u64);
196        counter!("buffer_sent_bytes_total", self.labels.iter()).increment(bytes as u64);
197        gauge!("buffer_size_events", self.labels.iter()).decrement(events as f64);
198        gauge!("buffer_size_bytes", self.labels.iter()).decrement(bytes as f64);
199
200        debug!("Dequeued batch from disk: component={}", self.component_id);
201        Ok(Some(batch))
202    }
203
204    /// Returns the number of pending batches on disk.
205    pub fn len(&self) -> usize {
206        self.pending_files.len()
207    }
208
209    pub fn is_empty(&self) -> bool {
210        self.pending_files.is_empty()
211    }
212}
213
214/// Creates a new disk-backed buffer bridge.
215pub fn channel(
216    data_dir: PathBuf,
217    component_id: String,
218    max_size_bytes: Option<u64>,
219) -> anyhow::Result<(crate::BufferSender, crate::BufferReceiver)> {
220    let disk_buffer = DiskBuffer::new(data_dir, component_id.clone(), max_size_bytes)?;
221
222    // Bridging channels
223    let (in_tx, mut in_rx) = crate::channel(
224        100,
225        crate::WhenFull::Block,
226        format!("{}_disk_in", component_id),
227    );
228    let (out_tx, out_rx) = crate::channel(
229        100,
230        crate::WhenFull::Block,
231        format!("{}_disk_out", component_id),
232    );
233
234    let notify = Arc::new(tokio::sync::Notify::new());
235
236    // Create shared state
237    let shared_buffer = Arc::new(std::sync::Mutex::new(disk_buffer));
238
239    // Ingress task
240    let shared_buffer_in = shared_buffer.clone();
241    let notify_in = notify.clone();
242    let component_id_in = component_id.clone();
243    tokio::spawn(async move {
244        while let Some(batch) = in_rx.recv().await {
245            // Apply backpressure if full
246            loop {
247                let is_full = {
248                    let b = shared_buffer_in.lock().unwrap_or_else(|p| p.into_inner());
249                    match b.max_size_bytes {
250                        Some(max) => b.current_size_bytes >= max,
251                        None => false,
252                    }
253                };
254                if is_full {
255                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
256                    continue;
257                }
258                break;
259            }
260
261            let mut b = shared_buffer_in.lock().unwrap_or_else(|p| p.into_inner());
262            if let Err(e) = b.enqueue(&batch) {
263                error!(message = "Failed to enqueue batch to disk", %e, component_id = component_id_in);
264            } else {
265                notify_in.notify_one();
266            }
267        }
268    });
269
270    // Egress task
271    let shared_buffer_out = shared_buffer;
272    let notify_out = notify;
273    let component_id_out = component_id;
274    tokio::spawn(async move {
275        loop {
276            let batch_opt = {
277                let mut b = shared_buffer_out.lock().unwrap_or_else(|p| p.into_inner());
278                if b.is_empty() {
279                    None
280                } else {
281                    match b.dequeue() {
282                        Ok(Some(batch)) => Some(batch),
283                        Ok(None) => None,
284                        Err(e) => {
285                            error!(message = "Failed to dequeue batch from disk", %e, component_id = component_id_out);
286                            None
287                        }
288                    }
289                }
290            };
291
292            if let Some(batch) = batch_opt {
293                if let Err(e) = out_tx.send(batch).await {
294                    error!(message = "Failed to send batch from disk to downstream", %e, component_id = component_id_out);
295                    return;
296                }
297            } else {
298                notify_out.notified().await;
299            }
300        }
301    });
302
303    Ok((in_tx, out_rx))
304}