Skip to main content

kinetic/transforms/wasm/
runtime.rs

1//! WASM runtime for executing transform modules
2
3use std::sync::Arc;
4use tokio::sync::RwLock;
5
6/// Runtime for loading and executing WASM modules
7#[derive(Debug)]
8pub struct WasmRuntime {
9    /// Shared wasmtime engine (per-process)
10    engine: Arc<wasmtime::Engine>,
11    /// Hot-swappable module (per-transform)
12    module: Arc<RwLock<wasmtime::Module>>,
13    /// Current module checksum for comparison
14    module_hash: Arc<RwLock<String>>,
15}
16
17impl WasmRuntime {
18    /// Create a new runtime with compiled WASM bytes
19    pub fn new(engine: Arc<wasmtime::Engine>, wasm_bytes: &[u8]) -> anyhow::Result<Self> {
20        let module = wasmtime::Module::new(&engine, wasm_bytes)?;
21        let hash = sha256(wasm_bytes);
22
23        Ok(Self {
24            engine,
25            module: Arc::new(RwLock::new(module)),
26            module_hash: Arc::new(RwLock::new(hash)),
27        })
28    }
29
30    /// Get the current module hash
31    pub async fn get_module_hash(&self) -> String {
32        self.module_hash.read().await.clone()
33    }
34
35    /// Hot-swap the module with new WASM bytes
36    pub async fn hot_swap(&self, new_wasm_bytes: &[u8]) -> anyhow::Result<()> {
37        // Compile new module
38        let new_module = wasmtime::Module::new(&self.engine, new_wasm_bytes)?;
39        let new_hash = sha256(new_wasm_bytes);
40
41        // Atomically swap
42        let mut module_guard = self.module.write().await;
43        let mut hash_guard = self.module_hash.write().await;
44
45        *module_guard = new_module;
46        *hash_guard = new_hash;
47
48        Ok(())
49    }
50
51    /// Create a new instance for batch processing
52    pub async fn instantiate(&self) -> anyhow::Result<wasmtime::Instance> {
53        let module = self.module.read().await;
54
55        // Create store for execution
56        // TODO: Add fuel metering when fuel feature is enabled
57        let mut store = wasmtime::Store::new(&self.engine, ());
58
59        // Create instance
60        let instance = wasmtime::Instance::new(&mut store, &module, &[])?;
61
62        // TODO: Return store and instance for execution
63        // For now, we return the instance but this needs refinement
64        // to properly manage store lifetime
65
66        Ok(instance)
67    }
68
69    /// Check if the module exports the required interface
70    pub async fn check_abi_compatibility(&self) -> bool {
71        let module = self.module.read().await;
72
73        // Required exports for map transform
74        let required_exports = ["transform", "get_schema"];
75
76        for export in &required_exports {
77            if module.get_export(export).is_none() {
78                return false;
79            }
80        }
81
82        true
83    }
84}
85
86fn sha256(data: &[u8]) -> String {
87    use sha2::{Digest, Sha256};
88    let mut hasher = Sha256::new();
89    hasher.update(data);
90    hex::encode(hasher.finalize())
91}
92
93/// Configuration for hot-swap behavior
94#[derive(Debug, Clone)]
95pub struct HotSwapConfig {
96    /// Enable hot-swap for this transform
97    pub enabled: bool,
98    /// Health gate duration in seconds
99    pub health_gate_secs: u64,
100    /// Auto-rollback on health gate failure
101    pub auto_rollback: bool,
102    /// Watch file system for changes (dev mode)
103    pub dev_mode: bool,
104}
105
106impl Default for HotSwapConfig {
107    fn default() -> Self {
108        Self {
109            enabled: false,
110            health_gate_secs: 30,
111            auto_rollback: true,
112            dev_mode: false,
113        }
114    }
115}