Skip to main content

kinetic/transforms/otel_aggregate/
task.rs

1//! OTel aggregate transform task.
2
3use crate::transforms::Transform;
4use crate::transforms::util::TransformErrorHandler;
5use async_trait::async_trait;
6use duckdb_engine::instance::Config as DuckDbConfig;
7use duckdb_engine::{DuckDbInstance, GenerationalSwap, Result, WindowTimer};
8use kinetic_buffers::{BufferReceiver, BufferSender};
9use kinetic_config::OtelAggregateConfig;
10use kinetic_config::model::ErrorPolicy;
11use kinetic_config::model::OtelAggregation;
12use kinetic_core::{ArcEventMetadata, ComponentId, EventBatch, EventMetadata};
13use metrics::{Label, counter};
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::Duration;
17use tracing::{debug, error, info, warn};
18
19const METRICS_SQL: &str = r#"
20SELECT
21    CAST(start_time_unix_nano AS BIGINT) as start_time_unix_nano,
22    CAST(end_time_unix_nano AS BIGINT) as end_time_unix_nano,
23    resource_attributes,
24    scope_attributes,
25    metric_name,
26    metric_description,
27    metric_unit,
28    CASE
29        WHEN metric_type = 'Sum' THEN 'Sum'
30        WHEN metric_type = 'Gauge' THEN 'Gauge'
31        ELSE 'Unknown'
32    END as metric_type,
33    CASE
34        WHEN metric_type = 'Sum' THEN SUM(sum_value)
35        WHEN metric_type = 'Gauge' THEN AVG(gauge_value)
36        ELSE NULL
37    END as value,
38    attributes
39FROM #{table_name}
40GROUP BY
41    start_time_unix_nano,
42    end_time_unix_nano,
43    resource_attributes,
44    scope_attributes,
45    metric_name,
46    metric_description,
47    metric_unit,
48    metric_type,
49    attributes;
50"#;
51
52const TRACES_SQL: &str = r#"
53SELECT
54    CAST(MIN(start_time_unix_nano) AS BIGINT) as start_time_unix_nano,
55    CAST(MAX(end_time_unix_nano) AS BIGINT) as end_time_unix_nano,
56    trace_id,
57    COUNT(*) as span_count,
58    MAX(CASE WHEN parent_span_id IS NULL THEN name ELSE NULL END) as root_span_name,
59    CAST(MAX(end_time_unix_nano) - MIN(start_time_unix_nano) AS BIGINT) as duration_ns,
60    LIST(DISTINCT name) as span_names,
61    LIST(DISTINCT CASE WHEN status_code = 'Error' THEN name ELSE NULL END) as error_spans,
62    COUNT(CASE WHEN status_code = 'Error' THEN 1 ELSE NULL END) as error_count
63FROM #{table_name}
64GROUP BY trace_id;
65"#;
66
67pub struct OtelAggregateTask {
68    receiver: BufferReceiver,
69    senders: HashMap<String, BufferSender>,
70    config: OtelAggregateConfig,
71    component_id: String,
72    pipeline_id: String,
73    window_duration: Duration,
74    error_handler: TransformErrorHandler,
75    labels: Arc<[Label]>,
76}
77
78#[async_trait]
79impl Transform for OtelAggregateTask {
80    async fn run(self: Box<Self>) {
81        self.run_task().await;
82    }
83}
84
85impl OtelAggregateTask {
86    pub fn new(
87        component_id: String,
88        pipeline_id: String,
89        receiver: BufferReceiver,
90        senders: HashMap<String, BufferSender>,
91        config: OtelAggregateConfig,
92        error_policy: ErrorPolicy,
93    ) -> Self {
94        let window_duration = parse_duration(&config.window_duration).unwrap_or_else(|e| {
95            warn!(
96                "Invalid window duration '{}', defaulting to 60s: {}",
97                config.window_duration, e
98            );
99            Duration::from_secs(60)
100        });
101
102        let error_handler = TransformErrorHandler::new(component_id.clone(), error_policy);
103        let labels: Arc<[Label]> = Arc::new([
104            Label::new("component_id", component_id.clone()),
105            Label::new("component_type", "transform"),
106            Label::new("component_kind", "otel_aggregate"),
107        ]);
108
109        Self {
110            receiver,
111            senders,
112            config,
113            component_id,
114            pipeline_id,
115            window_duration,
116            error_handler,
117            labels,
118        }
119    }
120
121    pub async fn run_task(mut self) {
122        info!(
123            "Starting OtelAggregate transform task: {} with {:?} aggregation",
124            self.component_id, self.config.aggregation
125        );
126
127        let duckdb_config = DuckDbConfig {
128            memory_limit: self
129                .config
130                .memory_limit
131                .clone()
132                .unwrap_or_else(|| "8GB".to_string()),
133            temp_directory: self.config.temp_directory.clone(),
134            enable_external_access: false,
135        };
136
137        let mut instance = match DuckDbInstance::new(duckdb_config) {
138            Ok(inst) => inst,
139            Err(e) => {
140                error!(
141                    "Failed to create DuckDB instance for {}: {}",
142                    self.component_id, e
143                );
144                return;
145            }
146        };
147
148        let mut generational_swap = GenerationalSwap::new();
149        let mut window_timer = WindowTimer::from_duration(self.window_duration);
150
151        loop {
152            tokio::select! {
153                biased;
154
155                _ = window_timer.tick() => {
156                    if let Err(e) = self.seal_and_aggregate(&mut instance, &mut generational_swap).await
157                        && !self
158                            .error_handler.handle_error(&self.senders, format!("seal_and_aggregate failed: {}", e), None)
159                            .await
160                        {
161                            break;
162                        }
163                }
164
165                maybe_batch = self.receiver.recv() => {
166                    match maybe_batch {
167                        Some(batch) => {
168                            counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
169                            counter!("component_received_event_bytes_total", self.labels.iter()).increment(batch.estimated_size() as u64);
170
171                            // Fork mode: send raw data to 'raw' output if it exists
172                            if let Some(raw_sender) = self.senders.get("raw")
173                                && let Err(e) = raw_sender.send(batch.clone()).await
174                            {
175                                error!(
176                                    "OtelAggregate {} failed to send raw batch: {:?}",
177                                    self.component_id, e
178                                );
179                            }
180
181                            if let Err(e) = self.process_batch(&mut instance, &mut generational_swap, &batch).await
182                                && !self
183                                    .error_handler.handle_error(&self.senders, format!("process_batch failed: {}", e), Some(&batch))
184                                    .await
185                                {
186                                    break;
187                                }
188
189                            if let Some(token) = batch.ack_token {
190                                token.ack();
191                            }
192                        }
193                        None => {
194                            info!("OtelAggregate transform task {} input channel closed", self.component_id);
195                            // Final aggregation before shutting down
196                            if let Err(e) = self.seal_and_aggregate(&mut instance, &mut generational_swap).await {
197                                let _ = self
198                                    .error_handler
199                                    .handle_error(
200                                        &self.senders,
201                                        format!("seal_and_aggregate on shutdown failed: {}", e),
202                                        None,
203                                    )
204                                    .await;
205                            }
206                            break;
207                        }
208                    }
209                }
210            }
211        }
212
213        info!(
214            "OtelAggregate transform task {} shutting down",
215            self.component_id
216        );
217    }
218
219    async fn process_batch(
220        &self,
221        instance: &mut DuckDbInstance,
222        swap: &mut GenerationalSwap,
223        batch: &EventBatch,
224    ) -> Result<()> {
225        debug!(
226            "OtelAggregate {} received batch of {} rows",
227            self.component_id,
228            batch.num_rows()
229        );
230
231        if !swap.is_initialized() {
232            swap.create_active_table_from_batch(instance.conn_mut(), &batch.payload)?;
233        }
234
235        swap.append_batch(instance.conn_mut(), &batch.payload)?;
236
237        Ok(())
238    }
239
240    async fn seal_and_aggregate(
241        &mut self,
242        instance: &mut DuckDbInstance,
243        swap: &mut GenerationalSwap,
244    ) -> Result<()> {
245        if !swap.has_data() {
246            debug!(
247                "OtelAggregate {} window closed but no data to aggregate",
248                self.component_id
249            );
250            return Ok(());
251        }
252
253        info!(
254            "OtelAggregate {} sealing window and running SQL",
255            self.component_id
256        );
257
258        let sealed_uuid = swap.seal(instance.conn_mut())?;
259        let sealed_table = format!("sealed_{}", sealed_uuid.simple());
260
261        let sql = self.aggregation_type_to_sql(&sealed_table);
262
263        debug!("OtelAggregate {} executing SQL: {}", self.component_id, sql);
264
265        let batches = instance.query_arrow(&sql)?;
266
267        let sender = self
268            .senders
269            .get("aggregated")
270            .or_else(|| self.senders.get("default"));
271
272        if let Some(sender) = sender {
273            for batch in batches {
274                if batch.num_rows() == 0 {
275                    continue;
276                }
277
278                let metadata = EventMetadata::new(
279                    self.pipeline_id.clone(),
280                    ComponentId(self.component_id.clone()),
281                );
282
283                match EventBatch::new(batch, ArcEventMetadata::new(metadata)) {
284                    Ok(event_batch) => {
285                        let rows = event_batch.num_rows();
286                        let bytes = event_batch.estimated_size();
287                        if let Err(e) = sender.send(event_batch).await {
288                            counter!("component_errors_total", self.labels.iter()).increment(1);
289                            error!(
290                                "OtelAggregate {} failed to send aggregated batch: {:?}",
291                                self.component_id, e
292                            );
293                        } else {
294                            counter!("component_sent_events_total", self.labels.iter())
295                                .increment(rows as u64);
296                            counter!("component_sent_event_bytes_total", self.labels.iter())
297                                .increment(bytes as u64);
298                        }
299                    }
300                    Err(e) => {
301                        counter!("component_errors_total", self.labels.iter()).increment(1);
302                        error!("Failed to create EventBatch in otel_aggregate: {}", e);
303                    }
304                }
305            }
306        }
307        duckdb_engine::generational::cleanup_sealed(instance.conn_mut(), &sealed_uuid)?;
308
309        info!(
310            "OtelAggregate {} completed window aggregation",
311            self.component_id
312        );
313
314        Ok(())
315    }
316
317    fn aggregation_type_to_sql(&self, table_name: &str) -> String {
318        match self.config.aggregation {
319            OtelAggregation::Metrics => METRICS_SQL.replace("#{table_name}", table_name),
320            OtelAggregation::Traces => TRACES_SQL.replace("#{table_name}", table_name),
321        }
322    }
323}
324
325fn parse_duration(s: &str) -> std::result::Result<Duration, String> {
326    kinetic_common::parse_duration(s).map_err(|e| e.to_string())
327}
328
329#[cfg(test)]
330#[allow(clippy::unwrap_used, clippy::expect_used)]
331mod tests {
332    use super::*;
333
334    #[test]
335    fn test_parse_duration() {
336        assert_eq!(parse_duration("15s").unwrap(), Duration::from_secs(15));
337        assert_eq!(parse_duration("5m").unwrap(), Duration::from_secs(300));
338        assert_eq!(parse_duration("100ms").unwrap(), Duration::from_millis(100));
339    }
340
341    #[test]
342    fn test_aggregation_type_to_sql() {
343        let metrics_config = OtelAggregateConfig {
344            aggregation: kinetic_config::model::OtelAggregation::Metrics,
345            window_duration: "15s".to_string(),
346            naming_convention: None,
347            otel_schema_version: None,
348            memory_limit: None,
349            temp_directory: None,
350            outputs: std::collections::HashMap::new(),
351        };
352        let (_, rx) =
353            kinetic_buffers::channel(1, kinetic_buffers::WhenFull::Block, "test".to_string());
354        let metrics_task = OtelAggregateTask {
355            receiver: rx,
356            senders: std::collections::HashMap::new(),
357            config: metrics_config,
358            component_id: "test".to_string(),
359            pipeline_id: "test_pipeline".to_string(),
360            window_duration: Duration::from_secs(15),
361            error_handler: TransformErrorHandler::new("test".to_string(), ErrorPolicy::DropOnError),
362            labels: Arc::new([]),
363        };
364        let sql = metrics_task.aggregation_type_to_sql("my_table");
365        assert!(sql.contains("FROM my_table"));
366        assert!(sql.contains("metric_name"));
367
368        let traces_config = OtelAggregateConfig {
369            aggregation: kinetic_config::model::OtelAggregation::Traces,
370            window_duration: "15s".to_string(),
371            naming_convention: None,
372            otel_schema_version: None,
373            memory_limit: None,
374            temp_directory: None,
375            outputs: std::collections::HashMap::new(),
376        };
377        let (_, traces_rx) = kinetic_buffers::channel(
378            1,
379            kinetic_buffers::WhenFull::Block,
380            "test_traces".to_string(),
381        );
382        let traces_task = OtelAggregateTask {
383            receiver: traces_rx,
384            senders: std::collections::HashMap::new(),
385            config: traces_config,
386            component_id: "test".to_string(),
387            pipeline_id: "test_pipeline".to_string(),
388            window_duration: Duration::from_secs(15),
389            error_handler: TransformErrorHandler::new("test".to_string(), ErrorPolicy::DropOnError),
390            labels: Arc::new([]),
391        };
392        let sql = traces_task.aggregation_type_to_sql("my_table");
393        assert!(sql.contains("FROM my_table"));
394        assert!(sql.contains("trace_id"));
395    }
396}