1use arrow_flight::flight_service_server::FlightServiceServer;
4use arrow_flight::sql::server::{FlightSqlService, PeekableFlightDataStream};
5use arrow_flight::sql::{
6 ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
7 ActionCreatePreparedStatementResult, Any, CommandGetCatalogs, CommandGetCrossReference,
8 CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys, CommandGetPrimaryKeys,
9 CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables, CommandPreparedStatementQuery,
10 CommandPreparedStatementUpdate, CommandStatementQuery, CommandStatementUpdate,
11 DoPutPreparedStatementResult, TicketStatementQuery,
12};
13use arrow_flight::{
14 Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse,
15 PutResult, Ticket,
16};
17use flight_common::config::FlightServerConfig;
18use futures::Stream;
19use kinetic_buffers::BufferSender;
20use std::pin::Pin;
21use tonic::{Request, Response, Status, Streaming};
22use tracing::info;
23
24pub struct FlightSourceServer {
25 config: FlightServerConfig,
26 component_id: String,
27 sender: BufferSender,
28}
29
30impl FlightSourceServer {
31 pub fn new(config: FlightServerConfig, component_id: String, sender: BufferSender) -> Self {
32 Self {
33 config,
34 component_id,
35 sender,
36 }
37 }
38
39 pub async fn run(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
40 let addr = self.config.endpoint.parse()?;
41 info!(
42 "Starting Arrow Flight SQL source '{}' on {}",
43 self.component_id, addr
44 );
45
46 let svc = FlightSqlSvc {
47 component_id: self.component_id.clone(),
48 sender: self.sender.clone(),
49 };
50
51 let server = FlightServiceServer::new(svc);
52
53 tonic::transport::Server::builder()
54 .add_service(server)
55 .serve(addr)
56 .await?;
57
58 Ok(())
59 }
60}
61
62#[derive(Clone)]
65struct FlightSqlSvc {
66 #[allow(dead_code)]
67 component_id: String,
68 #[allow(dead_code)]
69 sender: BufferSender,
70}
71
72#[tonic::async_trait]
73impl FlightSqlService for FlightSqlSvc {
74 type FlightService = FlightSqlSvc;
75
76 async fn do_handshake(
77 &self,
78 _request: Request<Streaming<HandshakeRequest>>,
79 ) -> Result<
80 Response<Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send + 'static>>>,
81 Status,
82 > {
83 Err(Status::unimplemented("Not implemented"))
84 }
85
86 async fn do_get_fallback(
87 &self,
88 _request: Request<Ticket>,
89 _message: Any,
90 ) -> Result<
91 Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
92 Status,
93 > {
94 Err(Status::unimplemented("Not implemented"))
95 }
96
97 async fn get_flight_info_statement(
98 &self,
99 _query: CommandStatementQuery,
100 _request: Request<FlightDescriptor>,
101 ) -> Result<Response<FlightInfo>, Status> {
102 Err(Status::unimplemented("Not implemented"))
103 }
104
105 async fn get_flight_info_prepared_statement(
106 &self,
107 _cmd: CommandPreparedStatementQuery,
108 _request: Request<FlightDescriptor>,
109 ) -> Result<Response<FlightInfo>, Status> {
110 Err(Status::unimplemented("Not implemented"))
111 }
112
113 async fn get_flight_info_catalogs(
114 &self,
115 _query: CommandGetCatalogs,
116 _request: Request<FlightDescriptor>,
117 ) -> Result<Response<FlightInfo>, Status> {
118 Err(Status::unimplemented("Not implemented"))
119 }
120
121 async fn get_flight_info_schemas(
122 &self,
123 _query: CommandGetDbSchemas,
124 _request: Request<FlightDescriptor>,
125 ) -> Result<Response<FlightInfo>, Status> {
126 Err(Status::unimplemented("Not implemented"))
127 }
128
129 async fn get_flight_info_tables(
130 &self,
131 _query: CommandGetTables,
132 _request: Request<FlightDescriptor>,
133 ) -> Result<Response<FlightInfo>, Status> {
134 Err(Status::unimplemented("Not implemented"))
135 }
136
137 async fn get_flight_info_table_types(
138 &self,
139 _query: CommandGetTableTypes,
140 _request: Request<FlightDescriptor>,
141 ) -> Result<Response<FlightInfo>, Status> {
142 Err(Status::unimplemented("Not implemented"))
143 }
144
145 async fn get_flight_info_sql_info(
146 &self,
147 _query: CommandGetSqlInfo,
148 _request: Request<FlightDescriptor>,
149 ) -> Result<Response<FlightInfo>, Status> {
150 Err(Status::unimplemented("Not implemented"))
151 }
152
153 async fn get_flight_info_primary_keys(
154 &self,
155 _query: CommandGetPrimaryKeys,
156 _request: Request<FlightDescriptor>,
157 ) -> Result<Response<FlightInfo>, Status> {
158 Err(Status::unimplemented("Not implemented"))
159 }
160
161 async fn get_flight_info_exported_keys(
162 &self,
163 _query: CommandGetExportedKeys,
164 _request: Request<FlightDescriptor>,
165 ) -> Result<Response<FlightInfo>, Status> {
166 Err(Status::unimplemented("Not implemented"))
167 }
168
169 async fn get_flight_info_imported_keys(
170 &self,
171 _query: CommandGetImportedKeys,
172 _request: Request<FlightDescriptor>,
173 ) -> Result<Response<FlightInfo>, Status> {
174 Err(Status::unimplemented("Not implemented"))
175 }
176
177 async fn get_flight_info_cross_reference(
178 &self,
179 _query: CommandGetCrossReference,
180 _request: Request<FlightDescriptor>,
181 ) -> Result<Response<FlightInfo>, Status> {
182 Err(Status::unimplemented("Not implemented"))
183 }
184
185 async fn do_get_statement(
186 &self,
187 _ticket: TicketStatementQuery,
188 _request: Request<Ticket>,
189 ) -> Result<
190 Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
191 Status,
192 > {
193 Err(Status::unimplemented("Not implemented"))
194 }
195
196 async fn do_get_prepared_statement(
197 &self,
198 _query: CommandPreparedStatementQuery,
199 _request: Request<Ticket>,
200 ) -> Result<
201 Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
202 Status,
203 > {
204 Err(Status::unimplemented("Not implemented"))
205 }
206
207 async fn do_get_catalogs(
208 &self,
209 _query: CommandGetCatalogs,
210 _request: Request<Ticket>,
211 ) -> Result<
212 Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
213 Status,
214 > {
215 Err(Status::unimplemented("Not implemented"))
216 }
217
218 async fn do_get_schemas(
219 &self,
220 _query: CommandGetDbSchemas,
221 _request: Request<Ticket>,
222 ) -> Result<
223 Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
224 Status,
225 > {
226 Err(Status::unimplemented("Not implemented"))
227 }
228
229 async fn do_get_tables(
230 &self,
231 _query: CommandGetTables,
232 _request: Request<Ticket>,
233 ) -> Result<
234 Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
235 Status,
236 > {
237 Err(Status::unimplemented("Not implemented"))
238 }
239
240 async fn do_get_table_types(
241 &self,
242 _query: CommandGetTableTypes,
243 _request: Request<Ticket>,
244 ) -> Result<
245 Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
246 Status,
247 > {
248 Err(Status::unimplemented("Not implemented"))
249 }
250
251 async fn do_get_sql_info(
252 &self,
253 _query: CommandGetSqlInfo,
254 _request: Request<Ticket>,
255 ) -> Result<
256 Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
257 Status,
258 > {
259 Err(Status::unimplemented("Not implemented"))
260 }
261
262 async fn do_get_primary_keys(
263 &self,
264 _query: CommandGetPrimaryKeys,
265 _request: Request<Ticket>,
266 ) -> Result<
267 Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
268 Status,
269 > {
270 Err(Status::unimplemented("Not implemented"))
271 }
272
273 async fn do_get_exported_keys(
274 &self,
275 _query: CommandGetExportedKeys,
276 _request: Request<Ticket>,
277 ) -> Result<
278 Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
279 Status,
280 > {
281 Err(Status::unimplemented("Not implemented"))
282 }
283
284 async fn do_get_imported_keys(
285 &self,
286 _query: CommandGetImportedKeys,
287 _request: Request<Ticket>,
288 ) -> Result<
289 Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
290 Status,
291 > {
292 Err(Status::unimplemented("Not implemented"))
293 }
294
295 async fn do_get_cross_reference(
296 &self,
297 _query: CommandGetCrossReference,
298 _request: Request<Ticket>,
299 ) -> Result<
300 Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>>,
301 Status,
302 > {
303 Err(Status::unimplemented("Not implemented"))
304 }
305
306 async fn do_put_statement_update(
307 &self,
308 _ticket: CommandStatementUpdate,
309 _request: Request<PeekableFlightDataStream>,
310 ) -> Result<i64, Status> {
311 info!(
313 "Flight SQL source {} received a statement update",
314 self.component_id
315 );
316 Ok(-1)
317 }
318
319 async fn do_put_prepared_statement_query(
320 &self,
321 _query: CommandPreparedStatementQuery,
322 _request: Request<PeekableFlightDataStream>,
323 ) -> Result<DoPutPreparedStatementResult, Status> {
324 Err(Status::unimplemented("Not implemented"))
325 }
326
327 async fn do_put_prepared_statement_update(
328 &self,
329 _query: CommandPreparedStatementUpdate,
330 _request: Request<PeekableFlightDataStream>,
331 ) -> Result<i64, Status> {
332 Err(Status::unimplemented("Not implemented"))
333 }
334
335 async fn do_put_fallback(
336 &self,
337 _request: Request<PeekableFlightDataStream>,
338 _message: Any,
339 ) -> Result<
340 Response<Pin<Box<dyn Stream<Item = Result<PutResult, Status>> + Send + 'static>>>,
341 Status,
342 > {
343 Err(Status::unimplemented("Not implemented"))
344 }
345
346 async fn do_action_create_prepared_statement(
347 &self,
348 _query: ActionCreatePreparedStatementRequest,
349 _request: Request<Action>,
350 ) -> Result<ActionCreatePreparedStatementResult, Status> {
351 Err(Status::unimplemented("Not implemented"))
352 }
353
354 async fn do_action_close_prepared_statement(
355 &self,
356 _query: ActionClosePreparedStatementRequest,
357 _request: Request<Action>,
358 ) -> Result<(), Status> {
359 Err(Status::unimplemented("Not implemented"))
360 }
361
362 async fn do_action_fallback(
363 &self,
364 _request: Request<Action>,
365 ) -> Result<
366 Response<
367 Pin<Box<dyn Stream<Item = Result<arrow_flight::Result, Status>> + Send + 'static>>,
368 >,
369 Status,
370 > {
371 Err(Status::unimplemented("Not implemented"))
372 }
373
374 async fn register_sql_info(&self, _id: i32, _result: &arrow_flight::sql::SqlInfo) {}
375}