Skip to main content

kinetic_core/
ack.rs

1//! End-to-end acknowledgement tracking for pipeline events.
2
3use std::sync::Arc;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6/// A token representing a batch of events that requires acknowledgement.
7///
8/// When an event batch is successfully written to a sink (or dropped intentionally
9/// by a transform), this token is triggered to notify the original source that it
10/// can commit the batch (e.g., commit Kafka offsets).
11#[derive(Clone, Debug)]
12pub struct AckToken {
13    inner: Arc<AckInner>,
14}
15
16impl PartialEq for AckToken {
17    fn eq(&self, other: &Self) -> bool {
18        Arc::ptr_eq(&self.inner, &other.inner)
19    }
20}
21
22impl Eq for AckToken {}
23
24#[derive(Debug)]
25struct AckInner {
26    notifier: Arc<dyn BatchNotifier>,
27    pending_acks: AtomicUsize,
28}
29
30impl AckToken {
31    /// Creates a new `AckToken` wrapping the given notifier.
32    pub fn new(notifier: Arc<dyn BatchNotifier>) -> Self {
33        Self {
34            inner: Arc::new(AckInner {
35                notifier,
36                pending_acks: AtomicUsize::new(1),
37            }),
38        }
39    }
40
41    /// Splits this token into `n` tokens.
42    ///
43    /// The underlying notifier will only be called after all `n` tokens have been acknowledged.
44    /// This is used by the Fanout mechanism to ensure all downstream components have processed
45    /// the batch before acknowledging it to the source.
46    ///
47    /// If `n` is 0, the original token is effectively acknowledged immediately.
48    /// If `n` is 1, it returns itself.
49    pub fn split(self, n: usize) -> Vec<AckToken> {
50        if n == 0 {
51            self.ack();
52            return Vec::new();
53        }
54        if n == 1 {
55            return vec![self];
56        }
57
58        // We already have 1 pending ack from the original token.
59        // We need to add n - 1 more.
60        self.inner.pending_acks.fetch_add(n - 1, Ordering::SeqCst);
61
62        let mut tokens = Vec::with_capacity(n);
63        for _ in 0..n {
64            tokens.push(self.clone());
65        }
66
67        // Note: `self` is dropped here. This is safe because `AckToken` does not implement
68        // a custom `Drop` that decrements the `pending_acks`. The actual acks are only decremented
69        // when `ack()` or `nack()` is explicitly called. If this design changes, this will break.
70        tokens
71    }
72
73    /// Acknowledges the batch.
74    ///
75    /// Only when all split tokens have been acknowledged will the underlying
76    /// notifier be triggered.
77    pub fn ack(self) {
78        // fetch_sub returns the previous value.
79        // If it was 1, we are the last one to acknowledge.
80        if self.inner.pending_acks.fetch_sub(1, Ordering::SeqCst) == 1 {
81            self.inner.notifier.on_ack();
82        }
83    }
84
85    /// Returns `true` if this token has been fully acknowledged.
86    pub fn is_acknowledged(&self) -> bool {
87        self.inner.pending_acks.load(Ordering::SeqCst) == 0
88    }
89}
90
91/// A trait implemented by sources to receive acknowledgement notifications.
92pub trait BatchNotifier: Send + Sync + std::fmt::Debug {
93    /// Called when the batch associated with this notifier has been fully processed.
94    fn on_ack(&self);
95}
96
97impl Drop for AckInner {
98    fn drop(&mut self) {
99        // If the token is dropped without being acknowledged, we could potentially
100        // trigger a nack() or log a warning. For now, Kinetic's backpressure and
101        // retry model assumes that a dropped token (e.g. during shutdown or unrecoverable error)
102        // simply means the source won't commit, leading to a replay on restart.
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109    use std::sync::atomic::AtomicUsize;
110
111    #[derive(Debug)]
112    struct MockNotifier {
113        call_count: Arc<AtomicUsize>,
114    }
115
116    impl BatchNotifier for MockNotifier {
117        fn on_ack(&self) {
118            self.call_count.fetch_add(1, Ordering::SeqCst);
119        }
120    }
121
122    #[test]
123    fn test_ack_token_triggers_once() {
124        let call_count = Arc::new(AtomicUsize::new(0));
125        let notifier = Arc::new(MockNotifier {
126            call_count: Arc::clone(&call_count),
127        });
128
129        let token = AckToken::new(notifier);
130
131        assert!(!token.is_acknowledged());
132
133        token.clone().ack();
134        assert!(token.is_acknowledged());
135        assert_eq!(call_count.load(Ordering::SeqCst), 1);
136    }
137
138    #[test]
139    fn test_ack_token_split() {
140        let call_count = Arc::new(AtomicUsize::new(0));
141        let notifier = Arc::new(MockNotifier {
142            call_count: Arc::clone(&call_count),
143        });
144
145        let token = AckToken::new(notifier);
146        let tokens = token.split(3);
147        assert_eq!(tokens.len(), 3);
148
149        tokens[0].clone().ack();
150        assert!(!tokens[1].is_acknowledged());
151        assert_eq!(call_count.load(Ordering::SeqCst), 0);
152
153        tokens[1].clone().ack();
154        assert!(!tokens[2].is_acknowledged());
155        assert_eq!(call_count.load(Ordering::SeqCst), 0);
156
157        tokens[2].clone().ack();
158        assert_eq!(call_count.load(Ordering::SeqCst), 1);
159    }
160}