Skip to content

Commit

Permalink
[event-hubs] fix eslint errors (round 1) (#13285)
Browse files Browse the repository at this point in the history
Round 1 addressing #10777
Replaces #13013 

This leaves 5 errors and 15 warnings (excluding TSDoc) after this PR is merged.

- 4 'Promise executor functions should not be async' errors. Fixing this will take a lot of care and I would want these to be reviewed on their own so they don't get lost in the noise.
- 1 'N is already defined'. This is due to a constant matching an interface name.
  • Loading branch information
chradek authored Jan 21, 2021
1 parent 3f04d2f commit 379a18d
Show file tree
Hide file tree
Showing 34 changed files with 588 additions and 423 deletions.
33 changes: 15 additions & 18 deletions sdk/eventhub/event-hubs/src/connectionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ export interface ConnectionContextOptions extends EventHubClientOptions {
/**
* Helper type to get the names of all the functions on an object.
*/
type FunctionPropertyNames<T> = { [K in keyof T]: T[K] extends Function ? K : never }[keyof T];
type FunctionPropertyNames<T> = { [K in keyof T]: T[K] extends Function ? K : never }[keyof T]; // eslint-disable-line @typescript-eslint/ban-types
/**
* Helper type to get the types of all the functions on an object.
*/
Expand Down Expand Up @@ -387,43 +387,40 @@ export namespace ConnectionContext {
}
};

function addConnectionListeners(connection: Connection) {
function addConnectionListeners(connection: Connection): void {
// Add listeners on the connection object.
connection.on(ConnectionEvents.connectionOpen, onConnectionOpen);
connection.on(ConnectionEvents.disconnected, onDisconnected);
connection.on(ConnectionEvents.protocolError, protocolError);
connection.on(ConnectionEvents.error, error);
}

function cleanConnectionContext(connectionContext: ConnectionContext) {
function cleanConnectionContext(context: ConnectionContext): Promise<void> {
// Remove listeners from the connection object.
connectionContext.connection.removeListener(
ConnectionEvents.connectionOpen,
onConnectionOpen
);
connectionContext.connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
connectionContext.connection.removeListener(ConnectionEvents.protocolError, protocolError);
connectionContext.connection.removeListener(ConnectionEvents.error, error);
context.connection.removeListener(ConnectionEvents.connectionOpen, onConnectionOpen);
context.connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
context.connection.removeListener(ConnectionEvents.protocolError, protocolError);
context.connection.removeListener(ConnectionEvents.error, error);
// Close the connection
return connectionContext.connection.close();
return context.connection.close();
}

async function refreshConnection(connectionContext: ConnectionContext) {
const originalConnectionId = connectionContext.connectionId;
async function refreshConnection(context: ConnectionContext): Promise<void> {
const originalConnectionId = context.connectionId;
try {
await cleanConnectionContext(connectionContext);
await cleanConnectionContext(context);
} catch (err) {
logger.verbose(
`[${connectionContext.connectionId}] There was an error closing the connection before reconnecting: %O`,
`[${context.connectionId}] There was an error closing the connection before reconnecting: %O`,
err
);
}

// Create a new connection, id, locks, and cbs client.
connectionContext.refreshConnection();
addConnectionListeners(connectionContext.connection);
context.refreshConnection();
addConnectionListeners(context.connection);
logger.verbose(
`The connection "${originalConnectionId}" has been updated to "${connectionContext.connectionId}".`
`The connection "${originalConnectionId}" has been updated to "${context.connectionId}".`
);
}

Expand Down
9 changes: 5 additions & 4 deletions sdk/eventhub/event-hubs/src/dataTransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { message } from "rhea-promise";
import isBuffer from "is-buffer";
import { Buffer } from "buffer";
import { logErrorStackTrace, logger } from "./log";
import { isObjectWithProperties } from "./util/typeGuards";

/**
* The default data transformer that will be used by the Azure SDK.
Expand All @@ -23,15 +24,15 @@ export const defaultDataTransformer = {
* - content: The given AMQP message body as a Buffer.
* - multiple: true | undefined.
*/
encode(body: any): any {
encode(body: unknown): any {
let result: any;
if (isBuffer(body)) {
result = message.data_section(body);
} else {
// string, undefined, null, boolean, array, object, number should end up here
// coercing undefined to null as that will ensure that null value will be given to the
// customer on receive.
if (body === undefined) body = null; // tslint:disable-line
if (body === undefined) body = null;
try {
const bodyStr = JSON.stringify(body);
result = message.data_section(Buffer.from(bodyStr, "utf8"));
Expand All @@ -56,10 +57,10 @@ export const defaultDataTransformer = {
* @param {DataSection} body The AMQP message body
* @return {*} decoded body or the given body as-is.
*/
decode(body: any): any {
decode(body: unknown): any {
let processedBody: any = body;
try {
if (body.content && isBuffer(body.content)) {
if (isObjectWithProperties(body, ["content"]) && isBuffer(body.content)) {
// This indicates that we are getting the AMQP described type. Let us try decoding it.
processedBody = body.content;
}
Expand Down
3 changes: 2 additions & 1 deletion sdk/eventhub/event-hubs/src/eventData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import { DeliveryAnnotations, Message as RheaMessage, MessageAnnotations } from "rhea-promise";
import { Constants } from "@azure/core-amqp";
import { isDefined } from "./util/typeGuards";

/**
* Describes the delivery annotations.
Expand Down Expand Up @@ -203,7 +204,7 @@ export function toRheaMessage(data: EventData, partitionKey?: string): RheaMessa
if (data.properties) {
msg.application_properties = data.properties;
}
if (partitionKey != undefined) {
if (isDefined(partitionKey)) {
msg.message_annotations[Constants.partitionKey] = partitionKey;
// Event Hub service cannot route messages to a specific partition based on the partition key
// if AMQP message header is an empty object. Hence we make sure that header is always present
Expand Down
9 changes: 5 additions & 4 deletions sdk/eventhub/event-hubs/src/eventDataBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { Span, SpanContext } from "@opentelemetry/api";
import { TRACEPARENT_PROPERTY, instrumentEventData } from "./diagnostics/instrumentEventData";
import { createMessageSpan } from "./diagnostics/messageSpan";
import { defaultDataTransformer } from "./dataTransformer";
import { isDefined, isObjectWithProperties } from "./util/typeGuards";

/**
* The amount of bytes to reserve as overhead for a small message.
Expand All @@ -29,9 +30,9 @@ const smallMessageMaxBytes = 255;
* @internal
* @hidden
*/
export function isEventDataBatch(eventDataBatch: any): eventDataBatch is EventDataBatch {
export function isEventDataBatch(eventDataBatch: unknown): eventDataBatch is EventDataBatch {
return (
eventDataBatch &&
isObjectWithProperties(eventDataBatch, ["count", "sizeInBytes", "tryAdd"]) &&
typeof eventDataBatch.tryAdd === "function" &&
typeof eventDataBatch.count === "number" &&
typeof eventDataBatch.sizeInBytes === "number"
Expand Down Expand Up @@ -192,8 +193,8 @@ export class EventDataBatchImpl implements EventDataBatch {
) {
this._context = context;
this._maxSizeInBytes = maxSizeInBytes;
this._partitionKey = partitionKey != undefined ? String(partitionKey) : partitionKey;
this._partitionId = partitionId != undefined ? String(partitionId) : partitionId;
this._partitionKey = isDefined(partitionKey) ? String(partitionKey) : partitionKey;
this._partitionId = isDefined(partitionId) ? String(partitionId) : partitionId;
this._sizeInBytes = 0;
this._count = 0;
}
Expand Down
6 changes: 3 additions & 3 deletions sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ export class EventHubConsumerClient {
private createEventProcessorForAllPartitions(
subscriptionEventHandlers: SubscriptionEventHandlers,
options?: SubscribeOptions
) {
): { targetedPartitionId: string; eventProcessor: EventProcessor } {
this._partitionGate.add("all");

if (this._userChoseCheckpointStore) {
Expand Down Expand Up @@ -569,7 +569,7 @@ export class EventHubConsumerClient {
partitionId: string,
eventHandlers: SubscriptionEventHandlers,
options?: SubscribeOptions
) {
): { targetedPartitionId: string; eventProcessor: EventProcessor } {
this._partitionGate.add(partitionId);

const subscribeOptions = options as SubscribeOptions | undefined;
Expand Down Expand Up @@ -607,7 +607,7 @@ export class EventHubConsumerClient {
subscriptionEventHandlers: SubscriptionEventHandlers,
checkpointStore: CheckpointStore,
options: FullEventProcessorOptions
) {
): EventProcessor {
return new EventProcessor(
this._consumerGroup,
connectionContext,
Expand Down
19 changes: 10 additions & 9 deletions sdk/eventhub/event-hubs/src/eventHubProducerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
SendBatchOptions
} from "./models/public";
import { throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing } from "./util/error";
import { isDefined } from "./util/typeGuards";
import { getParentSpan, OperationOptions } from "./util/operationOptions";

/**
Expand Down Expand Up @@ -80,7 +81,7 @@ export class EventHubProducerClient {
* - `webSocketOptions`: Configures the channelling of the AMQP connection over Web Sockets.
* - `userAgent` : A string to append to the built in user agent string that is passed to the service.
*/
constructor(connectionString: string, options?: EventHubClientOptions);
constructor(connectionString: string, options?: EventHubClientOptions); // eslint-disable-line @azure/azure-sdk/ts-naming-options
/**
* The `EventHubProducerClient` class is used to send events to an Event Hub.
* Use the `options` parmeter to configure retry policy or proxy settings.
Expand All @@ -94,7 +95,7 @@ export class EventHubProducerClient {
* - `webSocketOptions`: Configures the channelling of the AMQP connection over Web Sockets.
* - `userAgent` : A string to append to the built in user agent string that is passed to the service.
*/
constructor(connectionString: string, eventHubName: string, options?: EventHubClientOptions);
constructor(connectionString: string, eventHubName: string, options?: EventHubClientOptions); // eslint-disable-line @azure/azure-sdk/ts-naming-options
/**
* The `EventHubProducerClient` class is used to send events to an Event Hub.
* Use the `options` parmeter to configure retry policy or proxy settings.
Expand All @@ -113,13 +114,13 @@ export class EventHubProducerClient {
fullyQualifiedNamespace: string,
eventHubName: string,
credential: TokenCredential,
options?: EventHubClientOptions
options?: EventHubClientOptions // eslint-disable-line @azure/azure-sdk/ts-naming-options
);
constructor(
fullyQualifiedNamespaceOrConnectionString1: string,
eventHubNameOrOptions2?: string | EventHubClientOptions,
credentialOrOptions3?: TokenCredential | EventHubClientOptions,
options4?: EventHubClientOptions
options4?: EventHubClientOptions // eslint-disable-line @azure/azure-sdk/ts-naming-options
) {
this._context = createConnectionContext(
fullyQualifiedNamespaceOrConnectionString1,
Expand Down Expand Up @@ -174,7 +175,7 @@ export class EventHubProducerClient {
async createBatch(options: CreateBatchOptions = {}): Promise<EventDataBatch> {
throwErrorIfConnectionClosed(this._context);

if (options.partitionId != undefined && options.partitionKey != undefined) {
if (isDefined(options.partitionId) && isDefined(options.partitionKey)) {
throw new Error("partitionId and partitionKey cannot both be set when creating a batch");
}

Expand Down Expand Up @@ -260,7 +261,7 @@ export class EventHubProducerClient {
* @throws MessagingError if an error is encountered while sending a message.
* @throws Error if the underlying connection or sender has been closed.
*/
async sendBatch(batch: EventDataBatch, options?: OperationOptions): Promise<void>;
async sendBatch(batch: EventDataBatch, options?: OperationOptions): Promise<void>; // eslint-disable-line @azure/azure-sdk/ts-naming-options
async sendBatch(
batch: EventDataBatch | EventData[],
options: SendBatchOptions | OperationOptions = {}
Expand Down Expand Up @@ -316,16 +317,16 @@ export class EventHubProducerClient {
}
}
}
if (partitionId != undefined && partitionKey != undefined) {
if (isDefined(partitionId) && isDefined(partitionKey)) {
throw new Error(
`The partitionId (${partitionId}) and partitionKey (${partitionKey}) cannot both be specified.`
);
}

if (partitionId != undefined) {
if (isDefined(partitionId)) {
partitionId = String(partitionId);
}
if (partitionKey != undefined) {
if (isDefined(partitionKey)) {
partitionKey = String(partitionKey);
}

Expand Down
10 changes: 7 additions & 3 deletions sdk/eventhub/event-hubs/src/eventHubReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,10 @@ export class EventHubReceiver extends LinkEntity {
await this.abort();
}
} catch (err) {
return this._onError === onError && onError(err);
if (this._onError === onError) {
onError(err);
}
return;
}
} else {
logger.verbose(
Expand All @@ -478,6 +481,7 @@ export class EventHubReceiver extends LinkEntity {
0
);
this._addCredit(creditsToAdd);
return;
})
.catch((err) => {
// something really unexpected happened, so attempt to call user's error handler
Expand Down Expand Up @@ -700,13 +704,13 @@ export class EventHubReceiver extends LinkEntity {
try {
await this.close();
} finally {
return reject(new AbortError("The receive operation has been cancelled by the user."));
reject(new AbortError("The receive operation has been cancelled by the user."));
}
};

// operation has been cancelled, so exit immediately
if (abortSignal && abortSignal.aborted) {
return await rejectOnAbort();
return rejectOnAbort();
}

// updates the prefetch count so that the baseConsumer adds
Expand Down
Loading

0 comments on commit 379a18d

Please sign in to comment.