1use std::sync::Arc;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6#[derive(Clone, Debug)]
12pub struct AckToken {
13 inner: Arc<AckInner>,
14}
15
16impl PartialEq for AckToken {
17 fn eq(&self, other: &Self) -> bool {
18 Arc::ptr_eq(&self.inner, &other.inner)
19 }
20}
21
22impl Eq for AckToken {}
23
24#[derive(Debug)]
25struct AckInner {
26 notifier: Arc<dyn BatchNotifier>,
27 pending_acks: AtomicUsize,
28}
29
30impl AckToken {
31 pub fn new(notifier: Arc<dyn BatchNotifier>) -> Self {
33 Self {
34 inner: Arc::new(AckInner {
35 notifier,
36 pending_acks: AtomicUsize::new(1),
37 }),
38 }
39 }
40
41 pub fn split(self, n: usize) -> Vec<AckToken> {
50 if n == 0 {
51 self.ack();
52 return Vec::new();
53 }
54 if n == 1 {
55 return vec![self];
56 }
57
58 self.inner.pending_acks.fetch_add(n - 1, Ordering::SeqCst);
61
62 let mut tokens = Vec::with_capacity(n);
63 for _ in 0..n {
64 tokens.push(self.clone());
65 }
66
67 tokens
71 }
72
73 pub fn ack(self) {
78 if self.inner.pending_acks.fetch_sub(1, Ordering::SeqCst) == 1 {
81 self.inner.notifier.on_ack();
82 }
83 }
84
85 pub fn is_acknowledged(&self) -> bool {
87 self.inner.pending_acks.load(Ordering::SeqCst) == 0
88 }
89}
90
91pub trait BatchNotifier: Send + Sync + std::fmt::Debug {
93 fn on_ack(&self);
95}
96
97impl Drop for AckInner {
98 fn drop(&mut self) {
99 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109 use std::sync::atomic::AtomicUsize;
110
111 #[derive(Debug)]
112 struct MockNotifier {
113 call_count: Arc<AtomicUsize>,
114 }
115
116 impl BatchNotifier for MockNotifier {
117 fn on_ack(&self) {
118 self.call_count.fetch_add(1, Ordering::SeqCst);
119 }
120 }
121
122 #[test]
123 fn test_ack_token_triggers_once() {
124 let call_count = Arc::new(AtomicUsize::new(0));
125 let notifier = Arc::new(MockNotifier {
126 call_count: Arc::clone(&call_count),
127 });
128
129 let token = AckToken::new(notifier);
130
131 assert!(!token.is_acknowledged());
132
133 token.clone().ack();
134 assert!(token.is_acknowledged());
135 assert_eq!(call_count.load(Ordering::SeqCst), 1);
136 }
137
138 #[test]
139 fn test_ack_token_split() {
140 let call_count = Arc::new(AtomicUsize::new(0));
141 let notifier = Arc::new(MockNotifier {
142 call_count: Arc::clone(&call_count),
143 });
144
145 let token = AckToken::new(notifier);
146 let tokens = token.split(3);
147 assert_eq!(tokens.len(), 3);
148
149 tokens[0].clone().ack();
150 assert!(!tokens[1].is_acknowledged());
151 assert_eq!(call_count.load(Ordering::SeqCst), 0);
152
153 tokens[1].clone().ack();
154 assert!(!tokens[2].is_acknowledged());
155 assert_eq!(call_count.load(Ordering::SeqCst), 0);
156
157 tokens[2].clone().ack();
158 assert_eq!(call_count.load(Ordering::SeqCst), 1);
159 }
160}