Skip to main content

kinetic/topology/
fanout.rs

1//! Fanout mechanism for cloning EventBatches to multiple downstream channels.
2
3use arc_swap::ArcSwap;
4use kinetic_buffers::BufferSender;
5use kinetic_core::EventBatch;
6use std::sync::Arc;
7use tracing::{debug, error};
8
9/// Sends an incoming `EventBatch` to multiple downstream receivers.
10/// Because `EventBatch` contains `Arc`s internally (like `RecordBatch` and `ArcEventMetadata`),
11/// cloning it is very cheap and does not duplicate the underlying columnar data.
12#[derive(Clone, Debug, Default)]
13pub struct Fanout {
14    downstreams: Arc<ArcSwap<Vec<BufferSender>>>,
15}
16
17impl Fanout {
18    /// Create a new empty Fanout.
19    pub fn new() -> Self {
20        Self::default()
21    }
22
23    /// Add a new downstream destination.
24    pub fn add_sender(&self, sender: BufferSender) {
25        let old = self.downstreams.load();
26        let mut new_senders = (**old).clone();
27        new_senders.push(sender);
28        self.downstreams.store(Arc::new(new_senders));
29    }
30
31    /// Replace all existing downstream destinations with a new set.
32    pub fn replace_senders(&self, new_senders: Vec<BufferSender>) {
33        self.downstreams.store(Arc::new(new_senders));
34    }
35
36    /// Is the fanout empty?
37    pub fn is_empty(&self) -> bool {
38        self.downstreams.load().is_empty()
39    }
40
41    /// Returns a copy of the current list of downstream senders.
42    pub fn get_senders(&self) -> Vec<BufferSender> {
43        (**self.downstreams.load()).clone()
44    }
45
46    /// Creates a BufferSender that feeds into this Fanout.
47    /// This spawns a background task to relay batches from the channel to the fanout.
48    pub fn clone_to_sender(&self, component_id: String) -> BufferSender {
49        let (tx, mut rx) =
50            kinetic_buffers::channel(100, kinetic_buffers::WhenFull::Block, component_id);
51        let fanout = self.clone();
52
53        tokio::spawn(async move {
54            while let Some(batch) = rx.recv().await {
55                fanout.send(batch).await;
56            }
57        });
58
59        tx
60    }
61
62    /// Sends a batch to all downstream consumers.
63    pub async fn send(&self, mut batch: EventBatch) {
64        let downstreams_guard = self.downstreams.load();
65        if downstreams_guard.is_empty() {
66            debug!("Fanout dropped batch because there are no downstream consumers");
67            // If there's an ack token, we should acknowledge it here as it's been "intentionally" dropped
68            if let Some(token) = batch.ack_token.take() {
69                token.ack();
70            }
71            return;
72        }
73
74        let num_downstreams = downstreams_guard.len();
75        if let Some(token) = batch.ack_token.take() {
76            let mut tokens = token.split(num_downstreams);
77            // Process all but the last one by cloning
78            for sender in downstreams_guard.iter().take(num_downstreams - 1) {
79                let mut b = batch.clone();
80                b.ack_token = Some(tokens.remove(0));
81                if let Err(e) = sender.send(b).await {
82                    error!("Failed to send batch to downstream component: {:?}", e);
83                }
84            }
85            // Process the last one by taking ownership of the remaining batch
86            if let Some(last_sender) = downstreams_guard.last() {
87                batch.ack_token = Some(tokens.remove(0));
88                if let Err(e) = last_sender.send(batch).await {
89                    error!("Failed to send batch to downstream component: {:?}", e);
90                }
91            }
92        } else {
93            // No ack token, just clone for all but the last
94            for sender in downstreams_guard.iter().take(num_downstreams - 1) {
95                if let Err(e) = sender.send(batch.clone()).await {
96                    error!("Failed to send batch to downstream component: {:?}", e);
97                }
98            }
99            if let Some(last_sender) = downstreams_guard.last()
100                && let Err(e) = last_sender.send(batch).await
101            {
102                error!("Failed to send batch to downstream component: {:?}", e);
103            }
104        }
105    }
106}
107
108#[cfg(test)]
109#[allow(clippy::unwrap_used, clippy::expect_used)]
110mod tests {
111    use super::*;
112    use arrow_array::RecordBatch;
113    use arrow_schema::{DataType, Field, Schema};
114    use kinetic_core::{ComponentId, EventMetadata};
115    use std::sync::Arc;
116
117    fn empty_batch() -> EventBatch {
118        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
119        let record_batch = RecordBatch::new_empty(schema);
120        let meta = Arc::new(EventMetadata::new("pipe1", ComponentId("src1".to_string())));
121        EventBatch::new(record_batch, meta).expect("failed to create batch")
122    }
123
124    #[tokio::test]
125    async fn test_fanout_send() {
126        let fanout = Fanout::new();
127        let (tx1, mut rx1) =
128            kinetic_buffers::channel(10, kinetic_buffers::WhenFull::Block, "c1".to_string());
129        let (tx2, mut rx2) =
130            kinetic_buffers::channel(10, kinetic_buffers::WhenFull::Block, "c2".to_string());
131
132        fanout.add_sender(tx1);
133        fanout.add_sender(tx2);
134
135        let batch = empty_batch();
136        fanout.send(batch).await;
137
138        let b1 = rx1.recv().await.unwrap();
139        let b2 = rx2.recv().await.unwrap();
140
141        assert_eq!(b1.payload.num_rows(), 0);
142        assert_eq!(b2.payload.num_rows(), 0);
143    }
144}