Skip to content

Commit

Permalink
[Instrumenation.GrpcCore] File scoped namespace (open-telemetry#575)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kielek authored Aug 10, 2022
1 parent b4c61fe commit 485409e
Show file tree
Hide file tree
Showing 16 changed files with 1,720 additions and 1,736 deletions.
127 changes: 63 additions & 64 deletions src/OpenTelemetry.Instrumentation.GrpcCore/AsyncStreamReaderProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,84 +14,83 @@
// limitations under the License.
// </copyright>

namespace OpenTelemetry.Instrumentation.GrpcCore
{
using System;
using System.Threading;
using System.Threading.Tasks;
using global::Grpc.Core;
using System;
using System.Threading;
using System.Threading.Tasks;
using global::Grpc.Core;

namespace OpenTelemetry.Instrumentation.GrpcCore;

/// <summary>
/// A proxy stream reader with callbacks for interesting events.
/// </summary>
/// <remarks>
/// Borrowed heavily from
/// https://github.com/opentracing-contrib/csharp-grpc/blob/master/src/OpenTracing.Contrib.Grpc/Streaming/TracingAsyncStreamReader.cs.
/// </remarks>
/// <typeparam name="T">The message type.</typeparam>
/// <seealso cref="global::Grpc.Core.IAsyncStreamReader{T}" />
internal class AsyncStreamReaderProxy<T> : IAsyncStreamReader<T>
{
/// <summary>
/// A proxy stream reader with callbacks for interesting events.
/// The reader.
/// </summary>
/// <remarks>
/// Borrowed heavily from
/// https://github.com/opentracing-contrib/csharp-grpc/blob/master/src/OpenTracing.Contrib.Grpc/Streaming/TracingAsyncStreamReader.cs.
/// </remarks>
/// <typeparam name="T">The message type.</typeparam>
/// <seealso cref="global::Grpc.Core.IAsyncStreamReader{T}" />
internal class AsyncStreamReaderProxy<T> : IAsyncStreamReader<T>
{
/// <summary>
/// The reader.
/// </summary>
private readonly IAsyncStreamReader<T> reader;
private readonly IAsyncStreamReader<T> reader;

/// <summary>
/// The on message action.
/// </summary>
private readonly Action<T> onMessage;
/// <summary>
/// The on message action.
/// </summary>
private readonly Action<T> onMessage;

/// <summary>
/// The on stream end action.
/// </summary>
private readonly Action onStreamEnd;
/// <summary>
/// The on stream end action.
/// </summary>
private readonly Action onStreamEnd;

/// <summary>
/// The on exception action.
/// </summary>
private readonly Action<Exception> onException;
/// <summary>
/// The on exception action.
/// </summary>
private readonly Action<Exception> onException;

/// <summary>
/// Initializes a new instance of the <see cref="AsyncStreamReaderProxy{T}"/> class.
/// </summary>
/// <param name="reader">The reader.</param>
/// <param name="onMessage">The on message action, if any.</param>
/// <param name="onStreamEnd">The on stream end action, if any.</param>
/// <param name="onException">The on exception action, if any.</param>
public AsyncStreamReaderProxy(IAsyncStreamReader<T> reader, Action<T> onMessage = null, Action onStreamEnd = null, Action<Exception> onException = null)
{
this.reader = reader;
this.onMessage = onMessage;
this.onStreamEnd = onStreamEnd;
this.onException = onException;
}
/// <summary>
/// Initializes a new instance of the <see cref="AsyncStreamReaderProxy{T}"/> class.
/// </summary>
/// <param name="reader">The reader.</param>
/// <param name="onMessage">The on message action, if any.</param>
/// <param name="onStreamEnd">The on stream end action, if any.</param>
/// <param name="onException">The on exception action, if any.</param>
public AsyncStreamReaderProxy(IAsyncStreamReader<T> reader, Action<T> onMessage = null, Action onStreamEnd = null, Action<Exception> onException = null)
{
this.reader = reader;
this.onMessage = onMessage;
this.onStreamEnd = onStreamEnd;
this.onException = onException;
}

/// <inheritdoc/>
public T Current => this.reader.Current;
/// <inheritdoc/>
public T Current => this.reader.Current;

/// <inheritdoc/>
public async Task<bool> MoveNext(CancellationToken cancellationToken)
/// <inheritdoc/>
public async Task<bool> MoveNext(CancellationToken cancellationToken)
{
try
{
try
var hasNext = await this.reader.MoveNext(cancellationToken).ConfigureAwait(false);
if (hasNext)
{
var hasNext = await this.reader.MoveNext(cancellationToken).ConfigureAwait(false);
if (hasNext)
{
this.onMessage?.Invoke(this.Current);
}
else
{
this.onStreamEnd?.Invoke();
}

return hasNext;
this.onMessage?.Invoke(this.Current);
}
catch (Exception ex)
else
{
this.onException?.Invoke(ex);
throw;
this.onStreamEnd?.Invoke();
}

return hasNext;
}
catch (Exception ex)
{
this.onException?.Invoke(ex);
throw;
}
}
}
145 changes: 72 additions & 73 deletions src/OpenTelemetry.Instrumentation.GrpcCore/ClientStreamWriterProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,95 +14,94 @@
// limitations under the License.
// </copyright>

namespace OpenTelemetry.Instrumentation.GrpcCore
using System;
using System.Threading.Tasks;
using global::Grpc.Core;

namespace OpenTelemetry.Instrumentation.GrpcCore;

/// <summary>
/// A proxy client stream writer.
/// </summary>
/// <remarks>
/// Borrowed heavily from
/// https://github.com/opentracing-contrib/csharp-grpc/blob/master/src/OpenTracing.Contrib.Grpc/Streaming/TracingClientStreamWriter.cs.
/// </remarks>
/// <typeparam name="T">The message type.</typeparam>
/// <seealso cref="global::Grpc.Core.IClientStreamWriter{T}" />
internal class ClientStreamWriterProxy<T> : IClientStreamWriter<T>
{
using System;
using System.Threading.Tasks;
using global::Grpc.Core;
/// <summary>
/// The writer.
/// </summary>
private readonly IClientStreamWriter<T> writer;

/// <summary>
/// A proxy client stream writer.
/// The on write action.
/// </summary>
/// <remarks>
/// Borrowed heavily from
/// https://github.com/opentracing-contrib/csharp-grpc/blob/master/src/OpenTracing.Contrib.Grpc/Streaming/TracingClientStreamWriter.cs.
/// </remarks>
/// <typeparam name="T">The message type.</typeparam>
/// <seealso cref="global::Grpc.Core.IClientStreamWriter{T}" />
internal class ClientStreamWriterProxy<T> : IClientStreamWriter<T>
{
/// <summary>
/// The writer.
/// </summary>
private readonly IClientStreamWriter<T> writer;
private readonly Action<T> onWrite;

/// <summary>
/// The on write action.
/// </summary>
private readonly Action<T> onWrite;
/// <summary>
/// The on complete action.
/// </summary>
private readonly Action onComplete;

/// <summary>
/// The on complete action.
/// </summary>
private readonly Action onComplete;
/// <summary>
/// The on exception action.
/// </summary>
private readonly Action<Exception> onException;

/// <summary>
/// The on exception action.
/// </summary>
private readonly Action<Exception> onException;
/// <summary>
/// Initializes a new instance of the <see cref="ClientStreamWriterProxy{T}"/> class.
/// </summary>
/// <param name="writer">The writer.</param>
/// <param name="onWrite">The on write action if any.</param>
/// <param name="onComplete">The on complete action, if any.</param>
/// <param name="onException">The on exception action, if any.</param>
public ClientStreamWriterProxy(IClientStreamWriter<T> writer, Action<T> onWrite = null, Action onComplete = null, Action<Exception> onException = null)
{
this.writer = writer;
this.onWrite = onWrite;
this.onComplete = onComplete;
this.onException = onException;
}

/// <summary>
/// Initializes a new instance of the <see cref="ClientStreamWriterProxy{T}"/> class.
/// </summary>
/// <param name="writer">The writer.</param>
/// <param name="onWrite">The on write action if any.</param>
/// <param name="onComplete">The on complete action, if any.</param>
/// <param name="onException">The on exception action, if any.</param>
public ClientStreamWriterProxy(IClientStreamWriter<T> writer, Action<T> onWrite = null, Action onComplete = null, Action<Exception> onException = null)
/// <inheritdoc/>
public WriteOptions WriteOptions
{
get => this.writer.WriteOptions;
set => this.writer.WriteOptions = value;
}

/// <inheritdoc/>
public async Task WriteAsync(T message)
{
this.onWrite?.Invoke(message);

try
{
this.writer = writer;
this.onWrite = onWrite;
this.onComplete = onComplete;
this.onException = onException;
await this.writer.WriteAsync(message).ConfigureAwait(false);
}

/// <inheritdoc/>
public WriteOptions WriteOptions
catch (Exception e)
{
get => this.writer.WriteOptions;
set => this.writer.WriteOptions = value;
this.onException?.Invoke(e);
throw;
}
}

/// <inheritdoc/>
public async Task WriteAsync(T message)
{
this.onWrite?.Invoke(message);
/// <inheritdoc/>
public async Task CompleteAsync()
{
this.onComplete?.Invoke();

try
{
await this.writer.WriteAsync(message).ConfigureAwait(false);
}
catch (Exception e)
{
this.onException?.Invoke(e);
throw;
}
try
{
await this.writer.CompleteAsync().ConfigureAwait(false);
}

/// <inheritdoc/>
public async Task CompleteAsync()
catch (Exception e)
{
this.onComplete?.Invoke();

try
{
await this.writer.CompleteAsync().ConfigureAwait(false);
}
catch (Exception e)
{
this.onException?.Invoke(e);
throw;
}
this.onException?.Invoke(e);
throw;
}
}
}
Loading

0 comments on commit 485409e

Please sign in to comment.