kinetic/sinks/kafka/
producer.rs1use kafka_common::{KafkaProducerConfig, create_producer};
3use kinetic_buffers::BufferReceiver;
4use kinetic_core::ShutdownSignal;
5use metrics::{Label, counter};
6use rdkafka::producer::Producer;
7use rdkafka::producer::{FutureProducer, FutureRecord};
8use tracing::{debug, error, info};
9
10use kinetic_common::register;
11use kinetic_common::telemetry::EventDuration;
12use kinetic_core::encode::Encoder;
13use std::sync::Arc;
14use std::time::Instant;
15
16pub struct ProducerTask {
17 producer: FutureProducer,
18 receiver: BufferReceiver,
19 topic: String,
20 component_id: String,
21 shutdown: ShutdownSignal,
22 encoder: Arc<dyn Encoder>,
23 labels: Arc<[Label]>,
24 event_duration: EventDuration,
25}
26
27use crate::topology::healthcheck::Healthcheck;
28use async_trait::async_trait;
29
30#[async_trait]
31impl Healthcheck for ProducerTask {
32 async fn check(&self) -> anyhow::Result<()> {
33 self.producer
34 .client()
35 .fetch_metadata(Some(&self.topic), std::time::Duration::from_secs(5))
36 .map_err(|e| {
37 anyhow::anyhow!("Failed to fetch metadata for topic '{}': {}", self.topic, e)
38 })?;
39 Ok(())
40 }
41}
42
43impl ProducerTask {
44 pub fn new(
45 config: &KafkaProducerConfig,
46 component_id: String,
47 receiver: BufferReceiver,
48 shutdown: ShutdownSignal,
49 encoder: Arc<dyn Encoder>,
50 ) -> Result<Self, kafka_common::client::Error> {
51 let producer = create_producer(config)?;
52 let labels: Arc<[Label]> = Arc::new([
53 Label::new("component_id", component_id.clone()),
54 Label::new("component_type", "sink"),
55 Label::new("component_kind", "kafka"),
56 ]);
57 let event_duration = register!(EventDuration::new(component_id.clone(), "sink"));
58
59 Ok(Self {
60 producer,
61 receiver,
62 topic: config.topic.clone(),
63 component_id,
64 shutdown,
65 encoder,
66 labels,
67 event_duration,
68 })
69 }
70
71 pub async fn run(mut self) {
72 info!(
73 "Starting Kafka producer task for component: {}",
74 self.component_id
75 );
76
77 loop {
78 tokio::select! {
79 _ = self.shutdown.recv() => {
80 info!("Kafka sink '{}' received shutdown signal", self.component_id);
81 break;
82 }
83 batch = self.receiver.recv() => {
84 let Some(batch) = batch else {
85 break;
86 };
87 let start = Instant::now();
88
89 counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
90 counter!("component_received_event_bytes_total", self.labels.iter()).increment(batch.estimated_size() as u64);
91
92 let messages = match self.encoder.encode_individual(&batch) {
93 Ok(msgs) => msgs,
94 Err(e) => {
95 error!(
96 "Failed to serialize batch in Kafka sink '{}': {}",
97 self.component_id, e
98 );
99 self.event_duration.emit(start.elapsed());
100 continue;
101 }
102 };
103
104 let tokens = if let Some(token) = batch.ack_token {
106 token.split(messages.len())
107 } else {
108 Vec::new()
109 };
110
111 let mut delivery_futures = futures::stream::FuturesUnordered::new();
112
113 for (i, msg) in messages.into_iter().enumerate() {
114 let payload = msg.as_ref();
115 let p_len = payload.len();
116 let record = FutureRecord::to(&self.topic)
117 .payload(payload)
118 .key(b""); let token = tokens.get(i).cloned();
121
122 match self.producer.send_result(record) {
123 Ok(future) => {
124 delivery_futures.push(async move {
125 (future.await, token, p_len)
126 });
127 }
128 Err((e, _)) => {
129 error!(
130 "Failed to enqueue message in component '{}': {}",
131 self.component_id, e
132 );
133 }
134 }
135 }
136
137 use futures::StreamExt;
139 while let Some((result, token, p_len)) = delivery_futures.next().await {
140 match result {
141 Ok(Ok(delivery)) => {
142 debug!("Sent message delivery status: {:?}", delivery);
143 counter!("component_sent_events_total", self.labels.iter()).increment(1);
144 counter!("component_sent_network_bytes_total", self.labels.iter()).increment(p_len as u64);
145 if let Some(t) = token {
146 t.ack();
147 }
148 }
149 Ok(Err((e, _))) => {
150 counter!("component_errors_total", self.labels.iter()).increment(1);
151 error!(
152 "Failed to deliver message in component '{}': {}",
153 self.component_id, e
154 );
155 }
156 Err(_) => {
157 counter!("component_errors_total", self.labels.iter()).increment(1);
158 error!(
159 "Delivery future was canceled in component '{}'",
160 self.component_id
161 );
162 }
163 }
164 }
165 self.event_duration.emit(start.elapsed());
166 }
167 }
168 }
169
170 info!(
171 "Kafka producer task for component {} shutting down",
172 self.component_id
173 );
174 }
175}