kinetic/transforms/wasm/
task.rs1use super::runtime::{HotSwapConfig, WasmRuntime};
4use crate::transforms::Transform;
5use async_trait::async_trait;
6use kinetic_buffers::{BufferReceiver, BufferSender};
7use kinetic_config::model::{ErrorPolicy, MapTransformConfigWrapped};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::RwLock;
11
12#[derive(Debug, Default)]
14pub struct TransformPerfStats {
15 pub total_batches: u64,
17 pub total_events: u64,
19 pub wasm_execution_time_ms: u64,
21 pub wasm_fuel_consumed: u64,
23 pub avg_batch_size: f64,
25 pub error_count: u64,
27 pub last_module_swap: Option<std::time::Instant>,
29}
30
31pub struct WasmTransformTask {
33 pub config: MapTransformConfigWrapped,
35 pub inputs: Vec<BufferReceiver>,
37 pub senders: HashMap<String, BufferSender>,
39 pub runtime: WasmRuntime,
41 pub hotswap_config: HotSwapConfig,
43 pub perf_stats: Arc<RwLock<TransformPerfStats>>,
45 pub error_policy: ErrorPolicy,
47}
48
49#[allow(dead_code)]
50impl WasmTransformTask {
51 pub fn new(
53 config: MapTransformConfigWrapped,
54 inputs: Vec<BufferReceiver>,
55 senders: HashMap<String, BufferSender>,
56 runtime: WasmRuntime,
57 ) -> Self {
58 let hotswap_config = HotSwapConfig::default();
60
61 Self {
62 config,
63 inputs,
64 senders,
65 runtime,
66 hotswap_config,
67 perf_stats: Arc::new(RwLock::new(TransformPerfStats::default())),
68 error_policy: ErrorPolicy::DropOnError,
69 }
70 }
71
72 pub async fn get_perf_stats(&self) -> TransformPerfStats {
74 self.perf_stats.read().await.clone()
75 }
76
77 async fn record_batch_processed(
79 &self,
80 batch_size: usize,
81 execution_ms: u64,
82 fuel_consumed: u64,
83 ) {
84 let mut stats = self.perf_stats.write().await;
85 stats.total_batches += 1;
86 stats.total_events += batch_size as u64;
87 stats.wasm_execution_time_ms += execution_ms;
88 stats.wasm_fuel_consumed += fuel_consumed;
89
90 let n = stats.total_batches as f64;
92 stats.avg_batch_size = ((n - 1.0) * stats.avg_batch_size + batch_size as f64) / n;
93 }
94
95 async fn record_error(&self) {
97 let mut stats = self.perf_stats.write().await;
98 stats.error_count += 1;
99 }
100}
101
102#[async_trait]
103impl Transform for WasmTransformTask {
104 async fn run(self: Box<Self>) {
105 tracing::info!("WASM transform task started: {:?}", self.config);
115
116 for mut rx in self.inputs {
118 while let Some(_batch) = rx.recv().await {
119 }
121 }
122 }
123}
124
125impl Clone for TransformPerfStats {
126 fn clone(&self) -> Self {
127 Self {
128 total_batches: self.total_batches,
129 total_events: self.total_events,
130 wasm_execution_time_ms: self.wasm_execution_time_ms,
131 wasm_fuel_consumed: self.wasm_fuel_consumed,
132 avg_batch_size: self.avg_batch_size,
133 error_count: self.error_count,
134 last_module_swap: self.last_module_swap,
135 }
136 }
137}