kinetic/sources/internal/
logs.rs1use arrow_array::{RecordBatch, StringArray};
4use arrow_schema::{DataType, Field, Schema};
5use kinetic_buffers::BufferSender;
6use kinetic_core::{ComponentId, EventBatch, EventMetadata, ShutdownSignal};
7use std::sync::{Arc, Mutex};
8use tokio::sync::mpsc;
9use tracing::{error, info, subscriber::Subscriber};
10use tracing_subscriber::Layer;
11
12use metrics::{Label, counter};
13use std::sync::LazyLock;
14
15pub struct InternalLogsTask {
16 sender: BufferSender,
17 #[allow(dead_code)]
18 error_sender: BufferSender,
19 component_id: String,
20 pipeline_id: String,
21 log_rx: mpsc::Receiver<String>,
22 shutdown: ShutdownSignal,
23 labels: Arc<[Label]>,
24}
25
26pub struct LogCaptureLayer {
28 tx: mpsc::Sender<String>,
29}
30
31impl LogCaptureLayer {
32 pub fn new(tx: mpsc::Sender<String>) -> Self {
33 Self { tx }
34 }
35}
36
37impl<S> Layer<S> for LogCaptureLayer
38where
39 S: Subscriber,
40{
41 fn on_event(
42 &self,
43 event: &tracing::Event<'_>,
44 _ctx: tracing_subscriber::layer::Context<'_, S>,
45 ) {
46 let mut visitor = MessageVisitor::default();
47 event.record(&mut visitor);
48 if let Some(msg) = visitor.message {
49 let _ = self.tx.try_send(msg);
52 }
53 }
54}
55
56#[derive(Default)]
57struct MessageVisitor {
58 message: Option<String>,
59}
60
61impl tracing::field::Visit for MessageVisitor {
62 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
63 if field.name() == "message" {
64 self.message = Some(format!("{:?}", value));
65 }
66 }
67
68 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
69 if field.name() == "message" {
70 self.message = Some(value.to_string());
71 }
72 }
73}
74
75static LOG_SENDER: LazyLock<mpsc::Sender<String>> = LazyLock::new(|| {
76 let (tx, rx) = mpsc::channel(1024);
77 if let Ok(mut guard) = LOG_RECEIVER.lock() {
78 *guard = Some(rx);
79 }
80 tx
81});
82
83static LOG_RECEIVER: LazyLock<Mutex<Option<mpsc::Receiver<String>>>> =
84 LazyLock::new(|| Mutex::new(None));
85
86pub fn log_sender() -> mpsc::Sender<String> {
88 LOG_SENDER.clone()
89}
90
91pub fn take_log_receiver() -> Option<mpsc::Receiver<String>> {
94 let _ = &*LOG_SENDER;
96 LOG_RECEIVER.lock().ok()?.take()
97}
98
99impl InternalLogsTask {
100 pub fn new(
101 component_id: String,
102 pipeline_id: String,
103 sender: BufferSender,
104 error_sender: BufferSender,
105 log_rx: mpsc::Receiver<String>,
106 shutdown: ShutdownSignal,
107 ) -> Self {
108 let labels: Arc<[Label]> = Arc::new([
109 Label::new("component_id", component_id.clone()),
110 Label::new("component_type", "source"),
111 Label::new("component_kind", "internal_logs"),
112 ]);
113
114 Self {
115 sender,
116 error_sender,
117 component_id,
118 pipeline_id,
119 log_rx,
120 shutdown,
121 labels,
122 }
123 }
124
125 pub async fn run(mut self) {
126 info!("Starting internal_logs source: {}", self.component_id);
127
128 let metadata = Arc::new(EventMetadata::new(
129 self.pipeline_id.clone(),
130 ComponentId(self.component_id.clone()),
131 ));
132
133 let schema = Arc::new(Schema::new(vec![Field::new(
134 "message",
135 DataType::Utf8,
136 false,
137 )]));
138
139 loop {
140 tokio::select! {
141 _ = self.shutdown.recv() => {
142 info!("internal_logs source '{}' received shutdown signal", self.component_id);
143 break;
144 }
145 log_msg_opt = self.log_rx.recv() => {
146 match log_msg_opt {
147 Some(log_msg) => {
148 let messages = StringArray::from(vec![log_msg]);
149
150 let record_batch = match RecordBatch::try_new(schema.clone(), vec![Arc::new(messages)])
151 {
152 Ok(rb) => rb,
153 Err(e) => {
154 error!("Failed to create RecordBatch for internal logs: {}", e);
155 continue;
156 }
157 };
158
159 match EventBatch::new_with_xid(record_batch, metadata.clone()) {
160 Ok(batch) => {
161 let rows = batch.num_rows();
162 let bytes = batch.estimated_size();
163 counter!("component_received_events_total", self.labels.iter()).increment(rows as u64);
164 counter!("component_received_event_bytes_total", self.labels.iter()).increment(bytes as u64);
165
166 if let Err(e) = self.sender.send(batch).await {
167 error!("Failed to send internal logs downstream: {:?}", e);
168 } else {
169 counter!("component_sent_events_total", self.labels.iter()).increment(rows as u64);
170 counter!("component_sent_event_bytes_total", self.labels.iter()).increment(bytes as u64);
171 }
172 }
173 Err(e) => {
174 error!("Failed to create EventBatch for internal logs: {}", e);
175 }
176 }
177 }
178 None => {
179 break;
180 }
181 }
182 }
183 }
184 }
185 }
186}