Skip to main content

kinetic/sinks/kafka/
producer.rs

1//! Kafka sink producer loop.
2use kafka_common::{KafkaProducerConfig, create_producer};
3use kinetic_buffers::BufferReceiver;
4use kinetic_core::ShutdownSignal;
5use metrics::{Label, counter};
6use rdkafka::producer::Producer;
7use rdkafka::producer::{FutureProducer, FutureRecord};
8use tracing::{debug, error, info};
9
10use kinetic_common::register;
11use kinetic_common::telemetry::EventDuration;
12use kinetic_core::encode::Encoder;
13use std::sync::Arc;
14use std::time::Instant;
15
16pub struct ProducerTask {
17    producer: FutureProducer,
18    receiver: BufferReceiver,
19    topic: String,
20    component_id: String,
21    shutdown: ShutdownSignal,
22    encoder: Arc<dyn Encoder>,
23    labels: Arc<[Label]>,
24    event_duration: EventDuration,
25}
26
27use crate::topology::healthcheck::Healthcheck;
28use async_trait::async_trait;
29
30#[async_trait]
31impl Healthcheck for ProducerTask {
32    async fn check(&self) -> anyhow::Result<()> {
33        self.producer
34            .client()
35            .fetch_metadata(Some(&self.topic), std::time::Duration::from_secs(5))
36            .map_err(|e| {
37                anyhow::anyhow!("Failed to fetch metadata for topic '{}': {}", self.topic, e)
38            })?;
39        Ok(())
40    }
41}
42
43impl ProducerTask {
44    pub fn new(
45        config: &KafkaProducerConfig,
46        component_id: String,
47        receiver: BufferReceiver,
48        shutdown: ShutdownSignal,
49        encoder: Arc<dyn Encoder>,
50    ) -> Result<Self, kafka_common::client::Error> {
51        let producer = create_producer(config)?;
52        let labels: Arc<[Label]> = Arc::new([
53            Label::new("component_id", component_id.clone()),
54            Label::new("component_type", "sink"),
55            Label::new("component_kind", "kafka"),
56        ]);
57        let event_duration = register!(EventDuration::new(component_id.clone(), "sink"));
58
59        Ok(Self {
60            producer,
61            receiver,
62            topic: config.topic.clone(),
63            component_id,
64            shutdown,
65            encoder,
66            labels,
67            event_duration,
68        })
69    }
70
71    pub async fn run(mut self) {
72        info!(
73            "Starting Kafka producer task for component: {}",
74            self.component_id
75        );
76
77        loop {
78            tokio::select! {
79                _ = self.shutdown.recv() => {
80                    info!("Kafka sink '{}' received shutdown signal", self.component_id);
81                    break;
82                }
83                batch = self.receiver.recv() => {
84                    let Some(batch) = batch else {
85                        break;
86                    };
87                    let start = Instant::now();
88
89                    counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
90                    counter!("component_received_event_bytes_total", self.labels.iter()).increment(batch.estimated_size() as u64);
91
92                    let messages = match self.encoder.encode_individual(&batch) {
93                        Ok(msgs) => msgs,
94                        Err(e) => {
95                            error!(
96                                "Failed to serialize batch in Kafka sink '{}': {}",
97                                self.component_id, e
98                            );
99                            self.event_duration.emit(start.elapsed());
100                            continue;
101                        }
102                    };
103
104                    // Split the acknowledgement token into sub-tokens for each message
105                    let tokens = if let Some(token) = batch.ack_token {
106                        token.split(messages.len())
107                    } else {
108                        Vec::new()
109                    };
110
111                    let mut delivery_futures = futures::stream::FuturesUnordered::new();
112
113                    for (i, msg) in messages.into_iter().enumerate() {
114                        let payload = msg.as_ref();
115                        let p_len = payload.len();
116                        let record = FutureRecord::to(&self.topic)
117                            .payload(payload)
118                            .key(b""); // Optional key
119
120                        let token = tokens.get(i).cloned();
121
122                        match self.producer.send_result(record) {
123                            Ok(future) => {
124                                delivery_futures.push(async move {
125                                    (future.await, token, p_len)
126                                });
127                            }
128                            Err((e, _)) => {
129                                error!(
130                                    "Failed to enqueue message in component '{}': {}",
131                                    self.component_id, e
132                                );
133                            }
134                        }
135                    }
136
137                    // Await all deliveries for this batch
138                    use futures::StreamExt;
139                    while let Some((result, token, p_len)) = delivery_futures.next().await {
140                        match result {
141                            Ok(Ok(delivery)) => {
142                                debug!("Sent message delivery status: {:?}", delivery);
143                                counter!("component_sent_events_total", self.labels.iter()).increment(1);
144                                counter!("component_sent_network_bytes_total", self.labels.iter()).increment(p_len as u64);
145                                if let Some(t) = token {
146                                    t.ack();
147                                }
148                            }
149                            Ok(Err((e, _))) => {
150                                counter!("component_errors_total", self.labels.iter()).increment(1);
151                                error!(
152                                    "Failed to deliver message in component '{}': {}",
153                                    self.component_id, e
154                                );
155                            }
156                            Err(_) => {
157                                counter!("component_errors_total", self.labels.iter()).increment(1);
158                                error!(
159                                    "Delivery future was canceled in component '{}'",
160                                    self.component_id
161                                );
162                            }
163                        }
164                    }
165                    self.event_duration.emit(start.elapsed());
166                }
167            }
168        }
169
170        info!(
171            "Kafka producer task for component {} shutting down",
172            self.component_id
173        );
174    }
175}