kinetic/sinks/gcp/
pubsub.rs1use gcloud_googleapis::pubsub::v1::PubsubMessage;
4use gcloud_pubsub::client::Client as PubSubClient;
5use gcloud_pubsub::publisher::Publisher;
6use gcp_common::{client::create_pubsub_client, config::PubSubSinkConfig};
7use kinetic_buffers::BufferReceiver;
8use kinetic_core::{EventBatch, ShutdownSignal};
9use metrics::{Label, counter};
10use tracing::{debug, error, info, warn};
11
12use arrow_array::{Array, StringArray};
13use kinetic_core::encode::Encoder;
14use std::sync::Arc;
15
16pub struct PubSubSinkTask {
17 config: PubSubSinkConfig,
18 component_id: String,
19 receiver: BufferReceiver,
20 shutdown: ShutdownSignal,
21 encoder: Arc<dyn Encoder>,
22 labels: Arc<[Label]>,
23}
24
25use async_trait::async_trait;
26use kinetic_core::healthcheck::Healthcheck;
27
28#[async_trait]
29impl Healthcheck for PubSubSinkTask {
30 async fn check(&self) -> anyhow::Result<()> {
31 let gcp = self
32 .config
33 .config
34 .auth
35 .as_ref()
36 .cloned()
37 .unwrap_or_default();
38 let client: PubSubClient = create_pubsub_client(&gcp).await?;
39 let topic = client.topic(&self.config.config.topic);
40
41 if topic.exists(None).await? {
42 Ok(())
43 } else {
44 anyhow::bail!(
45 "Pub/Sub topic '{}' does not exist",
46 self.config.config.topic
47 )
48 }
49 }
50}
51
52impl PubSubSinkTask {
53 pub fn new(
54 config: PubSubSinkConfig,
55 component_id: String,
56 receiver: BufferReceiver,
57 shutdown: ShutdownSignal,
58 encoder: Arc<dyn Encoder>,
59 ) -> Self {
60 let labels: Arc<[Label]> = Arc::new([
61 Label::new("component_id", component_id.clone()),
62 Label::new("component_type", "sink"),
63 Label::new("component_kind", "gcp_pubsub"),
64 ]);
65
66 Self {
67 config,
68 component_id,
69 receiver,
70 shutdown,
71 encoder,
72 labels,
73 }
74 }
75
76 pub async fn run(mut self) {
77 info!(
78 "Starting Pub/Sub sink '{}' to topic: {}",
79 self.component_id, self.config.config.topic
80 );
81
82 let gcp = self
83 .config
84 .config
85 .auth
86 .as_ref()
87 .cloned()
88 .unwrap_or_default();
89 let client: PubSubClient = match create_pubsub_client(&gcp).await {
90 Ok(c) => c,
91 Err(e) => {
92 error!("Failed to create Pub/Sub client: {}", e);
93 return;
94 }
95 };
96
97 let topic = client.topic(&self.config.config.topic);
98 let dlq_topic = self
99 .config
100 .config
101 .dead_letter_topic
102 .as_ref()
103 .map(|t| client.topic(t));
104
105 let mut shutdown = self.shutdown.clone();
106
107 let publisher = topic.new_publisher(None);
108 let dlq_publisher = dlq_topic.as_ref().map(|t| t.new_publisher(None));
109
110 loop {
111 tokio::select! {
112 reason = shutdown.recv() => {
113 info!("Pub/Sub sink '{}' received shutdown signal: {:?}", self.component_id, reason);
114 break;
115 }
116 batch = self.receiver.recv() => {
117 let Some(batch) = batch else {
118 info!("Pub/Sub sink '{}' receiver closed", self.component_id);
119 break;
120 };
121
122 counter!("component_received_events_total", self.labels.iter()).increment(batch.num_rows() as u64);
123 counter!("component_received_event_bytes_total", self.labels.iter()).increment(batch.estimated_size() as u64);
124
125 if let Err(e) = self.process_batch(&publisher, dlq_publisher.as_ref(), batch).await {
126 error!("Failed to process batch in Pub/Sub sink {}: {}", self.component_id, e);
127 }
128 }
129 }
130 }
131
132 info!("Pub/Sub sink task {} shutting down", self.component_id);
133 }
134
135 async fn process_batch(
136 &self,
137 publisher: &Publisher,
138 dlq_publisher: Option<&Publisher>,
139 mut batch: EventBatch,
140 ) -> anyhow::Result<()> {
141 let buf = self.encoder.encode(&batch)?;
142 let buf_len = buf.len();
143
144 let ordering_key = if self.config.config.enable_message_ordering {
146 self.config
147 .config
148 .ordering_key_field
149 .as_ref()
150 .and_then(|field_name| {
151 if let Some((idx, _)) = batch.payload.schema().column_with_name(field_name) {
153 let col = batch.payload.column(idx);
154 if let Some(arr) = col
155 .as_any()
156 .downcast_ref::<StringArray>()
157 .filter(|arr| !arr.is_empty() && !arr.is_null(0))
158 {
159 return Some(arr.value(0).to_string());
160 }
161 }
162 batch.metadata.tenant_profile.clone()
164 })
165 } else {
166 None
167 };
168
169 let message = PubsubMessage {
170 data: buf.to_vec(),
172 ordering_key: ordering_key.unwrap_or_default(),
173 ..Default::default()
174 };
175
176 let awaiter = publisher.publish(message).await;
177
178 match awaiter.get().await {
179 Ok(id) => {
180 debug!("Published message {} to Pub/Sub topic", id);
181 counter!("component_sent_network_bytes_total", self.labels.iter())
182 .increment(buf_len as u64);
183 counter!("component_sent_events_total", self.labels.iter())
184 .increment(batch.num_rows() as u64);
185 counter!("component_sent_event_bytes_total", self.labels.iter())
186 .increment(batch.estimated_size() as u64);
187
188 if let Some(token) = batch.ack_token.take() {
189 token.ack();
190 }
191 }
192 Err(e) => {
193 counter!("component_errors_total", self.labels.iter()).increment(1);
194 error!("Failed to publish to Pub/Sub: {}", e);
195
196 if let Some(dlq) = dlq_publisher {
197 warn!("Routing failed batch to DLQ topic");
198 let dlq_message = PubsubMessage {
199 data: buf.to_vec(),
201 ..Default::default()
202 };
203 if let Err(dlq_err) = dlq.publish(dlq_message).await.get().await {
204 counter!("component_dlq_errors_total", self.labels.iter()).increment(1);
205 error!(
206 "Failed to publish to DLQ Pub/Sub topic for component {}: {}",
207 self.component_id, dlq_err
208 );
209 return Err(anyhow::anyhow!(
210 "Pub/Sub publish failed: {}, DLQ publish also failed: {}",
211 e,
212 dlq_err
213 ));
214 }
215 }
216
217 return Err(anyhow::anyhow!("Pub/Sub publish failed: {}", e));
218 }
219 }
220
221 Ok(())
222 }
223}