Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
OpenTelemetry.BatchActivityExportProcessor.BatchActivityExportProcessor(OpenTelemetry.BaseExporter<System.Diagnostics.Activity!>! exporter, OpenTelemetry.BatchExportProcessorOptions<System.Diagnostics.Activity!>! options) -> void
OpenTelemetry.BatchExportProcessor<T>.BatchExportProcessor(OpenTelemetry.BaseExporter<T!>! exporter, OpenTelemetry.BatchExportProcessorOptions<T!>! options) -> void
OpenTelemetry.BatchExportProcessorOptions<T>.UseThreads.get -> bool
OpenTelemetry.BatchExportProcessorOptions<T>.UseThreads.set -> void
OpenTelemetry.BatchLogRecordExportProcessor.BatchLogRecordExportProcessor(OpenTelemetry.BaseExporter<OpenTelemetry.Logs.LogRecord!>! exporter, OpenTelemetry.BatchExportProcessorOptions<OpenTelemetry.Logs.LogRecord!>! options) -> void
OpenTelemetry.Metrics.PeriodicExportingMetricReader.PeriodicExportingMetricReader(OpenTelemetry.BaseExporter<OpenTelemetry.Metrics.Metric!>! exporter, OpenTelemetry.Metrics.PeriodicExportingMetricReaderOptions! options) -> void
OpenTelemetry.Metrics.PeriodicExportingMetricReaderOptions.UseThreads.get -> bool
OpenTelemetry.Metrics.PeriodicExportingMetricReaderOptions.UseThreads.set -> void
Comment on lines +7 to +8
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to avoid adding a public facing API for this because it exposes an implementation detail. Since it sounds like your primary concern is WebAssembly/Blazor support, would it be possible to gate using some kind of platform check? Basically we want

if (Blazor) { don't rely on threads } else { rely on threads }

I understand other folks are mainly concerned with long running mostly idle threads, but maybe it makes sense to separate these concerns.

Copy link
Author

@jeromelaban jeromelaban Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hiding the option was the intent originally, but I get the concern. The idea was to provide for a way to exercise the new path without being breaking.

For the WebAssembly detection, this is one the parts where the threads option is used:

if (OperatingSystem.IsBrowser() || !this.useThreads)

There's a similar change for the other class:

if (OperatingSystem.IsBrowser() || !this.useThreads)

Having a set of tests for .NET WebAssembly (which is broader then blazor) would make sense, there's a similar structure available in SkiaSharp, though it's not runnable through dotnet test (last time I checked). Are there specific areas that should be tested, other than the two main classes touched here?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it sounds like your primary concern is WebAssembly/Blazor support, would it be possible to gate using some kind of platform check?

From the discussion in #5838, exposing this option would also enable exporters to use the thread pool instead of unnecessarily creating threads.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alanwest Would you have additional thoughts on this thread? Thanks! (specifically on the thread use API visibility)

228 changes: 63 additions & 165 deletions src/OpenTelemetry/BatchExportProcessor.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Diagnostics;
using System.Runtime.CompilerServices;
using OpenTelemetry.Internal;

Expand All @@ -24,12 +23,8 @@ public abstract class BatchExportProcessor<T> : BaseExportProcessor<T>
internal readonly int ExporterTimeoutMilliseconds;

private readonly CircularBuffer<T> circularBuffer;
private readonly Thread exporterThread;
private readonly AutoResetEvent exportTrigger = new(false);
private readonly ManualResetEvent dataExportedNotification = new(false);
private readonly ManualResetEvent shutdownTrigger = new(false);
private long shutdownDrainTarget = long.MaxValue;
private long droppedCount;
private readonly BatchExportWorker<T> worker;
private readonly bool useThreads;
private bool disposed;

/// <summary>
Expand All @@ -46,31 +41,50 @@ protected BatchExportProcessor(
int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds,
int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds,
int maxExportBatchSize = DefaultMaxExportBatchSize)
: this(exporter, new BatchExportProcessorOptions<T>
{
MaxQueueSize = maxQueueSize,
ScheduledDelayMilliseconds = scheduledDelayMilliseconds,
ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds,
MaxExportBatchSize = maxExportBatchSize,
UseThreads = true,
})
{
}

/// <summary>
/// Initializes a new instance of the <see cref="BatchExportProcessor{T}"/> class.
/// </summary>
/// <param name="exporter">Exporter instance.</param>
/// <param name="options">Configuration options for the batch export processor.</param>
protected BatchExportProcessor(
BaseExporter<T> exporter,
BatchExportProcessorOptions<T> options)
: base(exporter)
{
Guard.ThrowIfNull(options);

var maxQueueSize = options?.MaxQueueSize ?? 0;
Guard.ThrowIfOutOfRange(maxQueueSize, min: 1);
Guard.ThrowIfOutOfRange(maxExportBatchSize, min: 1, max: maxQueueSize, maxName: nameof(maxQueueSize));
Guard.ThrowIfOutOfRange(scheduledDelayMilliseconds, min: 1);
Guard.ThrowIfOutOfRange(exporterTimeoutMilliseconds, min: 0);

this.circularBuffer = new CircularBuffer<T>(maxQueueSize);
this.ScheduledDelayMilliseconds = scheduledDelayMilliseconds;
this.ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds;
this.MaxExportBatchSize = maxExportBatchSize;
this.exporterThread = new Thread(this.ExporterProc)
{
IsBackground = true,
#pragma warning disable CA1062 // Validate arguments of public methods - needed for netstandard2.1
Name = $"OpenTelemetry-{nameof(BatchExportProcessor<T>)}-{exporter.GetType().Name}",
#pragma warning restore CA1062 // Validate arguments of public methods - needed for netstandard2.1
};
this.exporterThread.Start();
this.ScheduledDelayMilliseconds = options?.ScheduledDelayMilliseconds ?? 0;
this.ExporterTimeoutMilliseconds = options?.ExporterTimeoutMilliseconds ?? -1;
this.MaxExportBatchSize = options?.MaxExportBatchSize ?? 0;
this.useThreads = options?.UseThreads ?? true;

Guard.ThrowIfOutOfRange(this.MaxExportBatchSize, min: 1, max: maxQueueSize, maxName: nameof(options.MaxQueueSize));
Guard.ThrowIfOutOfRange(this.ScheduledDelayMilliseconds, min: 1);
Guard.ThrowIfOutOfRange(this.ExporterTimeoutMilliseconds, min: 0);

this.worker = this.CreateWorker();
this.worker.Start();
}

/// <summary>
/// Gets the number of telemetry objects dropped by the processor.
/// </summary>
internal long DroppedCount => Volatile.Read(ref this.droppedCount);
internal long DroppedCount => this.worker.DroppedCount;

/// <summary>
/// Gets the number of telemetry objects received by the processor.
Expand All @@ -89,20 +103,14 @@ internal bool TryExport(T data)
{
if (this.circularBuffer.Count >= this.MaxExportBatchSize)
{
try
{
this.exportTrigger.Set();
}
catch (ObjectDisposedException)
{
}
this.worker.TriggerExport();
}

return true; // enqueue succeeded
}

// either the queue is full or exceeded the spin limit, drop the item on the floor
Interlocked.Increment(ref this.droppedCount);
this.worker.IncrementDroppedCount();

return false;
}
Expand All @@ -116,113 +124,27 @@ protected override void OnExport(T data)
/// <inheritdoc/>
protected override bool OnForceFlush(int timeoutMilliseconds)
{
var tail = this.circularBuffer.RemovedCount;
var head = this.circularBuffer.AddedCount;

if (head == tail)
{
return true; // nothing to flush
}

try
{
this.exportTrigger.Set();
}
catch (ObjectDisposedException)
{
return false;
}

if (timeoutMilliseconds == 0)
{
return false;
}

var triggers = new WaitHandle[] { this.dataExportedNotification, this.shutdownTrigger };

var sw = timeoutMilliseconds == Timeout.Infinite
? null
: Stopwatch.StartNew();

// There is a chance that the export thread finished processing all the data from the queue,
// and signaled before we enter wait here, use polling to prevent being blocked indefinitely.
const int pollingMilliseconds = 1000;

while (true)
{
if (sw == null)
{
try
{
WaitHandle.WaitAny(triggers, pollingMilliseconds);
}
catch (ObjectDisposedException)
{
return false;
}
}
else
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;

if (timeout <= 0)
{
return this.circularBuffer.RemovedCount >= head;
}

try
{
WaitHandle.WaitAny(triggers, Math.Min((int)timeout, pollingMilliseconds));
}
catch (ObjectDisposedException)
{
return false;
}
}

if (this.circularBuffer.RemovedCount >= head)
{
return true;
}

if (Volatile.Read(ref this.shutdownDrainTarget) != long.MaxValue)
{
return false;
}
}
return this.worker.WaitForExport(timeoutMilliseconds);
}

/// <inheritdoc/>
protected override bool OnShutdown(int timeoutMilliseconds)
{
Volatile.Write(ref this.shutdownDrainTarget, this.circularBuffer.AddedCount);

try
{
this.shutdownTrigger.Set();
}
catch (ObjectDisposedException)
{
return false;
}
var result = this.worker.Shutdown(timeoutMilliseconds);

OpenTelemetrySdkEventSource.Log.DroppedExportProcessorItems(this.GetType().Name, this.exporter.GetType().Name, this.DroppedCount);

if (timeoutMilliseconds == Timeout.Infinite)
{
this.exporterThread.Join();
return this.exporter.Shutdown();
return this.exporter.Shutdown() && result;
}

if (timeoutMilliseconds == 0)
{
return this.exporter.Shutdown(0);
return this.exporter.Shutdown(0) && result;
}

var sw = Stopwatch.StartNew();
this.exporterThread.Join(timeoutMilliseconds);
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;
return this.exporter.Shutdown((int)Math.Max(timeout, 0));
return this.exporter.Shutdown(timeoutMilliseconds) && result;
}

/// <inheritdoc/>
Expand All @@ -232,9 +154,7 @@ protected override void Dispose(bool disposing)
{
if (disposing)
{
this.exportTrigger.Dispose();
this.dataExportedNotification.Dispose();
this.shutdownTrigger.Dispose();
this.worker?.Dispose();
}

this.disposed = true;
Expand All @@ -243,49 +163,27 @@ protected override void Dispose(bool disposing)
base.Dispose(disposing);
}

private void ExporterProc()
private BatchExportWorker<T> CreateWorker()
{
var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger };

while (true)
#if NET
// Use task-based worker for browser platform where threading may be limited
if (ThreadingHelper.IsThreadingDisabled() || !this.useThreads)
{
// only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously
if (this.circularBuffer.Count < this.MaxExportBatchSize)
{
try
{
WaitHandle.WaitAny(triggers, this.ScheduledDelayMilliseconds);
}
catch (ObjectDisposedException)
{
// the exporter is somehow disposed before the worker thread could finish its job
return;
}
}

if (this.circularBuffer.Count > 0)
{
using (var batch = new Batch<T>(this.circularBuffer, this.MaxExportBatchSize))
{
this.exporter.Export(batch);
}

try
{
this.dataExportedNotification.Set();
this.dataExportedNotification.Reset();
}
catch (ObjectDisposedException)
{
// the exporter is somehow disposed before the worker thread could finish its job
return;
}
}

if (this.circularBuffer.RemovedCount >= Volatile.Read(ref this.shutdownDrainTarget))
{
return;
}
return new BatchExportTaskWorker<T>(
this.circularBuffer,
this.exporter,
this.MaxExportBatchSize,
this.ScheduledDelayMilliseconds,
this.ExporterTimeoutMilliseconds);
}
#endif

// Use thread-based worker for all other platforms
return new BatchExportThreadWorker<T>(
this.circularBuffer,
this.exporter,
this.MaxExportBatchSize,
this.ScheduledDelayMilliseconds,
this.ExporterTimeoutMilliseconds);
}
}
5 changes: 5 additions & 0 deletions src/OpenTelemetry/BatchExportProcessorOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,9 @@ public class BatchExportProcessorOptions<T>
/// Gets or sets the maximum batch size of every export. It must be smaller or equal to <see cref="MaxQueueSize"/>. The default value is 512.
/// </summary>
public int MaxExportBatchSize { get; set; } = BatchExportProcessor<T>.DefaultMaxExportBatchSize;

/// <summary>
/// Gets or sets a value indicating whether to use threads. Enables the use of <see cref="Thread" /> when <see langword="true"/>; otherwise <see cref="Task"/> is used. The default value is <see langword="true"/>.
/// </summary>
public bool UseThreads { get; set; } = true;
}
Loading