1use kinetic_common::config::{SinkConfig, SinkContext, SourceConfig, SourceContext};
4use kinetic_doc_derive::{ComponentDoc, FieldDoc};
5use serde::{Deserialize, Serialize};
6use tokio::task::JoinHandle;
7
8#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, FieldDoc)]
10pub struct KafkaClientConfig {
11 #[doc_field(required, example = "localhost:9092")]
13 pub bootstrap_servers: String,
14
15 #[serde(default)]
17 #[doc_field(example = "kinetic-worker-1")]
18 pub client_id: Option<String>,
19
20 #[serde(default)]
22 #[doc_field(example = "{\"compression.type\": \"lz4\"}")]
23 pub properties: std::collections::HashMap<String, String>,
24}
25
26#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
28pub struct KafkaConsumerConfig {
29 pub client: KafkaClientConfig,
31
32 #[doc_field(required, example = "kinetic-ingest")]
34 pub group_id: String,
35
36 #[doc_field(required, example = "[\"events\", \"logs\"]")]
38 pub topics: Vec<String>,
39
40 #[serde(default = "default_auto_offset_reset")]
42 #[doc_field(default = "earliest", example = "latest")]
43 pub auto_offset_reset: String,
44}
45
46impl SourceConfig for KafkaConsumerConfig {
47 fn build(&self, _cx: SourceContext) -> anyhow::Result<JoinHandle<()>> {
48 Err(anyhow::anyhow!(
49 "Kafka source build must be implemented in the data plane"
50 ))
51 }
52}
53
54#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, ComponentDoc)]
56pub struct KafkaProducerConfig {
57 pub client: KafkaClientConfig,
59
60 #[doc_field(required, example = "events-ingest")]
62 pub topic: String,
63}
64
65impl SinkConfig for KafkaProducerConfig {
66 fn build(&self, _cx: SinkContext) -> anyhow::Result<JoinHandle<()>> {
67 Err(anyhow::anyhow!(
68 "Kafka sink build must be implemented in the data plane"
69 ))
70 }
71}
72
73fn default_auto_offset_reset() -> String {
74 "earliest".to_string()
75}