Skip to main content

kinetic/transforms/sample/
task.rs

1//! Sampling transform task.
2//!
3//! Wires the `kinetic-sampling` library into the Kinetic pipeline topology.
4//! Handles channel I/O, named outputs (`sampled` and `passthrough`), metrics
5//! emission, error policy, and shutdown.
6
7use crate::transforms::Transform;
8use crate::transforms::util::TransformErrorHandler;
9use async_trait::async_trait;
10use kinetic_buffers::{BufferReceiver, BufferSender};
11use kinetic_config::model::{ErrorPolicy, SamplingMode};
12use kinetic_core::EventBatch;
13use kinetic_sampling::{
14    adaptive::{AdaptiveConfig, AdaptiveSampler},
15    hash::{HashConfig, MissingKeyBehaviour},
16    random::{RandomConfig, build_rng},
17    recordbatch::RecordBatchConfig,
18    reservoir::{ReservoirConfig, ReservoirSampler},
19    stratified::{MissingFieldBehaviour, StratifiedConfig},
20    systematic::{SystematicConfig, SystematicSampler},
21    tail::{Decision, Operator, TailConfig, TailRule, TailSampler},
22};
23use metrics::{Label, counter, gauge};
24use rand::SeedableRng;
25use rand::rngs::StdRng;
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::time::Duration;
29use tokio::time::{self, Interval};
30use tracing::{debug, error, info, warn};
31
32use kinetic_common::register;
33use kinetic_common::telemetry::EventDuration;
34use std::time::Instant;
35
36/// Sampling transform task.
37pub struct SampleTask {
38    receiver: BufferReceiver,
39    senders: HashMap<String, BufferSender>,
40    mode: SamplingMode,
41    route_all: bool,
42    component_id: String,
43    labels: Arc<[Label]>,
44    error_handler: TransformErrorHandler,
45    event_duration: EventDuration,
46}
47
48#[async_trait]
49impl Transform for SampleTask {
50    async fn run(self: Box<Self>) {
51        self.run_task().await;
52    }
53}
54
55impl SampleTask {
56    pub fn new(
57        component_id: String,
58        receiver: BufferReceiver,
59        senders: HashMap<String, BufferSender>,
60        mode: SamplingMode,
61        error_policy: ErrorPolicy,
62        route_all: bool,
63    ) -> Self {
64        let labels: Arc<[Label]> = Arc::new([
65            Label::new("component_id", component_id.clone()),
66            Label::new("component_type", "transform"),
67            Label::new("component_kind", "sample"),
68        ]);
69        let error_handler = TransformErrorHandler::new(component_id.clone(), error_policy);
70        let event_duration = register!(EventDuration::new(component_id.clone(), "transform"));
71
72        Self {
73            receiver,
74            senders,
75            mode,
76            route_all,
77            component_id,
78            labels,
79            error_handler,
80            event_duration,
81        }
82    }
83
84    pub async fn run_task(mut self) {
85        info!("Starting Sample transform task: {}", self.component_id);
86
87        match &self.mode {
88            SamplingMode::Random { rate, seed } => {
89                let config = RandomConfig {
90                    rate: *rate,
91                    seed: *seed,
92                };
93                self.run_random(config).await;
94            }
95            SamplingMode::Systematic { interval } => {
96                match SystematicSampler::new(SystematicConfig {
97                    interval: *interval,
98                }) {
99                    Ok(sampler) => self.run_systematic(sampler).await,
100                    Err(e) => error!(
101                        "Sample {}: invalid systematic config: {}",
102                        self.component_id, e
103                    ),
104                }
105            }
106            SamplingMode::Hash {
107                key,
108                rate,
109                missing_key_behaviour,
110            } => {
111                let behaviour = if missing_key_behaviour == "keep" {
112                    MissingKeyBehaviour::Keep
113                } else {
114                    MissingKeyBehaviour::Drop
115                };
116                let config = HashConfig {
117                    key: key.clone(),
118                    rate: *rate,
119                    missing_key_behaviour: behaviour,
120                };
121                self.run_hash(config).await;
122            }
123            SamplingMode::Recordbatch { rate, seed } => {
124                let config = RecordBatchConfig {
125                    rate: *rate,
126                    seed: *seed,
127                };
128                self.run_recordbatch(config).await;
129            }
130            SamplingMode::Stratified {
131                field,
132                strata,
133                default_rate,
134                missing_field_behaviour,
135                seed,
136            } => {
137                let behaviour = if missing_field_behaviour == "drop" {
138                    MissingFieldBehaviour::Drop
139                } else {
140                    MissingFieldBehaviour::Keep
141                };
142                let config = StratifiedConfig {
143                    field: field.clone(),
144                    strata: strata.clone(),
145                    default_rate: *default_rate,
146                    missing_field_behaviour: behaviour,
147                    seed: *seed,
148                };
149                self.run_stratified(config, *seed).await;
150            }
151            SamplingMode::Adaptive {
152                base_rate,
153                min_rate,
154                max_rate,
155                error_field,
156                error_value,
157                latency_field,
158                latency_threshold_ms,
159                window_secs,
160                sensitivity,
161            } => {
162                let config = AdaptiveConfig {
163                    base_rate: *base_rate,
164                    min_rate: *min_rate,
165                    max_rate: *max_rate,
166                    error_field: error_field.clone(),
167                    error_value: error_value.clone(),
168                    latency_field: latency_field.clone(),
169                    latency_threshold_ms: *latency_threshold_ms,
170                    window: Duration::from_secs(*window_secs),
171                    sensitivity: *sensitivity,
172                    seed: None,
173                };
174                match AdaptiveSampler::new(config) {
175                    Ok(sampler) => {
176                        self.run_adaptive(sampler).await;
177                    }
178                    Err(e) => error!(
179                        "Sample {}: invalid adaptive config: {}",
180                        self.component_id, e
181                    ),
182                }
183            }
184            SamplingMode::Reservoir {
185                size,
186                flush_interval_secs,
187            } => {
188                let config = ReservoirConfig {
189                    size: *size,
190                    flush_interval_secs: *flush_interval_secs,
191                };
192                match ReservoirSampler::new(config) {
193                    Ok(sampler) => {
194                        self.run_reservoir(sampler).await;
195                    }
196                    Err(e) => error!(
197                        "Sample {}: invalid reservoir config: {}",
198                        self.component_id, e
199                    ),
200                }
201            }
202            SamplingMode::Tail {
203                group_key,
204                decision_window_secs,
205                max_groups,
206                default_decision,
207                rules,
208            } => {
209                let tail_rules: Vec<TailRule> = rules
210                    .iter()
211                    .map(|r| TailRule {
212                        field: r.field.clone(),
213                        operator: parse_operator(&r.operator),
214                        value: r.value.clone(),
215                        decision: if r.decision == "keep" {
216                            Decision::Keep
217                        } else {
218                            Decision::Drop
219                        },
220                    })
221                    .collect();
222                let config = TailConfig {
223                    group_key: group_key.clone(),
224                    decision_window_secs: *decision_window_secs,
225                    max_groups: *max_groups,
226                    default_decision: if default_decision == "keep" {
227                        Decision::Keep
228                    } else {
229                        Decision::Drop
230                    },
231                    rules: tail_rules,
232                };
233                match TailSampler::new(config) {
234                    Ok(sampler) => self.run_tail(sampler).await,
235                    Err(e) => error!("Sample {}: invalid tail config: {}", self.component_id, e),
236                }
237            }
238        }
239
240        info!("Sample transform task {} shutting down", self.component_id);
241    }
242
243    async fn run_random(&mut self, config: RandomConfig) {
244        let mut rng = build_rng(config.seed);
245        while let Some(mut batch) = self.receiver.recv().await {
246            let start = Instant::now();
247            let received_rows = batch.num_rows();
248            self.inc_received(received_rows, batch.estimated_size());
249
250            match kinetic_sampling::random::sample(
251                &batch.payload,
252                &config,
253                self.route_all,
254                &mut *rng,
255            ) {
256                Ok(result) => {
257                    self.emit_sample_rate(&config.rate.to_string());
258                    self.send_sampled(result.selected, received_rows, &mut batch)
259                        .await;
260                    if let Some(rejected) = result.rejected {
261                        self.send_passthrough(rejected).await;
262                    }
263                }
264                Err(e) => {
265                    self.handle_batch_error(format!("random sample error: {e}"), &mut batch)
266                        .await;
267                }
268            }
269            self.event_duration.emit(start.elapsed());
270        }
271    }
272
273    async fn run_systematic(&mut self, sampler: SystematicSampler) {
274        while let Some(mut batch) = self.receiver.recv().await {
275            let start = Instant::now();
276            let received_rows = batch.num_rows();
277            self.inc_received(received_rows, batch.estimated_size());
278            let rate = 1.0 / sampler.config.interval as f64;
279
280            match sampler.sample(&batch.payload, self.route_all) {
281                Ok(result) => {
282                    self.emit_sample_rate(&rate.to_string());
283                    self.send_sampled(result.selected, received_rows, &mut batch)
284                        .await;
285                    if let Some(rejected) = result.rejected {
286                        self.send_passthrough(rejected).await;
287                    }
288                }
289                Err(e) => {
290                    self.handle_batch_error(format!("systematic sample error: {e}"), &mut batch)
291                        .await;
292                }
293            }
294            self.event_duration.emit(start.elapsed());
295        }
296    }
297
298    async fn run_hash(&mut self, config: HashConfig) {
299        while let Some(mut batch) = self.receiver.recv().await {
300            let start = Instant::now();
301            let received_rows = batch.num_rows();
302            self.inc_received(received_rows, batch.estimated_size());
303
304            match kinetic_sampling::hash::sample(&batch.payload, &config, self.route_all) {
305                Ok(result) => {
306                    let rate = config.rate as f64 / 100.0;
307                    self.emit_sample_rate(&rate.to_string());
308                    self.send_sampled(result.selected, received_rows, &mut batch)
309                        .await;
310                    if let Some(rejected) = result.rejected {
311                        self.send_passthrough(rejected).await;
312                    }
313                }
314                Err(e) => {
315                    self.handle_batch_error(format!("hash sample error: {e}"), &mut batch)
316                        .await;
317                }
318            }
319            self.event_duration.emit(start.elapsed());
320        }
321    }
322
323    async fn run_recordbatch(&mut self, config: RecordBatchConfig) {
324        let mut rng = build_rng(config.seed);
325        while let Some(mut batch) = self.receiver.recv().await {
326            let start = Instant::now();
327            let received_rows = batch.num_rows();
328            self.inc_received(received_rows, batch.estimated_size());
329
330            let payload = batch.payload.clone();
331            match kinetic_sampling::recordbatch::sample(payload, &config, self.route_all, &mut *rng)
332            {
333                Ok(result) => {
334                    self.emit_sample_rate(&config.rate.to_string());
335                    self.send_sampled(result.selected, received_rows, &mut batch)
336                        .await;
337                    if let Some(rejected) = result.rejected {
338                        self.send_passthrough(rejected).await;
339                    }
340                }
341                Err(e) => {
342                    self.handle_batch_error(format!("recordbatch sample error: {e}"), &mut batch)
343                        .await;
344                }
345            }
346            self.event_duration.emit(start.elapsed());
347        }
348    }
349
350    async fn run_stratified(&mut self, config: StratifiedConfig, seed: Option<u64>) {
351        let mut rng = build_rng(seed);
352        while let Some(mut batch) = self.receiver.recv().await {
353            let start = Instant::now();
354            let received_rows = batch.num_rows();
355            self.inc_received(received_rows, batch.estimated_size());
356
357            match kinetic_sampling::stratified::sample(
358                &batch.payload,
359                &config,
360                self.route_all,
361                &mut *rng,
362            ) {
363                Ok(result) => {
364                    self.emit_sample_rate(&config.default_rate.to_string());
365                    self.send_sampled(result.selected, received_rows, &mut batch)
366                        .await;
367                    if let Some(rejected) = result.rejected {
368                        self.send_passthrough(rejected).await;
369                    }
370                }
371                Err(e) => {
372                    self.handle_batch_error(format!("stratified sample error: {e}"), &mut batch)
373                        .await;
374                }
375            }
376            self.event_duration.emit(start.elapsed());
377        }
378    }
379
380    async fn run_adaptive(&mut self, sampler: AdaptiveSampler) {
381        let mut rng = StdRng::from_entropy();
382        while let Some(mut batch) = self.receiver.recv().await {
383            let start = Instant::now();
384            let received_rows = batch.num_rows();
385            self.inc_received(received_rows, batch.estimated_size());
386
387            match sampler.sample(&batch.payload, self.route_all, &mut rng) {
388                Ok(result) => {
389                    let rate = sampler.current_rate();
390                    self.emit_sample_rate(&rate.to_string());
391                    self.send_sampled(result.selected, received_rows, &mut batch)
392                        .await;
393                    if let Some(rejected) = result.rejected {
394                        self.send_passthrough(rejected).await;
395                    }
396                }
397                Err(e) => {
398                    self.handle_batch_error(format!("adaptive sample error: {e}"), &mut batch)
399                        .await;
400                }
401            }
402            self.event_duration.emit(start.elapsed());
403        }
404    }
405
406    async fn run_reservoir(&mut self, mut sampler: ReservoirSampler) {
407        let mut rng = StdRng::from_entropy();
408        let flush_secs = sampler.config.flush_interval_secs;
409        let mut flush_ticker: Interval = time::interval(Duration::from_secs(flush_secs.max(1)));
410
411        loop {
412            tokio::select! {
413                maybe_batch = self.receiver.recv() => {
414                    match maybe_batch {
415                        Some(batch) => {
416                            let start = Instant::now();
417                            let received_rows = batch.num_rows();
418                            self.inc_received(received_rows, batch.estimated_size());
419                            sampler.observe(&batch.payload, &mut rng);
420                            let reservoir_len = sampler.len();
421                            gauge!("sample_reservoir_size", self.labels.iter())
422                                .set(reservoir_len as f64);
423                            self.event_duration.emit(start.elapsed());
424                        }
425                        None => break,
426                    }
427                }
428                _ = flush_ticker.tick() => {
429                    let start = Instant::now();
430                    self.flush_reservoir(&mut sampler).await;
431                    self.event_duration.emit(start.elapsed());
432                }
433            }
434        }
435        self.flush_reservoir(&mut sampler).await;
436    }
437
438    async fn flush_reservoir(&self, sampler: &mut ReservoirSampler) {
439        match sampler.flush() {
440            Ok(Some(batch)) => {
441                let rows = batch.num_rows();
442                if let Some(sender) = self.senders.get("sampled") {
443                    match EventBatch::new(batch, self.make_placeholder_metadata()) {
444                        Ok(eb) => {
445                            let bytes = eb.estimated_size();
446                            match sender.send(eb).await {
447                                Ok(_) => self.inc_sent(rows, bytes, "sampled"),
448                                Err(e) => error!(
449                                    "Sample {} reservoir flush send error: {:?}",
450                                    self.component_id, e
451                                ),
452                            }
453                        }
454                        Err(e) => error!(
455                            "Sample {} failed to create EventBatch: {}",
456                            self.component_id, e
457                        ),
458                    }
459                }
460            }
461            Ok(None) => {}
462            Err(e) => error!("Sample {} reservoir flush error: {}", self.component_id, e),
463        }
464    }
465
466    async fn run_tail(&mut self, mut sampler: TailSampler) {
467        let drain_interval = Duration::from_secs(1);
468        let mut drain_ticker = time::interval(drain_interval);
469
470        loop {
471            tokio::select! {
472                maybe_batch = self.receiver.recv() => {
473                    match maybe_batch {
474                        Some(batch) => {
475                            let start = Instant::now();
476                            let received_rows = batch.num_rows();
477                            self.inc_received(received_rows, batch.estimated_size());
478                            let evicted = sampler.observe(&batch.payload);
479                            for batch in evicted {
480                                let rows = batch.num_rows();
481                                self.emit_tail_gauges(&sampler);
482                                if let Some(sender) = self.senders.get("sampled") {
483                                    counter!("sample_tail_evictions_total", self.labels.iter()).increment(1);
484                                    self.send_arrow_batch_to(batch, rows, sender).await;
485                                }
486                            }
487                            self.event_duration.emit(start.elapsed());
488                        }
489                        None => break,
490                    }
491                }
492                _ = drain_ticker.tick() => {
493                    let start = Instant::now();
494                    let (kept, _dropped) = sampler.drain_expired();
495                    for batch in kept {
496                        let rows = batch.num_rows();
497                        self.emit_tail_gauges(&sampler);
498                        if let Some(sender) = self.senders.get("sampled") {
499                            self.send_arrow_batch_to(batch, rows, sender).await;
500                        }
501                    }
502                    self.event_duration.emit(start.elapsed());
503                }
504            }
505        }
506        let (kept, _) = sampler.flush_all();
507        for batch in kept {
508            let rows = batch.num_rows();
509            if let Some(sender) = self.senders.get("sampled") {
510                self.send_arrow_batch_to(batch, rows, sender).await;
511            }
512        }
513    }
514
515    fn emit_tail_gauges(&self, sampler: &TailSampler) {
516        gauge!("sample_tail_buffer_groups", self.labels.iter()).set(sampler.group_count() as f64);
517        gauge!("sample_tail_buffer_events", self.labels.iter())
518            .set(sampler.buffered_event_count() as f64);
519    }
520
521    async fn send_sampled(
522        &self,
523        payload: arrow_array::RecordBatch,
524        received_rows: usize,
525        original: &mut EventBatch,
526    ) {
527        let sent_rows = payload.num_rows();
528        let dropped_rows = received_rows.saturating_sub(sent_rows);
529
530        if dropped_rows > 0 {
531            counter!("component_discarded_events_total", self.labels.iter())
532                .increment(dropped_rows as u64);
533            let drop_labels = [
534                Label::new("component_id", self.component_id.clone()),
535                Label::new("component_type", "transform"),
536                Label::new("component_kind", "sample"),
537                Label::new("decision", "dropped"),
538            ];
539            counter!("sample_decisions_total", drop_labels.iter()).increment(dropped_rows as u64);
540        }
541
542        if sent_rows == 0 {
543            if let Some(token) = original.ack_token.take() {
544                token.ack();
545            }
546            return;
547        }
548
549        let keep_labels = [
550            Label::new("component_id", self.component_id.clone()),
551            Label::new("component_type", "transform"),
552            Label::new("component_kind", "sample"),
553            Label::new("decision", "kept"),
554        ];
555        counter!("sample_decisions_total", keep_labels.iter()).increment(sent_rows as u64);
556
557        let new_batch = match EventBatch::new(payload, original.metadata.clone()) {
558            Ok(mut b) => {
559                b.ack_token = original.ack_token.take();
560                b
561            }
562            Err(e) => {
563                error!(
564                    "Sample {} failed to create EventBatch: {}",
565                    self.component_id, e
566                );
567                return;
568            }
569        };
570
571        if let Some(sender) = self.senders.get("sampled") {
572            let bytes = new_batch.estimated_size();
573            match sender.send(new_batch).await {
574                Ok(_) => self.inc_sent(sent_rows, bytes, "sampled"),
575                Err(e) => error!(
576                    "Sample {} failed to send to sampled output: {:?}",
577                    self.component_id, e
578                ),
579            }
580        } else {
581            debug!(
582                "Sample {}: no 'sampled' sender wired — events dropped",
583                self.component_id
584            );
585        }
586    }
587
588    async fn send_passthrough(&self, payload: arrow_array::RecordBatch) {
589        let rows = payload.num_rows();
590        if rows == 0 {
591            return;
592        }
593        if let Some(sender) = self.senders.get("passthrough")
594            && let Ok(batch) = EventBatch::new(payload, self.make_placeholder_metadata())
595        {
596            let bytes = batch.estimated_size();
597            match sender.send(batch).await {
598                Ok(_) => self.inc_sent(rows, bytes, "passthrough"),
599                Err(e) => warn!(
600                    "Sample {} failed to send to passthrough output: {:?}",
601                    self.component_id, e
602                ),
603            }
604        }
605    }
606
607    async fn send_arrow_batch_to(
608        &self,
609        payload: arrow_array::RecordBatch,
610        rows: usize,
611        sender: &BufferSender,
612    ) {
613        if let Ok(eb) = EventBatch::new(payload, self.make_placeholder_metadata()) {
614            let bytes = eb.estimated_size();
615            match sender.send(eb).await {
616                Ok(_) => self.inc_sent(rows, bytes, "sampled"),
617                Err(e) => error!("Sample {} send error: {:?}", self.component_id, e),
618            }
619        }
620    }
621
622    async fn handle_batch_error(&self, message: String, batch: &mut EventBatch) {
623        counter!("component_errors_total", self.labels.iter()).increment(1);
624        self.error_handler
625            .handle_error(&self.senders, message, Some(batch))
626            .await;
627    }
628
629    fn inc_received(&self, rows: usize, bytes: usize) {
630        counter!("component_received_events_total", self.labels.iter()).increment(rows as u64);
631        counter!("component_received_event_bytes_total", self.labels.iter())
632            .increment(bytes as u64);
633    }
634
635    fn inc_sent(&self, rows: usize, bytes: usize, output: &str) {
636        let labels = [
637            Label::new("component_id", self.component_id.clone()),
638            Label::new("component_type", "transform"),
639            Label::new("component_kind", "sample"),
640            Label::new("output", output.to_string()),
641        ];
642        counter!("component_sent_events_total", labels.iter()).increment(rows as u64);
643        counter!("component_sent_event_bytes_total", labels.iter()).increment(bytes as u64);
644    }
645
646    fn emit_sample_rate(&self, rate: &str) {
647        if let Ok(r) = rate.parse::<f64>() {
648            gauge!("sample_rate", self.labels.iter()).set(r);
649        }
650    }
651
652    fn make_placeholder_metadata(&self) -> kinetic_core::ArcEventMetadata {
653        use kinetic_core::{ComponentId, EventMetadata};
654        Arc::new(EventMetadata::new(
655            "unknown",
656            ComponentId(self.component_id.clone()),
657        ))
658    }
659}
660
661fn parse_operator(s: &str) -> Operator {
662    match s {
663        "==" => Operator::Eq,
664        "!=" => Operator::Ne,
665        ">" => Operator::Gt,
666        ">=" => Operator::Gte,
667        "<" => Operator::Lt,
668        "<=" => Operator::Lte,
669        "contains" => Operator::Contains,
670        other => {
671            warn!("Unknown tail rule operator '{}', defaulting to ==", other);
672            Operator::Eq
673        }
674    }
675}