kinetic/topology/
fanout.rs1use arc_swap::ArcSwap;
4use kinetic_buffers::BufferSender;
5use kinetic_core::EventBatch;
6use std::sync::Arc;
7use tracing::{debug, error};
8
9#[derive(Clone, Debug, Default)]
13pub struct Fanout {
14 downstreams: Arc<ArcSwap<Vec<BufferSender>>>,
15}
16
17impl Fanout {
18 pub fn new() -> Self {
20 Self::default()
21 }
22
23 pub fn add_sender(&self, sender: BufferSender) {
25 let old = self.downstreams.load();
26 let mut new_senders = (**old).clone();
27 new_senders.push(sender);
28 self.downstreams.store(Arc::new(new_senders));
29 }
30
31 pub fn replace_senders(&self, new_senders: Vec<BufferSender>) {
33 self.downstreams.store(Arc::new(new_senders));
34 }
35
36 pub fn is_empty(&self) -> bool {
38 self.downstreams.load().is_empty()
39 }
40
41 pub fn get_senders(&self) -> Vec<BufferSender> {
43 (**self.downstreams.load()).clone()
44 }
45
46 pub fn clone_to_sender(&self, component_id: String) -> BufferSender {
49 let (tx, mut rx) =
50 kinetic_buffers::channel(100, kinetic_buffers::WhenFull::Block, component_id);
51 let fanout = self.clone();
52
53 tokio::spawn(async move {
54 while let Some(batch) = rx.recv().await {
55 fanout.send(batch).await;
56 }
57 });
58
59 tx
60 }
61
62 pub async fn send(&self, mut batch: EventBatch) {
64 let downstreams_guard = self.downstreams.load();
65 if downstreams_guard.is_empty() {
66 debug!("Fanout dropped batch because there are no downstream consumers");
67 if let Some(token) = batch.ack_token.take() {
69 token.ack();
70 }
71 return;
72 }
73
74 let num_downstreams = downstreams_guard.len();
75 if let Some(token) = batch.ack_token.take() {
76 let mut tokens = token.split(num_downstreams);
77 for sender in downstreams_guard.iter().take(num_downstreams - 1) {
79 let mut b = batch.clone();
80 b.ack_token = Some(tokens.remove(0));
81 if let Err(e) = sender.send(b).await {
82 error!("Failed to send batch to downstream component: {:?}", e);
83 }
84 }
85 if let Some(last_sender) = downstreams_guard.last() {
87 batch.ack_token = Some(tokens.remove(0));
88 if let Err(e) = last_sender.send(batch).await {
89 error!("Failed to send batch to downstream component: {:?}", e);
90 }
91 }
92 } else {
93 for sender in downstreams_guard.iter().take(num_downstreams - 1) {
95 if let Err(e) = sender.send(batch.clone()).await {
96 error!("Failed to send batch to downstream component: {:?}", e);
97 }
98 }
99 if let Some(last_sender) = downstreams_guard.last()
100 && let Err(e) = last_sender.send(batch).await
101 {
102 error!("Failed to send batch to downstream component: {:?}", e);
103 }
104 }
105 }
106}
107
108#[cfg(test)]
109#[allow(clippy::unwrap_used, clippy::expect_used)]
110mod tests {
111 use super::*;
112 use arrow_array::RecordBatch;
113 use arrow_schema::{DataType, Field, Schema};
114 use kinetic_core::{ComponentId, EventMetadata};
115 use std::sync::Arc;
116
117 fn empty_batch() -> EventBatch {
118 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
119 let record_batch = RecordBatch::new_empty(schema);
120 let meta = Arc::new(EventMetadata::new("pipe1", ComponentId("src1".to_string())));
121 EventBatch::new(record_batch, meta).expect("failed to create batch")
122 }
123
124 #[tokio::test]
125 async fn test_fanout_send() {
126 let fanout = Fanout::new();
127 let (tx1, mut rx1) =
128 kinetic_buffers::channel(10, kinetic_buffers::WhenFull::Block, "c1".to_string());
129 let (tx2, mut rx2) =
130 kinetic_buffers::channel(10, kinetic_buffers::WhenFull::Block, "c2".to_string());
131
132 fanout.add_sender(tx1);
133 fanout.add_sender(tx2);
134
135 let batch = empty_batch();
136 fanout.send(batch).await;
137
138 let b1 = rx1.recv().await.unwrap();
139 let b2 = rx2.recv().await.unwrap();
140
141 assert_eq!(b1.payload.num_rows(), 0);
142 assert_eq!(b2.payload.num_rows(), 0);
143 }
144}