Skip to main content

kinetic/sources/opentelemetry/
server.rs

1//! OpenTelemetry gRPC server for receiving logs, metrics, and traces.
2
3use arrow_array::{RecordBatch, StringArray};
4use arrow_schema::{DataType, Field, Schema};
5use kinetic_buffers::BufferSender;
6use kinetic_core::healthcheck::Healthcheck;
7use kinetic_core::{ComponentId, EventBatch, EventMetadata};
8use metrics::{Label, counter};
9use otel_common::config::OtelServerConfig;
10use otel_common::proto::{
11    ExportLogsServiceRequest, ExportLogsServiceResponse, LogsService, LogsServiceServer,
12};
13use prost::Message;
14use std::sync::Arc;
15use tonic::{Request, Response, Status};
16use tracing::{error, info};
17
18use kinetic_common::register;
19use kinetic_common::telemetry::EventDuration;
20use std::time::Instant;
21
22pub struct OtelSourceServer {
23    config: OtelServerConfig,
24    component_id: String,
25    logs_sender: Option<BufferSender>,
26    #[allow(dead_code)]
27    metrics_sender: Option<BufferSender>,
28    #[allow(dead_code)]
29    traces_sender: Option<BufferSender>,
30    labels: Arc<[Label]>,
31    event_duration: EventDuration,
32}
33
34impl OtelSourceServer {
35    pub fn new(
36        config: OtelServerConfig,
37        component_id: String,
38        logs_sender: Option<BufferSender>,
39        metrics_sender: Option<BufferSender>,
40        traces_sender: Option<BufferSender>,
41    ) -> Self {
42        let labels: Arc<[Label]> = Arc::new([
43            Label::new("component_id", component_id.clone()),
44            Label::new("component_type", "source"),
45            Label::new("component_kind", "opentelemetry"),
46        ]);
47        let event_duration = register!(EventDuration::new(component_id.clone(), "source"));
48        Self {
49            config,
50            component_id,
51            logs_sender,
52            metrics_sender,
53            traces_sender,
54            labels,
55            event_duration,
56        }
57    }
58
59    pub async fn run(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
60        let addr: std::net::SocketAddr = self.config.grpc_endpoint.parse()?;
61        info!(
62            "Starting OpenTelemetry gRPC source '{}' on {}",
63            self.component_id, addr
64        );
65
66        let mut router = tonic::transport::Server::builder();
67
68        // Register services if we have senders for them
69        let server_builder = router.add_service(LogsServiceServer::new(LogsSvc {
70            sender: self.logs_sender.clone(),
71            component_id: self.component_id.clone(),
72            labels: self.labels.clone(),
73            event_duration: self.event_duration.clone(),
74        }));
75
76        // In a real implementation we would conditionally add services based on configuration
77        // and routing, but for now we just add them all if they are requested.
78        // It's a bit tricky with `tonic::transport::Server` because `add_service` changes the type.
79        // We will just start a basic logs server for this iteration to prove the pipeline.
80
81        server_builder.serve(addr).await?;
82
83        Ok(())
84    }
85}
86
87#[async_trait::async_trait]
88impl Healthcheck for OtelSourceServer {
89    async fn check(&self) -> anyhow::Result<()> {
90        let _addr: std::net::SocketAddr = self.config.grpc_endpoint.parse().map_err(|e| {
91            anyhow::anyhow!(
92                "Invalid OTel gRPC endpoint '{}': {}",
93                self.config.grpc_endpoint,
94                e
95            )
96        })?;
97        Ok(())
98    }
99}
100
101// Minimal implementation of LogsService to accept incoming OTLP logs
102struct LogsSvc {
103    sender: Option<BufferSender>,
104    component_id: String,
105    labels: Arc<[Label]>,
106    event_duration: EventDuration,
107}
108
109#[tonic::async_trait]
110impl LogsService for LogsSvc {
111    async fn export(
112        &self,
113        request: Request<ExportLogsServiceRequest>,
114    ) -> Result<Response<ExportLogsServiceResponse>, Status> {
115        let start = Instant::now();
116        let req = request.into_inner();
117
118        // Increment network bytes received
119        let byte_size = req.encoded_len();
120        counter!("component_received_network_bytes_total", self.labels.iter())
121            .increment(byte_size as u64);
122
123        if let Some(sender) = &self.sender {
124            // Count incoming logs (dummy decoding for now)
125            let mut log_count = 0;
126            for resource_logs in req.resource_logs {
127                for scope_logs in resource_logs.scope_logs {
128                    log_count += scope_logs.log_records.len();
129                }
130            }
131
132            if log_count > 0 {
133                // In a real implementation, we would decode OTLP to Arrow.
134                // For now, we simulate this.
135                let metadata = Arc::new(EventMetadata::new(
136                    "opentelemetry",
137                    ComponentId(self.component_id.clone()),
138                ));
139
140                let schema = Arc::new(Schema::new(vec![Field::new(
141                    "message",
142                    DataType::Utf8,
143                    false,
144                )]));
145
146                let mut messages = Vec::new();
147                messages.extend(std::iter::repeat_n("stub otel log message", log_count));
148
149                let array = StringArray::from(messages);
150                if let Ok(rb) = RecordBatch::try_new(schema, vec![Arc::new(array)]) {
151                    let batch = match EventBatch::new_with_xid(rb, metadata) {
152                        Ok(b) => b,
153                        Err(e) => {
154                            error!("Failed to create EventBatch from OTLP logs: {}", e);
155                            self.event_duration.emit(start.elapsed());
156                            return Err(Status::internal("Failed to create EventBatch"));
157                        }
158                    };
159                    if let Err(e) = sender.send(batch).await {
160                        error!("Failed to send OTLP logs downstream: {:?}", e);
161                        self.event_duration.emit(start.elapsed());
162                        return Err(Status::internal("Failed to enqueue logs"));
163                    }
164                }
165            }
166        }
167
168        self.event_duration.emit(start.elapsed());
169        Ok(Response::new(ExportLogsServiceResponse {
170            partial_success: None,
171        }))
172    }
173}