kinetic/sinks/flight/
client.rs1use arrow_flight::sql::client::FlightSqlServiceClient;
2use flight_common::config::FlightClientConfig;
3use kinetic_buffers::BufferReceiver;
4use metrics::{Label, counter};
5use std::sync::Arc;
6use tonic::transport::Channel;
7use tracing::{debug, error, info, warn};
8
9pub struct FlightSinkClient {
10 config: FlightClientConfig,
11 component_id: String,
12 receiver: BufferReceiver,
13 labels: Arc<[Label]>,
14}
15
16impl FlightSinkClient {
17 pub fn new(config: FlightClientConfig, component_id: String, receiver: BufferReceiver) -> Self {
18 let labels: Arc<[Label]> = Arc::new([
19 Label::new("component_id", component_id.clone()),
20 Label::new("component_type", "sink"),
21 Label::new("component_kind", "flight_sql"),
22 ]);
23
24 Self {
25 config,
26 component_id,
27 receiver,
28 labels,
29 }
30 }
31
32 async fn connect(&self) -> anyhow::Result<FlightSqlServiceClient<Channel>> {
33 let channel = Channel::from_shared(self.config.endpoint.clone())
34 .map_err(|e| anyhow::anyhow!("Invalid Flight SQL endpoint: {}", e))?
35 .connect()
36 .await?;
37
38 let mut sql_client = FlightSqlServiceClient::new(channel);
39
40 if let Some(commands) = &self.config.setup_commands {
42 for cmd in commands {
43 debug!(
44 "Flight SQL sink {} executing setup command: {}",
45 self.component_id, cmd
46 );
47 if let Err(e) = sql_client.execute_update(cmd.clone(), None).await {
48 error!(
49 "Flight SQL sink {} failed to execute setup command '{}': {}",
50 self.component_id, cmd, e
51 );
52 }
54 }
55 }
56
57 Ok(sql_client)
58 }
59
60 pub async fn run(mut self) {
61 info!(
62 "Starting Arrow Flight SQL sink '{}' to {}",
63 self.component_id, self.config.endpoint
64 );
65
66 let mut client: Option<FlightSqlServiceClient<Channel>> = None;
67 let table_name = self
68 .config
69 .table_name
70 .clone()
71 .unwrap_or_else(|| "events".to_string());
72
73 while let Some(mut batch) = self.receiver.recv().await {
74 let row_count = batch.num_rows();
75 let byte_size = batch.estimated_size();
76
77 counter!("component_received_events_total", self.labels.iter())
78 .increment(row_count as u64);
79 counter!("component_received_event_bytes_total", self.labels.iter())
80 .increment(byte_size as u64);
81
82 debug!(
83 "Flight SQL sink {} routing batch of {} rows to table {}",
84 self.component_id, row_count, table_name
85 );
86
87 let mut retry_count = 0;
88 let max_retries = 3;
89
90 loop {
91 if client.is_none() {
93 match self.connect().await {
94 Ok(c) => client = Some(c),
95 Err(e) => {
96 error!(
97 "Flight SQL sink {} failed to connect to {}: {}. Retrying in 5s...",
98 self.component_id, self.config.endpoint, e
99 );
100 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
101 continue;
102 }
103 }
104 }
105
106 let command = arrow_flight::sql::CommandStatementIngest {
107 options: std::collections::HashMap::new(),
108 table_definition_options: None,
109 table: table_name.clone(),
110 schema: None,
111 catalog: None,
112 temporary: false,
113 transaction_id: None,
114 };
115
116 let stream = futures::stream::iter(vec![Ok(batch.payload.clone())]);
117
118 let res: anyhow::Result<i64> = if let Some(ref mut c) = client {
119 c.execute_ingest(command, stream)
120 .await
121 .map_err(|e| anyhow::anyhow!("Flight error: {}", e))
122 } else {
123 Err(anyhow::anyhow!("Client not connected"))
124 };
125
126 match res {
127 Ok(rows) => {
128 debug!(
129 "Flight SQL sink {} ingested {} rows.",
130 self.component_id, rows
131 );
132
133 counter!("component_sent_events_total", self.labels.iter())
134 .increment(row_count as u64);
135 counter!("component_sent_event_bytes_total", self.labels.iter())
136 .increment(byte_size as u64);
137 counter!("component_sent_network_bytes_total", self.labels.iter())
140 .increment(byte_size as u64);
141
142 if let Some(token) = batch.ack_token.take() {
143 token.ack();
144 }
145 break;
146 }
147 Err(e) => {
148 error!(
149 "Flight SQL sink {} ingestion failed: {}",
150 self.component_id, e
151 );
152 client = None; retry_count += 1;
154 if retry_count >= max_retries {
155 warn!(
156 "Flight SQL sink {} reached max retries, dropping batch",
157 self.component_id
158 );
159 counter!("component_discarded_events_total", self.labels.iter())
160 .increment(row_count as u64);
161 break;
162 }
163 let wait_secs = 2u64.pow(retry_count as u32);
165 tokio::time::sleep(tokio::time::Duration::from_secs(wait_secs)).await;
166 }
167 }
168 }
169 }
170
171 info!(
172 "Arrow Flight SQL sink '{}' shutting down",
173 self.component_id
174 );
175 }
176}