kinetic_buffers/
memory.rs1use kinetic_core::EventBatch;
4use metrics::{Label, counter, gauge};
5use std::sync::Arc;
6use tokio::sync::mpsc;
7use tracing::warn;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
11#[serde(rename_all = "snake_case")]
12#[derive(Default)]
13pub enum WhenFull {
14 #[default]
16 Block,
17 }
20
21#[derive(Debug, Clone)]
23pub struct BufferSender {
24 tx: mpsc::Sender<EventBatch>,
25 when_full: WhenFull,
26 labels: Arc<[Label]>,
27}
28
29impl BufferSender {
30 pub async fn send(&self, batch: EventBatch) -> Result<(), mpsc::error::SendError<EventBatch>> {
35 let events = batch.num_rows();
36 let bytes = batch.estimated_size();
37 match self.when_full {
38 WhenFull::Block => match self.tx.reserve().await {
39 Ok(permit) => {
40 permit.send(batch);
41 self.emit_received(events, bytes);
42 Ok(())
43 }
44 Err(_) => Err(mpsc::error::SendError(batch)),
45 },
46 }
47 }
48
49 pub fn capacity(&self) -> usize {
51 self.tx.capacity()
52 }
53
54 pub fn max_capacity(&self) -> usize {
56 self.tx.max_capacity()
57 }
58
59 pub fn utilization(&self) -> f64 {
61 let max = self.tx.max_capacity();
62 if max == 0 {
63 return 1.0;
64 }
65 let current = max - self.tx.capacity();
66 current as f64 / max as f64
67 }
68
69 pub fn try_send(&self, batch: EventBatch) -> Result<(), mpsc::error::TrySendError<EventBatch>> {
71 let events = batch.num_rows();
72 let bytes = batch.estimated_size();
73 let res = self.tx.try_send(batch);
74 match &res {
75 Ok(_) => {
76 self.emit_received(events, bytes);
77 }
78 Err(mpsc::error::TrySendError::Full(_)) => {
79 counter!("buffer_full_events_total", self.labels.iter()).increment(1);
80 }
82 _ => {}
83 }
84 res
85 }
86
87 fn emit_received(&self, events: usize, bytes: usize) {
88 counter!("buffer_received_events_total", self.labels.iter()).increment(events as u64);
89 counter!("buffer_received_bytes_total", self.labels.iter()).increment(bytes as u64);
90 gauge!("buffer_size_events", self.labels.iter()).increment(events as f64);
91 gauge!("buffer_size_bytes", self.labels.iter()).increment(bytes as f64);
92 }
93}
94
95#[derive(Debug)]
97pub struct BufferReceiver {
98 rx: mpsc::Receiver<EventBatch>,
99 labels: Arc<[Label]>,
100}
101
102impl BufferReceiver {
103 pub async fn recv(&mut self) -> Option<EventBatch> {
105 let batch = self.rx.recv().await;
106 if let Some(ref b) = batch {
107 self.emit_sent(b);
108 }
109 batch
110 }
111
112 pub fn try_recv(&mut self) -> Result<EventBatch, mpsc::error::TryRecvError> {
114 let res = self.rx.try_recv();
115 if let Ok(ref b) = res {
116 self.emit_sent(b);
117 }
118 res
119 }
120
121 fn emit_sent(&self, batch: &EventBatch) {
122 let events = batch.num_rows();
123 let bytes = batch.estimated_size();
124 counter!("buffer_sent_events_total", self.labels.iter()).increment(events as u64);
125 counter!("buffer_sent_bytes_total", self.labels.iter()).increment(bytes as u64);
126 gauge!("buffer_size_events", self.labels.iter()).decrement(events as f64);
127 gauge!("buffer_size_bytes", self.labels.iter()).decrement(bytes as f64);
128 }
129}
130
131pub fn channel(
133 capacity: usize,
134 when_full: WhenFull,
135 component_id: String,
136) -> (BufferSender, BufferReceiver) {
137 if when_full != WhenFull::Block {
138 warn!(
139 "Configured when_full policy {:?} is not supported, falling back to block",
140 when_full
141 );
142 }
143
144 let capacity = capacity.max(1);
145 let (tx, rx) = mpsc::channel(capacity);
146 let labels: Arc<[Label]> = Arc::new([
147 Label::new("component_id", component_id),
148 Label::new("buffer_type", "memory"),
149 ]);
150
151 gauge!("buffer_max_size_events", labels.iter()).set(capacity as f64);
153
154 (
155 BufferSender {
156 tx,
157 when_full: WhenFull::Block,
158 labels: labels.clone(),
159 },
160 BufferReceiver { rx, labels },
161 )
162}
163
164#[cfg(test)]
165#[allow(clippy::unwrap_used)]
166mod tests {
167 use super::*;
168 use arrow_array::RecordBatch;
169 use arrow_schema::{DataType, Field, Schema};
170 use kinetic_core::{ComponentId, EventMetadata};
171 use std::sync::Arc;
172
173 fn empty_batch() -> EventBatch {
174 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
175 let record_batch = RecordBatch::new_empty(schema);
176 let meta = Arc::new(EventMetadata::new("pipe1", ComponentId("src1".to_string())));
177 EventBatch::new(record_batch, meta).expect("failed to create batch")
178 }
179
180 #[tokio::test]
181 async fn test_memory_buffer_block() {
182 let (tx, mut rx) = channel(1, WhenFull::Block, "test_buffer".to_string());
183
184 tx.try_send(empty_batch()).unwrap();
186
187 assert!(tx.try_send(empty_batch()).is_err());
189
190 assert!(rx.recv().await.is_some());
192
193 assert!(tx.try_send(empty_batch()).is_ok());
195 }
196}