kinetic/transforms/otel_aggregate/
sql_template.rs1use kinetic_config::model::AggregationType;
6
7const COUNT_TEMPLATE: &str = r#"
9SELECT
10 resource_attributes,
11 scope_attributes,
12 metric_name,
13 metric_description,
14 metric_unit,
15 CAST({window_start} AS BIGINT) as start_time_unix_nano,
16 CAST({window_end} AS BIGINT) as time_unix_nano,
17 COUNT(*) as count,
18 NULL::DOUBLE as sum,
19 NULL::DOUBLE as min,
20 NULL::DOUBLE as max,
21 NULL::DOUBLE[] as bucket_bounds,
22 NULL::BIGINT[] as bucket_counts
23FROM {table_name}
24WHERE time_unix_nano >= {window_start} AND time_unix_nano < {window_end}
25GROUP BY
26 resource_attributes,
27 scope_attributes,
28 metric_name,
29 metric_description,
30 metric_unit
31"#;
32
33const HISTOGRAM_TEMPLATE: &str = r#"
35SELECT
36 resource_attributes,
37 scope_attributes,
38 metric_name,
39 metric_description,
40 metric_unit,
41 CAST({window_start} AS BIGINT) as start_time_unix_nano,
42 CAST({window_end} AS BIGINT) as time_unix_nano,
43 SUM(count) as count,
44 SUM(sum) as sum,
45 MIN(min) as min,
46 MAX(max) as max,
47 bucket_bounds,
48 bucket_counts
49FROM {table_name}
50WHERE time_unix_nano >= {window_start} AND time_unix_nano < {window_end}
51GROUP BY
52 resource_attributes,
53 scope_attributes,
54 metric_name,
55 metric_description,
56 metric_unit,
57 bucket_bounds,
58 bucket_counts
59"#;
60
61const SUMMARY_TEMPLATE: &str = r#"
63SELECT
64 resource_attributes,
65 scope_attributes,
66 metric_name,
67 metric_description,
68 metric_unit,
69 CAST({window_start} AS BIGINT) as start_time_unix_nano,
70 CAST({window_end} AS BIGINT) as time_unix_nano,
71 SUM(count) as count,
72 SUM(sum) as sum,
73 MIN(min) as min,
74 MAX(max) as max,
75 NULL::DOUBLE[] as bucket_bounds,
76 NULL::BIGINT[] as bucket_counts
77FROM {table_name}
78WHERE time_unix_nano >= {window_start} AND time_unix_nano < {window_end}
79GROUP BY
80 resource_attributes,
81 scope_attributes,
82 metric_name,
83 metric_description,
84 metric_unit
85"#;
86
87const EXPONENTIAL_HISTOGRAM_TEMPLATE: &str = r#"
89SELECT
90 resource_attributes,
91 scope_attributes,
92 metric_name,
93 metric_description,
94 metric_unit,
95 CAST({window_start} AS BIGINT) as start_time_unix_nano,
96 CAST({window_end} AS BIGINT) as time_unix_nano,
97 SUM(count) as count,
98 SUM(sum) as sum,
99 MIN(min) as min,
100 MAX(max) as max,
101 NULL::DOUBLE[] as bucket_bounds,
102 NULL::BIGINT[] as bucket_counts
103FROM {table_name}
104WHERE time_unix_nano >= {window_start} AND time_unix_nano < {window_end}
105GROUP BY
106 resource_attributes,
107 scope_attributes,
108 metric_name,
109 metric_description,
110 metric_unit
111"#;
112
113pub struct SqlTemplate;
115
116impl SqlTemplate {
117 pub fn generate(
119 aggregation_type: &AggregationType,
120 table_name: &str,
121 window_start: i64,
122 window_end: i64,
123 ) -> String {
124 let template = match aggregation_type {
125 AggregationType::Count => COUNT_TEMPLATE,
126 AggregationType::Histogram => HISTOGRAM_TEMPLATE,
127 AggregationType::Summary => SUMMARY_TEMPLATE,
128 AggregationType::ExponentialHistogram => EXPONENTIAL_HISTOGRAM_TEMPLATE,
129 };
130
131 template
132 .replace("{table_name}", table_name)
133 .replace("{window_start}", &window_start.to_string())
134 .replace("{window_end}", &window_end.to_string())
135 }
136
137 pub fn create_active_table_sql(table_name: &str) -> String {
139 crate::transforms::otel_aggregate::OtelSchema::create_table_sql(table_name)
140 }
141}
142
143#[cfg(test)]
144#[allow(clippy::unwrap_used, clippy::expect_used)]
145mod tests {
146 use super::*;
147
148 #[test]
149 fn test_count_template() {
150 let sql = SqlTemplate::generate(&AggregationType::Count, "sealed_abc123", 1000000, 2000000);
151 assert!(sql.contains("sealed_abc123"));
152 assert!(sql.contains("1000000"));
153 assert!(sql.contains("2000000"));
154 assert!(sql.contains("COUNT(*)"));
155 }
156
157 #[test]
158 fn test_histogram_template() {
159 let sql = SqlTemplate::generate(
160 &AggregationType::Histogram,
161 "sealed_abc123",
162 1000000,
163 2000000,
164 );
165 assert!(sql.contains("sealed_abc123"));
166 assert!(sql.contains("SUM(count)"));
167 assert!(sql.contains("SUM(sum)"));
168 }
169
170 #[test]
171 fn test_summary_template() {
172 let sql =
173 SqlTemplate::generate(&AggregationType::Summary, "sealed_abc123", 1000000, 2000000);
174 assert!(sql.contains("sealed_abc123"));
175 assert!(sql.contains("SUM(count)"));
176 assert!(sql.contains("SUM(sum)"));
177 }
178
179 #[test]
180 fn test_exponential_histogram_template() {
181 let sql = SqlTemplate::generate(
182 &AggregationType::ExponentialHistogram,
183 "sealed_abc123",
184 1000000,
185 2000000,
186 );
187 assert!(sql.contains("sealed_abc123"));
188 assert!(sql.contains("SUM(count)"));
189 }
190}