Skip to main content

kinetic/sinks/gcp/
pubsub.rs

1//! Google Cloud Pub/Sub Sink.
2
3use gcloud_googleapis::pubsub::v1::PubsubMessage;
4use gcloud_pubsub::client::Client as PubSubClient;
5use gcloud_pubsub::publisher::Publisher;
6use gcp_common::{client::create_pubsub_client, config::PubSubSinkConfig};
7use kinetic_buffers::BufferReceiver;
8use kinetic_core::{EventBatch, ShutdownSignal};
9use metrics::{Label, counter};
10use tracing::{debug, error, info, warn};
11
12use arrow_array::{Array, StringArray};
13use kinetic_core::encode::Encoder;
14use std::sync::Arc;
15
16pub struct PubSubSinkTask {
17    config: PubSubSinkConfig,
18    component_id: String,
19    receiver: BufferReceiver,
20    shutdown: ShutdownSignal,
21    encoder: Arc<dyn Encoder>,
22    labels: Arc<[Label]>,
23}
24
25use async_trait::async_trait;
26use kinetic_core::healthcheck::Healthcheck;
27
28#[async_trait]
29impl Healthcheck for PubSubSinkTask {
30    async fn check(&self) -> anyhow::Result<()> {
31        let gcp = self
32            .config
33            .config
34            .auth
35            .as_ref()
36            .cloned()
37            .unwrap_or_default();
38        let client: PubSubClient = create_pubsub_client(&gcp).await?;
39        let topic = client.topic(&self.config.config.topic);
40
41        if topic.exists(None).await? {
42            Ok(())
43        } else {
44            anyhow::bail!(
45                "Pub/Sub topic '{}' does not exist",
46                self.config.config.topic
47            )
48        }
49    }
50}
51
52impl PubSubSinkTask {
53    pub fn new(
54        config: PubSubSinkConfig,
55        component_id: String,
56        receiver: BufferReceiver,
57        shutdown: ShutdownSignal,
58        encoder: Arc<dyn Encoder>,
59    ) -> Self {
60        let labels: Arc<[Label]> = Arc::new([
61            Label::new("component_id", component_id.clone()),
62            Label::new("component_type", "sink"),
63            Label::new("component_kind", "gcp_pubsub"),
64        ]);
65
66        Self {
67            config,
68            component_id,
69            receiver,
70            shutdown,
71            encoder,
72            labels,
73        }
74    }
75
76    pub async fn run(mut self) {
77        info!(
78            "Starting Pub/Sub sink '{}' to topic: {}",
79            self.component_id, self.config.config.topic
80        );
81
82        let gcp = self
83            .config
84            .config
85            .auth
86            .as_ref()
87            .cloned()
88            .unwrap_or_default();
89        let client: PubSubClient = match create_pubsub_client(&gcp).await {
90            Ok(c) => c,
91            Err(e) => {
92                error!("Failed to create Pub/Sub client: {}", e);
93                return;
94            }
95        };
96
97        let topic = client.topic(&self.config.config.topic);
98        let dlq_topic = self
99            .config
100            .config
101            .dead_letter_topic
102            .as_ref()
103            .map(|t| client.topic(t));
104
105        let mut shutdown = self.shutdown.clone();
106
107        let publisher = topic.new_publisher(None);
108        let dlq_publisher = dlq_topic.as_ref().map(|t| t.new_publisher(None));
109
110        loop {
111            tokio::select! {
112                reason = shutdown.recv() => {
113                    info!("Pub/Sub sink '{}' received shutdown signal: {:?}", self.component_id, reason);
114                    break;
115                }
116                batch = self.receiver.recv() => {
117                    let Some(batch) = batch else {
118                        info!("Pub/Sub sink '{}' receiver closed", self.component_id);
119                        break;
120                    };
121
122                    counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
123                    counter!("component_received_event_bytes_total", self.labels.iter()).increment(batch.estimated_size() as u64);
124
125                    if let Err(e) = self.process_batch(&publisher, dlq_publisher.as_ref(), batch).await {
126                        error!("Failed to process batch in Pub/Sub sink {}: {}", self.component_id, e);
127                    }
128                }
129            }
130        }
131
132        info!("Pub/Sub sink task {} shutting down", self.component_id);
133    }
134
135    async fn process_batch(
136        &self,
137        publisher: &Publisher,
138        dlq_publisher: Option<&Publisher>,
139        mut batch: EventBatch,
140    ) -> anyhow::Result<()> {
141        let buf = self.encoder.encode(&batch)?;
142        let buf_len = buf.len();
143
144        // Handle ordering key if configured
145        let ordering_key = if self.config.config.enable_message_ordering {
146            self.config
147                .config
148                .ordering_key_field
149                .as_ref()
150                .and_then(|field_name| {
151                    // Try to extract the value from the configured field in the first row of the batch
152                    if let Some((idx, _)) = batch.payload.schema().column_with_name(field_name) {
153                        let col = batch.payload.column(idx);
154                        if let Some(arr) = col
155                            .as_any()
156                            .downcast_ref::<StringArray>()
157                            .filter(|arr| !arr.is_empty() && !arr.is_null(0))
158                        {
159                            return Some(arr.value(0).to_string());
160                        }
161                    }
162                    // Fallback to tenant_profile if field is missing or empty
163                    batch.metadata.tenant_profile.clone()
164                })
165        } else {
166            None
167        };
168
169        let message = PubsubMessage {
170            // PERFORMANCE: PubsubMessage requires owned Vec<u8>, triggering a copy from Bytes.
171            data: buf.to_vec(),
172            ordering_key: ordering_key.unwrap_or_default(),
173            ..Default::default()
174        };
175
176        let awaiter = publisher.publish(message).await;
177
178        match awaiter.get().await {
179            Ok(id) => {
180                debug!("Published message {} to Pub/Sub topic", id);
181                counter!("component_sent_network_bytes_total", self.labels.iter())
182                    .increment(buf_len as u64);
183                counter!("component_sent_events_total", self.labels.iter())
184                    .increment(batch.num_rows() as u64);
185                counter!("component_sent_event_bytes_total", self.labels.iter())
186                    .increment(batch.estimated_size() as u64);
187
188                if let Some(token) = batch.ack_token.take() {
189                    token.ack();
190                }
191            }
192            Err(e) => {
193                counter!("component_errors_total", self.labels.iter()).increment(1);
194                error!("Failed to publish to Pub/Sub: {}", e);
195
196                if let Some(dlq) = dlq_publisher {
197                    warn!("Routing failed batch to DLQ topic");
198                    let dlq_message = PubsubMessage {
199                        // PERFORMANCE: PubsubMessage requires owned Vec<u8>, triggering a copy from Bytes.
200                        data: buf.to_vec(),
201                        ..Default::default()
202                    };
203                    if let Err(dlq_err) = dlq.publish(dlq_message).await.get().await {
204                        counter!("component_dlq_errors_total", self.labels.iter()).increment(1);
205                        error!(
206                            "Failed to publish to DLQ Pub/Sub topic for component {}: {}",
207                            self.component_id, dlq_err
208                        );
209                        return Err(anyhow::anyhow!(
210                            "Pub/Sub publish failed: {}, DLQ publish also failed: {}",
211                            e,
212                            dlq_err
213                        ));
214                    }
215                }
216
217                return Err(anyhow::anyhow!("Pub/Sub publish failed: {}", e));
218            }
219        }
220
221        Ok(())
222    }
223}