Skip to main content

kinetic/sinks/opentelemetry/
client.rs

1//! OpenTelemetry gRPC client for exporting logs, metrics, and traces.
2
3use kinetic_buffers::BufferReceiver;
4use kinetic_core::healthcheck::Healthcheck;
5use otel_common::config::OtelClientConfig;
6use otel_common::proto::{ExportLogsServiceRequest, LogsServiceClient};
7use reqwest::Url;
8use tracing::{debug, error, info};
9
10pub struct OtelSinkClient {
11    config: OtelClientConfig,
12    component_id: String,
13    receiver: BufferReceiver,
14}
15
16impl OtelSinkClient {
17    pub fn new(config: OtelClientConfig, component_id: String, receiver: BufferReceiver) -> Self {
18        Self {
19            config,
20            component_id,
21            receiver,
22        }
23    }
24
25    pub async fn run(mut self) {
26        info!(
27            "Starting OpenTelemetry gRPC sink '{}' to {}",
28            self.component_id, self.config.endpoint
29        );
30
31        // Simple reconnect loop
32        let mut client = loop {
33            match LogsServiceClient::connect(self.config.endpoint.clone()).await {
34                Ok(c) => break c,
35                Err(e) => {
36                    error!(
37                        "Failed to connect to OTLP endpoint {}: {}",
38                        self.config.endpoint, e
39                    );
40                    tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
41                }
42            }
43        };
44
45        while let Some(batch) = self.receiver.recv().await {
46            debug!(
47                "OTLP sink {} sending batch of {} rows",
48                self.component_id,
49                batch.num_rows()
50            );
51
52            // In a real implementation we would convert the Arrow RecordBatch back into
53            // OTLP protobufs (e.g. ExportLogsServiceRequest).
54            // Here we just send an empty request to prove the connectivity pipeline.
55
56            let req = tonic::Request::new(ExportLogsServiceRequest {
57                resource_logs: vec![],
58            });
59
60            match client.export(req).await {
61                Ok(response) => {
62                    debug!("OTLP export successful: {:?}", response);
63                }
64                Err(e) => {
65                    error!(
66                        "OTLP export failed in component '{}': {}",
67                        self.component_id, e
68                    );
69                    // Depending on retry configuration, we could push back to a retry queue.
70                }
71            }
72        }
73
74        info!(
75            "OpenTelemetry gRPC sink '{}' shutting down",
76            self.component_id
77        );
78    }
79}
80
81#[async_trait::async_trait]
82impl Healthcheck for OtelSinkClient {
83    async fn check(&self) -> anyhow::Result<()> {
84        let url = Url::parse(&self.config.endpoint).map_err(|e| {
85            anyhow::anyhow!("Invalid OTLP endpoint '{}': {}", self.config.endpoint, e)
86        })?;
87
88        let host = url.host_str().ok_or_else(|| {
89            anyhow::anyhow!("Missing host in OTLP endpoint '{}'", self.config.endpoint)
90        })?;
91        let port = url.port_or_known_default().unwrap_or(4317);
92
93        let addr = format!("{}:{}", host, port);
94        tokio::time::timeout(
95            tokio::time::Duration::from_secs(5),
96            tokio::net::TcpStream::connect(&addr),
97        )
98        .await
99        .map_err(|e| anyhow::anyhow!("OTLP sink healthcheck timed out for {}: {}", addr, e))?
100        .map_err(|e| anyhow::anyhow!("OTLP sink healthcheck failed for {}: {}", addr, e))?;
101
102        Ok(())
103    }
104}