Skip to main content

kinetic/sinks/dlq/
client.rs

1use aws_config::BehaviorVersion;
2use aws_config::Region;
3use aws_config::meta::region::RegionProviderChain;
4use aws_sdk_s3::Client;
5use std::time::SystemTime;
6use uuid::Uuid;
7
8pub struct S3DlqClient {
9    client: Client,
10    bucket: String,
11    prefix: String,
12}
13
14impl S3DlqClient {
15    pub async fn new(region: &str, bucket: String, prefix: String) -> Self {
16        let region_provider = RegionProviderChain::first_try(Region::new(region.to_string()))
17            .or_default_provider()
18            .or_else(Region::new("us-east-1"));
19
20        let config = aws_config::defaults(BehaviorVersion::latest())
21            .region(region_provider)
22            .load()
23            .await;
24        let client = Client::new(&config);
25
26        Self {
27            client,
28            bucket,
29            prefix,
30        }
31    }
32
33    pub async fn check_health(&self) -> Result<(), aws_sdk_s3::Error> {
34        self.client
35            .head_bucket()
36            .bucket(&self.bucket)
37            .send()
38            .await?;
39        Ok(())
40    }
41
42    pub async fn write_batch(
43        &self,
44        pipeline_name: &str,
45        stage: &str,
46        payload: bytes::Bytes,
47        encoding: &str,
48    ) -> Result<(), aws_sdk_s3::Error> {
49        let timestamp = SystemTime::now()
50            .duration_since(SystemTime::UNIX_EPOCH)
51            .map(|duration| duration.as_millis())
52            .unwrap_or(0);
53        let batch_id = Uuid::new_v4().to_string();
54
55        let key = format!(
56            "{}/{}/{}/{}_{}.{}",
57            self.prefix, pipeline_name, stage, timestamp, batch_id, encoding
58        );
59
60        self.client
61            .put_object()
62            .bucket(&self.bucket)
63            .key(key)
64            .body(payload.into())
65            .send()
66            .await?;
67
68        Ok(())
69    }
70}