1use arrow_ipc::reader::StreamReader;
4use arrow_ipc::writer::StreamWriter;
5use kinetic_core::EventBatch;
6use metrics::{Label, counter, gauge};
7use std::collections::VecDeque;
8use std::fs::{self, File};
9use std::io::{BufReader, BufWriter};
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use tracing::{debug, error, info};
13
14pub struct DiskBuffer {
16 data_dir: PathBuf,
17 component_id: String,
18 pending_files: VecDeque<PathBuf>,
19 labels: Arc<[Label]>,
20 pub max_size_bytes: Option<u64>,
21 pub current_size_bytes: u64,
22}
23
24impl DiskBuffer {
25 pub fn new<P: AsRef<Path>>(
27 data_dir: P,
28 component_id: String,
29 max_size_bytes: Option<u64>,
30 ) -> anyhow::Result<Self> {
31 let path = data_dir.as_ref().join(&component_id);
32 fs::create_dir_all(&path)?;
33
34 let labels: Arc<[Label]> = Arc::new([
35 Label::new("component_id", component_id.clone()),
36 Label::new("buffer_type", "disk"),
37 ]);
38
39 if let Some(max) = max_size_bytes {
40 gauge!("buffer_max_size_bytes", labels.iter()).set(max as f64);
41 }
42
43 let mut buffer = Self {
44 data_dir: path,
45 component_id,
46 pending_files: VecDeque::new(),
47 labels,
48 max_size_bytes,
49 current_size_bytes: 0,
50 };
51
52 buffer.load_existing()?;
53 Ok(buffer)
54 }
55
56 fn load_existing(&mut self) -> anyhow::Result<()> {
58 let mut files = Vec::new();
59 let mut total_size = 0;
60 for entry in fs::read_dir(&self.data_dir)? {
61 let entry = entry?;
62 let path = entry.path();
63 if path.is_file() && path.extension().is_some_and(|ext| ext == "arrow") {
64 total_size += entry.metadata()?.len();
65 files.push(path);
66 }
67 }
68
69 files.sort();
71 self.pending_files.extend(files);
72 self.current_size_bytes = total_size;
73
74 if !self.pending_files.is_empty() {
75 info!(
76 "Loaded {} existing batches ({} bytes) for component '{}'",
77 self.pending_files.len(),
78 self.current_size_bytes,
79 self.component_id
80 );
81 }
82
83 Ok(())
84 }
85
86 pub fn enqueue(&mut self, batch: &EventBatch) -> anyhow::Result<()> {
88 if let Some(max) = self.max_size_bytes
89 && self.current_size_bytes >= max
90 {
91 anyhow::bail!(
92 "Disk buffer is full ({} >= {} bytes)",
93 self.current_size_bytes,
94 max
95 );
96 }
97
98 let events = batch.num_rows();
99 let bytes = batch.estimated_size();
100
101 let timestamp = std::time::SystemTime::now()
103 .duration_since(std::time::UNIX_EPOCH)?
104 .as_nanos();
105 let id = xid::new().to_string();
106
107 let filename = format!("{:020}_{}.arrow", timestamp, id);
108 let file_path = self.data_dir.join(filename);
109
110 let metadata_json = serde_json::to_string(&batch.metadata)?;
112 let mut schema_metadata = batch.payload.schema().metadata().clone();
113 schema_metadata.insert("kinetic.metadata".to_string(), metadata_json);
114
115 let new_schema = batch
116 .payload
117 .schema()
118 .as_ref()
119 .clone()
120 .with_metadata(schema_metadata);
121 let rb_with_metadata = arrow_array::RecordBatch::try_new(
122 std::sync::Arc::new(new_schema),
123 batch.payload.columns().to_vec(),
124 )?;
125
126 let file = File::create(&file_path)?;
127 let mut writer = StreamWriter::try_new(BufWriter::new(file), &rb_with_metadata.schema())?;
128 writer.write(&rb_with_metadata)?;
129 writer.finish()?;
130
131 let written_size = fs::metadata(&file_path)?.len();
132 self.current_size_bytes += written_size;
133
134 self.pending_files.push_back(file_path);
135
136 if let Some(token) = batch.ack_token.clone() {
139 token.ack();
140 }
141
142 counter!("buffer_received_events_total", self.labels.iter()).increment(events as u64);
143 counter!("buffer_received_bytes_total", self.labels.iter()).increment(bytes as u64);
144 gauge!("buffer_size_events", self.labels.iter()).increment(events as f64);
145 gauge!("buffer_size_bytes", self.labels.iter()).increment(bytes as f64);
146
147 debug!("Enqueued batch to disk: component={}", self.component_id);
148 Ok(())
149 }
150
151 pub fn dequeue(&mut self) -> anyhow::Result<Option<EventBatch>> {
153 let Some(file_path) = self.pending_files.pop_front() else {
154 return Ok(None);
155 };
156
157 let file_size = fs::metadata(&file_path)?.len();
158
159 let batch = {
160 let file = File::open(&file_path)?;
161 let reader = StreamReader::try_new(BufReader::new(file), None)?;
162
163 let mut record_batches = Vec::new();
165 for b in reader {
166 record_batches.push(b?);
167 }
168
169 if record_batches.is_empty() {
170 anyhow::bail!("Arrow file was empty: {:?}", file_path);
171 }
172
173 let Some(rb) = record_batches.into_iter().next() else {
174 anyhow::bail!("Arrow file had no record batches: {:?}", file_path);
175 };
176
177 let schema = rb.schema();
179 let metadata_json = schema
180 .metadata()
181 .get("kinetic.metadata")
182 .ok_or_else(|| anyhow::anyhow!("Missing metadata in disk-buffered batch"))?;
183 let metadata: kinetic_core::EventMetadata = serde_json::from_str(metadata_json)?;
184
185 EventBatch::new(rb, std::sync::Arc::new(metadata))?
186 };
187
188 let events = batch.num_rows();
189 let bytes = batch.estimated_size();
190
191 fs::remove_file(&file_path)?;
193 self.current_size_bytes = self.current_size_bytes.saturating_sub(file_size);
194
195 counter!("buffer_sent_events_total", self.labels.iter()).increment(events as u64);
196 counter!("buffer_sent_bytes_total", self.labels.iter()).increment(bytes as u64);
197 gauge!("buffer_size_events", self.labels.iter()).decrement(events as f64);
198 gauge!("buffer_size_bytes", self.labels.iter()).decrement(bytes as f64);
199
200 debug!("Dequeued batch from disk: component={}", self.component_id);
201 Ok(Some(batch))
202 }
203
204 pub fn len(&self) -> usize {
206 self.pending_files.len()
207 }
208
209 pub fn is_empty(&self) -> bool {
210 self.pending_files.is_empty()
211 }
212}
213
214pub fn channel(
216 data_dir: PathBuf,
217 component_id: String,
218 max_size_bytes: Option<u64>,
219) -> anyhow::Result<(crate::BufferSender, crate::BufferReceiver)> {
220 let disk_buffer = DiskBuffer::new(data_dir, component_id.clone(), max_size_bytes)?;
221
222 let (in_tx, mut in_rx) = crate::channel(
224 100,
225 crate::WhenFull::Block,
226 format!("{}_disk_in", component_id),
227 );
228 let (out_tx, out_rx) = crate::channel(
229 100,
230 crate::WhenFull::Block,
231 format!("{}_disk_out", component_id),
232 );
233
234 let notify = Arc::new(tokio::sync::Notify::new());
235
236 let shared_buffer = Arc::new(std::sync::Mutex::new(disk_buffer));
238
239 let shared_buffer_in = shared_buffer.clone();
241 let notify_in = notify.clone();
242 let component_id_in = component_id.clone();
243 tokio::spawn(async move {
244 while let Some(batch) = in_rx.recv().await {
245 loop {
247 let is_full = {
248 let b = shared_buffer_in.lock().unwrap_or_else(|p| p.into_inner());
249 match b.max_size_bytes {
250 Some(max) => b.current_size_bytes >= max,
251 None => false,
252 }
253 };
254 if is_full {
255 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
256 continue;
257 }
258 break;
259 }
260
261 let mut b = shared_buffer_in.lock().unwrap_or_else(|p| p.into_inner());
262 if let Err(e) = b.enqueue(&batch) {
263 error!(message = "Failed to enqueue batch to disk", %e, component_id = component_id_in);
264 } else {
265 notify_in.notify_one();
266 }
267 }
268 });
269
270 let shared_buffer_out = shared_buffer;
272 let notify_out = notify;
273 let component_id_out = component_id;
274 tokio::spawn(async move {
275 loop {
276 let batch_opt = {
277 let mut b = shared_buffer_out.lock().unwrap_or_else(|p| p.into_inner());
278 if b.is_empty() {
279 None
280 } else {
281 match b.dequeue() {
282 Ok(Some(batch)) => Some(batch),
283 Ok(None) => None,
284 Err(e) => {
285 error!(message = "Failed to dequeue batch from disk", %e, component_id = component_id_out);
286 None
287 }
288 }
289 }
290 };
291
292 if let Some(batch) = batch_opt {
293 if let Err(e) = out_tx.send(batch).await {
294 error!(message = "Failed to send batch from disk to downstream", %e, component_id = component_id_out);
295 return;
296 }
297 } else {
298 notify_out.notified().await;
299 }
300 }
301 });
302
303 Ok((in_tx, out_rx))
304}