Skip to main content

kinetic/topology/
healthcheck.rs

1//! Sink healthcheck runner.
2
3use futures::future::join_all;
4pub use kinetic_core::Healthcheck;
5use snafu::Snafu;
6use tracing::{error, info};
7
8#[derive(Debug, Snafu)]
9pub enum Error {
10    #[snafu(display("Healthcheck failed for component '{}': {}", component, message))]
11    Failed { component: String, message: String },
12}
13
14pub type Result<T, E = Error> = std::result::Result<T, E>;
15
16/// Runs all provided healthchecks concurrently.
17pub async fn run_all(checks: Vec<(String, Box<dyn Healthcheck>)>) -> Result<()> {
18    if checks.is_empty() {
19        return Ok(());
20    }
21
22    info!("Running {} sink healthchecks...", checks.len());
23
24    let futures = checks.into_iter().map(|(name, check)| async move {
25        match check.check().await {
26            Ok(_) => {
27                info!("Healthcheck passed: {}", name);
28                Ok(())
29            }
30            Err(e) => {
31                error!("Healthcheck failed: {} - {}", name, e);
32                Err(Error::Failed {
33                    component: name,
34                    message: e.to_string(),
35                })
36            }
37        }
38    });
39
40    // Run all checks concurrently
41    let results = join_all(futures).await;
42
43    // If any failed, return an error. We just return the first error for now.
44    for res in results {
45        res?;
46    }
47
48    info!("All healthchecks passed.");
49    Ok(())
50}