1use crate::config::{KafkaConsumerConfig, KafkaProducerConfig};
4use rdkafka::ClientConfig;
5use rdkafka::consumer::{Consumer, StreamConsumer};
6use rdkafka::producer::FutureProducer;
7use snafu::Snafu;
8
9#[derive(Debug, Snafu)]
10pub enum Error {
11 #[snafu(display("Failed to create Kafka client: {}", source))]
12 ClientCreation { source: rdkafka::error::KafkaError },
13
14 #[snafu(display("Failed to subscribe to topics: {}", source))]
15 Subscription { source: rdkafka::error::KafkaError },
16}
17
18pub type Result<T, E = Error> = std::result::Result<T, E>;
19
20pub fn create_consumer(config: &KafkaConsumerConfig) -> Result<StreamConsumer> {
22 let mut client_config = ClientConfig::new();
23
24 client_config
25 .set("bootstrap.servers", &config.client.bootstrap_servers)
26 .set("group.id", &config.group_id)
27 .set("auto.offset.reset", &config.auto_offset_reset)
28 .set("enable.auto.commit", "false"); if let Some(client_id) = &config.client.client_id {
31 client_config.set("client.id", client_id);
32 }
33
34 for (k, v) in &config.client.properties {
35 if k == "security.protocol" || k.starts_with("ssl.") || k.starts_with("sasl.") {
36 tracing::warn!(
37 "Overriding Kafka security property '{}' via custom properties. This may have security implications.",
38 k
39 );
40 }
41 client_config.set(k, v);
42 }
43
44 let consumer: StreamConsumer = client_config
45 .create()
46 .map_err(|e| Error::ClientCreation { source: e })?;
47
48 let topics: Vec<&str> = config.topics.iter().map(|s| s.as_str()).collect();
49 consumer
50 .subscribe(&topics)
51 .map_err(|e| Error::Subscription { source: e })?;
52
53 Ok(consumer)
54}
55
56pub fn create_producer(config: &KafkaProducerConfig) -> Result<FutureProducer> {
58 let mut client_config = ClientConfig::new();
59
60 client_config.set("bootstrap.servers", &config.client.bootstrap_servers);
61
62 if let Some(client_id) = &config.client.client_id {
63 client_config.set("client.id", client_id);
64 }
65
66 for (k, v) in &config.client.properties {
67 if k == "security.protocol" || k.starts_with("ssl.") || k.starts_with("sasl.") {
68 tracing::warn!(
69 "Overriding Kafka security property '{}' via custom properties. This may have security implications.",
70 k
71 );
72 }
73 client_config.set(k, v);
74 }
75
76 let producer: FutureProducer = client_config
77 .create()
78 .map_err(|e| Error::ClientCreation { source: e })?;
79
80 Ok(producer)
81}