Skip to main content

kinetic/sinks/flight/
client.rs

1use arrow_flight::sql::client::FlightSqlServiceClient;
2use flight_common::config::FlightClientConfig;
3use kinetic_buffers::BufferReceiver;
4use metrics::{Label, counter};
5use std::sync::Arc;
6use tonic::transport::Channel;
7use tracing::{debug, error, info, warn};
8
9pub struct FlightSinkClient {
10    config: FlightClientConfig,
11    component_id: String,
12    receiver: BufferReceiver,
13    labels: Arc<[Label]>,
14}
15
16impl FlightSinkClient {
17    pub fn new(config: FlightClientConfig, component_id: String, receiver: BufferReceiver) -> Self {
18        let labels: Arc<[Label]> = Arc::new([
19            Label::new("component_id", component_id.clone()),
20            Label::new("component_type", "sink"),
21            Label::new("component_kind", "flight_sql"),
22        ]);
23
24        Self {
25            config,
26            component_id,
27            receiver,
28            labels,
29        }
30    }
31
32    async fn connect(&self) -> anyhow::Result<FlightSqlServiceClient<Channel>> {
33        let channel = Channel::from_shared(self.config.endpoint.clone())
34            .map_err(|e| anyhow::anyhow!("Invalid Flight SQL endpoint: {}", e))?
35            .connect()
36            .await?;
37
38        let mut sql_client = FlightSqlServiceClient::new(channel);
39
40        // Execute any pre-connection setup commands
41        if let Some(commands) = &self.config.setup_commands {
42            for cmd in commands {
43                debug!(
44                    "Flight SQL sink {} executing setup command: {}",
45                    self.component_id, cmd
46                );
47                if let Err(e) = sql_client.execute_update(cmd.clone(), None).await {
48                    error!(
49                        "Flight SQL sink {} failed to execute setup command '{}': {}",
50                        self.component_id, cmd, e
51                    );
52                    // We continue anyway, but maybe this should be fatal depending on user preference
53                }
54            }
55        }
56
57        Ok(sql_client)
58    }
59
60    pub async fn run(mut self) {
61        info!(
62            "Starting Arrow Flight SQL sink '{}' to {}",
63            self.component_id, self.config.endpoint
64        );
65
66        let mut client: Option<FlightSqlServiceClient<Channel>> = None;
67        let table_name = self
68            .config
69            .table_name
70            .clone()
71            .unwrap_or_else(|| "events".to_string());
72
73        while let Some(mut batch) = self.receiver.recv().await {
74            let row_count = batch.num_rows();
75            let byte_size = batch.estimated_size();
76
77            counter!("component_received_events_total", self.labels.iter())
78                .increment(row_count as u64);
79            counter!("component_received_event_bytes_total", self.labels.iter())
80                .increment(byte_size as u64);
81
82            debug!(
83                "Flight SQL sink {} routing batch of {} rows to table {}",
84                self.component_id, row_count, table_name
85            );
86
87            let mut retry_count = 0;
88            let max_retries = 3;
89
90            loop {
91                // Ensure we have a connected client
92                if client.is_none() {
93                    match self.connect().await {
94                        Ok(c) => client = Some(c),
95                        Err(e) => {
96                            error!(
97                                "Flight SQL sink {} failed to connect to {}: {}. Retrying in 5s...",
98                                self.component_id, self.config.endpoint, e
99                            );
100                            tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
101                            continue;
102                        }
103                    }
104                }
105
106                let command = arrow_flight::sql::CommandStatementIngest {
107                    options: std::collections::HashMap::new(),
108                    table_definition_options: None,
109                    table: table_name.clone(),
110                    schema: None,
111                    catalog: None,
112                    temporary: false,
113                    transaction_id: None,
114                };
115
116                let stream = futures::stream::iter(vec![Ok(batch.payload.clone())]);
117
118                let res: anyhow::Result<i64> = if let Some(ref mut c) = client {
119                    c.execute_ingest(command, stream)
120                        .await
121                        .map_err(|e| anyhow::anyhow!("Flight error: {}", e))
122                } else {
123                    Err(anyhow::anyhow!("Client not connected"))
124                };
125
126                match res {
127                    Ok(rows) => {
128                        debug!(
129                            "Flight SQL sink {} ingested {} rows.",
130                            self.component_id, rows
131                        );
132
133                        counter!("component_sent_events_total", self.labels.iter())
134                            .increment(row_count as u64);
135                        counter!("component_sent_event_bytes_total", self.labels.iter())
136                            .increment(byte_size as u64);
137                        // Network bytes is tricky since tonic doesn't easily expose it here,
138                        // but we can estimate or use the payload size as a proxy as per spec.
139                        counter!("component_sent_network_bytes_total", self.labels.iter())
140                            .increment(byte_size as u64);
141
142                        if let Some(token) = batch.ack_token.take() {
143                            token.ack();
144                        }
145                        break;
146                    }
147                    Err(e) => {
148                        error!(
149                            "Flight SQL sink {} ingestion failed: {}",
150                            self.component_id, e
151                        );
152                        client = None; // Reset client on error to force reconnect
153                        retry_count += 1;
154                        if retry_count >= max_retries {
155                            warn!(
156                                "Flight SQL sink {} reached max retries, dropping batch",
157                                self.component_id
158                            );
159                            counter!("component_discarded_events_total", self.labels.iter())
160                                .increment(row_count as u64);
161                            break;
162                        }
163                        // Exponential backoff
164                        let wait_secs = 2u64.pow(retry_count as u32);
165                        tokio::time::sleep(tokio::time::Duration::from_secs(wait_secs)).await;
166                    }
167                }
168            }
169        }
170
171        info!(
172            "Arrow Flight SQL sink '{}' shutting down",
173            self.component_id
174        );
175    }
176}