Skip to main content

kinetic/sources/flight/
server.rs

1//! Arrow Flight SQL source server for receiving queries and commands.
2
3use arrow_flight::flight_service_server::FlightServiceServer;
4use arrow_flight::sql::server::{FlightSqlService, PeekableFlightDataStream};
5use arrow_flight::sql::{
6    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
7    ActionCreatePreparedStatementResult, Any, CommandGetCatalogs, CommandGetCrossReference,
8    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys, CommandGetPrimaryKeys,
9    CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables, CommandPreparedStatementQuery,
10    CommandPreparedStatementUpdate, CommandStatementQuery, CommandStatementUpdate,
11    DoPutPreparedStatementResult, TicketStatementQuery,
12};
13use arrow_flight::{
14    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse,
15    PutResult, Ticket,
16};
17use flight_common::config::FlightServerConfig;
18use futures::Stream;
19use kinetic_buffers::BufferSender;
20use std::pin::Pin;
21use tonic::{Request, Response, Status, Streaming};
22use tracing::info;
23
24pub struct FlightSourceServer {
25    config: FlightServerConfig,
26    component_id: String,
27    sender: BufferSender,
28}
29
30impl FlightSourceServer {
31    pub fn new(config: FlightServerConfig, component_id: String, sender: BufferSender) -> Self {
32        Self {
33            config,
34            component_id,
35            sender,
36        }
37    }
38
39    pub async fn run(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
40        let addr = self.config.endpoint.parse()?;
41        info!(
42            "Starting Arrow Flight SQL source '{}' on {}",
43            self.component_id, addr
44        );
45
46        let svc = FlightSqlSvc {
47            component_id: self.component_id.clone(),
48            sender: self.sender.clone(),
49        };
50
51        let server = FlightServiceServer::new(svc);
52
53        tonic::transport::Server::builder()
54            .add_service(server)
55            .serve(addr)
56            .await?;
57
58        Ok(())
59    }
60}
61
62// Minimal stub implementation of FlightSqlService.
63// In a real implementation we would route the SQL updates to downstream sinks.
64#[derive(Clone)]
65struct FlightSqlSvc {
66    #[allow(dead_code)]
67    component_id: String,
68    #[allow(dead_code)]
69    sender: BufferSender,
70}
71
72#[tonic::async_trait]
73impl FlightSqlService for FlightSqlSvc {
74    type FlightService = FlightSqlSvc;
75
76    async fn do_handshake(
77        &self,
78        _request: Request<Streaming<HandshakeRequest>>,
79    ) -> Result<
80        Response<Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send + 'static>>>,
81        Status,
82    > {
83        Err(Status::unimplemented("Not implemented"))
84    }
85
86    async fn do_get_fallback(
87        &self,
88        _request: Request<Ticket>,
89        _message: Any,
90    ) -> Result<
91        Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
92        Status,
93    > {
94        Err(Status::unimplemented("Not implemented"))
95    }
96
97    async fn get_flight_info_statement(
98        &self,
99        _query: CommandStatementQuery,
100        _request: Request<FlightDescriptor>,
101    ) -> Result<Response<FlightInfo>, Status> {
102        Err(Status::unimplemented("Not implemented"))
103    }
104
105    async fn get_flight_info_prepared_statement(
106        &self,
107        _cmd: CommandPreparedStatementQuery,
108        _request: Request<FlightDescriptor>,
109    ) -> Result<Response<FlightInfo>, Status> {
110        Err(Status::unimplemented("Not implemented"))
111    }
112
113    async fn get_flight_info_catalogs(
114        &self,
115        _query: CommandGetCatalogs,
116        _request: Request<FlightDescriptor>,
117    ) -> Result<Response<FlightInfo>, Status> {
118        Err(Status::unimplemented("Not implemented"))
119    }
120
121    async fn get_flight_info_schemas(
122        &self,
123        _query: CommandGetDbSchemas,
124        _request: Request<FlightDescriptor>,
125    ) -> Result<Response<FlightInfo>, Status> {
126        Err(Status::unimplemented("Not implemented"))
127    }
128
129    async fn get_flight_info_tables(
130        &self,
131        _query: CommandGetTables,
132        _request: Request<FlightDescriptor>,
133    ) -> Result<Response<FlightInfo>, Status> {
134        Err(Status::unimplemented("Not implemented"))
135    }
136
137    async fn get_flight_info_table_types(
138        &self,
139        _query: CommandGetTableTypes,
140        _request: Request<FlightDescriptor>,
141    ) -> Result<Response<FlightInfo>, Status> {
142        Err(Status::unimplemented("Not implemented"))
143    }
144
145    async fn get_flight_info_sql_info(
146        &self,
147        _query: CommandGetSqlInfo,
148        _request: Request<FlightDescriptor>,
149    ) -> Result<Response<FlightInfo>, Status> {
150        Err(Status::unimplemented("Not implemented"))
151    }
152
153    async fn get_flight_info_primary_keys(
154        &self,
155        _query: CommandGetPrimaryKeys,
156        _request: Request<FlightDescriptor>,
157    ) -> Result<Response<FlightInfo>, Status> {
158        Err(Status::unimplemented("Not implemented"))
159    }
160
161    async fn get_flight_info_exported_keys(
162        &self,
163        _query: CommandGetExportedKeys,
164        _request: Request<FlightDescriptor>,
165    ) -> Result<Response<FlightInfo>, Status> {
166        Err(Status::unimplemented("Not implemented"))
167    }
168
169    async fn get_flight_info_imported_keys(
170        &self,
171        _query: CommandGetImportedKeys,
172        _request: Request<FlightDescriptor>,
173    ) -> Result<Response<FlightInfo>, Status> {
174        Err(Status::unimplemented("Not implemented"))
175    }
176
177    async fn get_flight_info_cross_reference(
178        &self,
179        _query: CommandGetCrossReference,
180        _request: Request<FlightDescriptor>,
181    ) -> Result<Response<FlightInfo>, Status> {
182        Err(Status::unimplemented("Not implemented"))
183    }
184
185    async fn do_get_statement(
186        &self,
187        _ticket: TicketStatementQuery,
188        _request: Request<Ticket>,
189    ) -> Result<
190        Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
191        Status,
192    > {
193        Err(Status::unimplemented("Not implemented"))
194    }
195
196    async fn do_get_prepared_statement(
197        &self,
198        _query: CommandPreparedStatementQuery,
199        _request: Request<Ticket>,
200    ) -> Result<
201        Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
202        Status,
203    > {
204        Err(Status::unimplemented("Not implemented"))
205    }
206
207    async fn do_get_catalogs(
208        &self,
209        _query: CommandGetCatalogs,
210        _request: Request<Ticket>,
211    ) -> Result<
212        Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
213        Status,
214    > {
215        Err(Status::unimplemented("Not implemented"))
216    }
217
218    async fn do_get_schemas(
219        &self,
220        _query: CommandGetDbSchemas,
221        _request: Request<Ticket>,
222    ) -> Result<
223        Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
224        Status,
225    > {
226        Err(Status::unimplemented("Not implemented"))
227    }
228
229    async fn do_get_tables(
230        &self,
231        _query: CommandGetTables,
232        _request: Request<Ticket>,
233    ) -> Result<
234        Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
235        Status,
236    > {
237        Err(Status::unimplemented("Not implemented"))
238    }
239
240    async fn do_get_table_types(
241        &self,
242        _query: CommandGetTableTypes,
243        _request: Request<Ticket>,
244    ) -> Result<
245        Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
246        Status,
247    > {
248        Err(Status::unimplemented("Not implemented"))
249    }
250
251    async fn do_get_sql_info(
252        &self,
253        _query: CommandGetSqlInfo,
254        _request: Request<Ticket>,
255    ) -> Result<
256        Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
257        Status,
258    > {
259        Err(Status::unimplemented("Not implemented"))
260    }
261
262    async fn do_get_primary_keys(
263        &self,
264        _query: CommandGetPrimaryKeys,
265        _request: Request<Ticket>,
266    ) -> Result<
267        Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
268        Status,
269    > {
270        Err(Status::unimplemented("Not implemented"))
271    }
272
273    async fn do_get_exported_keys(
274        &self,
275        _query: CommandGetExportedKeys,
276        _request: Request<Ticket>,
277    ) -> Result<
278        Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
279        Status,
280    > {
281        Err(Status::unimplemented("Not implemented"))
282    }
283
284    async fn do_get_imported_keys(
285        &self,
286        _query: CommandGetImportedKeys,
287        _request: Request<Ticket>,
288    ) -> Result<
289        Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
290        Status,
291    > {
292        Err(Status::unimplemented("Not implemented"))
293    }
294
295    async fn do_get_cross_reference(
296        &self,
297        _query: CommandGetCrossReference,
298        _request: Request<Ticket>,
299    ) -> Result<
300        Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
301        Status,
302    > {
303        Err(Status::unimplemented("Not implemented"))
304    }
305
306    async fn do_put_statement_update(
307        &self,
308        _ticket: CommandStatementUpdate,
309        _request: Request<PeekableFlightDataStream>,
310    ) -> Result<i64, Status> {
311        // In a real implementation, we would extract the query/data and send it to our pipeline.
312        info!(
313            "Flight SQL source {} received a statement update",
314            self.component_id
315        );
316        Ok(-1)
317    }
318
319    async fn do_put_prepared_statement_query(
320        &self,
321        _query: CommandPreparedStatementQuery,
322        _request: Request<PeekableFlightDataStream>,
323    ) -> Result<DoPutPreparedStatementResult, Status> {
324        Err(Status::unimplemented("Not implemented"))
325    }
326
327    async fn do_put_prepared_statement_update(
328        &self,
329        _query: CommandPreparedStatementUpdate,
330        _request: Request<PeekableFlightDataStream>,
331    ) -> Result<i64, Status> {
332        Err(Status::unimplemented("Not implemented"))
333    }
334
335    async fn do_put_fallback(
336        &self,
337        _request: Request<PeekableFlightDataStream>,
338        _message: Any,
339    ) -> Result<
340        Response<Pin<Box<dyn Stream<Item = Result<PutResult, Status>> + Send + 'static>>>,
341        Status,
342    > {
343        Err(Status::unimplemented("Not implemented"))
344    }
345
346    async fn do_action_create_prepared_statement(
347        &self,
348        _query: ActionCreatePreparedStatementRequest,
349        _request: Request<Action>,
350    ) -> Result<ActionCreatePreparedStatementResult, Status> {
351        Err(Status::unimplemented("Not implemented"))
352    }
353
354    async fn do_action_close_prepared_statement(
355        &self,
356        _query: ActionClosePreparedStatementRequest,
357        _request: Request<Action>,
358    ) -> Result<(), Status> {
359        Err(Status::unimplemented("Not implemented"))
360    }
361
362    async fn do_action_fallback(
363        &self,
364        _request: Request<Action>,
365    ) -> Result<
366        Response<
367            Pin<Box<dyn Stream<Item = Result<arrow_flight::Result, Status>> + Send + 'static>>,
368        >,
369        Status,
370    > {
371        Err(Status::unimplemented("Not implemented"))
372    }
373
374    async fn register_sql_info(&self, _id: i32, _result: &arrow_flight::sql::SqlInfo) {}
375}