From 379a18d2a77025ff85ff23bbb63a4622c244d945 Mon Sep 17 00:00:00 2001 From: chradek <51000525+chradek@users.noreply.github.com> Date: Thu, 21 Jan 2021 09:41:34 -0800 Subject: [PATCH] [event-hubs] fix eslint errors (round 1) (#13285) Round 1 addressing #10777 Replaces #13013 This leaves 5 errors and 15 warnings (excluding TSDoc) after this PR is merged. - 4 'Promise executor functions should not be async' errors. Fixing this will take a lot of care and I would want these to be reviewed on their own so they don't get lost in the noise. - 1 'N is already defined'. This is due to a constant matching an interface name. --- .../event-hubs/src/connectionContext.ts | 33 +- .../event-hubs/src/dataTransformer.ts | 9 +- sdk/eventhub/event-hubs/src/eventData.ts | 3 +- sdk/eventhub/event-hubs/src/eventDataBatch.ts | 9 +- .../event-hubs/src/eventHubConsumerClient.ts | 6 +- .../event-hubs/src/eventHubProducerClient.ts | 19 +- .../event-hubs/src/eventHubReceiver.ts | 10 +- sdk/eventhub/event-hubs/src/eventHubSender.ts | 68 ++-- sdk/eventhub/event-hubs/src/eventPosition.ts | 29 +- sdk/eventhub/event-hubs/src/eventProcessor.ts | 22 +- .../event-hubs/src/impl/partitionGate.ts | 4 +- sdk/eventhub/event-hubs/src/linkEntity.ts | 2 +- sdk/eventhub/event-hubs/src/log.ts | 5 +- .../event-hubs/src/managementClient.ts | 36 +-- .../event-hubs/src/partitionProcessor.ts | 10 +- sdk/eventhub/event-hubs/src/partitionPump.ts | 8 +- .../event-hubs/src/util/delayWithoutThrow.ts | 4 +- sdk/eventhub/event-hubs/src/util/error.ts | 5 +- sdk/eventhub/event-hubs/src/util/retries.ts | 3 +- .../event-hubs/src/util/typeGuards.ts | 54 ++++ sdk/eventhub/event-hubs/test/client.spec.ts | 14 +- .../test/eventHubConsumerClient.spec.ts | 141 +++++---- .../event-hubs/test/eventProcessor.spec.ts | 299 +++++++++++------- .../event-hubs/test/eventdata.spec.ts | 2 +- .../test/loadBalancingStrategy.spec.ts | 26 +- sdk/eventhub/event-hubs/test/misc.spec.ts | 6 - .../event-hubs/test/partitionPump.spec.ts | 12 +- sdk/eventhub/event-hubs/test/receiver.spec.ts | 10 +- sdk/eventhub/event-hubs/test/sender.spec.ts | 106 ++++--- .../utils/fakeSubscriptionEventHandlers.ts | 4 +- .../event-hubs/test/utils/logHelpers.ts | 8 +- .../test/utils/receivedMessagesTester.ts | 12 +- .../test/utils/subscriptionHandlerForTests.ts | 29 +- .../event-hubs/test/utils/testUtils.ts | 3 +- 34 files changed, 588 insertions(+), 423 deletions(-) create mode 100644 sdk/eventhub/event-hubs/src/util/typeGuards.ts diff --git a/sdk/eventhub/event-hubs/src/connectionContext.ts b/sdk/eventhub/event-hubs/src/connectionContext.ts index efe67cf368c6..5d20810c2aff 100644 --- a/sdk/eventhub/event-hubs/src/connectionContext.ts +++ b/sdk/eventhub/event-hubs/src/connectionContext.ts @@ -111,7 +111,7 @@ export interface ConnectionContextOptions extends EventHubClientOptions { /** * Helper type to get the names of all the functions on an object. */ -type FunctionPropertyNames = { [K in keyof T]: T[K] extends Function ? K : never }[keyof T]; +type FunctionPropertyNames = { [K in keyof T]: T[K] extends Function ? K : never }[keyof T]; // eslint-disable-line @typescript-eslint/ban-types /** * Helper type to get the types of all the functions on an object. */ @@ -387,7 +387,7 @@ export namespace ConnectionContext { } }; - function addConnectionListeners(connection: Connection) { + function addConnectionListeners(connection: Connection): void { // Add listeners on the connection object. connection.on(ConnectionEvents.connectionOpen, onConnectionOpen); connection.on(ConnectionEvents.disconnected, onDisconnected); @@ -395,35 +395,32 @@ export namespace ConnectionContext { connection.on(ConnectionEvents.error, error); } - function cleanConnectionContext(connectionContext: ConnectionContext) { + function cleanConnectionContext(context: ConnectionContext): Promise { // Remove listeners from the connection object. - connectionContext.connection.removeListener( - ConnectionEvents.connectionOpen, - onConnectionOpen - ); - connectionContext.connection.removeListener(ConnectionEvents.disconnected, onDisconnected); - connectionContext.connection.removeListener(ConnectionEvents.protocolError, protocolError); - connectionContext.connection.removeListener(ConnectionEvents.error, error); + context.connection.removeListener(ConnectionEvents.connectionOpen, onConnectionOpen); + context.connection.removeListener(ConnectionEvents.disconnected, onDisconnected); + context.connection.removeListener(ConnectionEvents.protocolError, protocolError); + context.connection.removeListener(ConnectionEvents.error, error); // Close the connection - return connectionContext.connection.close(); + return context.connection.close(); } - async function refreshConnection(connectionContext: ConnectionContext) { - const originalConnectionId = connectionContext.connectionId; + async function refreshConnection(context: ConnectionContext): Promise { + const originalConnectionId = context.connectionId; try { - await cleanConnectionContext(connectionContext); + await cleanConnectionContext(context); } catch (err) { logger.verbose( - `[${connectionContext.connectionId}] There was an error closing the connection before reconnecting: %O`, + `[${context.connectionId}] There was an error closing the connection before reconnecting: %O`, err ); } // Create a new connection, id, locks, and cbs client. - connectionContext.refreshConnection(); - addConnectionListeners(connectionContext.connection); + context.refreshConnection(); + addConnectionListeners(context.connection); logger.verbose( - `The connection "${originalConnectionId}" has been updated to "${connectionContext.connectionId}".` + `The connection "${originalConnectionId}" has been updated to "${context.connectionId}".` ); } diff --git a/sdk/eventhub/event-hubs/src/dataTransformer.ts b/sdk/eventhub/event-hubs/src/dataTransformer.ts index f09470d8983a..fc96c95bc08b 100644 --- a/sdk/eventhub/event-hubs/src/dataTransformer.ts +++ b/sdk/eventhub/event-hubs/src/dataTransformer.ts @@ -5,6 +5,7 @@ import { message } from "rhea-promise"; import isBuffer from "is-buffer"; import { Buffer } from "buffer"; import { logErrorStackTrace, logger } from "./log"; +import { isObjectWithProperties } from "./util/typeGuards"; /** * The default data transformer that will be used by the Azure SDK. @@ -23,7 +24,7 @@ export const defaultDataTransformer = { * - content: The given AMQP message body as a Buffer. * - multiple: true | undefined. */ - encode(body: any): any { + encode(body: unknown): any { let result: any; if (isBuffer(body)) { result = message.data_section(body); @@ -31,7 +32,7 @@ export const defaultDataTransformer = { // string, undefined, null, boolean, array, object, number should end up here // coercing undefined to null as that will ensure that null value will be given to the // customer on receive. - if (body === undefined) body = null; // tslint:disable-line + if (body === undefined) body = null; try { const bodyStr = JSON.stringify(body); result = message.data_section(Buffer.from(bodyStr, "utf8")); @@ -56,10 +57,10 @@ export const defaultDataTransformer = { * @param {DataSection} body The AMQP message body * @return {*} decoded body or the given body as-is. */ - decode(body: any): any { + decode(body: unknown): any { let processedBody: any = body; try { - if (body.content && isBuffer(body.content)) { + if (isObjectWithProperties(body, ["content"]) && isBuffer(body.content)) { // This indicates that we are getting the AMQP described type. Let us try decoding it. processedBody = body.content; } diff --git a/sdk/eventhub/event-hubs/src/eventData.ts b/sdk/eventhub/event-hubs/src/eventData.ts index 4cc9f70f1aab..a2a1cd7b6de4 100644 --- a/sdk/eventhub/event-hubs/src/eventData.ts +++ b/sdk/eventhub/event-hubs/src/eventData.ts @@ -3,6 +3,7 @@ import { DeliveryAnnotations, Message as RheaMessage, MessageAnnotations } from "rhea-promise"; import { Constants } from "@azure/core-amqp"; +import { isDefined } from "./util/typeGuards"; /** * Describes the delivery annotations. @@ -203,7 +204,7 @@ export function toRheaMessage(data: EventData, partitionKey?: string): RheaMessa if (data.properties) { msg.application_properties = data.properties; } - if (partitionKey != undefined) { + if (isDefined(partitionKey)) { msg.message_annotations[Constants.partitionKey] = partitionKey; // Event Hub service cannot route messages to a specific partition based on the partition key // if AMQP message header is an empty object. Hence we make sure that header is always present diff --git a/sdk/eventhub/event-hubs/src/eventDataBatch.ts b/sdk/eventhub/event-hubs/src/eventDataBatch.ts index 8734378ab30f..3f85de9a8a89 100644 --- a/sdk/eventhub/event-hubs/src/eventDataBatch.ts +++ b/sdk/eventhub/event-hubs/src/eventDataBatch.ts @@ -9,6 +9,7 @@ import { Span, SpanContext } from "@opentelemetry/api"; import { TRACEPARENT_PROPERTY, instrumentEventData } from "./diagnostics/instrumentEventData"; import { createMessageSpan } from "./diagnostics/messageSpan"; import { defaultDataTransformer } from "./dataTransformer"; +import { isDefined, isObjectWithProperties } from "./util/typeGuards"; /** * The amount of bytes to reserve as overhead for a small message. @@ -29,9 +30,9 @@ const smallMessageMaxBytes = 255; * @internal * @hidden */ -export function isEventDataBatch(eventDataBatch: any): eventDataBatch is EventDataBatch { +export function isEventDataBatch(eventDataBatch: unknown): eventDataBatch is EventDataBatch { return ( - eventDataBatch && + isObjectWithProperties(eventDataBatch, ["count", "sizeInBytes", "tryAdd"]) && typeof eventDataBatch.tryAdd === "function" && typeof eventDataBatch.count === "number" && typeof eventDataBatch.sizeInBytes === "number" @@ -192,8 +193,8 @@ export class EventDataBatchImpl implements EventDataBatch { ) { this._context = context; this._maxSizeInBytes = maxSizeInBytes; - this._partitionKey = partitionKey != undefined ? String(partitionKey) : partitionKey; - this._partitionId = partitionId != undefined ? String(partitionId) : partitionId; + this._partitionKey = isDefined(partitionKey) ? String(partitionKey) : partitionKey; + this._partitionId = isDefined(partitionId) ? String(partitionId) : partitionId; this._sizeInBytes = 0; this._count = 0; } diff --git a/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts b/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts index 93b1b90112f0..9a92f43bfb1e 100644 --- a/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts @@ -533,7 +533,7 @@ export class EventHubConsumerClient { private createEventProcessorForAllPartitions( subscriptionEventHandlers: SubscriptionEventHandlers, options?: SubscribeOptions - ) { + ): { targetedPartitionId: string; eventProcessor: EventProcessor } { this._partitionGate.add("all"); if (this._userChoseCheckpointStore) { @@ -569,7 +569,7 @@ export class EventHubConsumerClient { partitionId: string, eventHandlers: SubscriptionEventHandlers, options?: SubscribeOptions - ) { + ): { targetedPartitionId: string; eventProcessor: EventProcessor } { this._partitionGate.add(partitionId); const subscribeOptions = options as SubscribeOptions | undefined; @@ -607,7 +607,7 @@ export class EventHubConsumerClient { subscriptionEventHandlers: SubscriptionEventHandlers, checkpointStore: CheckpointStore, options: FullEventProcessorOptions - ) { + ): EventProcessor { return new EventProcessor( this._consumerGroup, connectionContext, diff --git a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts index caffb7603869..b7157c99d4bf 100644 --- a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts @@ -21,6 +21,7 @@ import { SendBatchOptions } from "./models/public"; import { throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing } from "./util/error"; +import { isDefined } from "./util/typeGuards"; import { getParentSpan, OperationOptions } from "./util/operationOptions"; /** @@ -80,7 +81,7 @@ export class EventHubProducerClient { * - `webSocketOptions`: Configures the channelling of the AMQP connection over Web Sockets. * - `userAgent` : A string to append to the built in user agent string that is passed to the service. */ - constructor(connectionString: string, options?: EventHubClientOptions); + constructor(connectionString: string, options?: EventHubClientOptions); // eslint-disable-line @azure/azure-sdk/ts-naming-options /** * The `EventHubProducerClient` class is used to send events to an Event Hub. * Use the `options` parmeter to configure retry policy or proxy settings. @@ -94,7 +95,7 @@ export class EventHubProducerClient { * - `webSocketOptions`: Configures the channelling of the AMQP connection over Web Sockets. * - `userAgent` : A string to append to the built in user agent string that is passed to the service. */ - constructor(connectionString: string, eventHubName: string, options?: EventHubClientOptions); + constructor(connectionString: string, eventHubName: string, options?: EventHubClientOptions); // eslint-disable-line @azure/azure-sdk/ts-naming-options /** * The `EventHubProducerClient` class is used to send events to an Event Hub. * Use the `options` parmeter to configure retry policy or proxy settings. @@ -113,13 +114,13 @@ export class EventHubProducerClient { fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential, - options?: EventHubClientOptions + options?: EventHubClientOptions // eslint-disable-line @azure/azure-sdk/ts-naming-options ); constructor( fullyQualifiedNamespaceOrConnectionString1: string, eventHubNameOrOptions2?: string | EventHubClientOptions, credentialOrOptions3?: TokenCredential | EventHubClientOptions, - options4?: EventHubClientOptions + options4?: EventHubClientOptions // eslint-disable-line @azure/azure-sdk/ts-naming-options ) { this._context = createConnectionContext( fullyQualifiedNamespaceOrConnectionString1, @@ -174,7 +175,7 @@ export class EventHubProducerClient { async createBatch(options: CreateBatchOptions = {}): Promise { throwErrorIfConnectionClosed(this._context); - if (options.partitionId != undefined && options.partitionKey != undefined) { + if (isDefined(options.partitionId) && isDefined(options.partitionKey)) { throw new Error("partitionId and partitionKey cannot both be set when creating a batch"); } @@ -260,7 +261,7 @@ export class EventHubProducerClient { * @throws MessagingError if an error is encountered while sending a message. * @throws Error if the underlying connection or sender has been closed. */ - async sendBatch(batch: EventDataBatch, options?: OperationOptions): Promise; + async sendBatch(batch: EventDataBatch, options?: OperationOptions): Promise; // eslint-disable-line @azure/azure-sdk/ts-naming-options async sendBatch( batch: EventDataBatch | EventData[], options: SendBatchOptions | OperationOptions = {} @@ -316,16 +317,16 @@ export class EventHubProducerClient { } } } - if (partitionId != undefined && partitionKey != undefined) { + if (isDefined(partitionId) && isDefined(partitionKey)) { throw new Error( `The partitionId (${partitionId}) and partitionKey (${partitionKey}) cannot both be specified.` ); } - if (partitionId != undefined) { + if (isDefined(partitionId)) { partitionId = String(partitionId); } - if (partitionKey != undefined) { + if (isDefined(partitionKey)) { partitionKey = String(partitionKey); } diff --git a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts index 51c5b4b9d9fb..2b9f11b2256a 100644 --- a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts +++ b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts @@ -462,7 +462,10 @@ export class EventHubReceiver extends LinkEntity { await this.abort(); } } catch (err) { - return this._onError === onError && onError(err); + if (this._onError === onError) { + onError(err); + } + return; } } else { logger.verbose( @@ -478,6 +481,7 @@ export class EventHubReceiver extends LinkEntity { 0 ); this._addCredit(creditsToAdd); + return; }) .catch((err) => { // something really unexpected happened, so attempt to call user's error handler @@ -700,13 +704,13 @@ export class EventHubReceiver extends LinkEntity { try { await this.close(); } finally { - return reject(new AbortError("The receive operation has been cancelled by the user.")); + reject(new AbortError("The receive operation has been cancelled by the user.")); } }; // operation has been cancelled, so exit immediately if (abortSignal && abortSignal.aborted) { - return await rejectOnAbort(); + return rejectOnAbort(); } // updates the prefetch count so that the baseConsumer adds diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index cbbaaefadec9..a993b4e3ed40 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -89,8 +89,8 @@ export class EventHubSender extends LinkEntity { this.address = context.config.getSenderAddress(partitionId); this.audience = context.config.getSenderAudience(partitionId); - this._onAmqpError = (context: EventContext) => { - const senderError = context.sender && context.sender.error; + this._onAmqpError = (eventContext: EventContext) => { + const senderError = eventContext.sender && eventContext.sender.error; logger.verbose( "[%s] 'sender_error' event occurred on the sender '%s' with address '%s'. " + "The associated error is: %O", @@ -102,8 +102,8 @@ export class EventHubSender extends LinkEntity { // TODO: Consider rejecting promise in trySendBatch() or createBatch() }; - this._onSessionError = (context: EventContext) => { - const sessionError = context.session && context.session.error; + this._onSessionError = (eventContext: EventContext) => { + const sessionError = eventContext.session && eventContext.session.error; logger.verbose( "[%s] 'session_error' event occurred on the session of sender '%s' with address '%s'. " + "The associated error is: %O", @@ -115,8 +115,8 @@ export class EventHubSender extends LinkEntity { // TODO: Consider rejecting promise in trySendBatch() or createBatch() }; - this._onAmqpClose = async (context: EventContext) => { - const sender = this._sender || context.sender!; + this._onAmqpClose = async (eventContext: EventContext) => { + const sender = this._sender || eventContext.sender!; logger.verbose( "[%s] 'sender_close' event occurred on the sender '%s' with address '%s'. " + "Value for isItselfClosed on the receiver is: '%s' " + @@ -140,8 +140,8 @@ export class EventHubSender extends LinkEntity { } }; - this._onSessionClose = async (context: EventContext) => { - const sender = this._sender || context.sender!; + this._onSessionClose = async (eventContext: EventContext) => { + const sender = this._sender || eventContext.sender!; logger.verbose( "[%s] 'session_close' event occurred on the session of sender '%s' with address '%s'. " + "Value for isSessionItselfClosed on the session is: '%s' " + @@ -226,7 +226,7 @@ export class EventHubSender extends LinkEntity { return this._sender!.maxMessageSize; } return new Promise(async (resolve, reject) => { - const rejectOnAbort = () => { + const rejectOnAbort = (): void => { const desc: string = `[${this._context.connectionId}] The create batch operation has been cancelled by the user.`; // Cancellation is user-intented, so treat as info instead of warning. logger.info(desc); @@ -234,7 +234,7 @@ export class EventHubSender extends LinkEntity { reject(error); }; - const onAbort = () => { + const onAbort = (): void => { if (abortSignal) { abortSignal.removeEventListener("abort", onAbort); } @@ -322,9 +322,9 @@ export class EventHubSender extends LinkEntity { const messages: RheaMessage[] = []; // Convert EventData to RheaMessage. for (let i = 0; i < events.length; i++) { - const message = toRheaMessage(events[i], partitionKey); - message.body = defaultDataTransformer.encode(events[i].body); - messages[i] = message; + const rheaMessage = toRheaMessage(events[i], partitionKey); + rheaMessage.body = defaultDataTransformer.encode(events[i].body); + messages[i] = rheaMessage; } // Encode every amqp message and then convert every encoded message to amqp data section const batchMessage: RheaMessage = { @@ -391,20 +391,20 @@ export class EventHubSender extends LinkEntity { * We have implemented a synchronous send over here in the sense that we shall be waiting * for the message to be accepted or rejected and accordingly resolve or reject the promise. * @hidden - * @param message The message to be sent to EventHub. + * @param rheaMessage The message to be sent to EventHub. * @returns Promise */ private _trySendBatch( - message: RheaMessage | Buffer, + rheaMessage: RheaMessage | Buffer, options: SendOptions & EventHubProducerOptions = {} ): Promise { const abortSignal: AbortSignalLike | undefined = options.abortSignal; const retryOptions = options.retryOptions || {}; const timeoutInMs = getRetryAttemptTimeoutInMs(retryOptions); retryOptions.timeoutInMs = timeoutInMs; - const sendEventPromise = () => + const sendEventPromise = (): Promise => new Promise(async (resolve, reject) => { - const rejectOnAbort = () => { + const rejectOnAbort = (): void => { const desc: string = `[${this._context.connectionId}] The send operation on the Sender "${this.name}" with ` + `address "${this.address}" has been cancelled by the user.`; @@ -419,13 +419,13 @@ export class EventHubSender extends LinkEntity { } const removeListeners = (): void => { - clearTimeout(waitTimer); + clearTimeout(waitTimer); // eslint-disable-line @typescript-eslint/no-use-before-define if (abortSignal) { - abortSignal.removeEventListener("abort", onAborted); + abortSignal.removeEventListener("abort", onAborted); // eslint-disable-line @typescript-eslint/no-use-before-define } }; - const onAborted = () => { + const onAborted = (): void => { removeListeners(); return rejectOnAbort(); }; @@ -434,7 +434,7 @@ export class EventHubSender extends LinkEntity { abortSignal.addEventListener("abort", onAborted); } - const actionAfterTimeout = () => { + const actionAfterTimeout = (): void => { removeListeners(); const desc: string = `[${this._context.connectionId}] Sender "${this.name}" with ` + @@ -464,15 +464,15 @@ export class EventHubSender extends LinkEntity { }); } catch (err) { removeListeners(); - err = translate(err); + const translatedError = translate(err); logger.warning( "[%s] An error occurred while creating the sender %s: %s", this._context.connectionId, this.name, - `${err?.name}: ${err?.message}` + `${translatedError?.name}: ${translatedError?.message}` ); - logErrorStackTrace(err); - return reject(err); + logErrorStackTrace(translatedError); + return reject(translatedError); } } const timeTakenByInit = Date.now() - initStartTime; @@ -496,7 +496,7 @@ export class EventHubSender extends LinkEntity { } try { this._sender!.sendTimeoutInSeconds = (timeoutInMs - timeTakenByInit) / 1000; - const delivery = await this._sender!.send(message, undefined, 0x80013700); + const delivery = await this._sender!.send(rheaMessage, undefined, 0x80013700); logger.info( "[%s] Sender '%s', sent message with delivery id: %d", this._context.connectionId, @@ -505,14 +505,14 @@ export class EventHubSender extends LinkEntity { ); return resolve(); } catch (err) { - err = translate(err.innerError || err); + const translatedError = translate(err.innerError || err); logger.warning( "[%s] An error occurred while sending the message %s", this._context.connectionId, - `${err?.name}: ${err?.message}` + `${translatedError?.name}: ${translatedError?.message}` ); - logErrorStackTrace(err); - return reject(err); + logErrorStackTrace(translatedError); + return reject(translatedError); } finally { removeListeners(); } @@ -586,15 +586,15 @@ export class EventHubSender extends LinkEntity { } } catch (err) { this.isConnecting = false; - err = translate(err); + const translatedError = translate(err); logger.warning( "[%s] An error occurred while creating the sender %s: %s", this._context.connectionId, this.name, - `${err?.name}: ${err?.message}` + `${translatedError?.name}: ${translatedError?.message}` ); - logErrorStackTrace(err); - throw err; + logErrorStackTrace(translatedError); + throw translatedError; } } diff --git a/sdk/eventhub/event-hubs/src/eventPosition.ts b/sdk/eventhub/event-hubs/src/eventPosition.ts index 201bc3f621e6..95e9a1b0e78b 100644 --- a/sdk/eventhub/event-hubs/src/eventPosition.ts +++ b/sdk/eventhub/event-hubs/src/eventPosition.ts @@ -2,6 +2,7 @@ // Licensed under the MIT license. import { Constants, ErrorNameConditionMapper, translate } from "@azure/core-amqp"; +import { isDefined, objectHasProperty } from "./util/typeGuards"; /** * Represents the position of an event in an Event Hub partition, typically used when calling the `subscribe()` @@ -51,15 +52,15 @@ export interface EventPosition { export function getEventPositionFilter(eventPosition: EventPosition): string { let result; // order of preference - if (eventPosition.offset != undefined) { + if (isDefined(eventPosition.offset)) { result = eventPosition.isInclusive ? `${Constants.offsetAnnotation} >= '${eventPosition.offset}'` : `${Constants.offsetAnnotation} > '${eventPosition.offset}'`; - } else if (eventPosition.sequenceNumber != undefined) { + } else if (isDefined(eventPosition.sequenceNumber)) { result = eventPosition.isInclusive ? `${Constants.sequenceNumberAnnotation} >= '${eventPosition.sequenceNumber}'` : `${Constants.sequenceNumberAnnotation} > '${eventPosition.sequenceNumber}'`; - } else if (eventPosition.enqueuedOn != undefined) { + } else if (isDefined(eventPosition.enqueuedOn)) { const time = eventPosition.enqueuedOn instanceof Date ? eventPosition.enqueuedOn.getTime() @@ -113,8 +114,8 @@ export const latestEventPosition: EventPosition = { */ export function validateEventPositions( position: EventPosition | { [partitionId: string]: EventPosition } -) { - if (position == undefined) { +): void { + if (!isDefined(position)) { return; } @@ -146,33 +147,33 @@ export function validateEventPositions( * @hidden * @internal */ -export function isEventPosition(position: any): position is EventPosition { +export function isEventPosition(position: unknown): position is EventPosition { if (!position) { return false; } - if (position.offset != undefined) { + if (objectHasProperty(position, "offset") && isDefined(position.offset)) { return true; } - if (position.sequenceNumber != undefined) { + if (objectHasProperty(position, "sequenceNumber") && isDefined(position.sequenceNumber)) { return true; } - if (position.enqueuedOn != undefined) { + if (objectHasProperty(position, "enqueuedOn") && isDefined(position.enqueuedOn)) { return true; } return false; } -function validateEventPosition(position: EventPosition) { - if (position == undefined) { +function validateEventPosition(position: EventPosition): void { + if (!isDefined(position)) { return; } - const offsetPresent = position.offset != undefined; - const sequenceNumberPresent = position.sequenceNumber != undefined; - const enqueuedOnPresent = position.enqueuedOn != undefined; + const offsetPresent = isDefined(position.offset); + const sequenceNumberPresent = isDefined(position.sequenceNumber); + const enqueuedOnPresent = isDefined(position.enqueuedOn); if ( (offsetPresent && sequenceNumberPresent) || diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index cb51278f0b9c..80c431bf51f0 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -301,7 +301,7 @@ export class EventProcessor { } } - private async _startPump(partitionId: string, abortSignal: AbortSignalLike) { + private async _startPump(partitionId: string, abortSignal: AbortSignalLike): Promise { if (abortSignal.aborted) { logger.verbose( `[${this._id}] The subscription was closed before starting to read from ${partitionId}.` @@ -402,7 +402,7 @@ export class EventProcessor { private async _runLoopWithLoadBalancing( loadBalancingStrategy: LoadBalancingStrategy, abortSignal: AbortSignalLike - ) { + ): Promise { let cancelLoopResolver; // This provides a mechanism for exiting the loop early // if the subscription has had `close` called. @@ -453,7 +453,7 @@ export class EventProcessor { loadBalancingStrategy: LoadBalancingStrategy, partitionIds: string[], abortSignal: AbortSignalLike - ) { + ): Promise { if (abortSignal.aborted) throw new AbortError("The operation was aborted."); // Retrieve current partition ownership details from the datastore. @@ -500,9 +500,7 @@ export class EventProcessor { const uniquePartitionsToClaim = new Set(partitionsToClaim); for (const partitionToClaim of uniquePartitionsToClaim) { - let partitionOwnershipRequest: PartitionOwnership; - - partitionOwnershipRequest = this._createPartitionOwnershipRequest( + const partitionOwnershipRequest = this._createPartitionOwnershipRequest( partitionOwnershipMap, partitionToClaim ); @@ -527,11 +525,13 @@ export class EventProcessor { eventHubName: this._eventHubName, consumerGroup: this._consumerGroup, partitionId: "", - updateCheckpoint: async () => {} + updateCheckpoint: async () => { + /* no-op */ + } }); - } catch (err) { + } catch (errorFromUser) { logger.verbose( - `[${this._id}] An error was thrown from the user's processError handler: ${err}` + `[${this._id}] An error was thrown from the user's processError handler: ${errorFromUser}` ); } } @@ -573,7 +573,7 @@ export class EventProcessor { } } - isRunning() { + isRunning(): boolean { return this._isRunning; } @@ -613,7 +613,7 @@ export class EventProcessor { } } - private async abandonPartitionOwnerships() { + private async abandonPartitionOwnerships(): Promise { logger.verbose(`[${this._id}] Abandoning owned partitions`); const allOwnerships = await this._checkpointStore.listOwnership( this._fullyQualifiedNamespace, diff --git a/sdk/eventhub/event-hubs/src/impl/partitionGate.ts b/sdk/eventhub/event-hubs/src/impl/partitionGate.ts index e5f59173971b..bff956ac0bb2 100644 --- a/sdk/eventhub/event-hubs/src/impl/partitionGate.ts +++ b/sdk/eventhub/event-hubs/src/impl/partitionGate.ts @@ -21,7 +21,7 @@ export class PartitionGate { * * @param partitionId A partition ID or the constant "all" */ - add(partitionId: string | "all") { + add(partitionId: string | "all"): void { if ( (partitionId === "all" && this._partitions.size > 0) || this._partitions.has(partitionId) || @@ -38,7 +38,7 @@ export class PartitionGate { * * @param partitionId A partition ID or the constant "all" */ - remove(partitionId: string | "all") { + remove(partitionId: string | "all"): void { this._partitions.delete(partitionId); } } diff --git a/sdk/eventhub/event-hubs/src/linkEntity.ts b/sdk/eventhub/event-hubs/src/linkEntity.ts index f41395637c3c..8f95b32aabf5 100644 --- a/sdk/eventhub/event-hubs/src/linkEntity.ts +++ b/sdk/eventhub/event-hubs/src/linkEntity.ts @@ -235,7 +235,7 @@ export class LinkEntity { clearTimeout(this._tokenRenewalTimer as NodeJS.Timer); if (link) { try { - // Closing the link and its underlying sessionĀ if the link is open. This should also + // Closing the link and its underlying session if the link is open. This should also // remove them from the internal map. await link.close(); logger.verbose( diff --git a/sdk/eventhub/event-hubs/src/log.ts b/sdk/eventhub/event-hubs/src/log.ts index 587f23af9ddb..8fb9a5680966 100644 --- a/sdk/eventhub/event-hubs/src/log.ts +++ b/sdk/eventhub/event-hubs/src/log.ts @@ -2,6 +2,7 @@ // Licensed under the MIT license. import { createClientLogger } from "@azure/logger"; +import { isObjectWithProperties } from "./util/typeGuards"; /** * The @azure/logger configuration for this package. @@ -14,8 +15,8 @@ export const logger = createClientLogger("event-hubs"); * @param error Error containing a stack trace. * @hidden */ -export function logErrorStackTrace(error: any) { - if (error && error.stack) { +export function logErrorStackTrace(error: unknown): void { + if (isObjectWithProperties(error, ["stack"]) && error.stack) { logger.verbose(error.stack); } } diff --git a/sdk/eventhub/event-hubs/src/managementClient.ts b/sdk/eventhub/event-hubs/src/managementClient.ts index 824fa18cb4fa..66895209cc87 100644 --- a/sdk/eventhub/event-hubs/src/managementClient.ts +++ b/sdk/eventhub/event-hubs/src/managementClient.ts @@ -13,6 +13,7 @@ import { retry, translate } from "@azure/core-amqp"; +import { AccessToken } from "@azure/core-auth"; import { EventContext, Message, @@ -141,7 +142,7 @@ export class ManagementClient extends LinkEntity { * @hidden * @internal */ - async getSecurityToken() { + async getSecurityToken(): Promise { if (this._context.tokenCredential instanceof SharedKeyCredential) { // the security_token has the $management address removed from the end of the audience // expected audience: sb://fully.qualified.namespace/event-hub-name/$management @@ -367,12 +368,12 @@ export class ManagementClient extends LinkEntity { await this._ensureTokenRenewal(); } } catch (err) { - err = translate(err); + const translatedError = translate(err); logger.warning( - `[${this._context.connectionId}] An error occured while establishing the $management links: ${err?.name}: ${err?.message}` + `[${this._context.connectionId}] An error occured while establishing the $management links: ${translatedError?.name}: ${translatedError?.message}` ); - logErrorStackTrace(err); - throw err; + logErrorStackTrace(translatedError); + throw translatedError; } } @@ -393,14 +394,14 @@ export class ManagementClient extends LinkEntity { try { const abortSignal: AbortSignalLike | undefined = options && options.abortSignal; - const sendOperationPromise = () => + const sendOperationPromise = (): Promise => new Promise(async (resolve, reject) => { let count = 0; const retryTimeoutInMs = getRetryAttemptTimeoutInMs(options.retryOptions); let timeTakenByInit = 0; - const rejectOnAbort = () => { + const rejectOnAbort = (): void => { const requestName = options.requestName; const desc: string = `[${this._context.connectionId}] The request "${requestName}" ` + @@ -428,7 +429,7 @@ export class ManagementClient extends LinkEntity { const initOperationStartTime = Date.now(); - const actionAfterTimeout = () => { + const actionAfterTimeout = (): void => { const desc: string = `The request with message_id "${request.message_id}" timed out. Please try again later.`; const e: Error = { name: "OperationTimeoutError", @@ -473,16 +474,15 @@ export class ManagementClient extends LinkEntity { const result = await this._mgmtReqResLink!.sendRequest(request, sendRequestOptions); resolve(result); } catch (err) { - err = translate(err); + const translatedError = translate(err); logger.warning( - "[%s] An error occurred during send on management request-response link with address " + - "'%s': %s", + "[%s] An error occurred during send on management request-response link with address '%s': %s", this._context.connectionId, this.address, - `${err?.name}: ${err?.message}` + `${translatedError?.name}: ${translatedError?.message}` ); - logErrorStackTrace(err); - reject(err); + logErrorStackTrace(translatedError); + reject(translatedError); } }); @@ -495,12 +495,12 @@ export class ManagementClient extends LinkEntity { }; return (await retry(config)).body; } catch (err) { - err = translate(err); + const translatedError = translate(err); logger.warning( - `An error occurred while making the request to $management endpoint: ${err?.name}: ${err?.message}` + `An error occurred while making the request to $management endpoint: ${translatedError?.name}: ${translatedError?.message}` ); - logErrorStackTrace(err); - throw err; + logErrorStackTrace(translatedError); + throw translatedError; } } diff --git a/sdk/eventhub/event-hubs/src/partitionProcessor.ts b/sdk/eventhub/event-hubs/src/partitionProcessor.ts index 62b1614c7014..14cca63bb952 100644 --- a/sdk/eventhub/event-hubs/src/partitionProcessor.ts +++ b/sdk/eventhub/event-hubs/src/partitionProcessor.ts @@ -96,7 +96,7 @@ export class PartitionProcessor implements PartitionContext { * @property The fully qualified namespace from where the current partition is being processed. It is set by the `EventProcessor` * @readonly */ - public get fullyQualifiedNamespace() { + public get fullyQualifiedNamespace(): string { return this._context.fullyQualifiedNamespace; } @@ -104,7 +104,7 @@ export class PartitionProcessor implements PartitionContext { * @property The name of the consumer group from where the current partition is being processed. It is set by the `EventProcessor` * @readonly */ - public get consumerGroup() { + public get consumerGroup(): string { return this._context.consumerGroup!; } @@ -112,7 +112,7 @@ export class PartitionProcessor implements PartitionContext { * @property The name of the event hub to which the current partition belongs. It is set by the `EventProcessor` * @readonly */ - public get eventHubName() { + public get eventHubName(): string { return this._context.eventHubName; } @@ -120,14 +120,14 @@ export class PartitionProcessor implements PartitionContext { * @property The identifier of the Event Hub partition that is being processed. It is set by the `EventProcessor` * @readonly */ - public get partitionId() { + public get partitionId(): string { return this._context.partitionId; } /** * @property The unique identifier of the `EventProcessor` that has spawned the current instance of `PartitionProcessor`. This is set by the `EventProcessor` */ - public get eventProcessorId() { + public get eventProcessorId(): string { return this._context.eventProcessorId; } diff --git a/sdk/eventhub/event-hubs/src/partitionPump.ts b/sdk/eventhub/event-hubs/src/partitionPump.ts index 95b1e2872cde..a1ba0a852157 100644 --- a/sdk/eventhub/event-hubs/src/partitionPump.ts +++ b/sdk/eventhub/event-hubs/src/partitionPump.ts @@ -155,9 +155,9 @@ export class PartitionPump { // forward error to user's processError and swallow errors they may throw try { await this._partitionProcessor.processError(err); - } catch (err) { + } catch (errorFromUser) { // Using verbose over warning because this error is swallowed. - logger.verbose("An error was thrown by user's processError method: ", err); + logger.verbose("An error was thrown by user's processError method: ", errorFromUser); } // close the partition processor if a non-retryable error was encountered @@ -170,11 +170,11 @@ export class PartitionPump { } // this will close the pump and will break us out of the while loop return await this.stop(CloseReason.Shutdown); - } catch (err) { + } catch (errorFromStop) { // Using verbose over warning because this error is swallowed. logger.verbose( `An error occurred while closing the receiver with reason ${CloseReason.Shutdown}: `, - err + errorFromStop ); } } diff --git a/sdk/eventhub/event-hubs/src/util/delayWithoutThrow.ts b/sdk/eventhub/event-hubs/src/util/delayWithoutThrow.ts index 0b4dfcc4e6e9..a37d0c73b8e8 100644 --- a/sdk/eventhub/event-hubs/src/util/delayWithoutThrow.ts +++ b/sdk/eventhub/event-hubs/src/util/delayWithoutThrow.ts @@ -16,5 +16,7 @@ export async function delayWithoutThrow( ): Promise { try { await delay(delayInMs, abortSignal); - } catch {} // swallow AbortError + } catch { + /* no-op to swallow AbortError */ + } } diff --git a/sdk/eventhub/event-hubs/src/util/error.ts b/sdk/eventhub/event-hubs/src/util/error.ts index 83ef87678e8a..9bc513d90997 100644 --- a/sdk/eventhub/event-hubs/src/util/error.ts +++ b/sdk/eventhub/event-hubs/src/util/error.ts @@ -3,6 +3,7 @@ import { logErrorStackTrace, logger } from "../log"; import { ConnectionContext } from "../connectionContext"; +import { isDefined } from "./typeGuards"; /** * @internal @@ -33,9 +34,9 @@ export function throwTypeErrorIfParameterMissing( connectionId: string, methodName: string, parameterName: string, - parameterValue: any + parameterValue: unknown ): void { - if (parameterValue === undefined || parameterValue === null) { + if (!isDefined(parameterValue)) { const error = new TypeError( `${methodName} called without required argument "${parameterName}"` ); diff --git a/sdk/eventhub/event-hubs/src/util/retries.ts b/sdk/eventhub/event-hubs/src/util/retries.ts index e4d69b513f5e..281723fbaaea 100644 --- a/sdk/eventhub/event-hubs/src/util/retries.ts +++ b/sdk/eventhub/event-hubs/src/util/retries.ts @@ -2,6 +2,7 @@ // Licensed under the MIT license. import { Constants, RetryOptions } from "@azure/core-amqp"; +import { isDefined } from "./typeGuards"; /** * @internal @@ -9,7 +10,7 @@ import { Constants, RetryOptions } from "@azure/core-amqp"; */ export function getRetryAttemptTimeoutInMs(retryOptions: RetryOptions | undefined): number { const timeoutInMs = - retryOptions == undefined || + !isDefined(retryOptions) || typeof retryOptions.timeoutInMs !== "number" || !isFinite(retryOptions.timeoutInMs) || retryOptions.timeoutInMs < Constants.defaultOperationTimeoutInMs diff --git a/sdk/eventhub/event-hubs/src/util/typeGuards.ts b/sdk/eventhub/event-hubs/src/util/typeGuards.ts new file mode 100644 index 000000000000..9394c1d4c583 --- /dev/null +++ b/sdk/eventhub/event-hubs/src/util/typeGuards.ts @@ -0,0 +1,54 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +/** + * Helper TypeGuard that checks if something is defined or not. + * @param thing Anything + * @internal + * @hidden + */ +export function isDefined(thing: T | undefined | null): thing is T { + return typeof thing !== "undefined" && thing !== null; +} + +/** + * Helper TypeGuard that checks if the input is an object with the specified properties. + * @param thing - Anything. + * @param properties - The name of the properties that should appear in the object. + * @internal + * @hidden + */ +export function isObjectWithProperties( + thing: Thing, + properties: PropertyName[] +): thing is Thing & Record { + if (!isDefined(thing) || typeof thing !== "object") { + return false; + } + + for (const property of properties) { + if (!objectHasProperty(thing, property)) { + return false; + } + } + + return true; +} + +/** + * Helper TypeGuard that checks if the input is an object with the specified property. + * @param thing - Any object. + * @param property - The name of the property that should appear in the object. + * @internal + * @hidden + */ +export function objectHasProperty( + thing: Thing, + property: PropertyName +): thing is Thing & Record { + if (!(property in thing)) { + return false; + } + + return true; +} diff --git a/sdk/eventhub/event-hubs/test/client.spec.ts b/sdk/eventhub/event-hubs/test/client.spec.ts index 5d6f6319e5ef..567440c02619 100644 --- a/sdk/eventhub/event-hubs/test/client.spec.ts +++ b/sdk/eventhub/event-hubs/test/client.spec.ts @@ -283,7 +283,9 @@ describe("EventHubConsumerClient with non existent namespace", function(): void let subscription: Subscription | undefined; const caughtErr = await new Promise((resolve) => { subscription = client.subscribe({ - processEvents: async () => {}, + processEvents: async () => { + /* no-op */ + }, processError: async (err) => { resolve(err); } @@ -426,7 +428,9 @@ describe("EventHubConsumerClient with non existent event hub", function(): void let subscription: Subscription | undefined; const caughtErr = await new Promise((resolve) => { subscription = client.subscribe({ - processEvents: async () => {}, + processEvents: async () => { + /* no-op */ + }, processError: async (err) => { resolve(err); } @@ -586,7 +590,7 @@ describe("EventHubProducerClient User Agent String", function(): void { }); }); -function testUserAgentString(context: ConnectionContext, customValue?: string) { +function testUserAgentString(context: ConnectionContext, customValue?: string): void { const packageVersion = packageJsonInfo.version; const properties = context.connection.options.properties; properties!["user-agent"].should.startWith( @@ -673,7 +677,9 @@ describe("EventHubConsumerClient after close()", function(): void { let subscription: Subscription | undefined; const caughtErr = await new Promise((resolve) => { subscription = client.subscribe({ - processEvents: async () => {}, + processEvents: async () => { + /* no-op */ + }, processError: async (err) => { resolve(err); } diff --git a/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts b/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts index 74eee6afa19c..e1ff120d4570 100644 --- a/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts @@ -45,13 +45,17 @@ describe("EventHubConsumerClient", () => { describe("unit tests", () => { it("isCheckpointStore", () => { isCheckpointStore({ - processEvents: async () => {}, - processClose: async () => {} - }).should.not.ok; + processEvents: async () => { + /* no-op */ + }, + processClose: async () => { + /* no-op */ + } + }).should.not.equal(true); - isCheckpointStore("hello").should.not.ok; + isCheckpointStore("hello").should.not.equal(true); - isCheckpointStore(new InMemoryCheckpointStore()).should.ok; + isCheckpointStore(new InMemoryCheckpointStore()).should.equal(true); }); describe("subscribe() overloads route properly", () => { @@ -59,23 +63,22 @@ describe("EventHubConsumerClient", () => { let clientWithCheckpointStore: EventHubConsumerClient; let subscriptionHandlers: SubscriptionEventHandlers; let fakeEventProcessor: SinonStubbedInstance; + let validateOptions: (options: FullEventProcessorOptions) => void; const fakeEventProcessorConstructor = ( connectionContext: ConnectionContext, subscriptionEventHandlers: SubscriptionEventHandlers, checkpointStore: CheckpointStore, options: FullEventProcessorOptions - ) => { + ): SinonStubbedInstance => { subscriptionEventHandlers.should.equal(subscriptionHandlers); should.exist(connectionContext.managementSession); - isCheckpointStore(checkpointStore).should.be.ok; + isCheckpointStore(checkpointStore).should.equal(true); validateOptions(options); return fakeEventProcessor; }; - let validateOptions: (options: FullEventProcessorOptions) => void; - beforeEach(() => { fakeEventProcessor = createStubInstance(EventProcessor); @@ -94,8 +97,12 @@ describe("EventHubConsumerClient", () => { ); subscriptionHandlers = { - processEvents: async () => {}, - processError: async () => {} + processEvents: async () => { + /* no-op */ + }, + processError: async () => { + /* no-op */ + } }; (client as any)["_createEventProcessor"] = fakeEventProcessorConstructor; @@ -103,7 +110,9 @@ describe("EventHubConsumerClient", () => { }); it("conflicting subscribes", () => { - validateOptions = () => {}; + validateOptions = () => { + /* no-op */ + }; client.subscribe(subscriptionHandlers); // invalid - we're already subscribed to a conflicting partition @@ -475,7 +484,7 @@ describe("EventHubConsumerClient", () => { let clients: EventHubConsumerClient[]; let producerClient: EventHubProducerClient; let partitionIds: string[]; - const subscriptions: Subscription[] = []; + let subscriptions: Subscription[]; beforeEach(async () => { producerClient = new EventHubProducerClient(service.connectionString!, service.path!, {}); @@ -486,6 +495,7 @@ describe("EventHubConsumerClient", () => { partitionIds.length.should.gte(2); clients = []; + subscriptions = []; }); afterEach(async () => { @@ -503,7 +513,6 @@ describe("EventHubConsumerClient", () => { describe("#close()", function(): void { it("stops any actively running subscriptions", async function(): Promise { - const subscriptions: Subscription[] = []; const client = new EventHubConsumerClient( EventHubConsumerClient.defaultConsumerGroupName, service.connectionString, @@ -632,46 +641,54 @@ describe("EventHubConsumerClient", () => { ); clients.push(consumerClient1, consumerClient2); + let subscription2: Subscription | undefined; + const subscriptionHandlers2: SubscriptionEventHandlers = { + async processError() { + /* no-op */ + }, + async processEvents() { + // stop this subscription since it already should have forced the 1st subscription to have an error. + await subscription2!.close(); + } + }; // keep track of the handlers called on subscription 1 const handlerCalls = { initialize: 0, close: 0 }; - const subscriptionHandlers1: SubscriptionEventHandlers = { - async processError() {}, - async processEvents() { - if (!handlerCalls.close) { - // start the 2nd subscription that will kick the 1st subscription off - subscription2 = consumerClient2.subscribe(partitionId, subscriptionHandlers2, { - ownerLevel: 1, - maxBatchSize: 1, - maxWaitTimeInSeconds: 1 - }); - } else { - // stop this subscription, we know close was called so we've restarted - await subscription1.close(); + + const subscription1 = consumerClient1.subscribe( + partitionId, + { + async processError() { + /* no-op */ + }, + async processEvents() { + if (!handlerCalls.close) { + // start the 2nd subscription that will kick the 1st subscription off + subscription2 = consumerClient2.subscribe(partitionId, subscriptionHandlers2, { + ownerLevel: 1, + maxBatchSize: 1, + maxWaitTimeInSeconds: 1 + }); + } else { + // stop this subscription, we know close was called so we've restarted + await subscription1.close(); + } + }, + async processClose() { + handlerCalls.close++; + }, + async processInitialize() { + handlerCalls.initialize++; } }, - async processClose() { - handlerCalls.close++; - }, - async processInitialize() { - handlerCalls.initialize++; - } - }; - const subscriptionHandlers2: SubscriptionEventHandlers = { - async processError() {}, - async processEvents() { - // stop this subscription since it already should have forced the 1st subscription to have an error. - await subscription2!.close(); + { + maxBatchSize: 1, + maxWaitTimeInSeconds: 1 } - }; - let subscription2: Subscription | undefined; - const subscription1 = consumerClient1.subscribe(partitionId, subscriptionHandlers1, { - maxBatchSize: 1, - maxWaitTimeInSeconds: 1 - }); + ); await loopUntil({ maxTimes: 10, @@ -703,8 +720,6 @@ describe("EventHubConsumerClient", () => { clients.push(consumerClient1, consumerClient2); - const partitionIds = await consumerClient1.getPartitionIds(); - const partitionHandlerCalls: { [partitionId: string]: { initialize: number; @@ -719,7 +734,9 @@ describe("EventHubConsumerClient", () => { } const subscriptionHandlers1: SubscriptionEventHandlers = { - async processError() {}, + async processError() { + /* no-op */ + }, async processEvents(_, context) { partitionHandlerCalls[context.partitionId].processEvents = true; }, @@ -754,7 +771,9 @@ describe("EventHubConsumerClient", () => { const partitionsReadFromSub2 = new Set(); const subscriptionHandlers2: SubscriptionEventHandlers = { - async processError() {}, + async processError() { + /* no-op */ + }, async processEvents(_, context) { partitionsReadFromSub2.add(context.partitionId); } @@ -1071,8 +1090,12 @@ describe("EventHubConsumerClient", () => { let closeCalled = 0; const subscription = client.subscribe(partitionId, { - async processError() {}, - async processEvents() {}, + async processError() { + /* no-op */ + }, + async processEvents() { + /* no-op */ + }, async processClose() { closeCalled++; }, @@ -1115,8 +1138,12 @@ describe("EventHubConsumerClient", () => { let closeCalled = 0; const subscription = client.subscribe({ - async processError() {}, - async processEvents() {}, + async processError() { + /* no-op */ + }, + async processEvents() { + /* no-op */ + }, async processClose() { closeCalled++; }, @@ -1158,7 +1185,9 @@ describe("EventHubConsumerClient", () => { let subscription: Subscription; const caughtErr: Error = await new Promise((resolve) => { subscription = client.subscribe({ - processEvents: async () => {}, + processEvents: async () => { + /* no-op */ + }, processError: async (err, context) => { if (!context.partitionId) { await subscription.close(); @@ -1187,7 +1216,9 @@ describe("EventHubConsumerClient", () => { const caughtErr: Error = await new Promise((resolve) => { // Subscribe to an invalid partition id to trigger a partition-specific error. subscription = client.subscribe("-1", { - processEvents: async () => {}, + processEvents: async () => { + /* no-op */ + }, processError: async (err, context) => { if (context.partitionId) { await subscription.close(); diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index b7ee6c80f8bb..cee10f1b397e 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -83,6 +83,62 @@ describe("Event Processor", function(): void { describe("unit tests", () => { describe("_getStartingPosition", () => { + function createEventProcessor( + checkpointStore: CheckpointStore, + startPosition?: FullEventProcessorOptions["startPosition"] + ): EventProcessor { + return new EventProcessor( + EventHubConsumerClient.defaultConsumerGroupName, + consumerClient["_context"], + { + processEvents: async () => { + /* no-op */ + }, + processError: async () => { + /* no-op */ + } + }, + checkpointStore, + { + startPosition, + maxBatchSize: 1, + maxWaitTimeInSeconds: 1, + loadBalancingStrategy: defaultOptions.loadBalancingStrategy, + loopIntervalInMs: defaultOptions.loopIntervalInMs + } + ); + } + + const emptyCheckpointStore = createCheckpointStore([]); + + function createCheckpointStore( + checkpointsForTest: Pick[] + ): CheckpointStore { + return { + claimOwnership: async () => { + return []; + }, + listCheckpoints: async () => { + return checkpointsForTest.map((cp) => { + return { + fullyQualifiedNamespace: "not-used-for-this-test", + consumerGroup: "not-used-for-this-test", + eventHubName: "not-used-for-this-test", + offset: cp.offset, + sequenceNumber: cp.sequenceNumber, + partitionId: cp.partitionId + }; + }); + }, + listOwnership: async () => { + return []; + }, + updateCheckpoint: async () => { + /* no-op */ + } + }; + } + before(() => { consumerClient["_context"].managementSession!.getEventHubProperties = async () => { return Promise.resolve({ @@ -97,7 +153,7 @@ describe("Event Processor", function(): void { const processor = createEventProcessor(emptyCheckpointStore); const eventPosition = await processor["_getStartingPosition"]("0"); - isLatestPosition(eventPosition).should.be.ok; + should.equal(isLatestPosition(eventPosition), true); }); it("has a checkpoint", async () => { @@ -158,58 +214,8 @@ describe("Event Processor", function(): void { should.not.exist(eventPositionForPartitionZero!.sequenceNumber); const eventPositionForPartitionOne = await processor["_getStartingPosition"]("1"); - isLatestPosition(eventPositionForPartitionOne).should.be.ok; + should.equal(isLatestPosition(eventPositionForPartitionOne), true); }); - - function createEventProcessor( - checkpointStore: CheckpointStore, - startPosition?: FullEventProcessorOptions["startPosition"] - ) { - return new EventProcessor( - EventHubConsumerClient.defaultConsumerGroupName, - consumerClient["_context"], - { - processEvents: async () => {}, - processError: async () => {} - }, - checkpointStore, - { - startPosition, - maxBatchSize: 1, - maxWaitTimeInSeconds: 1, - loadBalancingStrategy: defaultOptions.loadBalancingStrategy, - loopIntervalInMs: defaultOptions.loopIntervalInMs - } - ); - } - - const emptyCheckpointStore = createCheckpointStore([]); - - function createCheckpointStore( - checkpointsForTest: Pick[] - ): CheckpointStore { - return { - claimOwnership: async () => { - return []; - }, - listCheckpoints: async () => { - return checkpointsForTest.map((cp) => { - return { - fullyQualifiedNamespace: "not-used-for-this-test", - consumerGroup: "not-used-for-this-test", - eventHubName: "not-used-for-this-test", - offset: cp.offset, - sequenceNumber: cp.sequenceNumber, - partitionId: cp.partitionId - }; - }); - }, - listOwnership: async () => { - return []; - }, - updateCheckpoint: async () => {} - }; - } }); describe("_handleSubscriptionError", () => { @@ -229,7 +235,9 @@ describe("Event Processor", function(): void { EventHubConsumerClient.defaultConsumerGroupName, consumerClient["_context"], { - processEvents: async () => {}, + processEvents: async () => { + /* no-op */ + }, processError: async (err, context) => { // simulate the user messing up and accidentally throwing an error // we should just log it and not kill anything. @@ -284,7 +292,9 @@ describe("Event Processor", function(): void { async listOwnership(): Promise { return []; }, - async updateCheckpoint(): Promise {}, + async updateCheckpoint(): Promise { + /* no-op */ + }, async listCheckpoints(): Promise { return []; } @@ -297,7 +307,9 @@ describe("Event Processor", function(): void { pumpManager.createPumpCalled = true; }, - async removeAllPumps() {}, + async removeAllPumps() { + /* no-op */ + }, isReceivingFromPartition() { return false; @@ -312,8 +324,12 @@ describe("Event Processor", function(): void { EventHubConsumerClient.defaultConsumerGroupName, consumerClient["_context"], { - processEvents: async () => {}, - processError: async () => {} + processEvents: async () => { + /* no-op */ + }, + processError: async () => { + /* no-op */ + } }, checkpointStore, { @@ -335,10 +351,10 @@ describe("Event Processor", function(): void { // when we fail to claim a partition we should _definitely_ // not attempt to start a pump. - pumpManager.createPumpCalled.should.be.false; + should.equal(pumpManager.createPumpCalled, false); // we'll attempt to claim a partition (but won't succeed) - checkpointStore.claimOwnershipCalled.should.be.true; + should.equal(checkpointStore.claimOwnershipCalled, true); }); it("abandoned claims are treated as unowned claims", async () => { @@ -387,8 +403,12 @@ describe("Event Processor", function(): void { loopIntervalInMs: 1, maxWaitTimeInSeconds: 1, pumpManager: { - async createPump() {}, - async removeAllPumps(): Promise {}, + async createPump() { + /* no-op */ + }, + async removeAllPumps(): Promise { + /* no-op */ + }, isReceivingFromPartition() { return false; } @@ -416,7 +436,7 @@ describe("Event Processor", function(): void { triggerAbortedSignalAfterNumCalls(partitionIds.length * numTimesAbortedIsCheckedInLoop) ); - handlers.errors.should.be.empty; + handlers.errors.should.deep.equal([]); const currentOwnerships = await checkpointStore.listOwnership( commonFields.fullyQualifiedNamespace, @@ -505,7 +525,9 @@ describe("Event Processor", function(): void { claimOwnership: async () => { throw new Error("Some random failure!"); }, - updateCheckpoint: async () => {}, + updateCheckpoint: async () => { + /* no-op */ + }, listCheckpoints: async () => [] }; @@ -513,7 +535,9 @@ describe("Event Processor", function(): void { EventHubConsumerClient.defaultConsumerGroupName, consumerClient["_context"], { - processEvents: async () => {}, + processEvents: async () => { + /* no-op */ + }, processError: async (err, _) => { errors.push(err); } @@ -617,8 +641,12 @@ describe("Event Processor", function(): void { EventHubConsumerClient.defaultConsumerGroupName, consumerClient["_context"], { - processEvents: async () => {}, - processError: async () => {} + processEvents: async () => { + /* no-op */ + }, + processError: async () => { + /* no-op */ + } }, new InMemoryCheckpointStore(), { @@ -636,8 +664,12 @@ describe("Event Processor", function(): void { EventHubConsumerClient.defaultConsumerGroupName, consumerClient["_context"], { - processEvents: async () => {}, - processError: async () => {} + processEvents: async () => { + /* no-op */ + }, + processError: async () => { + /* no-op */ + } }, new InMemoryCheckpointStore(), { ...defaultOptions, ownerId: "hello", startPosition: latestEventPosition } @@ -680,8 +712,8 @@ describe("Event Processor", function(): void { receivedEvents.should.deep.equal(expectedMessages); - subscriptionEventHandler.hasErrors(partitionIds).should.be.false; - subscriptionEventHandler.allShutdown(partitionIds).should.be.true; + subscriptionEventHandler.hasErrors(partitionIds).should.equal(false); + subscriptionEventHandler.allShutdown(partitionIds).should.equal(true); }); it("should not throw if stop is called without start", async function(): Promise { @@ -694,8 +726,12 @@ describe("Event Processor", function(): void { processInitialize: async () => { didPartitionProcessorStart = true; }, - processEvents: async () => {}, - processError: async () => {} + processEvents: async () => { + /* no-op */ + }, + processError: async () => { + /* no-op */ + } }, new InMemoryCheckpointStore(), { @@ -707,7 +743,7 @@ describe("Event Processor", function(): void { // shutdown the processor await processor.stop(); - didPartitionProcessorStart.should.be.false; + didPartitionProcessorStart.should.equal(false); }); it("should support start after stopping", async function(): Promise { @@ -743,8 +779,8 @@ describe("Event Processor", function(): void { receivedEvents.should.deep.equal(expectedMessages); - subscriptionEventHandler.hasErrors(partitionIds).should.be.false; - subscriptionEventHandler.allShutdown(partitionIds).should.be.true; + subscriptionEventHandler.hasErrors(partitionIds).should.equal(false); + subscriptionEventHandler.allShutdown(partitionIds).should.equal(true); // validate correct events captured for each partition @@ -759,8 +795,8 @@ describe("Event Processor", function(): void { loggerForTest(`Stopping processor again`); await processor.stop(); - subscriptionEventHandler.hasErrors(partitionIds).should.be.false; - subscriptionEventHandler.allShutdown(partitionIds).should.be.true; + subscriptionEventHandler.hasErrors(partitionIds).should.equal(false); + subscriptionEventHandler.allShutdown(partitionIds).should.equal(true); }); describe("Partition processor", function(): void { @@ -792,8 +828,8 @@ describe("Event Processor", function(): void { // shutdown the processor await processor.stop(); - subscriptionEventHandler.hasErrors(partitionIds).should.be.false; - subscriptionEventHandler.allShutdown(partitionIds).should.be.true; + subscriptionEventHandler.hasErrors(partitionIds).should.equal(false); + subscriptionEventHandler.allShutdown(partitionIds).should.equal(true); receivedEvents.should.deep.equal(expectedMessages); }); @@ -871,12 +907,13 @@ describe("Event Processor", function(): void { let partionCount: { [x: string]: number } = {}; class FooPartitionProcessor { - async processEvents(events: ReceivedEventData[], context: PartitionContext) { + async processEvents(events: ReceivedEventData[], context: PartitionContext): Promise { processedAtLeastOneEvent.add(context.partitionId); - !partionCount[context.partitionId] - ? (partionCount[context.partitionId] = 1) - : partionCount[context.partitionId]++; + if (!partionCount[context.partitionId]) { + partionCount[context.partitionId] = 0; + } + partionCount[context.partitionId]++; const existingEvents = checkpointMap.get(context.partitionId)!; @@ -890,7 +927,7 @@ describe("Event Processor", function(): void { } } } - async processError() { + async processError(): Promise { didError = true; } } @@ -983,7 +1020,7 @@ describe("Event Processor", function(): void { firstEventsReceivedFromProcessor2[index++] = receivedEvents[0]; } - didError.should.be.false; + didError.should.equal(false); index = 0; // validate correct events captured for each partition using checkpoint for (const partitionId of partitionIds) { @@ -999,7 +1036,7 @@ describe("Event Processor", function(): void { const checkpointStore = new InMemoryCheckpointStore(); const allObjects = new Set(); - const assertUnique = (...objects: any[]) => { + const assertUnique = (...objects: any[]): void => { const size = allObjects.size; for (const obj of objects) { @@ -1101,20 +1138,20 @@ describe("Event Processor", function(): void { // The partitionProcess will need to add events to the partitionResultsMap as they are received class FooPartitionProcessor implements Required { - async processInitialize(context: PartitionContext) { + async processInitialize(context: PartitionContext): Promise { loggerForTest(`processInitialize(${context.partitionId})`); partitionResultsMap.get(context.partitionId)!.initialized = true; } - async processClose(reason: CloseReason, context: PartitionContext) { + async processClose(reason: CloseReason, context: PartitionContext): Promise { loggerForTest(`processClose(${context.partitionId})`); partitionResultsMap.get(context.partitionId)!.closeReason = reason; } - async processEvents(events: ReceivedEventData[], context: PartitionContext) { + async processEvents(events: ReceivedEventData[], context: PartitionContext): Promise { partitionOwnershipArr.add(context.partitionId); const existingEvents = partitionResultsMap.get(context.partitionId)!.events; existingEvents.push(...events.map((event) => event.body)); } - async processError(err: Error, context: PartitionContext) { + async processError(err: Error, context: PartitionContext): Promise { loggerForTest(`processError(${context.partitionId})`); const errorName = (err as any).code; if (errorName === "ReceiverDisconnectedError") { @@ -1207,11 +1244,11 @@ describe("Event Processor", function(): void { // if stealing has occurred we just want to make sure that _all_ // the stealing has completed. - const isBalanced = (friendlyName: string) => { + const isBalanced = (friendlyName: string): boolean => { const n = Math.floor(partitionIds.length / 2); const numPartitions = partitionOwnershipMap.get(processorByName[friendlyName].id)! .length; - return numPartitions == n || numPartitions == n + 1; + return numPartitions === n || numPartitions === n + 1; }; if (!isBalanced(`processor-1`) || !isBalanced(`processor-2`)) { @@ -1234,8 +1271,8 @@ describe("Event Processor", function(): void { for (const partitionId of partitionIds) { const results = partitionResultsMap.get(partitionId)!; results.events.length.should.be.gte(1); - results.initialized.should.be.true; - (results.closeReason === CloseReason.Shutdown).should.be.true; + results.initialized.should.equal(true); + (results.closeReason === CloseReason.Shutdown).should.equal(true); } }); @@ -1258,20 +1295,20 @@ describe("Event Processor", function(): void { // The partitionProcess will need to add events to the partitionResultsMap as they are received class FooPartitionProcessor implements Required { - async processInitialize(context: PartitionContext) { + async processInitialize(context: PartitionContext): Promise { loggerForTest(`processInitialize(${context.partitionId})`); partitionResultsMap.get(context.partitionId)!.initialized = true; } - async processClose(reason: CloseReason, context: PartitionContext) { + async processClose(reason: CloseReason, context: PartitionContext): Promise { loggerForTest(`processClose(${context.partitionId})`); partitionResultsMap.get(context.partitionId)!.closeReason = reason; } - async processEvents(events: ReceivedEventData[], context: PartitionContext) { + async processEvents(events: ReceivedEventData[], context: PartitionContext): Promise { partitionOwnershipArr.add(context.partitionId); const existingEvents = partitionResultsMap.get(context.partitionId)!.events; existingEvents.push(...events.map((event) => event.body)); } - async processError(err: Error, context: PartitionContext) { + async processError(err: Error, context: PartitionContext): Promise { loggerForTest(`processError(${context.partitionId})`); const errorName = (err as any).code; if (errorName === "ReceiverDisconnectedError") { @@ -1364,11 +1401,11 @@ describe("Event Processor", function(): void { // if stealing has occurred we just want to make sure that _all_ // the stealing has completed. - const isBalanced = (friendlyName: string) => { + const isBalanced = (friendlyName: string): boolean => { const n = Math.floor(partitionIds.length / 2); const numPartitions = partitionOwnershipMap.get(processorByName[friendlyName].id)! .length; - return numPartitions == n || numPartitions == n + 1; + return numPartitions === n || numPartitions === n + 1; }; if (!isBalanced(`processor-1`) || !isBalanced(`processor-2`)) { @@ -1391,8 +1428,8 @@ describe("Event Processor", function(): void { for (const partitionId of partitionIds) { const results = partitionResultsMap.get(partitionId)!; results.events.length.should.be.gte(1); - results.initialized.should.be.true; - (results.closeReason === CloseReason.Shutdown).should.be.true; + results.initialized.should.equal(true); + (results.closeReason === CloseReason.Shutdown).should.equal(true); } }); @@ -1407,10 +1444,13 @@ describe("Event Processor", function(): void { // The partitionProcess will need to add events to the partitionResultsMap as they are received class FooPartitionProcessor { - async processEvents(_events: ReceivedEventData[], context: PartitionContext) { + async processEvents( + _events: ReceivedEventData[], + context: PartitionContext + ): Promise { partitionOwnershipArr.add(context.partitionId); } - async processError() { + async processError(): Promise { didError = true; } } @@ -1471,7 +1511,7 @@ describe("Event Processor", function(): void { } } - didError.should.be.false; + didError.should.equal(false); const n = Math.floor(partitionIds.length / 2); partitionOwnershipMap.get(processorByName[`processor-0`].id)!.length.should.oneOf([n, n + 1]); partitionOwnershipMap.get(processorByName[`processor-1`].id)!.length.should.oneOf([n, n + 1]); @@ -1487,10 +1527,15 @@ describe("Event Processor", function(): void { // The partitionProcess will need to add events to the partitionResultsMap as they are received class FooPartitionProcessor { - async processEvents(_events: ReceivedEventData[], context: PartitionContext) { + async processEvents( + _events: ReceivedEventData[], + context: PartitionContext + ): Promise { partitionOwnershipArr.add(context.partitionId); } - async processError() {} + async processError(): Promise { + /* no-op */ + } } // create messages @@ -1582,8 +1627,12 @@ describe("Event Processor", function(): void { claimedPartitions.add(partitionId); claimedPartitionsMap[eventProcessorId] = claimedPartitions; }, - async processEvents() {}, - async processError() {}, + async processEvents() { + /* no-op */ + }, + async processError() { + /* no-op */ + }, async processClose(reason, context) { const eventProcessorId: string = (context as any).eventProcessorId; const partitionId = context.partitionId; @@ -1672,10 +1721,10 @@ describe("Event Processor", function(): void { } // All partitions must be claimed. - const allPartitionsClaimed = + const innerAllPartitionsClaimed = aProcessorPartitions.size + bProcessorPartitions.size === partitionIds.length; - if (!allPartitionsClaimed) { + if (!innerAllPartitionsClaimed) { lastLoopError = { reason: "All partitions not claimed", partitionIds, @@ -1685,7 +1734,7 @@ describe("Event Processor", function(): void { }; } - return allPartitionsClaimed; + return innerAllPartitionsClaimed; } }); } catch (err) { @@ -1745,8 +1794,12 @@ describe("Event Processor", function(): void { claimedPartitions.add(partitionId); claimedPartitionsMap[eventProcessorId] = claimedPartitions; }, - async processEvents() {}, - async processError() {}, + async processEvents() { + /* no-op */ + }, + async processError() { + /* no-op */ + }, async processClose(reason, context) { const eventProcessorId: string = (context as any).eventProcessorId; const partitionId = context.partitionId; @@ -1835,10 +1888,10 @@ describe("Event Processor", function(): void { } // All partitions must be claimed. - const allPartitionsClaimed = + const innerAllPartitionsClaimed = aProcessorPartitions.size + bProcessorPartitions.size === partitionIds.length; - if (!allPartitionsClaimed) { + if (!innerAllPartitionsClaimed) { lastLoopError = { reason: "All partitions not claimed", partitionIds, @@ -1848,7 +1901,7 @@ describe("Event Processor", function(): void { }; } - return allPartitionsClaimed; + return innerAllPartitionsClaimed; } }); } catch (err) { @@ -1911,9 +1964,15 @@ function triggerAbortedSignalAfterNumCalls(maxCalls: number): AbortSignal { return false; }, - addEventListener: () => {}, - removeEventListener: () => {}, - onabort: () => {}, + addEventListener: () => { + /* no-op */ + }, + removeEventListener: () => { + /* no-op */ + }, + onabort: () => { + /* no-op */ + }, dispatchEvent: () => true }; diff --git a/sdk/eventhub/event-hubs/test/eventdata.spec.ts b/sdk/eventhub/event-hubs/test/eventdata.spec.ts index 66ce16c9e79b..d9aaf851d3d6 100644 --- a/sdk/eventhub/event-hubs/test/eventdata.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventdata.spec.ts @@ -35,12 +35,12 @@ const testSourceEventData: EventData = { properties: properties }; -const testEventData = fromRheaMessage(testMessage); const messageFromED = toRheaMessage(testSourceEventData); describe("EventData", function(): void { describe("fromRheaMessage", function(): void { it("populates body with the message body", function(): void { + const testEventData = fromRheaMessage(testMessage); testEventData.body.should.equal(testBody); }); diff --git a/sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts b/sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts index 927376da4d2b..2e862d818566 100644 --- a/sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts +++ b/sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts @@ -1,10 +1,12 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. +import chai from "chai"; import { PartitionOwnership } from "../src/eventProcessor"; import { BalancedLoadBalancingStrategy } from "../src/loadBalancerStrategies/balancedStrategy"; import { GreedyLoadBalancingStrategy } from "../src/loadBalancerStrategies/greedyStrategy"; import { UnbalancedLoadBalancingStrategy } from "../src/loadBalancerStrategies/unbalancedStrategy"; +const should = chai.should(); describe("LoadBalancingStrategy", () => { function createOwnershipMap( @@ -33,7 +35,7 @@ describe("LoadBalancingStrategy", () => { const lb = new UnbalancedLoadBalancingStrategy(); lb.getPartitionsToCliam("ownerId", m, ["1", "2", "3"]).should.deep.eq(["1", "2", "3"]); - m.should.be.empty; + should.equal(m.size, 0); }); it("claim partitions we already own", () => { @@ -201,7 +203,7 @@ describe("LoadBalancingStrategy", () => { // meet the minimum. const partitions = ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]; - const lb = new BalancedLoadBalancingStrategy(1000 * 60); + const lbs = new BalancedLoadBalancingStrategy(1000 * 60); // we'll do 4 consumers const initialOwnershipMap = createOwnershipMap({ @@ -220,7 +222,7 @@ describe("LoadBalancingStrategy", () => { "9": "d" }); - const requestedPartitions = lb.getPartitionsToCliam("c", initialOwnershipMap, partitions); + const requestedPartitions = lbs.getPartitionsToCliam("c", initialOwnershipMap, partitions); requestedPartitions.sort(); requestedPartitions.should.deep.equal( @@ -297,7 +299,7 @@ describe("LoadBalancingStrategy", () => { it("honors the partitionOwnershipExpirationIntervalInMs", () => { const intervalInMs = 1000; - const lb = new BalancedLoadBalancingStrategy(intervalInMs); + const lbs = new BalancedLoadBalancingStrategy(intervalInMs); const allPartitions = ["0", "1"]; const ownershipMap = createOwnershipMap({ "0": "b", @@ -305,14 +307,14 @@ describe("LoadBalancingStrategy", () => { }); // At this point, 'a' has its fair share of partitions, and none should be returned. - let partitionsToOwn = lb.getPartitionsToCliam("a", ownershipMap, allPartitions); + let partitionsToOwn = lbs.getPartitionsToCliam("a", ownershipMap, allPartitions); partitionsToOwn.length.should.equal(0, "Expected to not claim any new partitions."); // Change the ownership of partition "0" so it is older than the interval. const ownership = ownershipMap.get("0")!; ownership.lastModifiedTimeInMs = Date.now() - (intervalInMs + 1); // Add 1 to the interval to ensure it has just expired. - partitionsToOwn = lb.getPartitionsToCliam("a", ownershipMap, allPartitions); + partitionsToOwn = lbs.getPartitionsToCliam("a", ownershipMap, allPartitions); partitionsToOwn.should.deep.equal(["0"]); }); }); @@ -441,7 +443,7 @@ describe("LoadBalancingStrategy", () => { allPartitions.push(`${i}`); } - let partitionsToOwn = lb.getPartitionsToCliam( + const partitionsToOwn = lb.getPartitionsToCliam( "a", createOwnershipMap({ "0": "", @@ -481,7 +483,7 @@ describe("LoadBalancingStrategy", () => { // meet the minimum. const partitions = ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]; - const lb = new BalancedLoadBalancingStrategy(1000 * 60); + const lbs = new BalancedLoadBalancingStrategy(1000 * 60); // we'll do 4 consumers const initialOwnershipMap = createOwnershipMap({ @@ -500,7 +502,7 @@ describe("LoadBalancingStrategy", () => { "9": "d" }); - const requestedPartitions = lb.getPartitionsToCliam("c", initialOwnershipMap, partitions); + const requestedPartitions = lbs.getPartitionsToCliam("c", initialOwnershipMap, partitions); requestedPartitions.sort(); requestedPartitions.should.deep.equal( @@ -577,7 +579,7 @@ describe("LoadBalancingStrategy", () => { it("honors the partitionOwnershipExpirationIntervalInMs", () => { const intervalInMs = 1000; - const lb = new GreedyLoadBalancingStrategy(intervalInMs); + const lbs = new GreedyLoadBalancingStrategy(intervalInMs); const allPartitions = ["0", "1", "2", "3"]; const ownershipMap = createOwnershipMap({ "0": "b", @@ -585,7 +587,7 @@ describe("LoadBalancingStrategy", () => { }); // At this point, "a" should only grab 1 partition since both "a" and "b" should end up with 2 partitions each. - let partitionsToOwn = lb.getPartitionsToCliam("a", ownershipMap, allPartitions); + let partitionsToOwn = lbs.getPartitionsToCliam("a", ownershipMap, allPartitions); partitionsToOwn.length.should.equal(1, "Expected to claim 1 new partitions."); // Change the ownership of partition "0" so it is older than the interval. @@ -595,7 +597,7 @@ describe("LoadBalancingStrategy", () => { // At this point, "a" should grab partitions 0, 2, and 3. // This is because "b" only owned 1 partition and that claim is expired, // so "a" as treated as if it is the only owner. - partitionsToOwn = lb.getPartitionsToCliam("a", ownershipMap, allPartitions); + partitionsToOwn = lbs.getPartitionsToCliam("a", ownershipMap, allPartitions); partitionsToOwn.sort(); partitionsToOwn.should.deep.equal(["0", "2", "3"]); }); diff --git a/sdk/eventhub/event-hubs/test/misc.spec.ts b/sdk/eventhub/event-hubs/test/misc.spec.ts index ba0fb25f7125..f693c0a92059 100644 --- a/sdk/eventhub/event-hubs/test/misc.spec.ts +++ b/sdk/eventhub/event-hubs/test/misc.spec.ts @@ -335,12 +335,6 @@ describe("Misc tests", function(): void { it("should consistently send messages with partitionkey to a partitionId", async function(): Promise< void > { - const consumerClient = new EventHubConsumerClient( - EventHubConsumerClient.defaultConsumerGroupName, - service.connectionString!, - service.path - ); - const { subscriptionEventHandler, startPosition diff --git a/sdk/eventhub/event-hubs/test/partitionPump.spec.ts b/sdk/eventhub/event-hubs/test/partitionPump.spec.ts index 1a2ec1c262b8..d88a4cd402be 100644 --- a/sdk/eventhub/event-hubs/test/partitionPump.spec.ts +++ b/sdk/eventhub/event-hubs/test/partitionPump.spec.ts @@ -22,10 +22,6 @@ describe("PartitionPump", () => { public spanOptions: SpanOptions | undefined; public spanName: string | undefined; - constructor() { - super(); - } - startSpan(nameArg: string, optionsArg?: SpanOptions): TestSpan { this.spanName = nameArg; this.spanOptions = optionsArg; @@ -105,10 +101,12 @@ describe("PartitionPump", () => { const tracer = new TestTracer(); const span = tracer.startSpan("whatever"); - await trace(async () => {}, span); + await trace(async () => { + /* no-op */ + }, span); span.status!.code.should.equal(CanonicalCode.OK); - span.endCalled.should.be.ok; + should.equal(span.endCalled, true); }); it("trace - throws", async () => { @@ -121,7 +119,7 @@ describe("PartitionPump", () => { span.status!.code.should.equal(CanonicalCode.UNKNOWN); span.status!.message!.should.equal("error thrown from fn"); - span.endCalled.should.be.ok; + should.equal(span.endCalled, true); }); }); }); diff --git a/sdk/eventhub/event-hubs/test/receiver.spec.ts b/sdk/eventhub/event-hubs/test/receiver.spec.ts index f20d4f9b9d16..971a4be50e9b 100644 --- a/sdk/eventhub/event-hubs/test/receiver.spec.ts +++ b/sdk/eventhub/event-hubs/test/receiver.spec.ts @@ -63,7 +63,7 @@ describe("EventHubConsumerClient", function(): void { let subscription: Subscription | undefined; await new Promise((resolve, reject) => { subscription = consumerClient.subscribe( - // @ts-expect-error + // @ts-expect-error Testing the value 0 can be provided as a number for JS users. 0, { processEvents: async () => { @@ -704,7 +704,9 @@ describe("EventHubConsumerClient", function(): void { let subscription: Subscription | undefined; const caughtErr = await new Promise((resolve) => { subscription = badConsumerClient.subscribe({ - processEvents: async () => {}, + processEvents: async () => { + /* no-op */ + }, processError: async (err) => { resolve(err); } @@ -723,7 +725,9 @@ describe("EventHubConsumerClient", function(): void { let subscription: Subscription | undefined; const caughtErr = await new Promise((resolve) => { subscription = consumerClient.subscribe("boo", { - processEvents: async () => {}, + processEvents: async () => { + /* no-op */ + }, processError: async (err) => { resolve(err); } diff --git a/sdk/eventhub/event-hubs/test/sender.spec.ts b/sdk/eventhub/event-hubs/test/sender.spec.ts index 04260cc55c45..b4de88c43614 100644 --- a/sdk/eventhub/event-hubs/test/sender.spec.ts +++ b/sdk/eventhub/event-hubs/test/sender.spec.ts @@ -92,7 +92,7 @@ describe("EventHub Sender", function(): void { it("partitionId is set as expected when it is 0 i.e. falsy", async () => { const batch = await producerClient.createBatch({ - //@ts-expect-error + // @ts-expect-error Testing the value 0 is not ignored. partitionId: 0 }); should.equal(batch.partitionId, "0"); @@ -107,7 +107,7 @@ describe("EventHub Sender", function(): void { it("partitionKey is set as expected when it is 0 i.e. falsy", async () => { const batch = await producerClient.createBatch({ - //@ts-expect-error + // @ts-expect-error Testing the value 0 is not ignored. partitionKey: 0 }); should.equal(batch.partitionKey, "0"); @@ -129,18 +129,15 @@ describe("EventHub Sender", function(): void { should.not.exist(batch.partitionKey); batch.maxSizeInBytes.should.be.gt(0); - batch.tryAdd({ body: list[0] }).should.be.ok; - batch.tryAdd({ body: list[1] }).should.not.be.ok; // The Mike message will be rejected - it's over the limit. - batch.tryAdd({ body: list[2] }).should.be.ok; // Marie should get added"; + should.equal(batch.tryAdd({ body: list[0] }), true); + should.equal(batch.tryAdd({ body: list[1] }), false); // The Mike message will be rejected - it's over the limit. + should.equal(batch.tryAdd({ body: list[2] }), true); // Marie should get added"; - const { - subscriptionEventHandler, - startPosition - } = await SubscriptionHandlerForTests.startingFromHere(producerClient); + const { subscriptionEventHandler } = await SubscriptionHandlerForTests.startingFromHere( + producerClient + ); - const subscriber = consumerClient.subscribe("0", subscriptionEventHandler, { - startPosition - }); + const subscriber = consumerClient.subscribe("0", subscriptionEventHandler, { startPosition }); await producerClient.sendBatch(batch); let receivedEvents; @@ -165,7 +162,7 @@ describe("EventHub Sender", function(): void { const list = ["Albert", "Marie"]; const batch = await producerClient.createBatch({ - //@ts-expect-error + // @ts-expect-error Testing the value 0 is not ignored. partitionId: 0 }); @@ -173,17 +170,14 @@ describe("EventHub Sender", function(): void { should.not.exist(batch.partitionKey); batch.maxSizeInBytes.should.be.gt(0); - batch.tryAdd({ body: list[0] }).should.be.ok; - batch.tryAdd({ body: list[1] }).should.be.ok; + should.equal(batch.tryAdd({ body: list[0] }), true); + should.equal(batch.tryAdd({ body: list[1] }), true); - const { - subscriptionEventHandler, - startPosition - } = await SubscriptionHandlerForTests.startingFromHere(producerClient); + const { subscriptionEventHandler } = await SubscriptionHandlerForTests.startingFromHere( + producerClient + ); - const subscriber = consumerClient.subscribe("0", subscriptionEventHandler, { - startPosition - }); + const subscriber = consumerClient.subscribe("0", subscriptionEventHandler, { startPosition }); await producerClient.sendBatch(batch); let receivedEvents; @@ -206,7 +200,7 @@ describe("EventHub Sender", function(): void { const list = ["Albert", "Marie"]; const batch = await producerClient.createBatch({ - //@ts-expect-error + // @ts-expect-error Testing the value 0 is not ignored. partitionKey: 0 }); @@ -214,13 +208,12 @@ describe("EventHub Sender", function(): void { should.not.exist(batch.partitionId); batch.maxSizeInBytes.should.be.gt(0); - batch.tryAdd({ body: list[0] }).should.be.ok; - batch.tryAdd({ body: list[1] }).should.be.ok; + should.equal(batch.tryAdd({ body: list[0] }), true); + should.equal(batch.tryAdd({ body: list[1] }), true); - const { - subscriptionEventHandler, - startPosition - } = await SubscriptionHandlerForTests.startingFromHere(producerClient); + const { subscriptionEventHandler } = await SubscriptionHandlerForTests.startingFromHere( + producerClient + ); const subscriber = consumerClient.subscribe(subscriptionEventHandler, { startPosition @@ -255,13 +248,15 @@ describe("EventHub Sender", function(): void { batch.maxSizeInBytes.should.be.gt(0); - batch.tryAdd(list[0]).should.be.ok; - batch.tryAdd(list[1]).should.be.ok; - batch.tryAdd(list[2]).should.be.ok; + should.equal(batch.tryAdd(list[0]), true); + should.equal(batch.tryAdd(list[1]), true); + should.equal(batch.tryAdd(list[2]), true); const receivedEvents: ReceivedEventData[] = []; - let waitUntilEventsReceivedResolver: Function; - const waitUntilEventsReceived = new Promise((r) => (waitUntilEventsReceivedResolver = r)); + let waitUntilEventsReceivedResolver: (value?: any) => void; + const waitUntilEventsReceived = new Promise( + (resolve) => (waitUntilEventsReceivedResolver = resolve) + ); const sequenceNumber = (await consumerClient.getPartitionProperties("0")) .lastEnqueuedSequenceNumber; @@ -269,7 +264,9 @@ describe("EventHub Sender", function(): void { const subscriber = consumerClient.subscribe( "0", { - async processError() {}, + async processError() { + /* no-op */ + }, async processEvents(events) { receivedEvents.push(...events); if (receivedEvents.length >= 3) { @@ -507,10 +504,9 @@ describe("EventHub Sender", function(): void { describe("Multiple sendBatch calls", function(): void { it("should be sent successfully in parallel", async function(): Promise { - const { - subscriptionEventHandler, - startPosition - } = await SubscriptionHandlerForTests.startingFromHere(consumerClient); + const { subscriptionEventHandler } = await SubscriptionHandlerForTests.startingFromHere( + consumerClient + ); const promises = []; for (let i = 0; i < 5; i++) { @@ -740,12 +736,14 @@ describe("EventHub Sender", function(): void { it("should be sent successfully", async () => { const data: EventData[] = [{ body: "Hello World 1" }, { body: "Hello World 2" }]; const receivedEvents: ReceivedEventData[] = []; - let receivingResolver: Function; + let receivingResolver: (value?: unknown) => void; - const receivingPromise = new Promise((r) => (receivingResolver = r)); + const receivingPromise = new Promise((resolve) => (receivingResolver = resolve)); const subscription = consumerClient.subscribe( { - async processError() {}, + async processError() { + /* no-op */ + }, async processEvents(events) { receivedEvents.push(...events); receivingResolver(); @@ -769,11 +767,13 @@ describe("EventHub Sender", function(): void { it("should be sent successfully with partitionKey", async () => { const data: EventData[] = [{ body: "Hello World 1" }, { body: "Hello World 2" }]; const receivedEvents: ReceivedEventData[] = []; - let receivingResolver: Function; - const receivingPromise = new Promise((r) => (receivingResolver = r)); + let receivingResolver: (value?: unknown) => void; + const receivingPromise = new Promise((resolve) => (receivingResolver = resolve)); const subscription = consumerClient.subscribe( { - async processError() {}, + async processError() { + /* no-op */ + }, async processEvents(events) { receivedEvents.push(...events); receivingResolver(); @@ -801,12 +801,14 @@ describe("EventHub Sender", function(): void { const partitionId = "0"; const data: EventData[] = [{ body: "Hello World 1" }, { body: "Hello World 2" }]; const receivedEvents: ReceivedEventData[] = []; - let receivingResolver: Function; - const receivingPromise = new Promise((r) => (receivingResolver = r)); + let receivingResolver: (value?: unknown) => void; + const receivingPromise = new Promise((resolve) => (receivingResolver = resolve)); const subscription = consumerClient.subscribe( partitionId, { - async processError() {}, + async processError() { + /* no-op */ + }, async processEvents(events) { receivedEvents.push(...events); receivingResolver(); @@ -1033,7 +1035,7 @@ describe("EventHub Sender", function(): void { it("throws an error if partitionId and partitionKey are set and partitionId is 0 i.e. falsy", async () => { try { await producerClient.createBatch({ - //@ts-expect-error + // @ts-expect-error Testing the value 0 is not ignored. partitionId: 0, partitionKey: "boo" }); @@ -1049,7 +1051,7 @@ describe("EventHub Sender", function(): void { try { await producerClient.createBatch({ partitionId: "1", - //@ts-expect-error + // @ts-expect-error Testing the value 0 is not ignored. partitionKey: 0 }); throw new Error("Test failure"); @@ -1202,7 +1204,7 @@ describe("EventHub Sender", function(): void { it("throws an error if partitionId and partitionKey are set with partitionId set to 0 i.e. falsy", async () => { const badOptions: SendBatchOptions = { partitionKey: "foo", - //@ts-expect-error + // @ts-expect-error Testing the value 0 is not ignored. partitionId: 0 }; const batch = [{ body: "Hello 1" }, { body: "Hello 2" }]; @@ -1217,7 +1219,7 @@ describe("EventHub Sender", function(): void { }); it("throws an error if partitionId and partitionKey are set with partitionKey set to 0 i.e. falsy", async () => { const badOptions: SendBatchOptions = { - //@ts-expect-error + // @ts-expect-error Testing the value 0 is not ignored. partitionKey: 0, partitionId: "0" }; diff --git a/sdk/eventhub/event-hubs/test/utils/fakeSubscriptionEventHandlers.ts b/sdk/eventhub/event-hubs/test/utils/fakeSubscriptionEventHandlers.ts index fe486579be14..6d49eb8f7bfb 100644 --- a/sdk/eventhub/event-hubs/test/utils/fakeSubscriptionEventHandlers.ts +++ b/sdk/eventhub/event-hubs/test/utils/fakeSubscriptionEventHandlers.ts @@ -7,7 +7,7 @@ export class FakeSubscriptionEventHandlers implements SubscriptionEventHandlers public events: Map = new Map(); public errors: Error[] = []; - async processEvents(events: ReceivedEventData[], context: PartitionContext) { + async processEvents(events: ReceivedEventData[], context: PartitionContext): Promise { for (const event of events) { let receivedEvents = this.events.get(context.partitionId); @@ -20,7 +20,7 @@ export class FakeSubscriptionEventHandlers implements SubscriptionEventHandlers } } - async processError(err: Error) { + async processError(err: Error): Promise { this.errors.push(err); } } diff --git a/sdk/eventhub/event-hubs/test/utils/logHelpers.ts b/sdk/eventhub/event-hubs/test/utils/logHelpers.ts index b45e3925f41a..b83c64767e93 100644 --- a/sdk/eventhub/event-hubs/test/utils/logHelpers.ts +++ b/sdk/eventhub/event-hubs/test/utils/logHelpers.ts @@ -24,7 +24,7 @@ export class LogTester { debugModule.enable(loggers.map((logger) => logger.namespace).join(",")); } - assert() { + assert(): void { this.close(); if (this._expectedMessages.length > 0) { @@ -32,7 +32,7 @@ export class LogTester { } } - private check(message: string) { + private check(message: string): void { for (let i = 0; i < this._expectedMessages.length; ++i) { const expectedMessage = this._expectedMessages[i]; if (typeof expectedMessage === "string") { @@ -49,7 +49,7 @@ export class LogTester { } } - private close() { + private close(): void { for (const logger of this._attachedLoggers) { logger.logger.enabled = logger.wasEnabled; logger.logger.log = logger.previousLogFunction; @@ -59,7 +59,7 @@ export class LogTester { debugModule.disable(); } - private attach(logger: debugModule.Debugger) { + private attach(logger: debugModule.Debugger): void { this._attachedLoggers.push({ logger, wasEnabled: logger.enabled, diff --git a/sdk/eventhub/event-hubs/test/utils/receivedMessagesTester.ts b/sdk/eventhub/event-hubs/test/utils/receivedMessagesTester.ts index 0604dbcc4a5c..1b6239b0167c 100644 --- a/sdk/eventhub/event-hubs/test/utils/receivedMessagesTester.ts +++ b/sdk/eventhub/event-hubs/test/utils/receivedMessagesTester.ts @@ -85,7 +85,7 @@ export class ReceivedMessagesTester implements Required { const expectedMessagePrefix = `EventHubConsumerClient test - ${Date.now().toString()}`; const messagesToSend = []; @@ -162,15 +162,15 @@ export class ReceivedMessagesTester implements Required { this.data.set(context.partitionId, {}); } - async processClose(reason: CloseReason, context: PartitionContext) { + async processClose(reason: CloseReason, context: PartitionContext): Promise { this.data.get(context.partitionId)!.closeReason = reason; } - async processEvents(events: ReceivedEventData[], context: PartitionContext) { + async processEvents(events: ReceivedEventData[], context: PartitionContext): Promise { // by default we don't fill out the lastEnqueuedEventInfo field (they have to enable it // explicitly in the options for the processor). should.not.exist(context.lastEnqueuedEventProperties); @@ -79,7 +79,7 @@ export class SubscriptionHandlerForTests implements Required { loggerForTest(`Error in partition ${context.partitionId}: ${err}`); should.exist( context.partitionId, @@ -110,7 +110,8 @@ export class SubscriptionHandlerForTests implements Required { - const akey = `${a.partitionId}:${a.event.body}`; - const bkey = `${b.partitionId}:${b.event.body}`; - return akey.localeCompare(bkey); - }); - - return this.events; + isWaiting = false; } } + + this.events.sort((a, b) => { + const akey = `${a.partitionId}:${a.event.body}`; + const bkey = `${b.partitionId}:${b.event.body}`; + return akey.localeCompare(bkey); + }); + + return this.events; } async waitForEvents( @@ -147,7 +150,7 @@ export class SubscriptionHandlerForTests implements Required