Skip to main content

kinetic/sources/kafka/
consumer.rs

1//! Kafka source consumer loop.
2
3use kafka_common::{KafkaConsumerConfig, create_consumer};
4use kinetic_buffers::BufferSender;
5use kinetic_core::healthcheck::Healthcheck;
6use kinetic_core::{ArcEventMetadata, ComponentId, EventBatch, EventMetadata, ShutdownSignal};
7use rdkafka::Message;
8use rdkafka::TopicPartitionList;
9use rdkafka::consumer::{Consumer, StreamConsumer};
10use std::fmt;
11use std::sync::Arc;
12use tokio::time::{Duration, sleep};
13use tracing::{debug, error, info, warn};
14
15struct KafkaBatchNotifier {
16    component_id: String,
17    consumer: Arc<StreamConsumer>,
18    offsets: TopicPartitionList,
19}
20
21impl fmt::Debug for KafkaBatchNotifier {
22    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23        f.debug_struct("KafkaBatchNotifier")
24            .field("component_id", &self.component_id)
25            .finish()
26    }
27}
28
29impl kinetic_core::ack::BatchNotifier for KafkaBatchNotifier {
30    fn on_ack(&self) {
31        debug!(
32            message = "Kafka source batch acknowledged, committing offsets.",
33            component_id = %self.component_id
34        );
35        if let Err(e) = self
36            .consumer
37            .commit(&self.offsets, rdkafka::consumer::CommitMode::Async)
38        {
39            error!(
40                message = "Failed to commit Kafka offsets.",
41                component_id = %self.component_id,
42                err = %e
43            );
44        }
45    }
46}
47
48use kinetic_common::register;
49use kinetic_common::telemetry::EventDuration;
50use kinetic_core::encode::Decoder;
51use metrics::{Label, counter};
52use std::time::Instant;
53
54pub struct ConsumerTask {
55    consumer: Arc<StreamConsumer>,
56    sender: BufferSender,
57    #[allow(dead_code)]
58    error_sender: BufferSender,
59    metadata_template: ArcEventMetadata,
60    component_id: String,
61    shutdown: ShutdownSignal,
62    decoder: Arc<dyn Decoder>,
63    labels: Arc<[Label]>,
64    event_duration: EventDuration,
65}
66
67impl ConsumerTask {
68    pub fn new(
69        config: &KafkaConsumerConfig,
70        component_id: String,
71        pipeline_id: String,
72        sender: BufferSender,
73        error_sender: BufferSender,
74        shutdown: ShutdownSignal,
75        decoder: Arc<dyn Decoder>,
76    ) -> Result<Self, kafka_common::client::Error> {
77        let consumer = create_consumer(config)?;
78        let metadata_template = Arc::new(EventMetadata::new(
79            pipeline_id,
80            ComponentId(component_id.clone()),
81        ));
82        let labels: Arc<[Label]> = Arc::new([
83            Label::new("component_id", component_id.clone()),
84            Label::new("component_type", "source"),
85            Label::new("component_kind", "kafka"),
86        ]);
87        let event_duration = register!(EventDuration::new(component_id.clone(), "source"));
88
89        Ok(Self {
90            consumer: Arc::new(consumer),
91            sender,
92            error_sender,
93            metadata_template,
94            component_id,
95            shutdown,
96            decoder,
97            labels,
98            event_duration,
99        })
100    }
101
102    pub async fn run(self) {
103        info!(
104            message = "Starting Kafka consumer task.",
105            component_id = %self.component_id
106        );
107
108        let mut batches: Vec<EventBatch> = Vec::new();
109        let mut offsets = TopicPartitionList::new();
110        let mut interval = tokio::time::interval(Duration::from_secs(5));
111        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
112
113        let mut shutdown = self.shutdown.clone();
114        let mut is_paused = false;
115
116        loop {
117            // Backpressure: pause if buffer > 80%, resume if < 50%
118            let utilization = self.sender.utilization();
119            if !is_paused && utilization >= 0.8 {
120                warn!(
121                    message = "Buffer utilization high, pausing Kafka ingestion.",
122                    component_id = %self.component_id,
123                    utilization = %format!("{:.2}%", utilization * 100.0)
124                );
125                if let Ok(assignment) = self.consumer.assignment() {
126                    if let Err(e) = self.consumer.pause(&assignment) {
127                        error!(
128                            message = "Failed to pause Kafka consumer.",
129                            component_id = %self.component_id,
130                            err = %e
131                        );
132                    } else {
133                        is_paused = true;
134                    }
135                }
136            } else if is_paused && utilization < 0.5 {
137                info!(
138                    message = "Buffer utilization stabilized, resuming Kafka ingestion.",
139                    component_id = %self.component_id,
140                    utilization = %format!("{:.2}%", utilization * 100.0)
141                );
142                if let Ok(assignment) = self.consumer.assignment() {
143                    if let Err(e) = self.consumer.resume(&assignment) {
144                        error!(
145                            message = "Failed to resume Kafka consumer.",
146                            component_id = %self.component_id,
147                            err = %e
148                        );
149                    } else {
150                        is_paused = false;
151                    }
152                }
153            }
154
155            tokio::select! {
156                _ = shutdown.recv() => {
157                    info!(
158                        message = "Received shutdown signal.",
159                        component_id = %self.component_id
160                    );
161                    if !batches.is_empty() {
162                        self.flush_batches(&mut batches, &mut offsets).await;
163                    }
164                    self.consumer.unsubscribe();
165                    break;
166                }
167                _ = interval.tick() => {
168                    if !batches.is_empty() {
169                        self.flush_batches(&mut batches, &mut offsets).await;
170                    }
171                }
172                msg = self.consumer.recv() => {
173                    match msg {
174                        Err(e) => {
175                            error!(
176                                message = "Kafka receive error.",
177                                component_id = %self.component_id,
178                                err = %e,
179                                stage = "receiving",
180                                error_type = "network_failure"
181                            );
182                            let mut err_labels = self.labels.to_vec();
183                            err_labels.push(Label::new("stage", "receiving"));
184                            err_labels.push(Label::new("error_type", "network_failure"));
185                            counter!("component_errors_total", err_labels).increment(1);
186                            sleep(Duration::from_secs(1)).await;
187                        }
188                        Ok(m) => {
189                            let start = Instant::now();
190                            // Store offset for committing later on ACK
191                            if let Err(e) = offsets.add_partition_offset(m.topic(), m.partition(), rdkafka::Offset::Offset(m.offset() + 1)) {
192                                error!(
193                                    message = "Failed to add partition offset to list.",
194                                    err = %e,
195                                    topic = %m.topic(),
196                                    partition = %m.partition()
197                                );
198                            }
199
200                            if let Some(p) = m.payload() {
201                                let p_len = p.len();
202                                counter!("component_received_network_bytes_total", self.labels.iter()).increment(p_len as u64);
203
204                                // TODO: Registry Validation (Security Rule)
205                                // Security: Validate all incoming schemas against the Registry before decoding.
206
207                                match self.decoder.decode(p, self.metadata_template.clone()) {
208                                    Ok(batch) => {
209                                        counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
210                                        counter!("component_received_event_bytes_total", self.labels.iter()).increment(batch.estimated_size() as u64);
211                                        batches.push(batch);
212                                        if batches.len() >= 1000 {
213                                            self.flush_batches(&mut batches, &mut offsets).await;
214                                            interval.reset();
215                                        }
216                                    }
217                                    Err(e) => {
218                                        error!(
219                                            message = "Failed to decode Kafka message.",
220                                            component_id = %self.component_id,
221                                            err = %e,
222                                            stage = "receiving",
223                                            error_type = "decode_failure"
224                                        );
225                                        let mut err_labels = self.labels.to_vec();
226                                        err_labels.push(Label::new("stage", "receiving"));
227                                        err_labels.push(Label::new("error_type", "decode_failure"));
228                                        counter!("component_errors_total", err_labels).increment(1);
229                                    }
230                                }
231                            }
232                            self.event_duration.emit(start.elapsed());
233                        }
234                    }
235                }
236            }
237        }
238    }
239
240    async fn flush_batches(&self, batches: &mut Vec<EventBatch>, offsets: &mut TopicPartitionList) {
241        if batches.is_empty() {
242            return;
243        }
244
245        // Group batches by schema to ensure concat_batches succeeds
246        let mut groups: std::collections::HashMap<Arc<arrow_schema::Schema>, Vec<EventBatch>> =
247            std::collections::HashMap::new();
248
249        for batch in batches.drain(..) {
250            groups
251                .entry(batch.payload.schema())
252                .or_default()
253                .push(batch);
254        }
255
256        for (schema, group_batches) in groups {
257            let record_batches: Vec<_> = group_batches.iter().map(|b| b.payload.clone()).collect();
258
259            let merged_payload =
260                match arrow_select::concat::concat_batches(&schema, record_batches.iter()) {
261                    Ok(rb) => rb,
262                    Err(e) => {
263                        error!(
264                            message = "Failed to concat batches in Kafka source.",
265                            component_id = %self.component_id,
266                            err = %e,
267                            stage = "processing",
268                            error_type = "concat_failure"
269                        );
270                        let mut err_labels = self.labels.to_vec();
271                        err_labels.push(Label::new("stage", "processing"));
272                        err_labels.push(Label::new("error_type", "concat_failure"));
273                        counter!("component_errors_total", err_labels).increment(1);
274                        continue;
275                    }
276                };
277
278            let mut event_batch =
279                match EventBatch::new(merged_payload, self.metadata_template.clone()) {
280                    Ok(eb) => eb,
281                    Err(e) => {
282                        error!(
283                            message = "Failed to create EventBatch.",
284                            component_id = %self.component_id,
285                            err = %e,
286                            stage = "processing",
287                            error_type = "event_batch_failure"
288                        );
289                        let mut err_labels = self.labels.to_vec();
290                        err_labels.push(Label::new("stage", "processing"));
291                        err_labels.push(Label::new("error_type", "event_batch_failure"));
292                        counter!("component_errors_total", err_labels).increment(1);
293                        continue;
294                    }
295                };
296
297            // Combine ACK tokens if multiple batches were merged
298            let notifier = KafkaBatchNotifier {
299                component_id: self.component_id.clone(),
300                consumer: self.consumer.clone(),
301                offsets: offsets.clone(),
302            };
303            let token = kinetic_core::ack::AckToken::new(Arc::new(notifier));
304            event_batch.ack_token = Some(token);
305
306            let num_rows = event_batch.num_rows();
307            if let Err(e) = self.sender.send(event_batch).await {
308                let mut err_labels = self.labels.to_vec();
309                err_labels.push(Label::new("stage", "processing"));
310                err_labels.push(Label::new("error_type", "buffer_send_failure"));
311                counter!("component_errors_total", err_labels).increment(1);
312                error!(
313                    message = "Failed to send batch to buffer in Kafka source.",
314                    component_id = %self.component_id,
315                    err = %e,
316                    stage = "processing",
317                    error_type = "buffer_send_failure"
318                );
319            } else {
320                counter!("component_sent_events_total", self.labels.iter())
321                    .increment(num_rows as u64);
322            }
323        }
324
325        *offsets = TopicPartitionList::new();
326    }
327}
328
329#[async_trait::async_trait]
330impl Healthcheck for ConsumerTask {
331    async fn check(&self) -> anyhow::Result<()> {
332        // Attempt to fetch metadata from the cluster with a timeout
333        self.consumer
334            .fetch_metadata(None, Duration::from_secs(5))
335            .map(|_| ())
336            .map_err(|e| {
337                anyhow::anyhow!(
338                    "Kafka healthcheck failed for component '{}': {}",
339                    self.component_id,
340                    e
341                )
342            })
343    }
344}