Skip to main content

kafka_common/
client.rs

1//! Helper functions to create rdkafka clients from configuration.
2
3use crate::config::{KafkaConsumerConfig, KafkaProducerConfig};
4use rdkafka::ClientConfig;
5use rdkafka::consumer::{Consumer, StreamConsumer};
6use rdkafka::producer::FutureProducer;
7use snafu::Snafu;
8
9#[derive(Debug, Snafu)]
10pub enum Error {
11    #[snafu(display("Failed to create Kafka client: {}", source))]
12    ClientCreation { source: rdkafka::error::KafkaError },
13
14    #[snafu(display("Failed to subscribe to topics: {}", source))]
15    Subscription { source: rdkafka::error::KafkaError },
16}
17
18pub type Result<T, E = Error> = std::result::Result<T, E>;
19
20/// Creates a `StreamConsumer` based on the given configuration.
21pub fn create_consumer(config: &KafkaConsumerConfig) -> Result<StreamConsumer> {
22    let mut client_config = ClientConfig::new();
23
24    client_config
25        .set("bootstrap.servers", &config.client.bootstrap_servers)
26        .set("group.id", &config.group_id)
27        .set("auto.offset.reset", &config.auto_offset_reset)
28        .set("enable.auto.commit", "false"); // We manage offsets manually
29
30    if let Some(client_id) = &config.client.client_id {
31        client_config.set("client.id", client_id);
32    }
33
34    for (k, v) in &config.client.properties {
35        if k == "security.protocol" || k.starts_with("ssl.") || k.starts_with("sasl.") {
36            tracing::warn!(
37                "Overriding Kafka security property '{}' via custom properties. This may have security implications.",
38                k
39            );
40        }
41        client_config.set(k, v);
42    }
43
44    let consumer: StreamConsumer = client_config
45        .create()
46        .map_err(|e| Error::ClientCreation { source: e })?;
47
48    let topics: Vec<&str> = config.topics.iter().map(|s| s.as_str()).collect();
49    consumer
50        .subscribe(&topics)
51        .map_err(|e| Error::Subscription { source: e })?;
52
53    Ok(consumer)
54}
55
56/// Creates a `FutureProducer` based on the given configuration.
57pub fn create_producer(config: &KafkaProducerConfig) -> Result<FutureProducer> {
58    let mut client_config = ClientConfig::new();
59
60    client_config.set("bootstrap.servers", &config.client.bootstrap_servers);
61
62    if let Some(client_id) = &config.client.client_id {
63        client_config.set("client.id", client_id);
64    }
65
66    for (k, v) in &config.client.properties {
67        if k == "security.protocol" || k.starts_with("ssl.") || k.starts_with("sasl.") {
68            tracing::warn!(
69                "Overriding Kafka security property '{}' via custom properties. This may have security implications.",
70                k
71            );
72        }
73        client_config.set(k, v);
74    }
75
76    let producer: FutureProducer = client_config
77        .create()
78        .map_err(|e| Error::ClientCreation { source: e })?;
79
80    Ok(producer)
81}