kinetic/sinks/opentelemetry/
client.rs1use kinetic_buffers::BufferReceiver;
4use kinetic_core::healthcheck::Healthcheck;
5use otel_common::config::OtelClientConfig;
6use otel_common::proto::{ExportLogsServiceRequest, LogsServiceClient};
7use reqwest::Url;
8use tracing::{debug, error, info};
9
10pub struct OtelSinkClient {
11 config: OtelClientConfig,
12 component_id: String,
13 receiver: BufferReceiver,
14}
15
16impl OtelSinkClient {
17 pub fn new(config: OtelClientConfig, component_id: String, receiver: BufferReceiver) -> Self {
18 Self {
19 config,
20 component_id,
21 receiver,
22 }
23 }
24
25 pub async fn run(mut self) {
26 info!(
27 "Starting OpenTelemetry gRPC sink '{}' to {}",
28 self.component_id, self.config.endpoint
29 );
30
31 let mut client = loop {
33 match LogsServiceClient::connect(self.config.endpoint.clone()).await {
34 Ok(c) => break c,
35 Err(e) => {
36 error!(
37 "Failed to connect to OTLP endpoint {}: {}",
38 self.config.endpoint, e
39 );
40 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
41 }
42 }
43 };
44
45 while let Some(batch) = self.receiver.recv().await {
46 debug!(
47 "OTLP sink {} sending batch of {} rows",
48 self.component_id,
49 batch.num_rows()
50 );
51
52 let req = tonic::Request::new(ExportLogsServiceRequest {
57 resource_logs: vec![],
58 });
59
60 match client.export(req).await {
61 Ok(response) => {
62 debug!("OTLP export successful: {:?}", response);
63 }
64 Err(e) => {
65 error!(
66 "OTLP export failed in component '{}': {}",
67 self.component_id, e
68 );
69 }
71 }
72 }
73
74 info!(
75 "OpenTelemetry gRPC sink '{}' shutting down",
76 self.component_id
77 );
78 }
79}
80
81#[async_trait::async_trait]
82impl Healthcheck for OtelSinkClient {
83 async fn check(&self) -> anyhow::Result<()> {
84 let url = Url::parse(&self.config.endpoint).map_err(|e| {
85 anyhow::anyhow!("Invalid OTLP endpoint '{}': {}", self.config.endpoint, e)
86 })?;
87
88 let host = url.host_str().ok_or_else(|| {
89 anyhow::anyhow!("Missing host in OTLP endpoint '{}'", self.config.endpoint)
90 })?;
91 let port = url.port_or_known_default().unwrap_or(4317);
92
93 let addr = format!("{}:{}", host, port);
94 tokio::time::timeout(
95 tokio::time::Duration::from_secs(5),
96 tokio::net::TcpStream::connect(&addr),
97 )
98 .await
99 .map_err(|e| anyhow::anyhow!("OTLP sink healthcheck timed out for {}: {}", addr, e))?
100 .map_err(|e| anyhow::anyhow!("OTLP sink healthcheck failed for {}: {}", addr, e))?;
101
102 Ok(())
103 }
104}