Skip to main content

kafka_common/
config.rs

1//! Configuration structs for Kafka clients.
2
3use kinetic_common::config::{SinkConfig, SinkContext, SourceConfig, SourceContext};
4use kinetic_doc_derive::{ComponentDoc, FieldDoc};
5use serde::{Deserialize, Serialize};
6use tokio::task::JoinHandle;
7
8/// Shared configuration for Kafka client connections.
9#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
10pub struct KafkaClientConfig {
11    /// Comma-separated list of Kafka broker addresses.
12    #[doc_field(required, example = "localhost:9092")]
13    pub bootstrap_servers: String,
14
15    /// Optional Kafka client identifier.
16    #[serde(default)]
17    #[doc_field(example = "kinetic-worker-1")]
18    pub client_id: Option<String>,
19
20    /// Additional librdkafka configuration properties.
21    #[serde(default)]
22    #[doc_field(example = "{\"compression.type\": \"lz4\"}")]
23    pub properties: std::collections::HashMap<String, String>,
24}
25
26/// Reads events from one or more Apache Kafka topics using a consumer group.
27#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
28pub struct KafkaConsumerConfig {
29    /// Shared Kafka client connection settings.
30    pub client: KafkaClientConfig,
31
32    /// Kafka consumer group identifier.
33    #[doc_field(required, example = "kinetic-ingest")]
34    pub group_id: String,
35
36    /// List of Kafka topics to consume from.
37    #[doc_field(required, example = "[\"events\", \"logs\"]")]
38    pub topics: Vec<String>,
39
40    /// Where to begin consuming when no committed offset exists.
41    #[serde(default = "default_auto_offset_reset")]
42    #[doc_field(default = "earliest", example = "latest")]
43    pub auto_offset_reset: String,
44}
45
46impl SourceConfig for KafkaConsumerConfig {
47    fn build(&self, _cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
48        Err(anyhow::anyhow!(
49            "Kafka source build must be implemented in the data plane"
50        ))
51    }
52}
53
54/// Produces events to an Apache Kafka topic.
55#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
56pub struct KafkaProducerConfig {
57    /// Shared Kafka client connection settings.
58    pub client: KafkaClientConfig,
59
60    /// The Kafka topic to produce events to.
61    #[doc_field(required, example = "events-ingest")]
62    pub topic: String,
63}
64
65impl SinkConfig for KafkaProducerConfig {
66    fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
67        Err(anyhow::anyhow!(
68            "Kafka sink build must be implemented in the data plane"
69        ))
70    }
71}
72
73fn default_auto_offset_reset() -> String {
74    "earliest".to_string()
75}