kinetic/sources/opentelemetry/
server.rs1use arrow_array::{RecordBatch, StringArray};
4use arrow_schema::{DataType, Field, Schema};
5use kinetic_buffers::BufferSender;
6use kinetic_core::healthcheck::Healthcheck;
7use kinetic_core::{ComponentId, EventBatch, EventMetadata};
8use metrics::{Label, counter};
9use otel_common::config::OtelServerConfig;
10use otel_common::proto::{
11 ExportLogsServiceRequest, ExportLogsServiceResponse, LogsService, LogsServiceServer,
12};
13use prost::Message;
14use std::sync::Arc;
15use tonic::{Request, Response, Status};
16use tracing::{error, info};
17
18use kinetic_common::register;
19use kinetic_common::telemetry::EventDuration;
20use std::time::Instant;
21
22pub struct OtelSourceServer {
23 config: OtelServerConfig,
24 component_id: String,
25 logs_sender: Option<BufferSender>,
26 #[allow(dead_code)]
27 metrics_sender: Option<BufferSender>,
28 #[allow(dead_code)]
29 traces_sender: Option<BufferSender>,
30 labels: Arc<[Label]>,
31 event_duration: EventDuration,
32}
33
34impl OtelSourceServer {
35 pub fn new(
36 config: OtelServerConfig,
37 component_id: String,
38 logs_sender: Option<BufferSender>,
39 metrics_sender: Option<BufferSender>,
40 traces_sender: Option<BufferSender>,
41 ) -> Self {
42 let labels: Arc<[Label]> = Arc::new([
43 Label::new("component_id", component_id.clone()),
44 Label::new("component_type", "source"),
45 Label::new("component_kind", "opentelemetry"),
46 ]);
47 let event_duration = register!(EventDuration::new(component_id.clone(), "source"));
48 Self {
49 config,
50 component_id,
51 logs_sender,
52 metrics_sender,
53 traces_sender,
54 labels,
55 event_duration,
56 }
57 }
58
59 pub async fn run(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
60 let addr: std::net::SocketAddr = self.config.grpc_endpoint.parse()?;
61 info!(
62 "Starting OpenTelemetry gRPC source '{}' on {}",
63 self.component_id, addr
64 );
65
66 let mut router = tonic::transport::Server::builder();
67
68 let server_builder = router.add_service(LogsServiceServer::new(LogsSvc {
70 sender: self.logs_sender.clone(),
71 component_id: self.component_id.clone(),
72 labels: self.labels.clone(),
73 event_duration: self.event_duration.clone(),
74 }));
75
76 server_builder.serve(addr).await?;
82
83 Ok(())
84 }
85}
86
87#[async_trait::async_trait]
88impl Healthcheck for OtelSourceServer {
89 async fn check(&self) -> anyhow::Result<()> {
90 let _addr: std::net::SocketAddr = self.config.grpc_endpoint.parse().map_err(|e| {
91 anyhow::anyhow!(
92 "Invalid OTel gRPC endpoint '{}': {}",
93 self.config.grpc_endpoint,
94 e
95 )
96 })?;
97 Ok(())
98 }
99}
100
101struct LogsSvc {
103 sender: Option<BufferSender>,
104 component_id: String,
105 labels: Arc<[Label]>,
106 event_duration: EventDuration,
107}
108
109#[tonic::async_trait]
110impl LogsService for LogsSvc {
111 async fn export(
112 &self,
113 request: Request<ExportLogsServiceRequest>,
114 ) -> Result<Response<ExportLogsServiceResponse>, Status> {
115 let start = Instant::now();
116 let req = request.into_inner();
117
118 let byte_size = req.encoded_len();
120 counter!("component_received_network_bytes_total", self.labels.iter())
121 .increment(byte_size as u64);
122
123 if let Some(sender) = &self.sender {
124 let mut log_count = 0;
126 for resource_logs in req.resource_logs {
127 for scope_logs in resource_logs.scope_logs {
128 log_count += scope_logs.log_records.len();
129 }
130 }
131
132 if log_count > 0 {
133 let metadata = Arc::new(EventMetadata::new(
136 "opentelemetry",
137 ComponentId(self.component_id.clone()),
138 ));
139
140 let schema = Arc::new(Schema::new(vec![Field::new(
141 "message",
142 DataType::Utf8,
143 false,
144 )]));
145
146 let mut messages = Vec::new();
147 messages.extend(std::iter::repeat_n("stub otel log message", log_count));
148
149 let array = StringArray::from(messages);
150 if let Ok(rb) = RecordBatch::try_new(schema, vec![Arc::new(array)]) {
151 let batch = match EventBatch::new_with_xid(rb, metadata) {
152 Ok(b) => b,
153 Err(e) => {
154 error!("Failed to create EventBatch from OTLP logs: {}", e);
155 self.event_duration.emit(start.elapsed());
156 return Err(Status::internal("Failed to create EventBatch"));
157 }
158 };
159 if let Err(e) = sender.send(batch).await {
160 error!("Failed to send OTLP logs downstream: {:?}", e);
161 self.event_duration.emit(start.elapsed());
162 return Err(Status::internal("Failed to enqueue logs"));
163 }
164 }
165 }
166 }
167
168 self.event_duration.emit(start.elapsed());
169 Ok(Response::new(ExportLogsServiceResponse {
170 partial_success: None,
171 }))
172 }
173}