Skip to main content

kinetic/transforms/otel_aggregate/
sql_template.rs

1//! SQL template generation for OTel aggregation types.
2//!
3//! Generates DuckDB SQL queries for different aggregation types.
4
5use kinetic_config::model::AggregationType;
6
7/// SQL template for count aggregation
8const COUNT_TEMPLATE: &str = r#"
9SELECT
10    resource_attributes,
11    scope_attributes,
12    metric_name,
13    metric_description,
14    metric_unit,
15    CAST({window_start} AS BIGINT) as start_time_unix_nano,
16    CAST({window_end} AS BIGINT) as time_unix_nano,
17    COUNT(*) as count,
18    NULL::DOUBLE as sum,
19    NULL::DOUBLE as min,
20    NULL::DOUBLE as max,
21    NULL::DOUBLE[] as bucket_bounds,
22    NULL::BIGINT[] as bucket_counts
23FROM {table_name}
24WHERE time_unix_nano >= {window_start} AND time_unix_nano < {window_end}
25GROUP BY
26    resource_attributes,
27    scope_attributes,
28    metric_name,
29    metric_description,
30    metric_unit
31"#;
32
33/// SQL template for histogram aggregation
34const HISTOGRAM_TEMPLATE: &str = r#"
35SELECT
36    resource_attributes,
37    scope_attributes,
38    metric_name,
39    metric_description,
40    metric_unit,
41    CAST({window_start} AS BIGINT) as start_time_unix_nano,
42    CAST({window_end} AS BIGINT) as time_unix_nano,
43    SUM(count) as count,
44    SUM(sum) as sum,
45    MIN(min) as min,
46    MAX(max) as max,
47    bucket_bounds,
48    bucket_counts
49FROM {table_name}
50WHERE time_unix_nano >= {window_start} AND time_unix_nano < {window_end}
51GROUP BY
52    resource_attributes,
53    scope_attributes,
54    metric_name,
55    metric_description,
56    metric_unit,
57    bucket_bounds,
58    bucket_counts
59"#;
60
61/// SQL template for summary aggregation
62const SUMMARY_TEMPLATE: &str = r#"
63SELECT
64    resource_attributes,
65    scope_attributes,
66    metric_name,
67    metric_description,
68    metric_unit,
69    CAST({window_start} AS BIGINT) as start_time_unix_nano,
70    CAST({window_end} AS BIGINT) as time_unix_nano,
71    SUM(count) as count,
72    SUM(sum) as sum,
73    MIN(min) as min,
74    MAX(max) as max,
75    NULL::DOUBLE[] as bucket_bounds,
76    NULL::BIGINT[] as bucket_counts
77FROM {table_name}
78WHERE time_unix_nano >= {window_start} AND time_unix_nano < {window_end}
79GROUP BY
80    resource_attributes,
81    scope_attributes,
82    metric_name,
83    metric_description,
84    metric_unit
85"#;
86
87/// SQL template for exponential histogram aggregation
88const EXPONENTIAL_HISTOGRAM_TEMPLATE: &str = r#"
89SELECT
90    resource_attributes,
91    scope_attributes,
92    metric_name,
93    metric_description,
94    metric_unit,
95    CAST({window_start} AS BIGINT) as start_time_unix_nano,
96    CAST({window_end} AS BIGINT) as time_unix_nano,
97    SUM(count) as count,
98    SUM(sum) as sum,
99    MIN(min) as min,
100    MAX(max) as max,
101    NULL::DOUBLE[] as bucket_bounds,
102    NULL::BIGINT[] as bucket_counts
103FROM {table_name}
104WHERE time_unix_nano >= {window_start} AND time_unix_nano < {window_end}
105GROUP BY
106    resource_attributes,
107    scope_attributes,
108    metric_name,
109    metric_description,
110    metric_unit
111"#;
112
113/// SQL template for generating output metric names
114pub struct SqlTemplate;
115
116impl SqlTemplate {
117    /// Generate SQL for the given aggregation type
118    pub fn generate(
119        aggregation_type: &AggregationType,
120        table_name: &str,
121        window_start: i64,
122        window_end: i64,
123    ) -> String {
124        let template = match aggregation_type {
125            AggregationType::Count => COUNT_TEMPLATE,
126            AggregationType::Histogram => HISTOGRAM_TEMPLATE,
127            AggregationType::Summary => SUMMARY_TEMPLATE,
128            AggregationType::ExponentialHistogram => EXPONENTIAL_HISTOGRAM_TEMPLATE,
129        };
130
131        template
132            .replace("{table_name}", table_name)
133            .replace("{window_start}", &window_start.to_string())
134            .replace("{window_end}", &window_end.to_string())
135    }
136
137    /// Get a SQL template for creating the active table with OTel schema
138    pub fn create_active_table_sql(table_name: &str) -> String {
139        crate::transforms::otel_aggregate::OtelSchema::create_table_sql(table_name)
140    }
141}
142
143#[cfg(test)]
144#[allow(clippy::unwrap_used, clippy::expect_used)]
145mod tests {
146    use super::*;
147
148    #[test]
149    fn test_count_template() {
150        let sql = SqlTemplate::generate(&AggregationType::Count, "sealed_abc123", 1000000, 2000000);
151        assert!(sql.contains("sealed_abc123"));
152        assert!(sql.contains("1000000"));
153        assert!(sql.contains("2000000"));
154        assert!(sql.contains("COUNT(*)"));
155    }
156
157    #[test]
158    fn test_histogram_template() {
159        let sql = SqlTemplate::generate(
160            &AggregationType::Histogram,
161            "sealed_abc123",
162            1000000,
163            2000000,
164        );
165        assert!(sql.contains("sealed_abc123"));
166        assert!(sql.contains("SUM(count)"));
167        assert!(sql.contains("SUM(sum)"));
168    }
169
170    #[test]
171    fn test_summary_template() {
172        let sql =
173            SqlTemplate::generate(&AggregationType::Summary, "sealed_abc123", 1000000, 2000000);
174        assert!(sql.contains("sealed_abc123"));
175        assert!(sql.contains("SUM(count)"));
176        assert!(sql.contains("SUM(sum)"));
177    }
178
179    #[test]
180    fn test_exponential_histogram_template() {
181        let sql = SqlTemplate::generate(
182            &AggregationType::ExponentialHistogram,
183            "sealed_abc123",
184            1000000,
185            2000000,
186        );
187        assert!(sql.contains("sealed_abc123"));
188        assert!(sql.contains("SUM(count)"));
189    }
190}