1use kafka_common::{KafkaConsumerConfig, create_consumer};
4use kinetic_buffers::BufferSender;
5use kinetic_core::healthcheck::Healthcheck;
6use kinetic_core::{ArcEventMetadata, ComponentId, EventBatch, EventMetadata, ShutdownSignal};
7use rdkafka::Message;
8use rdkafka::TopicPartitionList;
9use rdkafka::consumer::{Consumer, StreamConsumer};
10use std::fmt;
11use std::sync::Arc;
12use tokio::time::{Duration, sleep};
13use tracing::{debug, error, info, warn};
14
15struct KafkaBatchNotifier {
16 component_id: String,
17 consumer: Arc<StreamConsumer>,
18 offsets: TopicPartitionList,
19}
20
21impl fmt::Debug for KafkaBatchNotifier {
22 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23 f.debug_struct("KafkaBatchNotifier")
24 .field("component_id", &self.component_id)
25 .finish()
26 }
27}
28
29impl kinetic_core::ack::BatchNotifier for KafkaBatchNotifier {
30 fn on_ack(&self) {
31 debug!(
32 message = "Kafka source batch acknowledged, committing offsets.",
33 component_id = %self.component_id
34 );
35 if let Err(e) = self
36 .consumer
37 .commit(&self.offsets, rdkafka::consumer::CommitMode::Async)
38 {
39 error!(
40 message = "Failed to commit Kafka offsets.",
41 component_id = %self.component_id,
42 err = %e
43 );
44 }
45 }
46}
47
48use kinetic_common::register;
49use kinetic_common::telemetry::EventDuration;
50use kinetic_core::encode::Decoder;
51use metrics::{Label, counter};
52use std::time::Instant;
53
54pub struct ConsumerTask {
55 consumer: Arc<StreamConsumer>,
56 sender: BufferSender,
57 #[allow(dead_code)]
58 error_sender: BufferSender,
59 metadata_template: ArcEventMetadata,
60 component_id: String,
61 shutdown: ShutdownSignal,
62 decoder: Arc<dyn Decoder>,
63 labels: Arc<[Label]>,
64 event_duration: EventDuration,
65}
66
67impl ConsumerTask {
68 pub fn new(
69 config: &KafkaConsumerConfig,
70 component_id: String,
71 pipeline_id: String,
72 sender: BufferSender,
73 error_sender: BufferSender,
74 shutdown: ShutdownSignal,
75 decoder: Arc<dyn Decoder>,
76 ) -> Result<Self, kafka_common::client::Error> {
77 let consumer = create_consumer(config)?;
78 let metadata_template = Arc::new(EventMetadata::new(
79 pipeline_id,
80 ComponentId(component_id.clone()),
81 ));
82 let labels: Arc<[Label]> = Arc::new([
83 Label::new("component_id", component_id.clone()),
84 Label::new("component_type", "source"),
85 Label::new("component_kind", "kafka"),
86 ]);
87 let event_duration = register!(EventDuration::new(component_id.clone(), "source"));
88
89 Ok(Self {
90 consumer: Arc::new(consumer),
91 sender,
92 error_sender,
93 metadata_template,
94 component_id,
95 shutdown,
96 decoder,
97 labels,
98 event_duration,
99 })
100 }
101
102 pub async fn run(self) {
103 info!(
104 message = "Starting Kafka consumer task.",
105 component_id = %self.component_id
106 );
107
108 let mut batches: Vec<EventBatch> = Vec::new();
109 let mut offsets = TopicPartitionList::new();
110 let mut interval = tokio::time::interval(Duration::from_secs(5));
111 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
112
113 let mut shutdown = self.shutdown.clone();
114 let mut is_paused = false;
115
116 loop {
117 let utilization = self.sender.utilization();
119 if !is_paused && utilization >= 0.8 {
120 warn!(
121 message = "Buffer utilization high, pausing Kafka ingestion.",
122 component_id = %self.component_id,
123 utilization = %format!("{:.2}%", utilization * 100.0)
124 );
125 if let Ok(assignment) = self.consumer.assignment() {
126 if let Err(e) = self.consumer.pause(&assignment) {
127 error!(
128 message = "Failed to pause Kafka consumer.",
129 component_id = %self.component_id,
130 err = %e
131 );
132 } else {
133 is_paused = true;
134 }
135 }
136 } else if is_paused && utilization < 0.5 {
137 info!(
138 message = "Buffer utilization stabilized, resuming Kafka ingestion.",
139 component_id = %self.component_id,
140 utilization = %format!("{:.2}%", utilization * 100.0)
141 );
142 if let Ok(assignment) = self.consumer.assignment() {
143 if let Err(e) = self.consumer.resume(&assignment) {
144 error!(
145 message = "Failed to resume Kafka consumer.",
146 component_id = %self.component_id,
147 err = %e
148 );
149 } else {
150 is_paused = false;
151 }
152 }
153 }
154
155 tokio::select! {
156 _ = shutdown.recv() => {
157 info!(
158 message = "Received shutdown signal.",
159 component_id = %self.component_id
160 );
161 if !batches.is_empty() {
162 self.flush_batches(&mut batches, &mut offsets).await;
163 }
164 self.consumer.unsubscribe();
165 break;
166 }
167 _ = interval.tick() => {
168 if !batches.is_empty() {
169 self.flush_batches(&mut batches, &mut offsets).await;
170 }
171 }
172 msg = self.consumer.recv() => {
173 match msg {
174 Err(e) => {
175 error!(
176 message = "Kafka receive error.",
177 component_id = %self.component_id,
178 err = %e,
179 stage = "receiving",
180 error_type = "network_failure"
181 );
182 let mut err_labels = self.labels.to_vec();
183 err_labels.push(Label::new("stage", "receiving"));
184 err_labels.push(Label::new("error_type", "network_failure"));
185 counter!("component_errors_total", err_labels).increment(1);
186 sleep(Duration::from_secs(1)).await;
187 }
188 Ok(m) => {
189 let start = Instant::now();
190 if let Err(e) = offsets.add_partition_offset(m.topic(), m.partition(), rdkafka::Offset::Offset(m.offset() + 1)) {
192 error!(
193 message = "Failed to add partition offset to list.",
194 err = %e,
195 topic = %m.topic(),
196 partition = %m.partition()
197 );
198 }
199
200 if let Some(p) = m.payload() {
201 let p_len = p.len();
202 counter!("component_received_network_bytes_total", self.labels.iter()).increment(p_len as u64);
203
204 match self.decoder.decode(p, self.metadata_template.clone()) {
208 Ok(batch) => {
209 counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
210 counter!("component_received_event_bytes_total", self.labels.iter()).increment(batch.estimated_size() as u64);
211 batches.push(batch);
212 if batches.len() >= 1000 {
213 self.flush_batches(&mut batches, &mut offsets).await;
214 interval.reset();
215 }
216 }
217 Err(e) => {
218 error!(
219 message = "Failed to decode Kafka message.",
220 component_id = %self.component_id,
221 err = %e,
222 stage = "receiving",
223 error_type = "decode_failure"
224 );
225 let mut err_labels = self.labels.to_vec();
226 err_labels.push(Label::new("stage", "receiving"));
227 err_labels.push(Label::new("error_type", "decode_failure"));
228 counter!("component_errors_total", err_labels).increment(1);
229 }
230 }
231 }
232 self.event_duration.emit(start.elapsed());
233 }
234 }
235 }
236 }
237 }
238 }
239
240 async fn flush_batches(&self, batches: &mut Vec<EventBatch>, offsets: &mut TopicPartitionList) {
241 if batches.is_empty() {
242 return;
243 }
244
245 let mut groups: std::collections::HashMap<Arc<arrow_schema::Schema>, Vec<EventBatch>> =
247 std::collections::HashMap::new();
248
249 for batch in batches.drain(..) {
250 groups
251 .entry(batch.payload.schema())
252 .or_default()
253 .push(batch);
254 }
255
256 for (schema, group_batches) in groups {
257 let record_batches: Vec<_> = group_batches.iter().map(|b| b.payload.clone()).collect();
258
259 let merged_payload =
260 match arrow_select::concat::concat_batches(&schema, record_batches.iter()) {
261 Ok(rb) => rb,
262 Err(e) => {
263 error!(
264 message = "Failed to concat batches in Kafka source.",
265 component_id = %self.component_id,
266 err = %e,
267 stage = "processing",
268 error_type = "concat_failure"
269 );
270 let mut err_labels = self.labels.to_vec();
271 err_labels.push(Label::new("stage", "processing"));
272 err_labels.push(Label::new("error_type", "concat_failure"));
273 counter!("component_errors_total", err_labels).increment(1);
274 continue;
275 }
276 };
277
278 let mut event_batch =
279 match EventBatch::new(merged_payload, self.metadata_template.clone()) {
280 Ok(eb) => eb,
281 Err(e) => {
282 error!(
283 message = "Failed to create EventBatch.",
284 component_id = %self.component_id,
285 err = %e,
286 stage = "processing",
287 error_type = "event_batch_failure"
288 );
289 let mut err_labels = self.labels.to_vec();
290 err_labels.push(Label::new("stage", "processing"));
291 err_labels.push(Label::new("error_type", "event_batch_failure"));
292 counter!("component_errors_total", err_labels).increment(1);
293 continue;
294 }
295 };
296
297 let notifier = KafkaBatchNotifier {
299 component_id: self.component_id.clone(),
300 consumer: self.consumer.clone(),
301 offsets: offsets.clone(),
302 };
303 let token = kinetic_core::ack::AckToken::new(Arc::new(notifier));
304 event_batch.ack_token = Some(token);
305
306 let num_rows = event_batch.num_rows();
307 if let Err(e) = self.sender.send(event_batch).await {
308 let mut err_labels = self.labels.to_vec();
309 err_labels.push(Label::new("stage", "processing"));
310 err_labels.push(Label::new("error_type", "buffer_send_failure"));
311 counter!("component_errors_total", err_labels).increment(1);
312 error!(
313 message = "Failed to send batch to buffer in Kafka source.",
314 component_id = %self.component_id,
315 err = %e,
316 stage = "processing",
317 error_type = "buffer_send_failure"
318 );
319 } else {
320 counter!("component_sent_events_total", self.labels.iter())
321 .increment(num_rows as u64);
322 }
323 }
324
325 *offsets = TopicPartitionList::new();
326 }
327}
328
329#[async_trait::async_trait]
330impl Healthcheck for ConsumerTask {
331 async fn check(&self) -> anyhow::Result<()> {
332 self.consumer
334 .fetch_metadata(None, Duration::from_secs(5))
335 .map(|_| ())
336 .map_err(|e| {
337 anyhow::anyhow!(
338 "Kafka healthcheck failed for component '{}': {}",
339 self.component_id,
340 e
341 )
342 })
343 }
344}