1use crate::transforms::Transform;
8use crate::transforms::util::TransformErrorHandler;
9use async_trait::async_trait;
10use kinetic_buffers::{BufferReceiver, BufferSender};
11use kinetic_config::model::{ErrorPolicy, SamplingMode};
12use kinetic_core::EventBatch;
13use kinetic_sampling::{
14 adaptive::{AdaptiveConfig, AdaptiveSampler},
15 hash::{HashConfig, MissingKeyBehaviour},
16 random::{RandomConfig, build_rng},
17 recordbatch::RecordBatchConfig,
18 reservoir::{ReservoirConfig, ReservoirSampler},
19 stratified::{MissingFieldBehaviour, StratifiedConfig},
20 systematic::{SystematicConfig, SystematicSampler},
21 tail::{Decision, Operator, TailConfig, TailRule, TailSampler},
22};
23use metrics::{Label, counter, gauge};
24use rand::SeedableRng;
25use rand::rngs::StdRng;
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::time::Duration;
29use tokio::time::{self, Interval};
30use tracing::{debug, error, info, warn};
31
32use kinetic_common::register;
33use kinetic_common::telemetry::EventDuration;
34use std::time::Instant;
35
36pub struct SampleTask {
38 receiver: BufferReceiver,
39 senders: HashMap<String, BufferSender>,
40 mode: SamplingMode,
41 route_all: bool,
42 component_id: String,
43 labels: Arc<[Label]>,
44 error_handler: TransformErrorHandler,
45 event_duration: EventDuration,
46}
47
48#[async_trait]
49impl Transform for SampleTask {
50 async fn run(self: Box<Self>) {
51 self.run_task().await;
52 }
53}
54
55impl SampleTask {
56 pub fn new(
57 component_id: String,
58 receiver: BufferReceiver,
59 senders: HashMap<String, BufferSender>,
60 mode: SamplingMode,
61 error_policy: ErrorPolicy,
62 route_all: bool,
63 ) -> Self {
64 let labels: Arc<[Label]> = Arc::new([
65 Label::new("component_id", component_id.clone()),
66 Label::new("component_type", "transform"),
67 Label::new("component_kind", "sample"),
68 ]);
69 let error_handler = TransformErrorHandler::new(component_id.clone(), error_policy);
70 let event_duration = register!(EventDuration::new(component_id.clone(), "transform"));
71
72 Self {
73 receiver,
74 senders,
75 mode,
76 route_all,
77 component_id,
78 labels,
79 error_handler,
80 event_duration,
81 }
82 }
83
84 pub async fn run_task(mut self) {
85 info!("Starting Sample transform task: {}", self.component_id);
86
87 match &self.mode {
88 SamplingMode::Random { rate, seed } => {
89 let config = RandomConfig {
90 rate: *rate,
91 seed: *seed,
92 };
93 self.run_random(config).await;
94 }
95 SamplingMode::Systematic { interval } => {
96 match SystematicSampler::new(SystematicConfig {
97 interval: *interval,
98 }) {
99 Ok(sampler) => self.run_systematic(sampler).await,
100 Err(e) => error!(
101 "Sample {}: invalid systematic config: {}",
102 self.component_id, e
103 ),
104 }
105 }
106 SamplingMode::Hash {
107 key,
108 rate,
109 missing_key_behaviour,
110 } => {
111 let behaviour = if missing_key_behaviour == "keep" {
112 MissingKeyBehaviour::Keep
113 } else {
114 MissingKeyBehaviour::Drop
115 };
116 let config = HashConfig {
117 key: key.clone(),
118 rate: *rate,
119 missing_key_behaviour: behaviour,
120 };
121 self.run_hash(config).await;
122 }
123 SamplingMode::Recordbatch { rate, seed } => {
124 let config = RecordBatchConfig {
125 rate: *rate,
126 seed: *seed,
127 };
128 self.run_recordbatch(config).await;
129 }
130 SamplingMode::Stratified {
131 field,
132 strata,
133 default_rate,
134 missing_field_behaviour,
135 seed,
136 } => {
137 let behaviour = if missing_field_behaviour == "drop" {
138 MissingFieldBehaviour::Drop
139 } else {
140 MissingFieldBehaviour::Keep
141 };
142 let config = StratifiedConfig {
143 field: field.clone(),
144 strata: strata.clone(),
145 default_rate: *default_rate,
146 missing_field_behaviour: behaviour,
147 seed: *seed,
148 };
149 self.run_stratified(config, *seed).await;
150 }
151 SamplingMode::Adaptive {
152 base_rate,
153 min_rate,
154 max_rate,
155 error_field,
156 error_value,
157 latency_field,
158 latency_threshold_ms,
159 window_secs,
160 sensitivity,
161 } => {
162 let config = AdaptiveConfig {
163 base_rate: *base_rate,
164 min_rate: *min_rate,
165 max_rate: *max_rate,
166 error_field: error_field.clone(),
167 error_value: error_value.clone(),
168 latency_field: latency_field.clone(),
169 latency_threshold_ms: *latency_threshold_ms,
170 window: Duration::from_secs(*window_secs),
171 sensitivity: *sensitivity,
172 seed: None,
173 };
174 match AdaptiveSampler::new(config) {
175 Ok(sampler) => {
176 self.run_adaptive(sampler).await;
177 }
178 Err(e) => error!(
179 "Sample {}: invalid adaptive config: {}",
180 self.component_id, e
181 ),
182 }
183 }
184 SamplingMode::Reservoir {
185 size,
186 flush_interval_secs,
187 } => {
188 let config = ReservoirConfig {
189 size: *size,
190 flush_interval_secs: *flush_interval_secs,
191 };
192 match ReservoirSampler::new(config) {
193 Ok(sampler) => {
194 self.run_reservoir(sampler).await;
195 }
196 Err(e) => error!(
197 "Sample {}: invalid reservoir config: {}",
198 self.component_id, e
199 ),
200 }
201 }
202 SamplingMode::Tail {
203 group_key,
204 decision_window_secs,
205 max_groups,
206 default_decision,
207 rules,
208 } => {
209 let tail_rules: Vec<TailRule> = rules
210 .iter()
211 .map(|r| TailRule {
212 field: r.field.clone(),
213 operator: parse_operator(&r.operator),
214 value: r.value.clone(),
215 decision: if r.decision == "keep" {
216 Decision::Keep
217 } else {
218 Decision::Drop
219 },
220 })
221 .collect();
222 let config = TailConfig {
223 group_key: group_key.clone(),
224 decision_window_secs: *decision_window_secs,
225 max_groups: *max_groups,
226 default_decision: if default_decision == "keep" {
227 Decision::Keep
228 } else {
229 Decision::Drop
230 },
231 rules: tail_rules,
232 };
233 match TailSampler::new(config) {
234 Ok(sampler) => self.run_tail(sampler).await,
235 Err(e) => error!("Sample {}: invalid tail config: {}", self.component_id, e),
236 }
237 }
238 }
239
240 info!("Sample transform task {} shutting down", self.component_id);
241 }
242
243 async fn run_random(&mut self, config: RandomConfig) {
244 let mut rng = build_rng(config.seed);
245 while let Some(mut batch) = self.receiver.recv().await {
246 let start = Instant::now();
247 let received_rows = batch.num_rows();
248 self.inc_received(received_rows, batch.estimated_size());
249
250 match kinetic_sampling::random::sample(
251 &batch.payload,
252 &config,
253 self.route_all,
254 &mut *rng,
255 ) {
256 Ok(result) => {
257 self.emit_sample_rate(&config.rate.to_string());
258 self.send_sampled(result.selected, received_rows, &mut batch)
259 .await;
260 if let Some(rejected) = result.rejected {
261 self.send_passthrough(rejected).await;
262 }
263 }
264 Err(e) => {
265 self.handle_batch_error(format!("random sample error: {e}"), &mut batch)
266 .await;
267 }
268 }
269 self.event_duration.emit(start.elapsed());
270 }
271 }
272
273 async fn run_systematic(&mut self, sampler: SystematicSampler) {
274 while let Some(mut batch) = self.receiver.recv().await {
275 let start = Instant::now();
276 let received_rows = batch.num_rows();
277 self.inc_received(received_rows, batch.estimated_size());
278 let rate = 1.0 / sampler.config.interval as f64;
279
280 match sampler.sample(&batch.payload, self.route_all) {
281 Ok(result) => {
282 self.emit_sample_rate(&rate.to_string());
283 self.send_sampled(result.selected, received_rows, &mut batch)
284 .await;
285 if let Some(rejected) = result.rejected {
286 self.send_passthrough(rejected).await;
287 }
288 }
289 Err(e) => {
290 self.handle_batch_error(format!("systematic sample error: {e}"), &mut batch)
291 .await;
292 }
293 }
294 self.event_duration.emit(start.elapsed());
295 }
296 }
297
298 async fn run_hash(&mut self, config: HashConfig) {
299 while let Some(mut batch) = self.receiver.recv().await {
300 let start = Instant::now();
301 let received_rows = batch.num_rows();
302 self.inc_received(received_rows, batch.estimated_size());
303
304 match kinetic_sampling::hash::sample(&batch.payload, &config, self.route_all) {
305 Ok(result) => {
306 let rate = config.rate as f64 / 100.0;
307 self.emit_sample_rate(&rate.to_string());
308 self.send_sampled(result.selected, received_rows, &mut batch)
309 .await;
310 if let Some(rejected) = result.rejected {
311 self.send_passthrough(rejected).await;
312 }
313 }
314 Err(e) => {
315 self.handle_batch_error(format!("hash sample error: {e}"), &mut batch)
316 .await;
317 }
318 }
319 self.event_duration.emit(start.elapsed());
320 }
321 }
322
323 async fn run_recordbatch(&mut self, config: RecordBatchConfig) {
324 let mut rng = build_rng(config.seed);
325 while let Some(mut batch) = self.receiver.recv().await {
326 let start = Instant::now();
327 let received_rows = batch.num_rows();
328 self.inc_received(received_rows, batch.estimated_size());
329
330 let payload = batch.payload.clone();
331 match kinetic_sampling::recordbatch::sample(payload, &config, self.route_all, &mut *rng)
332 {
333 Ok(result) => {
334 self.emit_sample_rate(&config.rate.to_string());
335 self.send_sampled(result.selected, received_rows, &mut batch)
336 .await;
337 if let Some(rejected) = result.rejected {
338 self.send_passthrough(rejected).await;
339 }
340 }
341 Err(e) => {
342 self.handle_batch_error(format!("recordbatch sample error: {e}"), &mut batch)
343 .await;
344 }
345 }
346 self.event_duration.emit(start.elapsed());
347 }
348 }
349
350 async fn run_stratified(&mut self, config: StratifiedConfig, seed: Option<u64>) {
351 let mut rng = build_rng(seed);
352 while let Some(mut batch) = self.receiver.recv().await {
353 let start = Instant::now();
354 let received_rows = batch.num_rows();
355 self.inc_received(received_rows, batch.estimated_size());
356
357 match kinetic_sampling::stratified::sample(
358 &batch.payload,
359 &config,
360 self.route_all,
361 &mut *rng,
362 ) {
363 Ok(result) => {
364 self.emit_sample_rate(&config.default_rate.to_string());
365 self.send_sampled(result.selected, received_rows, &mut batch)
366 .await;
367 if let Some(rejected) = result.rejected {
368 self.send_passthrough(rejected).await;
369 }
370 }
371 Err(e) => {
372 self.handle_batch_error(format!("stratified sample error: {e}"), &mut batch)
373 .await;
374 }
375 }
376 self.event_duration.emit(start.elapsed());
377 }
378 }
379
380 async fn run_adaptive(&mut self, sampler: AdaptiveSampler) {
381 let mut rng = StdRng::from_entropy();
382 while let Some(mut batch) = self.receiver.recv().await {
383 let start = Instant::now();
384 let received_rows = batch.num_rows();
385 self.inc_received(received_rows, batch.estimated_size());
386
387 match sampler.sample(&batch.payload, self.route_all, &mut rng) {
388 Ok(result) => {
389 let rate = sampler.current_rate();
390 self.emit_sample_rate(&rate.to_string());
391 self.send_sampled(result.selected, received_rows, &mut batch)
392 .await;
393 if let Some(rejected) = result.rejected {
394 self.send_passthrough(rejected).await;
395 }
396 }
397 Err(e) => {
398 self.handle_batch_error(format!("adaptive sample error: {e}"), &mut batch)
399 .await;
400 }
401 }
402 self.event_duration.emit(start.elapsed());
403 }
404 }
405
406 async fn run_reservoir(&mut self, mut sampler: ReservoirSampler) {
407 let mut rng = StdRng::from_entropy();
408 let flush_secs = sampler.config.flush_interval_secs;
409 let mut flush_ticker: Interval = time::interval(Duration::from_secs(flush_secs.max(1)));
410
411 loop {
412 tokio::select! {
413 maybe_batch = self.receiver.recv() => {
414 match maybe_batch {
415 Some(batch) => {
416 let start = Instant::now();
417 let received_rows = batch.num_rows();
418 self.inc_received(received_rows, batch.estimated_size());
419 sampler.observe(&batch.payload, &mut rng);
420 let reservoir_len = sampler.len();
421 gauge!("sample_reservoir_size", self.labels.iter())
422 .set(reservoir_len as f64);
423 self.event_duration.emit(start.elapsed());
424 }
425 None => break,
426 }
427 }
428 _ = flush_ticker.tick() => {
429 let start = Instant::now();
430 self.flush_reservoir(&mut sampler).await;
431 self.event_duration.emit(start.elapsed());
432 }
433 }
434 }
435 self.flush_reservoir(&mut sampler).await;
436 }
437
438 async fn flush_reservoir(&self, sampler: &mut ReservoirSampler) {
439 match sampler.flush() {
440 Ok(Some(batch)) => {
441 let rows = batch.num_rows();
442 if let Some(sender) = self.senders.get("sampled") {
443 match EventBatch::new(batch, self.make_placeholder_metadata()) {
444 Ok(eb) => {
445 let bytes = eb.estimated_size();
446 match sender.send(eb).await {
447 Ok(_) => self.inc_sent(rows, bytes, "sampled"),
448 Err(e) => error!(
449 "Sample {} reservoir flush send error: {:?}",
450 self.component_id, e
451 ),
452 }
453 }
454 Err(e) => error!(
455 "Sample {} failed to create EventBatch: {}",
456 self.component_id, e
457 ),
458 }
459 }
460 }
461 Ok(None) => {}
462 Err(e) => error!("Sample {} reservoir flush error: {}", self.component_id, e),
463 }
464 }
465
466 async fn run_tail(&mut self, mut sampler: TailSampler) {
467 let drain_interval = Duration::from_secs(1);
468 let mut drain_ticker = time::interval(drain_interval);
469
470 loop {
471 tokio::select! {
472 maybe_batch = self.receiver.recv() => {
473 match maybe_batch {
474 Some(batch) => {
475 let start = Instant::now();
476 let received_rows = batch.num_rows();
477 self.inc_received(received_rows, batch.estimated_size());
478 let evicted = sampler.observe(&batch.payload);
479 for batch in evicted {
480 let rows = batch.num_rows();
481 self.emit_tail_gauges(&sampler);
482 if let Some(sender) = self.senders.get("sampled") {
483 counter!("sample_tail_evictions_total", self.labels.iter()).increment(1);
484 self.send_arrow_batch_to(batch, rows, sender).await;
485 }
486 }
487 self.event_duration.emit(start.elapsed());
488 }
489 None => break,
490 }
491 }
492 _ = drain_ticker.tick() => {
493 let start = Instant::now();
494 let (kept, _dropped) = sampler.drain_expired();
495 for batch in kept {
496 let rows = batch.num_rows();
497 self.emit_tail_gauges(&sampler);
498 if let Some(sender) = self.senders.get("sampled") {
499 self.send_arrow_batch_to(batch, rows, sender).await;
500 }
501 }
502 self.event_duration.emit(start.elapsed());
503 }
504 }
505 }
506 let (kept, _) = sampler.flush_all();
507 for batch in kept {
508 let rows = batch.num_rows();
509 if let Some(sender) = self.senders.get("sampled") {
510 self.send_arrow_batch_to(batch, rows, sender).await;
511 }
512 }
513 }
514
515 fn emit_tail_gauges(&self, sampler: &TailSampler) {
516 gauge!("sample_tail_buffer_groups", self.labels.iter()).set(sampler.group_count() as f64);
517 gauge!("sample_tail_buffer_events", self.labels.iter())
518 .set(sampler.buffered_event_count() as f64);
519 }
520
521 async fn send_sampled(
522 &self,
523 payload: arrow_array::RecordBatch,
524 received_rows: usize,
525 original: &mut EventBatch,
526 ) {
527 let sent_rows = payload.num_rows();
528 let dropped_rows = received_rows.saturating_sub(sent_rows);
529
530 if dropped_rows > 0 {
531 counter!("component_discarded_events_total", self.labels.iter())
532 .increment(dropped_rows as u64);
533 let drop_labels = [
534 Label::new("component_id", self.component_id.clone()),
535 Label::new("component_type", "transform"),
536 Label::new("component_kind", "sample"),
537 Label::new("decision", "dropped"),
538 ];
539 counter!("sample_decisions_total", drop_labels.iter()).increment(dropped_rows as u64);
540 }
541
542 if sent_rows == 0 {
543 if let Some(token) = original.ack_token.take() {
544 token.ack();
545 }
546 return;
547 }
548
549 let keep_labels = [
550 Label::new("component_id", self.component_id.clone()),
551 Label::new("component_type", "transform"),
552 Label::new("component_kind", "sample"),
553 Label::new("decision", "kept"),
554 ];
555 counter!("sample_decisions_total", keep_labels.iter()).increment(sent_rows as u64);
556
557 let new_batch = match EventBatch::new(payload, original.metadata.clone()) {
558 Ok(mut b) => {
559 b.ack_token = original.ack_token.take();
560 b
561 }
562 Err(e) => {
563 error!(
564 "Sample {} failed to create EventBatch: {}",
565 self.component_id, e
566 );
567 return;
568 }
569 };
570
571 if let Some(sender) = self.senders.get("sampled") {
572 let bytes = new_batch.estimated_size();
573 match sender.send(new_batch).await {
574 Ok(_) => self.inc_sent(sent_rows, bytes, "sampled"),
575 Err(e) => error!(
576 "Sample {} failed to send to sampled output: {:?}",
577 self.component_id, e
578 ),
579 }
580 } else {
581 debug!(
582 "Sample {}: no 'sampled' sender wired — events dropped",
583 self.component_id
584 );
585 }
586 }
587
588 async fn send_passthrough(&self, payload: arrow_array::RecordBatch) {
589 let rows = payload.num_rows();
590 if rows == 0 {
591 return;
592 }
593 if let Some(sender) = self.senders.get("passthrough")
594 && let Ok(batch) = EventBatch::new(payload, self.make_placeholder_metadata())
595 {
596 let bytes = batch.estimated_size();
597 match sender.send(batch).await {
598 Ok(_) => self.inc_sent(rows, bytes, "passthrough"),
599 Err(e) => warn!(
600 "Sample {} failed to send to passthrough output: {:?}",
601 self.component_id, e
602 ),
603 }
604 }
605 }
606
607 async fn send_arrow_batch_to(
608 &self,
609 payload: arrow_array::RecordBatch,
610 rows: usize,
611 sender: &BufferSender,
612 ) {
613 if let Ok(eb) = EventBatch::new(payload, self.make_placeholder_metadata()) {
614 let bytes = eb.estimated_size();
615 match sender.send(eb).await {
616 Ok(_) => self.inc_sent(rows, bytes, "sampled"),
617 Err(e) => error!("Sample {} send error: {:?}", self.component_id, e),
618 }
619 }
620 }
621
622 async fn handle_batch_error(&self, message: String, batch: &mut EventBatch) {
623 counter!("component_errors_total", self.labels.iter()).increment(1);
624 self.error_handler
625 .handle_error(&self.senders, message, Some(batch))
626 .await;
627 }
628
629 fn inc_received(&self, rows: usize, bytes: usize) {
630 counter!("component_received_events_total", self.labels.iter()).increment(rows as u64);
631 counter!("component_received_event_bytes_total", self.labels.iter())
632 .increment(bytes as u64);
633 }
634
635 fn inc_sent(&self, rows: usize, bytes: usize, output: &str) {
636 let labels = [
637 Label::new("component_id", self.component_id.clone()),
638 Label::new("component_type", "transform"),
639 Label::new("component_kind", "sample"),
640 Label::new("output", output.to_string()),
641 ];
642 counter!("component_sent_events_total", labels.iter()).increment(rows as u64);
643 counter!("component_sent_event_bytes_total", labels.iter()).increment(bytes as u64);
644 }
645
646 fn emit_sample_rate(&self, rate: &str) {
647 if let Ok(r) = rate.parse::<f64>() {
648 gauge!("sample_rate", self.labels.iter()).set(r);
649 }
650 }
651
652 fn make_placeholder_metadata(&self) -> kinetic_core::ArcEventMetadata {
653 use kinetic_core::{ComponentId, EventMetadata};
654 Arc::new(EventMetadata::new(
655 "unknown",
656 ComponentId(self.component_id.clone()),
657 ))
658 }
659}
660
661fn parse_operator(s: &str) -> Operator {
662 match s {
663 "==" => Operator::Eq,
664 "!=" => Operator::Ne,
665 ">" => Operator::Gt,
666 ">=" => Operator::Gte,
667 "<" => Operator::Lt,
668 "<=" => Operator::Lte,
669 "contains" => Operator::Contains,
670 other => {
671 warn!("Unknown tail rule operator '{}', defaulting to ==", other);
672 Operator::Eq
673 }
674 }
675}