1use crate::transforms::Transform;
4use crate::transforms::util::TransformErrorHandler;
5use async_trait::async_trait;
6use duckdb_engine::instance::Config as DuckDbConfig;
7use duckdb_engine::{DuckDbInstance, GenerationalSwap, Result, WindowTimer};
8use kinetic_buffers::{BufferReceiver, BufferSender};
9use kinetic_config::OtelAggregateConfig;
10use kinetic_config::model::ErrorPolicy;
11use kinetic_config::model::OtelAggregation;
12use kinetic_core::{ArcEventMetadata, ComponentId, EventBatch, EventMetadata};
13use metrics::{Label, counter};
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::Duration;
17use tracing::{debug, error, info, warn};
18
19const METRICS_SQL: &str = r#"
20SELECT
21 CAST(start_time_unix_nano AS BIGINT) as start_time_unix_nano,
22 CAST(end_time_unix_nano AS BIGINT) as end_time_unix_nano,
23 resource_attributes,
24 scope_attributes,
25 metric_name,
26 metric_description,
27 metric_unit,
28 CASE
29 WHEN metric_type = 'Sum' THEN 'Sum'
30 WHEN metric_type = 'Gauge' THEN 'Gauge'
31 ELSE 'Unknown'
32 END as metric_type,
33 CASE
34 WHEN metric_type = 'Sum' THEN SUM(sum_value)
35 WHEN metric_type = 'Gauge' THEN AVG(gauge_value)
36 ELSE NULL
37 END as value,
38 attributes
39FROM #{table_name}
40GROUP BY
41 start_time_unix_nano,
42 end_time_unix_nano,
43 resource_attributes,
44 scope_attributes,
45 metric_name,
46 metric_description,
47 metric_unit,
48 metric_type,
49 attributes;
50"#;
51
52const TRACES_SQL: &str = r#"
53SELECT
54 CAST(MIN(start_time_unix_nano) AS BIGINT) as start_time_unix_nano,
55 CAST(MAX(end_time_unix_nano) AS BIGINT) as end_time_unix_nano,
56 trace_id,
57 COUNT(*) as span_count,
58 MAX(CASE WHEN parent_span_id IS NULL THEN name ELSE NULL END) as root_span_name,
59 CAST(MAX(end_time_unix_nano) - MIN(start_time_unix_nano) AS BIGINT) as duration_ns,
60 LIST(DISTINCT name) as span_names,
61 LIST(DISTINCT CASE WHEN status_code = 'Error' THEN name ELSE NULL END) as error_spans,
62 COUNT(CASE WHEN status_code = 'Error' THEN 1 ELSE NULL END) as error_count
63FROM #{table_name}
64GROUP BY trace_id;
65"#;
66
67pub struct OtelAggregateTask {
68 receiver: BufferReceiver,
69 senders: HashMap<String, BufferSender>,
70 config: OtelAggregateConfig,
71 component_id: String,
72 pipeline_id: String,
73 window_duration: Duration,
74 error_handler: TransformErrorHandler,
75 labels: Arc<[Label]>,
76}
77
78#[async_trait]
79impl Transform for OtelAggregateTask {
80 async fn run(self: Box<Self>) {
81 self.run_task().await;
82 }
83}
84
85impl OtelAggregateTask {
86 pub fn new(
87 component_id: String,
88 pipeline_id: String,
89 receiver: BufferReceiver,
90 senders: HashMap<String, BufferSender>,
91 config: OtelAggregateConfig,
92 error_policy: ErrorPolicy,
93 ) -> Self {
94 let window_duration = parse_duration(&config.window_duration).unwrap_or_else(|e| {
95 warn!(
96 "Invalid window duration '{}', defaulting to 60s: {}",
97 config.window_duration, e
98 );
99 Duration::from_secs(60)
100 });
101
102 let error_handler = TransformErrorHandler::new(component_id.clone(), error_policy);
103 let labels: Arc<[Label]> = Arc::new([
104 Label::new("component_id", component_id.clone()),
105 Label::new("component_type", "transform"),
106 Label::new("component_kind", "otel_aggregate"),
107 ]);
108
109 Self {
110 receiver,
111 senders,
112 config,
113 component_id,
114 pipeline_id,
115 window_duration,
116 error_handler,
117 labels,
118 }
119 }
120
121 pub async fn run_task(mut self) {
122 info!(
123 "Starting OtelAggregate transform task: {} with {:?} aggregation",
124 self.component_id, self.config.aggregation
125 );
126
127 let duckdb_config = DuckDbConfig {
128 memory_limit: self
129 .config
130 .memory_limit
131 .clone()
132 .unwrap_or_else(|| "8GB".to_string()),
133 temp_directory: self.config.temp_directory.clone(),
134 enable_external_access: false,
135 };
136
137 let mut instance = match DuckDbInstance::new(duckdb_config) {
138 Ok(inst) => inst,
139 Err(e) => {
140 error!(
141 "Failed to create DuckDB instance for {}: {}",
142 self.component_id, e
143 );
144 return;
145 }
146 };
147
148 let mut generational_swap = GenerationalSwap::new();
149 let mut window_timer = WindowTimer::from_duration(self.window_duration);
150
151 loop {
152 tokio::select! {
153 biased;
154
155 _ = window_timer.tick() => {
156 if let Err(e) = self.seal_and_aggregate(&mut instance, &mut generational_swap).await
157 && !self
158 .error_handler.handle_error(&self.senders, format!("seal_and_aggregate failed: {}", e), None)
159 .await
160 {
161 break;
162 }
163 }
164
165 maybe_batch = self.receiver.recv() => {
166 match maybe_batch {
167 Some(batch) => {
168 counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
169 counter!("component_received_event_bytes_total", self.labels.iter()).increment(batch.estimated_size() as u64);
170
171 if let Some(raw_sender) = self.senders.get("raw")
173 && let Err(e) = raw_sender.send(batch.clone()).await
174 {
175 error!(
176 "OtelAggregate {} failed to send raw batch: {:?}",
177 self.component_id, e
178 );
179 }
180
181 if let Err(e) = self.process_batch(&mut instance, &mut generational_swap, &batch).await
182 && !self
183 .error_handler.handle_error(&self.senders, format!("process_batch failed: {}", e), Some(&batch))
184 .await
185 {
186 break;
187 }
188
189 if let Some(token) = batch.ack_token {
190 token.ack();
191 }
192 }
193 None => {
194 info!("OtelAggregate transform task {} input channel closed", self.component_id);
195 if let Err(e) = self.seal_and_aggregate(&mut instance, &mut generational_swap).await {
197 let _ = self
198 .error_handler
199 .handle_error(
200 &self.senders,
201 format!("seal_and_aggregate on shutdown failed: {}", e),
202 None,
203 )
204 .await;
205 }
206 break;
207 }
208 }
209 }
210 }
211 }
212
213 info!(
214 "OtelAggregate transform task {} shutting down",
215 self.component_id
216 );
217 }
218
219 async fn process_batch(
220 &self,
221 instance: &mut DuckDbInstance,
222 swap: &mut GenerationalSwap,
223 batch: &EventBatch,
224 ) -> Result<()> {
225 debug!(
226 "OtelAggregate {} received batch of {} rows",
227 self.component_id,
228 batch.num_rows()
229 );
230
231 if !swap.is_initialized() {
232 swap.create_active_table_from_batch(instance.conn_mut(), &batch.payload)?;
233 }
234
235 swap.append_batch(instance.conn_mut(), &batch.payload)?;
236
237 Ok(())
238 }
239
240 async fn seal_and_aggregate(
241 &mut self,
242 instance: &mut DuckDbInstance,
243 swap: &mut GenerationalSwap,
244 ) -> Result<()> {
245 if !swap.has_data() {
246 debug!(
247 "OtelAggregate {} window closed but no data to aggregate",
248 self.component_id
249 );
250 return Ok(());
251 }
252
253 info!(
254 "OtelAggregate {} sealing window and running SQL",
255 self.component_id
256 );
257
258 let sealed_uuid = swap.seal(instance.conn_mut())?;
259 let sealed_table = format!("sealed_{}", sealed_uuid.simple());
260
261 let sql = self.aggregation_type_to_sql(&sealed_table);
262
263 debug!("OtelAggregate {} executing SQL: {}", self.component_id, sql);
264
265 let batches = instance.query_arrow(&sql)?;
266
267 let sender = self
268 .senders
269 .get("aggregated")
270 .or_else(|| self.senders.get("default"));
271
272 if let Some(sender) = sender {
273 for batch in batches {
274 if batch.num_rows() == 0 {
275 continue;
276 }
277
278 let metadata = EventMetadata::new(
279 self.pipeline_id.clone(),
280 ComponentId(self.component_id.clone()),
281 );
282
283 match EventBatch::new(batch, ArcEventMetadata::new(metadata)) {
284 Ok(event_batch) => {
285 let rows = event_batch.num_rows();
286 let bytes = event_batch.estimated_size();
287 if let Err(e) = sender.send(event_batch).await {
288 counter!("component_errors_total", self.labels.iter()).increment(1);
289 error!(
290 "OtelAggregate {} failed to send aggregated batch: {:?}",
291 self.component_id, e
292 );
293 } else {
294 counter!("component_sent_events_total", self.labels.iter())
295 .increment(rows as u64);
296 counter!("component_sent_event_bytes_total", self.labels.iter())
297 .increment(bytes as u64);
298 }
299 }
300 Err(e) => {
301 counter!("component_errors_total", self.labels.iter()).increment(1);
302 error!("Failed to create EventBatch in otel_aggregate: {}", e);
303 }
304 }
305 }
306 }
307 duckdb_engine::generational::cleanup_sealed(instance.conn_mut(), &sealed_uuid)?;
308
309 info!(
310 "OtelAggregate {} completed window aggregation",
311 self.component_id
312 );
313
314 Ok(())
315 }
316
317 fn aggregation_type_to_sql(&self, table_name: &str) -> String {
318 match self.config.aggregation {
319 OtelAggregation::Metrics => METRICS_SQL.replace("#{table_name}", table_name),
320 OtelAggregation::Traces => TRACES_SQL.replace("#{table_name}", table_name),
321 }
322 }
323}
324
325fn parse_duration(s: &str) -> std::result::Result<Duration, String> {
326 kinetic_common::parse_duration(s).map_err(|e| e.to_string())
327}
328
329#[cfg(test)]
330#[allow(clippy::unwrap_used, clippy::expect_used)]
331mod tests {
332 use super::*;
333
334 #[test]
335 fn test_parse_duration() {
336 assert_eq!(parse_duration("15s").unwrap(), Duration::from_secs(15));
337 assert_eq!(parse_duration("5m").unwrap(), Duration::from_secs(300));
338 assert_eq!(parse_duration("100ms").unwrap(), Duration::from_millis(100));
339 }
340
341 #[test]
342 fn test_aggregation_type_to_sql() {
343 let metrics_config = OtelAggregateConfig {
344 aggregation: kinetic_config::model::OtelAggregation::Metrics,
345 window_duration: "15s".to_string(),
346 naming_convention: None,
347 otel_schema_version: None,
348 memory_limit: None,
349 temp_directory: None,
350 outputs: std::collections::HashMap::new(),
351 };
352 let (_, rx) =
353 kinetic_buffers::channel(1, kinetic_buffers::WhenFull::Block, "test".to_string());
354 let metrics_task = OtelAggregateTask {
355 receiver: rx,
356 senders: std::collections::HashMap::new(),
357 config: metrics_config,
358 component_id: "test".to_string(),
359 pipeline_id: "test_pipeline".to_string(),
360 window_duration: Duration::from_secs(15),
361 error_handler: TransformErrorHandler::new("test".to_string(), ErrorPolicy::DropOnError),
362 labels: Arc::new([]),
363 };
364 let sql = metrics_task.aggregation_type_to_sql("my_table");
365 assert!(sql.contains("FROM my_table"));
366 assert!(sql.contains("metric_name"));
367
368 let traces_config = OtelAggregateConfig {
369 aggregation: kinetic_config::model::OtelAggregation::Traces,
370 window_duration: "15s".to_string(),
371 naming_convention: None,
372 otel_schema_version: None,
373 memory_limit: None,
374 temp_directory: None,
375 outputs: std::collections::HashMap::new(),
376 };
377 let (_, traces_rx) = kinetic_buffers::channel(
378 1,
379 kinetic_buffers::WhenFull::Block,
380 "test_traces".to_string(),
381 );
382 let traces_task = OtelAggregateTask {
383 receiver: traces_rx,
384 senders: std::collections::HashMap::new(),
385 config: traces_config,
386 component_id: "test".to_string(),
387 pipeline_id: "test_pipeline".to_string(),
388 window_duration: Duration::from_secs(15),
389 error_handler: TransformErrorHandler::new("test".to_string(), ErrorPolicy::DropOnError),
390 labels: Arc::new([]),
391 };
392 let sql = traces_task.aggregation_type_to_sql("my_table");
393 assert!(sql.contains("FROM my_table"));
394 assert!(sql.contains("trace_id"));
395 }
396}