veecle_telemetry/collector/
mod.rs

1//! Telemetry data collection and export infrastructure.
2//!
3//! This module provides the core infrastructure for collecting telemetry data and exporting it
4//! to various backends.
5//! It includes the global collector singleton, export trait, and various
6//! built-in exporters.
7//!
8//! # Global Collector
9//!
10//! The collector uses a global singleton pattern to ensure telemetry data is collected
11//! consistently across the entire application.
12//! The collector must be initialized once
13//! using [`set_exporter`] before any telemetry data can be collected.
14//!
15//! # Export Trait
16//!
17//! The [`Export`] trait defines the interface for exporting telemetry data.
18//! Custom exporters can be implemented by providing an implementation of this trait.
19//!
20//! # Built-in Exporters
21//!
22//! - [`ConsoleJsonExporter`] - Exports telemetry data as JSON to stdout
23//! - [`TestExporter`] - Collects telemetry data in memory for testing purposes
24
25#[cfg(feature = "std")]
26mod json_exporter;
27#[cfg(feature = "std")]
28mod pretty_exporter;
29#[cfg(feature = "std")]
30mod test_exporter;
31
32use core::fmt::Debug;
33#[cfg(feature = "enable")]
34use core::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
35use core::{error, fmt};
36
37#[cfg(feature = "std")]
38pub use json_exporter::ConsoleJsonExporter;
39#[cfg(feature = "std")]
40pub use pretty_exporter::ConsolePrettyExporter;
41#[cfg(feature = "std")]
42#[doc(hidden)]
43pub use test_exporter::TestExporter;
44
45use crate::TraceId;
46#[cfg(feature = "enable")]
47use crate::protocol::ExecutionId;
48use crate::protocol::InstanceMessage;
49#[cfg(feature = "enable")]
50use crate::protocol::{
51    LogMessage, SpanAddEventMessage, SpanAddLinkMessage, SpanCloseMessage, SpanCreateMessage,
52    SpanEnterMessage, SpanExitMessage, SpanSetAttributeMessage, TelemetryMessage, TracingMessage,
53};
54
55/// Trait for exporting telemetry data to external systems.
56///
57/// Implementors of this trait define how telemetry data should be exported,
58/// whether to files, network endpoints, or other destinations.
59///
60/// # Examples
61///
62/// ```rust
63/// use veecle_telemetry::collector::Export;
64/// use veecle_telemetry::protocol::InstanceMessage;
65///
66/// #[derive(Debug)]
67/// struct CustomExporter;
68///
69/// impl Export for CustomExporter {
70///     fn export(&self, message: InstanceMessage<'_>) {
71///         // Custom export logic here
72///         println!("Exporting: {:?}", message);
73///     }
74/// }
75/// ```
76pub trait Export: Debug {
77    /// Exports a telemetry message.
78    ///
79    /// This method is called for each telemetry message that needs to be exported.
80    /// The implementation should handle the message appropriately based on its type.
81    fn export(&self, message: InstanceMessage<'_>);
82}
83
84/// The global telemetry collector.
85///
86/// This structure manages the collection and export of telemetry data.
87/// It maintains a unique execution ID, handles trace ID generation, and coordinates with the
88/// configured exporter.
89///
90/// The collector is typically accessed through the [`get_collector`] function rather
91/// than being constructed directly.
92#[derive(Debug)]
93pub struct Collector {
94    #[cfg(feature = "enable")]
95    inner: CollectorInner,
96}
97
98#[cfg(feature = "enable")]
99#[derive(Debug)]
100struct CollectorInner {
101    execution_id: ExecutionId,
102
103    exporter: &'static (dyn Export + Sync),
104
105    trace_id_prefix: u64,
106    trace_id_counter: AtomicU64,
107}
108
109#[cfg(feature = "enable")]
110#[derive(Debug)]
111struct NopExporter;
112
113#[cfg(feature = "enable")]
114impl Export for NopExporter {
115    fn export(&self, _: InstanceMessage) {}
116}
117
118// The GLOBAL_COLLECTOR static holds a pointer to the global exporter. It is protected by
119// the GLOBAL_INIT static which determines whether GLOBAL_EXPORTER has been initialized.
120#[cfg(feature = "enable")]
121static mut GLOBAL_COLLECTOR: Collector = Collector {
122    inner: CollectorInner {
123        execution_id: ExecutionId::from_raw(0),
124        exporter: &NO_EXPORTER,
125
126        trace_id_prefix: 0,
127        trace_id_counter: AtomicU64::new(0),
128    },
129};
130static NO_COLLECTOR: Collector = Collector {
131    #[cfg(feature = "enable")]
132    inner: CollectorInner {
133        execution_id: ExecutionId::from_raw(0),
134        exporter: &NO_EXPORTER,
135
136        trace_id_prefix: 0,
137        trace_id_counter: AtomicU64::new(0),
138    },
139};
140#[cfg(feature = "enable")]
141static NO_EXPORTER: NopExporter = NopExporter;
142
143#[cfg(feature = "enable")]
144static GLOBAL_INIT: AtomicUsize = AtomicUsize::new(0);
145
146// There are three different states that we care about:
147// - the collector is uninitialized
148// - the collector is initializing (set_exporter has been called but GLOBAL_COLLECTOR hasn't been set yet)
149// - the collector is active
150#[cfg(feature = "enable")]
151const UNINITIALIZED: usize = 0;
152#[cfg(feature = "enable")]
153const INITIALIZING: usize = 1;
154#[cfg(feature = "enable")]
155const INITIALIZED: usize = 2;
156
157/// Initializes the collector with the given Exporter and [`ExecutionId`].
158///
159/// An [`ExecutionId`] should never be re-used as it's used to collect metadata about the execution and to generate
160/// [`TraceId`]s which need to be globally unique.
161#[cfg(feature = "enable")]
162pub fn set_exporter(
163    execution_id: ExecutionId,
164    exporter: &'static (dyn Export + Sync),
165) -> Result<(), SetExporterError> {
166    if GLOBAL_INIT
167        .compare_exchange(
168            UNINITIALIZED,
169            INITIALIZING,
170            Ordering::Acquire,
171            Ordering::Relaxed,
172        )
173        .is_ok()
174    {
175        // SAFETY: this is guarded by the atomic
176        unsafe { GLOBAL_COLLECTOR = Collector::new(execution_id, exporter) }
177        GLOBAL_INIT.store(INITIALIZED, Ordering::Release);
178
179        Ok(())
180    } else {
181        Err(SetExporterError(()))
182    }
183}
184
185/// Returns a reference to the collector.
186///
187/// If an exporter has not been set, a no-op implementation is returned.
188pub fn get_collector() -> &'static Collector {
189    #[cfg(not(feature = "enable"))]
190    {
191        &NO_COLLECTOR
192    }
193
194    // Acquire memory ordering guarantees that current thread would see any
195    // memory writes that happened before store of the value
196    // into `GLOBAL_INIT` with memory ordering `Release` or stronger.
197    //
198    // Since the value `INITIALIZED` is written only after `GLOBAL_COLLECTOR` was
199    // initialized, observing it after `Acquire` load here makes both
200    // write to the `GLOBAL_COLLECTOR` static and initialization of the exporter
201    // internal state synchronized with current thread.
202    #[cfg(feature = "enable")]
203    if GLOBAL_INIT.load(Ordering::Acquire) != INITIALIZED {
204        &NO_COLLECTOR
205    } else {
206        // SAFETY: this is guarded by the atomic
207        unsafe {
208            #[expect(clippy::deref_addrof, reason = "false positive")]
209            &*&raw const GLOBAL_COLLECTOR
210        }
211    }
212}
213
214/// The type returned by [`set_exporter`] if [`set_exporter`] has already been called.
215///
216/// [`set_exporter`]: fn.set_exporter.html
217#[derive(Debug)]
218pub struct SetExporterError(());
219
220impl SetExporterError {
221    const MESSAGE: &'static str = "a global exporter has already been set";
222}
223
224impl fmt::Display for SetExporterError {
225    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
226        fmt.write_str(Self::MESSAGE)
227    }
228}
229
230impl error::Error for SetExporterError {}
231
232impl Collector {
233    #[cfg(feature = "enable")]
234    fn new(execution_id: ExecutionId, exporter: &'static (dyn Export + Sync)) -> Self {
235        let execution_id_raw = *execution_id;
236        let trace_id_prefix = (execution_id_raw >> 64) as u64;
237        let initial_counter_value = execution_id_raw as u64;
238
239        Self {
240            inner: CollectorInner {
241                execution_id,
242                exporter,
243                trace_id_prefix,
244                trace_id_counter: AtomicU64::new(initial_counter_value),
245            },
246        }
247    }
248
249    #[inline]
250    pub(crate) fn generate_trace_id(&self) -> TraceId {
251        #[cfg(not(feature = "enable"))]
252        {
253            TraceId(0)
254        }
255
256        #[cfg(feature = "enable")]
257        if self.inner.trace_id_prefix == 0 {
258            TraceId(0)
259        } else {
260            let suffix = self.inner.trace_id_counter.fetch_add(1, Ordering::Relaxed);
261
262            TraceId(((self.inner.trace_id_prefix as u128) << 32) | (suffix as u128))
263        }
264    }
265}
266
267#[cfg(feature = "enable")]
268impl Collector {
269    /// Collects and exports an external telemetry message.
270    ///
271    /// This method allows external systems to inject telemetry messages into the
272    /// collector pipeline.
273    /// The message will be exported using the configured exporter.
274    ///
275    /// # Examples
276    ///
277    /// ```rust
278    /// use veecle_telemetry::collector::get_collector;
279    /// use veecle_telemetry::protocol::{
280    ///     ExecutionId,
281    ///     InstanceMessage,
282    ///     TelemetryMessage,
283    ///     TimeSyncMessage,
284    /// };
285    ///
286    /// let collector = get_collector();
287    /// let message = InstanceMessage {
288    ///     execution: ExecutionId::from_raw(1),
289    ///     message: TelemetryMessage::TimeSync(TimeSyncMessage {
290    ///         local_timestamp: 0,
291    ///         since_epoch: 0,
292    ///     }),
293    /// };
294    /// collector.collect_external(message);
295    /// ```
296    #[inline]
297    pub fn collect_external(&self, message: InstanceMessage<'_>) {
298        self.inner.exporter.export(message);
299    }
300
301    #[inline]
302    pub(crate) fn new_span(&self, span: SpanCreateMessage<'_>) {
303        self.tracing_message(TracingMessage::CreateSpan(span));
304    }
305
306    #[inline]
307    pub(crate) fn enter_span(&self, enter: SpanEnterMessage) {
308        self.tracing_message(TracingMessage::EnterSpan(enter));
309    }
310
311    #[inline]
312    pub(crate) fn exit_span(&self, exit: SpanExitMessage) {
313        self.tracing_message(TracingMessage::ExitSpan(exit));
314    }
315
316    #[inline]
317    pub(crate) fn close_span(&self, span: SpanCloseMessage) {
318        self.tracing_message(TracingMessage::CloseSpan(span));
319    }
320
321    #[inline]
322    pub(crate) fn span_event(&self, event: SpanAddEventMessage<'_>) {
323        self.tracing_message(TracingMessage::AddEvent(event));
324    }
325
326    #[inline]
327    pub(crate) fn span_link(&self, link: SpanAddLinkMessage) {
328        self.tracing_message(TracingMessage::AddLink(link));
329    }
330
331    #[inline]
332    pub(crate) fn span_attribute(&self, attribute: SpanSetAttributeMessage<'_>) {
333        self.tracing_message(TracingMessage::SetAttribute(attribute));
334    }
335
336    #[inline]
337    pub(crate) fn log_message(&self, log: LogMessage<'_>) {
338        self.inner.exporter.export(InstanceMessage {
339            execution: self.inner.execution_id,
340            message: TelemetryMessage::Log(log),
341        });
342    }
343
344    #[inline]
345    fn tracing_message(&self, message: TracingMessage<'_>) {
346        self.inner.exporter.export(InstanceMessage {
347            execution: self.inner.execution_id,
348            message: TelemetryMessage::Tracing(message),
349        });
350    }
351}