Skip to main content

kinetic/sources/internal/
logs.rs

1//! Internal logs source that hooks into the tracing subscriber to emit own logs as events.
2
3use arrow_array::{RecordBatch, StringArray};
4use arrow_schema::{DataType, Field, Schema};
5use kinetic_buffers::BufferSender;
6use kinetic_core::{ComponentId, EventBatch, EventMetadata, ShutdownSignal};
7use std::sync::{Arc, Mutex};
8use tokio::sync::mpsc;
9use tracing::{error, info, subscriber::Subscriber};
10use tracing_subscriber::Layer;
11
12use metrics::{Label, counter};
13use std::sync::LazyLock;
14
15pub struct InternalLogsTask {
16    sender: BufferSender,
17    #[allow(dead_code)]
18    error_sender: BufferSender,
19    component_id: String,
20    pipeline_id: String,
21    log_rx: mpsc::Receiver<String>,
22    shutdown: ShutdownSignal,
23    labels: Arc<[Label]>,
24}
25
26/// A tracing Layer that captures log messages and sends them to a channel.
27pub struct LogCaptureLayer {
28    tx: mpsc::Sender<String>,
29}
30
31impl LogCaptureLayer {
32    pub fn new(tx: mpsc::Sender<String>) -> Self {
33        Self { tx }
34    }
35}
36
37impl<S> Layer<S> for LogCaptureLayer
38where
39    S: Subscriber,
40{
41    fn on_event(
42        &self,
43        event: &tracing::Event<'_>,
44        _ctx: tracing_subscriber::layer::Context<'_, S>,
45    ) {
46        let mut visitor = MessageVisitor::default();
47        event.record(&mut visitor);
48        if let Some(msg) = visitor.message {
49            // We use try_send to avoid blocking the logging thread if the source is slow.
50            // If the channel is full, logs are dropped (which is better than deadlocking).
51            let _ = self.tx.try_send(msg);
52        }
53    }
54}
55
56#[derive(Default)]
57struct MessageVisitor {
58    message: Option<String>,
59}
60
61impl tracing::field::Visit for MessageVisitor {
62    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
63        if field.name() == "message" {
64            self.message = Some(format!("{:?}", value));
65        }
66    }
67
68    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
69        if field.name() == "message" {
70            self.message = Some(value.to_string());
71        }
72    }
73}
74
75static LOG_SENDER: LazyLock<mpsc::Sender<String>> = LazyLock::new(|| {
76    let (tx, rx) = mpsc::channel(1024);
77    if let Ok(mut guard) = LOG_RECEIVER.lock() {
78        *guard = Some(rx);
79    }
80    tx
81});
82
83static LOG_RECEIVER: LazyLock<Mutex<Option<mpsc::Receiver<String>>>> =
84    LazyLock::new(|| Mutex::new(None));
85
86/// Returns a Sender that can be used to feed logs into the InternalLogs source.
87pub fn log_sender() -> mpsc::Sender<String> {
88    LOG_SENDER.clone()
89}
90
91/// Returns the Receiver for the InternalLogs source.
92/// Note: This can only be called once as it moves the receiver.
93pub fn take_log_receiver() -> Option<mpsc::Receiver<String>> {
94    // Ensure sender is initialized so receiver is created
95    let _ = &*LOG_SENDER;
96    LOG_RECEIVER.lock().ok()?.take()
97}
98
99impl InternalLogsTask {
100    pub fn new(
101        component_id: String,
102        pipeline_id: String,
103        sender: BufferSender,
104        error_sender: BufferSender,
105        log_rx: mpsc::Receiver<String>,
106        shutdown: ShutdownSignal,
107    ) -> Self {
108        let labels: Arc<[Label]> = Arc::new([
109            Label::new("component_id", component_id.clone()),
110            Label::new("component_type", "source"),
111            Label::new("component_kind", "internal_logs"),
112        ]);
113
114        Self {
115            sender,
116            error_sender,
117            component_id,
118            pipeline_id,
119            log_rx,
120            shutdown,
121            labels,
122        }
123    }
124
125    pub async fn run(mut self) {
126        info!("Starting internal_logs source: {}", self.component_id);
127
128        let metadata = Arc::new(EventMetadata::new(
129            self.pipeline_id.clone(),
130            ComponentId(self.component_id.clone()),
131        ));
132
133        let schema = Arc::new(Schema::new(vec![Field::new(
134            "message",
135            DataType::Utf8,
136            false,
137        )]));
138
139        loop {
140            tokio::select! {
141                _ = self.shutdown.recv() => {
142                    info!("internal_logs source '{}' received shutdown signal", self.component_id);
143                    break;
144                }
145                log_msg_opt = self.log_rx.recv() => {
146                    match log_msg_opt {
147                        Some(log_msg) => {
148                            let messages = StringArray::from(vec![log_msg]);
149
150                            let record_batch = match RecordBatch::try_new(schema.clone(), vec![Arc::new(messages)])
151                            {
152                                Ok(rb) => rb,
153                                Err(e) => {
154                                    error!("Failed to create RecordBatch for internal logs: {}", e);
155                                    continue;
156                                }
157                            };
158
159                            match EventBatch::new_with_xid(record_batch, metadata.clone()) {
160                                Ok(batch) => {
161                                    let rows = batch.num_rows();
162                                    let bytes = batch.estimated_size();
163                                    counter!("component_received_events_total", self.labels.iter()).increment(rows as u64);
164                                    counter!("component_received_event_bytes_total", self.labels.iter()).increment(bytes as u64);
165
166                                    if let Err(e) = self.sender.send(batch).await {
167                                        error!("Failed to send internal logs downstream: {:?}", e);
168                                    } else {
169                                        counter!("component_sent_events_total", self.labels.iter()).increment(rows as u64);
170                                        counter!("component_sent_event_bytes_total", self.labels.iter()).increment(bytes as u64);
171                                    }
172                                }
173                                Err(e) => {
174                                    error!("Failed to create EventBatch for internal logs: {}", e);
175                                }
176                            }
177                        }
178                        None => {
179                            break;
180                        }
181                    }
182                }
183            }
184        }
185    }
186}