Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs Client] Formatting Pass #18356

Merged
merged 1 commit into from
Feb 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,10 @@ public virtual void UpdateCheckpointStart(string partitionId,
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
///
[Event(33, Level = EventLevel.Informational, Message = "Completed the attempt to create/update a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'.")]
public virtual void UpdateCheckpointComplete(string partitionId,
string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup)
public virtual void UpdateCheckpointComplete(string partitionId,
string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup)
{
if (IsEnabled())
{
Expand All @@ -363,11 +363,11 @@ public virtual void UpdateCheckpointComplete(string partitionId,
/// <param name="errorMessage">The message for the exception that occurred.</param>
///
[Event(34, Level = EventLevel.Error, Message = "An exception occurred when creating/updating a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'. ErrorMessage: '{4}'.")]
public virtual void UpdateCheckpointError(string partitionId,
string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup,
string errorMessage)
public virtual void UpdateCheckpointError(string partitionId,
string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup,
string errorMessage)
{
if (IsEnabled())
{
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1466,7 +1466,7 @@ public async Task GetCheckpointLogsStartAndComplete()
await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken());

mockLog.Verify(m => m.GetCheckpointStart(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0"));
mockLog.Verify(m => m.GetCheckpointComplete(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0"));
mockLog.Verify(m => m.GetCheckpointComplete(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0"));
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Azure.Messaging.EventHubs.Processor.Tests
/// being run, offering access to information such as environment variables.
/// </summary>
///
public class StorageTestEnvironment: TestEnvironment
public class StorageTestEnvironment : TestEnvironment
{
/// <summary>The singleton instance of the <see cref="StorageTestEnvironment" />, lazily created.</summary>
private static readonly Lazy<StorageTestEnvironment> Singleton = new Lazy<StorageTestEnvironment>(() => new StorageTestEnvironment(), LazyThreadSafetyMode.ExecutionAndPublication);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public async Task EventsCanBeReadByOneProcessorClient(LoadBalancingStrategy load
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);

// Send a set of events.

Expand Down Expand Up @@ -79,7 +79,7 @@ public async Task EventsCanBeReadByOneProcessorClient(LoadBalancingStrategy load
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
Expand All @@ -97,7 +97,7 @@ public async Task EventsCanBeReadByOneProcessorClientUsingAnIdentityCredential()
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);

// Send a set of events.

Expand Down Expand Up @@ -129,7 +129,7 @@ public async Task EventsCanBeReadByOneProcessorClientUsingAnIdentityCredential()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
Expand All @@ -148,7 +148,7 @@ public async Task EventsCanBeReadByOneProcessorClientUsingTheSharedKeyCredential
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);

// Send a set of events.

Expand Down Expand Up @@ -180,7 +180,7 @@ public async Task EventsCanBeReadByOneProcessorClientUsingTheSharedKeyCredential
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
Expand Down Expand Up @@ -244,7 +244,7 @@ public async Task EventsCanBeReadByMultipleProcessorClients()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
Expand Down Expand Up @@ -288,7 +288,7 @@ public async Task ProcessorClientCreatesOwnership()

var processedEvents = new ConcurrentDictionary<string, EventData>();
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var storageManager = new InMemoryStorageManager(_ => {});
var storageManager = new InMemoryStorageManager(_ => { });
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(250) };
var processor = CreateProcessorWithIdentity(scope.ConsumerGroups.First(), scope.EventHubName, storageManager, options);

Expand Down Expand Up @@ -346,7 +346,7 @@ public async Task ProcessorClientCanStartFromAnInitialPosition()

await using (var consumer = new EventHubConsumerClient(scope.ConsumerGroups.First(), connectionString))
{
await foreach (var partitionEvent in consumer.ReadEventsAsync(new ReadEventOptions { MaximumWaitTime = null }, cancellationSource.Token))
await foreach (var partitionEvent in consumer.ReadEventsAsync(new ReadEventOptions { MaximumWaitTime = null }, cancellationSource.Token))
{
if (partitionEvent.Data.IsEquivalentTo(lastSourceEvent))
{
Expand Down Expand Up @@ -393,7 +393,7 @@ public async Task ProcessorClientCanStartFromAnInitialPosition()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
Expand Down Expand Up @@ -438,7 +438,7 @@ public async Task ProcessorClientBeginsWithTheNextEventAfterCheckpointing()
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var beforeCheckpointProcessHandler = CreateEventTrackingHandler(segmentEventCount, processedEvents, completionSource, cancellationSource.Token, processedEventCallback);
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(250) };
var storageManager = new InMemoryStorageManager(_ => {});
var storageManager = new InMemoryStorageManager(_ => { });
var processor = CreateProcessor(scope.ConsumerGroups.First(), connectionString, storageManager, options);

processor.ProcessErrorAsync += CreateAssertingErrorHandler();
Expand Down Expand Up @@ -476,7 +476,7 @@ public async Task ProcessorClientBeginsWithTheNextEventAfterCheckpointing()
foreach (var sourceEvent in afterCheckpointEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
Expand All @@ -500,7 +500,7 @@ private EventProcessorClient CreateProcessor(string consumerGroup,
{
EventHubConnection createConnection() => new EventHubConnection(connectionString);

storageManager ??= new InMemoryStorageManager(_=> {});
storageManager ??= new InMemoryStorageManager(_ => { });
return new TestEventProcessorClient(storageManager, consumerGroup, "fakeNamespace", "fakeEventHub", Mock.Of<TokenCredential>(), createConnection, options);
}

Expand All @@ -524,7 +524,7 @@ private EventProcessorClient CreateProcessorWithIdentity(string consumerGroup,
var credential = EventHubsTestEnvironment.Instance.Credential;
EventHubConnection createConnection() => new EventHubConnection(EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, eventHubName, credential);

storageManager ??= new InMemoryStorageManager(_=> {});
storageManager ??= new InMemoryStorageManager(_ => { });
return new TestEventProcessorClient(storageManager, consumerGroup, EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, eventHubName, credential, createConnection, options);
}

Expand All @@ -548,7 +548,7 @@ private EventProcessorClient CreateProcessorWithSharedAccessKey(string consumerG
var credential = new EventHubsSharedAccessKeyCredential(EventHubsTestEnvironment.Instance.SharedAccessKeyName, EventHubsTestEnvironment.Instance.SharedAccessKey);
EventHubConnection createConnection() => null; //new EventHubConnection(EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, eventHubName, credential);

storageManager ??= new InMemoryStorageManager(_=> {});
storageManager ??= new InMemoryStorageManager(_ => { });
return new TestEventProcessorClient(storageManager, consumerGroup, EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, eventHubName, credential, createConnection, options);
}

Expand Down Expand Up @@ -621,7 +621,7 @@ private Func<ProcessEventArgs, Task> CreateEventTrackingHandler(int targetCount,

if (processedEvents.Count >= targetCount)
{
completionSource.TrySetResult(true);
completionSource.TrySetResult(true);
}
}
}
Expand Down
Loading