Skip to content

Commit

Permalink
Add support for MQTT event handler (#31072)
Browse files Browse the repository at this point in the history
Add support for MQTT event handler

---------

Co-authored-by: chuongnguyen <chuongnguyen@microsoft.com>
  • Loading branch information
cqnguy23 and chuongnguyen authored Oct 1, 2024
1 parent 642b48c commit 377c46b
Show file tree
Hide file tree
Showing 15 changed files with 1,228 additions and 24 deletions.
2 changes: 2 additions & 0 deletions sdk/web-pubsub/web-pubsub-express/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features Added

- Add support for MQTT event handling

### Breaking Changes

### Bugs Fixed
Expand Down
165 changes: 165 additions & 0 deletions sdk/web-pubsub/web-pubsub-express/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,171 @@ app.listen(3000, () =>
);
```

### Handle the `connect` request and reject the connection if auth failed
```js
const express = require("express");

const { WebPubSubEventHandler } = require("@azure/web-pubsub-express");
const handler = new WebPubSubEventHandler("chat", {
handleConnect: (req, res) => {
// auth the connection and reject the connection if auth failed
res.fail(401, "Unauthorized");
// the following method is also a valid approach
// res.failWith({ code: 401, detail: "Unauthorized" });
},
allowedEndpoints: ["https://<yourAllowedService>.webpubsub.azure.com"]
});

const app = express();

app.use(handler.getMiddleware());

app.listen(3000, () =>
console.log(`Azure WebPubSub Upstream ready at http://localhost:3000${handler.path}`)
);
```

### Handle the `connected` request

```js
const express = require("express");

const { WebPubSubEventHandler } = require("@azure/web-pubsub-express");
const handler = new WebPubSubEventHandler("chat", {
onConnected: (connectedRequest) => {
// Your onConnected logic goes here
},
allowedEndpoints: ["https://<yourAllowedService>.webpubsub.azure.com"]
});

const app = express();

app.use(handler.getMiddleware());

app.listen(3000, () =>
console.log(`Azure WebPubSub Upstream ready at http://localhost:3000${handler.path}`)
);
```

### Handle the `onDisconnected` request

```js
const express = require("express");

const { WebPubSubEventHandler } = require("@azure/web-pubsub-express");
const handler = new WebPubSubEventHandler("chat", {
onDisconnected: (disconnectedRequest) => {
// Your onDisconnected logic goes here
},
allowedEndpoints: ["https://<yourAllowedService>.webpubsub.azure.com"]
});

const app = express();

app.use(handler.getMiddleware());

app.listen(3000, () =>
console.log(`Azure WebPubSub Upstream ready at http://localhost:3000${handler.path}`)
);
```

### Handle the `connect` request for mqtt and assign `<userId>` and `<mqtt>` properties
```js
const express = require("express");

const { WebPubSubEventHandler } = require("@azure/web-pubsub-express");
const handler = new WebPubSubEventHandler("chat", {
handleConnect: (req, res) => {
if (req.context.clientProtocol === "mqtt") { // return mqtt response when request is of MQTT kind
// get connect request as mqtt request and print it
const mqttRequest = req as MqttConnectRequest;
console.log(mqttRequest);

// auth the connection and return mqtt response
res.success({
userId: "user1",
mqtt: { userProperties: [{ name: "a", value: "b" }] },
});
} else {
res.success({
userId: "user1",
});
}
},
allowedEndpoints: ["https://<yourAllowedService>.webpubsub.azure.com"]
});

const app = express();

app.use(handler.getMiddleware());

app.listen(3000, () =>
console.log(`Azure WebPubSub Upstream ready at http://localhost:3000${handler.path}`)
);
```

### Handle the `connect` request for mqtt and reject the connection if auth failed
```js
const express = require("express");

const { WebPubSubEventHandler } = require("@azure/web-pubsub-express");
const handler = new WebPubSubEventHandler("chat", {
handleConnect: (req, res) => {
// auth the connection and reject the connection if auth failed
if (req.context.clientProtocol === "mqtt") { // return mqtt error response when request is of MQTT kind
// get connect request as mqtt request and print it
const mqttRequest = req as MqttConnectRequest;
console.log(mqttRequest);

// auth the connection and return mqtt failure response
res.fail(401, "Not Authorized");

// Or use below method for more fine-grained control over the MQTT return code
// res.failWith({ mqtt: { code: MqttV500ConnectReasonCode.NotAuthorized } });
} else res.success();
},
allowedEndpoints: ["https://<yourAllowedService>.webpubsub.azure.com"]
});

const app = express();

app.use(handler.getMiddleware());

app.listen(3000, () =>
console.log(`Azure WebPubSub Upstream ready at http://localhost:3000${handler.path}`)
);
```

### Handle the `onDisconnected` for mqtt request

```js
const express = require("express");

const { WebPubSubEventHandler } = require("@azure/web-pubsub-express");
const handler = new WebPubSubEventHandler("chat", {
onDisconnected: (disconnectedRequest) => {
if (disconnectedRequest.context.clientProtocol === "mqtt") {
// get disconnect request as mqtt request and print it
const mqttRequest = disconnectedRequest as MqttDisconnectedRequest;
console.log(mqttRequest.mqtt);
// Your onDisconnected logic goes here
} else {
console.log(disconnectedRequest);
// Your onDisconnected logic goes here
}
},
allowedEndpoints: ["https://<yourAllowedService>.webpubsub.azure.com"]
});

const app = express();

app.use(handler.getMiddleware());

app.listen(3000, () =>
console.log(`Azure WebPubSub Upstream ready at http://localhost:3000${handler.path}`)
);
```

### Only allow specified endpoints

```js
Expand Down
146 changes: 145 additions & 1 deletion sdk/web-pubsub/web-pubsub-express/review/web-pubsub-express.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@ export interface ConnectedRequest {
context: ConnectionContext;
}

// @public
export interface ConnectErrorResponse {
code: 400 | 401 | 500;
detail?: string;
}

// @public
export interface ConnectionContext {
clientProtocol: WebPubSubClientProtocol;
connectionId: string;
eventName: string;
hub: string;
mqtt?: MqttConnectionContextProperties;
origin: string;
signature: string;
states: Record<string, any>;
subprotocol?: string;
userId?: string;
Expand Down Expand Up @@ -50,8 +59,9 @@ export interface ConnectResponse {
// @public
export interface ConnectResponseHandler {
fail(code: 400 | 401 | 500, detail?: string): void;
failWith(response: ConnectErrorResponse | MqttConnectErrorResponse): void;
setState(name: string, value: unknown): void;
success(response?: ConnectResponse): void;
success(response?: ConnectResponse | MqttConnectResponse): void;
}

// @public
Expand All @@ -60,6 +70,137 @@ export interface DisconnectedRequest {
reason?: string;
}

// @public
export interface MqttConnectErrorResponse {
mqtt: MqttConnectErrorResponseProperties;
}

// @public
export interface MqttConnectErrorResponseProperties {
code: MqttV311ConnectReturnCode | MqttV500ConnectReasonCode;
reason?: string;
userProperties?: MqttUserProperty[];
}

// @public
export interface MqttConnectionContextProperties {
physicalConnectionId: string;
sessionId?: string;
}

// @public
export interface MqttConnectProperties {
password?: string;
protocolVersion: number;
username?: string;
userProperties?: MqttUserProperty[];
}

// @public
export interface MqttConnectRequest extends ConnectRequest {
mqtt: MqttConnectProperties;
}

// @public
export interface MqttConnectResponse extends ConnectResponse {
mqtt?: MqttConnectResponseProperties;
}

// @public
export interface MqttConnectResponseProperties {
userProperties?: MqttUserProperty[];
}

// @public
export interface MqttDisconnectedProperties {
disconnectPacket: MqttDisconnectPacket;
initiatedByClient: boolean;
}

// @public
export interface MqttDisconnectedRequest extends DisconnectedRequest {
mqtt: MqttDisconnectedProperties;
}

// @public
export interface MqttDisconnectPacket {
code: MqttDisconnectReasonCode;
userProperties?: MqttUserProperty[];
}

// @public
export enum MqttDisconnectReasonCode {
AdministrativeAction = 152,
ConnectionRateExceeded = 159,
DisconnectWithWillMessage = 4,
ImplementationSpecificError = 131,
KeepAliveTimeout = 141,
MalformedPacket = 129,
MaximumConnectTime = 160,
MessageRateTooHigh = 150,
NormalDisconnection = 0,
NotAuthorized = 135,
PacketTooLarge = 149,
PayloadFormatInvalid = 153,
ProtocolError = 130,
QosNotSupported = 155,
QuotaExceeded = 151,
ReceiveMaximumExceeded = 147,
RetainNotSupported = 154,
ServerBusy = 137,
ServerMoved = 157,
ServerShuttingDown = 139,
SessionTakenOver = 142,
SharedSubscriptionsNotSupported = 158,
SubscriptionIdentifiersNotSupported = 161,
TopicAliasInvalid = 148,
TopicFilterInvalid = 143,
TopicNameInvalid = 144,
UnspecifiedError = 128,
UseAnotherServer = 156,
WildcardSubscriptionsNotSupported = 162
}

// @public
export interface MqttUserProperty {
name: string;
value: string;
}

// @public
export enum MqttV311ConnectReturnCode {
BadUsernameOrPassword = 4,
IdentifierRejected = 2,
NotAuthorized = 5,
ServerUnavailable = 3,
UnacceptableProtocolVersion = 1
}

// @public
export enum MqttV500ConnectReasonCode {
BadAuthenticationMethod = 140,
BadUserNameOrPassword = 134,
Banned = 138,
ClientIdentifierNotValid = 133,
ConnectionRateExceeded = 159,
ImplementationSpecificError = 131,
MalformedPacket = 129,
NotAuthorized = 135,
PacketTooLarge = 149,
PayloadFormatInvalid = 153,
ProtocolError = 130,
QosNotSupported = 155,
QuotaExceeded = 151,
RetainNotSupported = 154,
ServerBusy = 137,
ServerMoved = 157,
ServerUnavailable = 136,
TopicNameInvalid = 144,
UnspecifiedError = 128,
UnsupportedProtocolVersion = 132,
UseAnotherServer = 156
}

// @public
export type UserEventRequest = {
context: ConnectionContext;
Expand All @@ -82,6 +223,9 @@ export interface UserEventResponseHandler {
success(data?: string | ArrayBuffer, dataType?: "binary" | "text" | "json"): void;
}

// @public
export type WebPubSubClientProtocol = "default" | "mqtt";

// @public
export class WebPubSubEventHandler {
constructor(hub: string, options?: WebPubSubEventHandlerOptions);
Expand Down
Loading

0 comments on commit 377c46b

Please sign in to comment.