Skip to content

Commit

Permalink
[Event Hubs Client] Producer Dead Code Removal (Azure#18424)
Browse files Browse the repository at this point in the history
The focus of these changes is to trim the dead code paths for publishing
a single event from the producer client and assocaited tests.  This overload
existed in early pre-GA betas but has been determined to be unlikely to
return to the public API in the future.
  • Loading branch information
jsquire authored and jongio committed Feb 9, 2021
1 parent 47a0150 commit f449415
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -491,64 +491,6 @@ internal virtual async Task<PartitionPublishingProperties> GetPartitionPublishin
}
}

/// <summary>
/// Sends the <see cref="EventData" /> to the associated Event Hub. To avoid the
/// overhead associated with measuring and validating the size in the client, validation will
/// be delegated to the Event Hubs service and is deferred until the operation is invoked.
/// The call will fail if the size of the specified <paramref name="eventData"/> exceeds the
/// maximum allowable size of a single event.
/// </summary>
///
/// <param name="eventData">The event data to send.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request to cancel the operation.</param>
///
/// <returns>
/// A task to be resolved on when the operation has completed; if no exception is thrown when awaited, the
/// Event Hubs service has acknowledge receipt and assumed responsibility for delivery of the event.
/// </returns>
///
/// <seealso cref="SendAsync(EventData, SendEventOptions, CancellationToken)" />
/// <seealso cref="SendAsync(IEnumerable{EventData}, CancellationToken)" />
/// <seealso cref="SendAsync(IEnumerable{EventData}, SendEventOptions, CancellationToken)" />
/// <seealso cref="SendAsync(EventDataBatch, CancellationToken)" />
///
internal virtual async Task SendAsync(EventData eventData,
CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(eventData, nameof(eventData));
await SendAsync(new[] { eventData }, null, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Sends the <see cref="EventData" /> to the associated Event Hub. To avoid the
/// overhead associated with measuring and validating the size in the client, validation will
/// be delegated to the Event Hubs service and is deferred until the operation is invoked.
/// The call will fail if the size of the specified <paramref name="eventData"/> exceeds the
/// maximum allowable size of a single event.
/// </summary>
///
/// <param name="eventData">The event data to send.</param>
/// <param name="options">The set of options to consider when sending this batch.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request to cancel the operation.</param>
///
/// <returns>
/// A task to be resolved on when the operation has completed; if no exception is thrown when awaited, the
/// Event Hubs service has acknowledge receipt and assumed responsibility for delivery of the event.
/// </returns>
///
/// <seealso cref="SendAsync(EventData, CancellationToken)" />
/// <seealso cref="SendAsync(IEnumerable{EventData}, CancellationToken)" />
/// <seealso cref="SendAsync(IEnumerable{EventData}, SendEventOptions, CancellationToken)" />
/// <seealso cref="SendAsync(EventDataBatch, CancellationToken)" />
///
internal virtual async Task SendAsync(EventData eventData,
SendEventOptions options,
CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(eventData, nameof(eventData));
await SendAsync(new[] { eventData }, options, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Sends a set of events to the associated Event Hub as a single operation. To avoid the
/// overhead associated with measuring and validating the size in the client, validation will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public async Task EventHubProducerCreatesDiagnosticScopeOnSend()
var producer = new EventHubProducerClient(fakeConnection, transportMock.Object);

var eventData = new EventData(ReadOnlyMemory<byte>.Empty);
await producer.SendAsync(eventData);
await producer.SendAsync(new[] { eventData });

activity.Stop();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,48 +229,6 @@ public async Task ProducerCanSendAnEventBatchUsingAPartitionHashKey()
}
}

/// <summary>
/// Verifies that the <see cref="EventHubProducerClient" /> is able to
/// connect to the Event Hubs service and perform operations.
/// </summary>
///
[Test]
public async Task ProducerCanSendSingleZeroLengthEvent()
{
await using (EventHubScope scope = await EventHubScope.CreateAsync(1))
{
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);

await using (var producer = new EventHubProducerClient(connectionString))
{
var singleEvent = new EventData(Array.Empty<byte>());
Assert.That(async () => await producer.SendAsync(singleEvent), Throws.Nothing);
}
}
}

/// <summary>
/// Verifies that the <see cref="EventHubProducerClient" /> is able to
/// connect to the Event Hubs service and perform operations.
/// </summary>
///
[Test]
public async Task ProducerCanSendSingleLargeEvent()
{
await using (EventHubScope scope = await EventHubScope.CreateAsync(1))
{
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);

await using (var producer = new EventHubProducerClient(connectionString, new EventHubProducerClientOptions { RetryOptions = new EventHubsRetryOptions { TryTimeout = TimeSpan.FromMinutes(5) } }))
{
// Actual limit is 1046520 for a single event.

var singleEvent = new EventData(new byte[100000]);
Assert.That(async () => await producer.SendAsync(singleEvent), Throws.Nothing);
}
}
}

/// <summary>
/// Verifies that the <see cref="EventHubProducerClient" /> is able to
/// connect to the Event Hubs service and perform operations.
Expand All @@ -293,31 +251,6 @@ public async Task ProducerCanSendSingleLargeEventInASet()
}
}

/// <summary>
/// Verifies that the <see cref="EventHubProducerClient" /> is able to
/// connect to the Event Hubs service and perform operations.
/// </summary>
///
[Test]
public async Task ProducerCannotSendSingleEventLargerThanMaximumSize()
{
await using (EventHubScope scope = await EventHubScope.CreateAsync(1))
{
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);

await using (var producer = new EventHubProducerClient(connectionString))
{
// Actual limit is 1046520 for a single event.

var singleEvent = new EventData(new byte[1500000]);
EventData[] eventBatch = new[] { new EventData(new byte[1500000]) };

Assert.That(async () => await producer.SendAsync(singleEvent), Throws.InstanceOf<EventHubsException>().And.Property(nameof(EventHubsException.Reason)).EqualTo(EventHubsException.FailureReason.MessageSizeExceeded));
Assert.That(async () => await producer.SendAsync(eventBatch), Throws.InstanceOf<EventHubsException>().And.Property(nameof(EventHubsException.Reason)).EqualTo(EventHubsException.FailureReason.MessageSizeExceeded));
}
}
}

/// <summary>
/// Verifies that the <see cref="EventHubProducerClient" /> is able to
/// connect to the Event Hubs service and perform operations.
Expand Down Expand Up @@ -1011,7 +944,7 @@ public async Task ProducerSendsEventsWithTheSamePartitionHashKeyToTheSamePartiti

for (var index = 0; index < batches; index++)
{
await producer.SendAsync(new EventData(Encoding.UTF8.GetBytes($"Just a few messages ({ index })")), batchOptions);
await producer.SendAsync(new[] { new EventData(Encoding.UTF8.GetBytes($"Just a few messages ({ index })")) }, batchOptions);
}

// Read the events.
Expand Down Expand Up @@ -1076,7 +1009,7 @@ public async Task ProducerCannotSendWhenProxyIsInvalid()

await using (var invalidProxyProducer = new EventHubProducerClient(connectionString, producerOptions))
{
Assert.That(async () => await invalidProxyProducer.SendAsync(new EventData(new byte[1])), Throws.InstanceOf<WebSocketException>().Or.InstanceOf<TimeoutException>());
Assert.That(async () => await invalidProxyProducer.SendAsync(new[] { new EventData(new byte[1]) }), Throws.InstanceOf<WebSocketException>().Or.InstanceOf<TimeoutException>());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,68 +514,6 @@ public async Task ReadPartitionPublishingPropertiesAsyncReturnsEmptyPartitionSta
"Partition state should not have been initialized.");
}

/// <summary>
/// Verifies functionality of the <see cref="EventHubProducerClient.SendAsync" />
/// method.
/// </summary>
///
[Test]
public void SendSingleWithoutOptionsRequiresAnEvent()
{
var producer = new EventHubProducerClient(new MockConnection());
Assert.That(async () => await producer.SendAsync(default(EventData)), Throws.ArgumentNullException);
}

/// <summary>
/// Verifies functionality of the <see cref="EventHubProducerClient.SendAsync" />
/// method.
/// </summary>
///
[Test]
public void SendSingleRequiresAnEvent()
{
var producer = new EventHubProducerClient(new MockConnection());
Assert.That(async () => await producer.SendAsync(default(EventData), new SendEventOptions()), Throws.ArgumentNullException);
}

/// <summary>
/// Verifies functionality of the <see cref="EventHubProducerClient.SendAsync" />
/// method.
/// </summary>
///
[Test]
public async Task SendSingleWithoutOptionsDelegatesToBatchSend()
{
var transportProducer = new ObservableTransportProducerMock();
var producer = new Mock<EventHubProducerClient> { CallBase = true };

producer
.Setup(instance => instance.SendAsync(It.Is<IEnumerable<EventData>>(value => value.Count() == 1), It.IsAny<SendEventOptions>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask)
.Verifiable("The single send should delegate to the batch send.");

await producer.Object.SendAsync(new EventData(new byte[] { 0x22 }));
}

/// <summary>
/// Verifies functionality of the <see cref="EventHubProducerClient.SendAsync" />
/// method.
/// </summary>
///
[Test]
public async Task SendSingleWitOptionsDelegatesToBatchSend()
{
var transportProducer = new ObservableTransportProducerMock();
var producer = new Mock<EventHubProducerClient> { CallBase = true };

producer
.Setup(instance => instance.SendAsync(It.Is<IEnumerable<EventData>>(value => value.Count() == 1), It.IsAny<SendEventOptions>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask)
.Verifiable("The single send should delegate to the batch send.");

await producer.Object.SendAsync(new EventData(new byte[] { 0x22 }), new SendEventOptions());
}

/// <summary>
/// Verifies functionality of the <see cref="EventHubProducerClient.SendAsync" />
/// method.
Expand Down

0 comments on commit f449415

Please sign in to comment.