kinetic/sinks/dlq/
client.rs1use aws_config::BehaviorVersion;
2use aws_config::Region;
3use aws_config::meta::region::RegionProviderChain;
4use aws_sdk_s3::Client;
5use std::time::SystemTime;
6use uuid::Uuid;
7
8pub struct S3DlqClient {
9 client: Client,
10 bucket: String,
11 prefix: String,
12}
13
14impl S3DlqClient {
15 pub async fn new(region: &str, bucket: String, prefix: String) -> Self {
16 let region_provider = RegionProviderChain::first_try(Region::new(region.to_string()))
17 .or_default_provider()
18 .or_else(Region::new("us-east-1"));
19
20 let config = aws_config::defaults(BehaviorVersion::latest())
21 .region(region_provider)
22 .load()
23 .await;
24 let client = Client::new(&config);
25
26 Self {
27 client,
28 bucket,
29 prefix,
30 }
31 }
32
33 pub async fn check_health(&self) -> Result<(), aws_sdk_s3::Error> {
34 self.client
35 .head_bucket()
36 .bucket(&self.bucket)
37 .send()
38 .await?;
39 Ok(())
40 }
41
42 pub async fn write_batch(
43 &self,
44 pipeline_name: &str,
45 stage: &str,
46 payload: bytes::Bytes,
47 encoding: &str,
48 ) -> Result<(), aws_sdk_s3::Error> {
49 let timestamp = SystemTime::now()
50 .duration_since(SystemTime::UNIX_EPOCH)
51 .map(|duration| duration.as_millis())
52 .unwrap_or(0);
53 let batch_id = Uuid::new_v4().to_string();
54
55 let key = format!(
56 "{}/{}/{}/{}_{}.{}",
57 self.prefix, pipeline_name, stage, timestamp, batch_id, encoding
58 );
59
60 self.client
61 .put_object()
62 .bucket(&self.bucket)
63 .key(key)
64 .body(payload.into())
65 .send()
66 .await?;
67
68 Ok(())
69 }
70}