Skip to main content

kinetic_buffers/
memory.rs

1//! In-memory buffering with backpressure for Kinetic.
2
3use kinetic_core::EventBatch;
4use metrics::{Label, counter, gauge};
5use std::sync::Arc;
6use tokio::sync::mpsc;
7use tracing::warn;
8
9/// Policy for what to do when a buffer is full.
10#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
11#[serde(rename_all = "snake_case")]
12#[derive(Default)]
13pub enum WhenFull {
14    /// Backpressure upstream (stop accepting messages until space is available).
15    #[default]
16    Block,
17    // Note: Kinetic plan specifies `block` is the only strategy for zero data loss.
18    // We explicitly do not implement `DropNewest` or `DropOldest`.
19}
20
21/// A sender handle to a memory buffer.
22#[derive(Debug, Clone)]
23pub struct BufferSender {
24    tx: mpsc::Sender<EventBatch>,
25    when_full: WhenFull,
26    labels: Arc<[Label]>,
27}
28
29impl BufferSender {
30    /// Sends a batch to the buffer.
31    ///
32    /// If the buffer is full and `when_full` is `Block`, this will await until space is available,
33    /// propagating backpressure upstream.
34    pub async fn send(&self, batch: EventBatch) -> Result<(), mpsc::error::SendError<EventBatch>> {
35        let events = batch.num_rows();
36        let bytes = batch.estimated_size();
37        match self.when_full {
38            WhenFull::Block => match self.tx.reserve().await {
39                Ok(permit) => {
40                    permit.send(batch);
41                    self.emit_received(events, bytes);
42                    Ok(())
43                }
44                Err(_) => Err(mpsc::error::SendError(batch)),
45            },
46        }
47    }
48
49    /// Returns the current number of free slots in the buffer.
50    pub fn capacity(&self) -> usize {
51        self.tx.capacity()
52    }
53
54    /// Returns the maximum capacity of the buffer.
55    pub fn max_capacity(&self) -> usize {
56        self.tx.max_capacity()
57    }
58
59    /// Returns the utilization of the buffer as a fraction between 0.0 and 1.0.
60    pub fn utilization(&self) -> f64 {
61        let max = self.tx.max_capacity();
62        if max == 0 {
63            return 1.0;
64        }
65        let current = max - self.tx.capacity();
66        current as f64 / max as f64
67    }
68
69    /// Try to send without blocking. Returns an error if full.
70    pub fn try_send(&self, batch: EventBatch) -> Result<(), mpsc::error::TrySendError<EventBatch>> {
71        let events = batch.num_rows();
72        let bytes = batch.estimated_size();
73        let res = self.tx.try_send(batch);
74        match &res {
75            Ok(_) => {
76                self.emit_received(events, bytes);
77            }
78            Err(mpsc::error::TrySendError::Full(_)) => {
79                counter!("buffer_full_events_total", self.labels.iter()).increment(1);
80                // Emit BufferEventsDropped if we were load shedding, but we only block.
81            }
82            _ => {}
83        }
84        res
85    }
86
87    fn emit_received(&self, events: usize, bytes: usize) {
88        counter!("buffer_received_events_total", self.labels.iter()).increment(events as u64);
89        counter!("buffer_received_bytes_total", self.labels.iter()).increment(bytes as u64);
90        gauge!("buffer_size_events", self.labels.iter()).increment(events as f64);
91        gauge!("buffer_size_bytes", self.labels.iter()).increment(bytes as f64);
92    }
93}
94
95/// A receiver handle from a memory buffer.
96#[derive(Debug)]
97pub struct BufferReceiver {
98    rx: mpsc::Receiver<EventBatch>,
99    labels: Arc<[Label]>,
100}
101
102impl BufferReceiver {
103    /// Receives the next batch from the buffer.
104    pub async fn recv(&mut self) -> Option<EventBatch> {
105        let batch = self.rx.recv().await;
106        if let Some(ref b) = batch {
107            self.emit_sent(b);
108        }
109        batch
110    }
111
112    /// Tries to receive the next batch from the buffer without blocking.
113    pub fn try_recv(&mut self) -> Result<EventBatch, mpsc::error::TryRecvError> {
114        let res = self.rx.try_recv();
115        if let Ok(ref b) = res {
116            self.emit_sent(b);
117        }
118        res
119    }
120
121    fn emit_sent(&self, batch: &EventBatch) {
122        let events = batch.num_rows();
123        let bytes = batch.estimated_size();
124        counter!("buffer_sent_events_total", self.labels.iter()).increment(events as u64);
125        counter!("buffer_sent_bytes_total", self.labels.iter()).increment(bytes as u64);
126        gauge!("buffer_size_events", self.labels.iter()).decrement(events as f64);
127        gauge!("buffer_size_bytes", self.labels.iter()).decrement(bytes as f64);
128    }
129}
130
131/// Creates a new memory buffer with the given capacity (in batches).
132pub fn channel(
133    capacity: usize,
134    when_full: WhenFull,
135    component_id: String,
136) -> (BufferSender, BufferReceiver) {
137    if when_full != WhenFull::Block {
138        warn!(
139            "Configured when_full policy {:?} is not supported, falling back to block",
140            when_full
141        );
142    }
143
144    let capacity = capacity.max(1);
145    let (tx, rx) = mpsc::channel(capacity);
146    let labels: Arc<[Label]> = Arc::new([
147        Label::new("component_id", component_id),
148        Label::new("buffer_type", "memory"),
149    ]);
150
151    // Emit BufferCreated metrics
152    gauge!("buffer_max_size_events", labels.iter()).set(capacity as f64);
153
154    (
155        BufferSender {
156            tx,
157            when_full: WhenFull::Block,
158            labels: labels.clone(),
159        },
160        BufferReceiver { rx, labels },
161    )
162}
163
164#[cfg(test)]
165#[allow(clippy::unwrap_used)]
166mod tests {
167    use super::*;
168    use arrow_array::RecordBatch;
169    use arrow_schema::{DataType, Field, Schema};
170    use kinetic_core::{ComponentId, EventMetadata};
171    use std::sync::Arc;
172
173    fn empty_batch() -> EventBatch {
174        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
175        let record_batch = RecordBatch::new_empty(schema);
176        let meta = Arc::new(EventMetadata::new("pipe1", ComponentId("src1".to_string())));
177        EventBatch::new(record_batch, meta).expect("failed to create batch")
178    }
179
180    #[tokio::test]
181    async fn test_memory_buffer_block() {
182        let (tx, mut rx) = channel(1, WhenFull::Block, "test_buffer".to_string());
183
184        // Fill the buffer
185        tx.try_send(empty_batch()).unwrap();
186
187        // Next try_send should fail
188        assert!(tx.try_send(empty_batch()).is_err());
189
190        // Recv to free space
191        assert!(rx.recv().await.is_some());
192
193        // Now we can send again
194        assert!(tx.try_send(empty_batch()).is_ok());
195    }
196}