Skip to main content

kinetic/transforms/otel_aggregate/
otel_schema.rs

1//! OTel schema mapping for aggregation transforms.
2//!
3//! Maps OpenTelemetry metric data models to DuckDB table schemas.
4
5/// Supported OTel schema versions
6pub const OTEL_SCHEMA_VERSION_1_0: &str = "1.0";
7
8/// OTel metric column names for standard attributes
9pub const RESOURCE_ATTRS: &str = "resource_attributes";
10pub const SCOPE_ATTRS: &str = "scope_attributes";
11pub const METRIC_NAME: &str = "metric_name";
12pub const METRIC_DESCRIPTION: &str = "metric_description";
13pub const METRIC_UNIT: &str = "metric_unit";
14pub const TIME_UNIX_NANO: &str = "time_unix_nano";
15pub const START_TIME_UNIX_NANO: &str = "start_time_unix_nano";
16
17/// OTel histogram-specific columns
18pub const COUNT: &str = "count";
19pub const SUM: &str = "sum";
20pub const MIN: &str = "min";
21pub const MAX: &str = "max";
22pub const BUCKET_BOUNDS: &str = "bucket_bounds";
23pub const BUCKET_COUNTS: &str = "bucket_counts";
24
25/// OTel summary-specific columns
26pub const QUANTILE_VALUES: &str = "quantile_values";
27
28/// Schema mapping for OTel metrics tables
29pub struct OtelSchema;
30
31impl OtelSchema {
32    /// Get the standard OTel metric table schema as a SQL CREATE TABLE statement
33    pub fn create_table_sql(table_name: &str) -> String {
34        format!(
35            "CREATE TABLE {} (
36                {} VARCHAR,
37                {} VARCHAR,
38                {} VARCHAR NOT NULL,
39                {} VARCHAR,
40                {} VARCHAR,
41                {} BIGINT NOT NULL,
42                {} BIGINT,
43                {} DOUBLE,
44                {} BIGINT,
45                {} DOUBLE,
46                {} DOUBLE,
47                {} DOUBLE[],
48                {} BIGINT[]
49            )",
50            table_name,
51            RESOURCE_ATTRS,
52            SCOPE_ATTRS,
53            METRIC_NAME,
54            METRIC_DESCRIPTION,
55            METRIC_UNIT,
56            TIME_UNIX_NANO,
57            START_TIME_UNIX_NANO,
58            COUNT,
59            SUM,
60            MIN,
61            MAX,
62            BUCKET_BOUNDS,
63            BUCKET_COUNTS,
64        )
65    }
66
67    /// Get column names for selecting from raw OTel format
68    pub fn select_columns() -> &'static [&'static str] {
69        &[
70            RESOURCE_ATTRS,
71            SCOPE_ATTRS,
72            METRIC_NAME,
73            METRIC_DESCRIPTION,
74            METRIC_UNIT,
75            TIME_UNIX_NANO,
76            START_TIME_UNIX_NANO,
77            COUNT,
78            SUM,
79            MIN,
80            MAX,
81            BUCKET_BOUNDS,
82            BUCKET_COUNTS,
83        ]
84    }
85
86    /// Get group by columns for aggregation
87    pub fn group_by_columns() -> &'static [&'static str] {
88        &[
89            RESOURCE_ATTRS,
90            SCOPE_ATTRS,
91            METRIC_NAME,
92            METRIC_DESCRIPTION,
93            METRIC_UNIT,
94        ]
95    }
96}
97
98#[cfg(test)]
99#[allow(clippy::unwrap_used, clippy::expect_used)]
100mod tests {
101    use super::*;
102
103    #[test]
104    fn test_create_table_sql() {
105        let sql = OtelSchema::create_table_sql("test_metrics");
106        assert!(sql.contains("CREATE TABLE test_metrics"));
107        assert!(sql.contains("metric_name"));
108        assert!(sql.contains("count"));
109        assert!(sql.contains("sum"));
110    }
111
112    #[test]
113    fn test_select_columns() {
114        let cols = OtelSchema::select_columns();
115        assert!(cols.contains(&"metric_name"));
116        assert!(cols.contains(&"count"));
117        assert!(cols.contains(&"sum"));
118    }
119
120    #[test]
121    fn test_group_by_columns() {
122        let cols = OtelSchema::group_by_columns();
123        assert!(cols.contains(&"metric_name"));
124        assert!(!cols.contains(&"count"));
125    }
126}