Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for MQTT event handler #31072

Merged
merged 24 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/web-pubsub/web-pubsub-express/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features Added

- Add support for MQTT event handling

### Breaking Changes

### Bugs Fixed
Expand Down
165 changes: 165 additions & 0 deletions sdk/web-pubsub/web-pubsub-express/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,171 @@ app.listen(3000, () =>
);
```

### Handle the `connect` request and reject the connection if auth failed
```js
const express = require("express");

const { WebPubSubEventHandler } = require("@azure/web-pubsub-express");
const handler = new WebPubSubEventHandler("chat", {
handleConnect: (req, res) => {
// auth the connection and reject the connection if auth failed
res.fail(401, "Unauthorized");
// the following method is also a valid approach
// res.failWith({ code: 401, detail: "Unauthorized" });
},
allowedEndpoints: ["https://<yourAllowedService>.webpubsub.azure.com"]
});

const app = express();

app.use(handler.getMiddleware());

app.listen(3000, () =>
console.log(`Azure WebPubSub Upstream ready at http://localhost:3000${handler.path}`)
);
```

### Handle the `connected` request

```js
const express = require("express");

const { WebPubSubEventHandler } = require("@azure/web-pubsub-express");
const handler = new WebPubSubEventHandler("chat", {
onConnected: (connectedRequest) => {
// Your onConnected logic goes here
},
allowedEndpoints: ["https://<yourAllowedService>.webpubsub.azure.com"]
});

const app = express();

app.use(handler.getMiddleware());

app.listen(3000, () =>
console.log(`Azure WebPubSub Upstream ready at http://localhost:3000${handler.path}`)
);
```

### Handle the `onDisconnected` request
cqnguy23 marked this conversation as resolved.
Show resolved Hide resolved

```js
const express = require("express");

const { WebPubSubEventHandler } = require("@azure/web-pubsub-express");
const handler = new WebPubSubEventHandler("chat", {
onDisconnected: (disconnectedRequest) => {
// Your onDisconnected logic goes here
},
allowedEndpoints: ["https://<yourAllowedService>.webpubsub.azure.com"]
});

const app = express();

app.use(handler.getMiddleware());

app.listen(3000, () =>
console.log(`Azure WebPubSub Upstream ready at http://localhost:3000${handler.path}`)
);
```

### Handle the `connect` request for mqtt and assign `<userId>` and `<mqtt>` properties
```js
const express = require("express");

const { WebPubSubEventHandler } = require("@azure/web-pubsub-express");
const handler = new WebPubSubEventHandler("chat", {
handleConnect: (req, res) => {
if (req.context.clientProtocol === "mqtt") { // return mqtt response when request is of MQTT kind
cqnguy23 marked this conversation as resolved.
Show resolved Hide resolved
// get connect request as mqtt request and print it
const mqttRequest = req as MqttConnectRequest;
console.log(mqttRequest);

// auth the connection and return mqtt response
res.success({
userId: "user1",
mqtt: { userProperties: [{ name: "a", value: "b" }] },
});
} else {
res.success({
userId: "user1",
});
}
},
allowedEndpoints: ["https://<yourAllowedService>.webpubsub.azure.com"]
});

const app = express();

app.use(handler.getMiddleware());

app.listen(3000, () =>
console.log(`Azure WebPubSub Upstream ready at http://localhost:3000${handler.path}`)
);
```

### Handle the `connect` request for mqtt and reject the connection if auth failed
```js
const express = require("express");

const { WebPubSubEventHandler } = require("@azure/web-pubsub-express");
const handler = new WebPubSubEventHandler("chat", {
handleConnect: (req, res) => {
// auth the connection and reject the connection if auth failed
if (req.context.clientProtocol === "mqtt") { // return mqtt error response when request is of MQTT kind
cqnguy23 marked this conversation as resolved.
Show resolved Hide resolved
// get connect request as mqtt request and print it
const mqttRequest = req as MqttConnectRequest;
console.log(mqttRequest);

// auth the connection and return mqtt failure response
res.fail(401, "Not Authorized");

// Or use below method for more fine-grained control over the MQTT return code
// res.failWith({ mqtt: { code: MqttV500ConnectReasonCode.NotAuthorized } });
} else res.success();
},
allowedEndpoints: ["https://<yourAllowedService>.webpubsub.azure.com"]
});

const app = express();

app.use(handler.getMiddleware());

app.listen(3000, () =>
console.log(`Azure WebPubSub Upstream ready at http://localhost:3000${handler.path}`)
);
```

### Handle the `onDisconnected` for mqtt request

```js
const express = require("express");

const { WebPubSubEventHandler } = require("@azure/web-pubsub-express");
const handler = new WebPubSubEventHandler("chat", {
onDisconnected: (disconnectedRequest) => {
if (disconnectedRequest.context.clientProtocol === "mqtt") {
// get disconnect request as mqtt request and print it
const mqttRequest = disconnectedRequest as MqttDisconnectedRequest;
console.log(mqttRequest.mqtt);
// Your onDisconnected logic goes here
} else {
console.log(disconnectedRequest);
// Your onDisconnected logic goes here
}
cqnguy23 marked this conversation as resolved.
Show resolved Hide resolved
},
allowedEndpoints: ["https://<yourAllowedService>.webpubsub.azure.com"]
});

const app = express();

app.use(handler.getMiddleware());

app.listen(3000, () =>
console.log(`Azure WebPubSub Upstream ready at http://localhost:3000${handler.path}`)
);
```

### Only allow specified endpoints

```js
Expand Down
146 changes: 145 additions & 1 deletion sdk/web-pubsub/web-pubsub-express/review/web-pubsub-express.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@ export interface ConnectedRequest {
context: ConnectionContext;
}

// @public
export interface ConnectErrorResponse {
code: 400 | 401 | 500;
detail?: string;
}

// @public
export interface ConnectionContext {
clientProtocol: WebPubSubClientProtocol;
connectionId: string;
eventName: string;
hub: string;
mqtt?: MqttConnectionContextProperties;
origin: string;
signature: string;
states: Record<string, any>;
subprotocol?: string;
userId?: string;
Expand Down Expand Up @@ -50,8 +59,9 @@ export interface ConnectResponse {
// @public
export interface ConnectResponseHandler {
fail(code: 400 | 401 | 500, detail?: string): void;
jeremymeng marked this conversation as resolved.
Show resolved Hide resolved
failWith(response: ConnectErrorResponse | MqttConnectErrorResponse): void;
setState(name: string, value: unknown): void;
success(response?: ConnectResponse): void;
success(response?: ConnectResponse | MqttConnectResponse): void;
}

// @public
Expand All @@ -60,6 +70,137 @@ export interface DisconnectedRequest {
reason?: string;
}

// @public
export interface MqttConnectErrorResponse {
mqtt: MqttConnectErrorResponseProperties;
}

// @public
export interface MqttConnectErrorResponseProperties {
code: MqttV311ConnectReturnCode | MqttV500ConnectReasonCode;
reason?: string;
userProperties?: MqttUserProperty[];
}

// @public
export interface MqttConnectionContextProperties {
physicalConnectionId: string;
sessionId?: string;
}

// @public
export interface MqttConnectProperties {
cqnguy23 marked this conversation as resolved.
Show resolved Hide resolved
password?: string;
protocolVersion: number;
username?: string;
userProperties?: MqttUserProperty[];
}

// @public
export interface MqttConnectRequest extends ConnectRequest {
mqtt: MqttConnectProperties;
}

// @public
export interface MqttConnectResponse extends ConnectResponse {
mqtt?: MqttConnectResponseProperties;
}

// @public
export interface MqttConnectResponseProperties {
userProperties?: MqttUserProperty[];
}

// @public
export interface MqttDisconnectedProperties {
disconnectPacket: MqttDisconnectPacket;
initiatedByClient: boolean;
}

// @public
export interface MqttDisconnectedRequest extends DisconnectedRequest {
mqtt: MqttDisconnectedProperties;
}

// @public
export interface MqttDisconnectPacket {
code: MqttDisconnectReasonCode;
userProperties?: MqttUserProperty[];
}

// @public
export enum MqttDisconnectReasonCode {
AdministrativeAction = 152,
ConnectionRateExceeded = 159,
DisconnectWithWillMessage = 4,
ImplementationSpecificError = 131,
KeepAliveTimeout = 141,
MalformedPacket = 129,
MaximumConnectTime = 160,
MessageRateTooHigh = 150,
NormalDisconnection = 0,
NotAuthorized = 135,
PacketTooLarge = 149,
PayloadFormatInvalid = 153,
ProtocolError = 130,
QosNotSupported = 155,
QuotaExceeded = 151,
ReceiveMaximumExceeded = 147,
RetainNotSupported = 154,
ServerBusy = 137,
ServerMoved = 157,
ServerShuttingDown = 139,
SessionTakenOver = 142,
SharedSubscriptionsNotSupported = 158,
SubscriptionIdentifiersNotSupported = 161,
TopicAliasInvalid = 148,
TopicFilterInvalid = 143,
TopicNameInvalid = 144,
UnspecifiedError = 128,
UseAnotherServer = 156,
WildcardSubscriptionsNotSupported = 162
}

// @public
export interface MqttUserProperty {
name: string;
value: string;
}

// @public
export enum MqttV311ConnectReturnCode {
BadUsernameOrPassword = 4,
IdentifierRejected = 2,
NotAuthorized = 5,
ServerUnavailable = 3,
UnacceptableProtocolVersion = 1
}

// @public
export enum MqttV500ConnectReasonCode {
BadAuthenticationMethod = 140,
BadUserNameOrPassword = 134,
Banned = 138,
ClientIdentifierNotValid = 133,
ConnectionRateExceeded = 159,
ImplementationSpecificError = 131,
MalformedPacket = 129,
NotAuthorized = 135,
PacketTooLarge = 149,
PayloadFormatInvalid = 153,
ProtocolError = 130,
QosNotSupported = 155,
QuotaExceeded = 151,
RetainNotSupported = 154,
ServerBusy = 137,
ServerMoved = 157,
ServerUnavailable = 136,
TopicNameInvalid = 144,
UnspecifiedError = 128,
UnsupportedProtocolVersion = 132,
UseAnotherServer = 156
}

// @public
export type UserEventRequest = {
context: ConnectionContext;
Expand All @@ -82,6 +223,9 @@ export interface UserEventResponseHandler {
success(data?: string | ArrayBuffer, dataType?: "binary" | "text" | "json"): void;
}

// @public
export type WebPubSubClientProtocol = "default" | "mqtt";

// @public
export class WebPubSubEventHandler {
constructor(hub: string, options?: WebPubSubEventHandlerOptions);
Expand Down
Loading
Loading