kinetic/topology/
healthcheck.rs1use futures::future::join_all;
4pub use kinetic_core::Healthcheck;
5use snafu::Snafu;
6use tracing::{error, info};
7
8#[derive(Debug, Snafu)]
9pub enum Error {
10 #[snafu(display("Healthcheck failed for component '{}': {}", component, message))]
11 Failed { component: String, message: String },
12}
13
14pub type Result<T, E = Error> = std::result::Result<T, E>;
15
16pub async fn run_all(checks: Vec<(String, Box<dyn Healthcheck>)>) -> Result<()> {
18 if checks.is_empty() {
19 return Ok(());
20 }
21
22 info!("Running {} sink healthchecks...", checks.len());
23
24 let futures = checks.into_iter().map(|(name, check)| async move {
25 match check.check().await {
26 Ok(_) => {
27 info!("Healthcheck passed: {}", name);
28 Ok(())
29 }
30 Err(e) => {
31 error!("Healthcheck failed: {} - {}", name, e);
32 Err(Error::Failed {
33 component: name,
34 message: e.to_string(),
35 })
36 }
37 }
38 });
39
40 let results = join_all(futures).await;
42
43 for res in results {
45 res?;
46 }
47
48 info!("All healthchecks passed.");
49 Ok(())
50}