diff --git a/lib/broadcast-operator.ts b/lib/broadcast-operator.ts index 4348c34a90..015382d8ab 100644 --- a/lib/broadcast-operator.ts +++ b/lib/broadcast-operator.ts @@ -1,5 +1,5 @@ import type { BroadcastFlags, Room, SocketId } from "socket.io-adapter"; -import { RESERVED_EVENTS } from "./socket"; +import { Handshake, RESERVED_EVENTS, Socket } from "./socket"; import { PacketType } from "socket.io-parser"; import type { Adapter } from "socket.io-adapter"; @@ -160,4 +160,142 @@ export class BroadcastOperator { } return this.adapter.sockets(this.rooms); } + + /** + * Returns the matching socket instances + * + * @public + */ + public fetchSockets(): Promise { + return this.adapter + .fetchSockets({ + rooms: this.rooms, + except: this.exceptRooms, + }) + .then((sockets) => { + return sockets.map((socket) => { + if (socket instanceof Socket) { + // FIXME the TypeScript compiler complains about missing private properties + return (socket as unknown) as RemoteSocket; + } else { + return new RemoteSocket(this.adapter, socket as SocketDetails); + } + }); + }); + } + + /** + * Makes the matching socket instances join the specified rooms + * + * @param room + * @public + */ + public socketsJoin(room: Room | Room[]): void { + this.adapter.addSockets( + { + rooms: this.rooms, + except: this.exceptRooms, + }, + Array.isArray(room) ? room : [room] + ); + } + + /** + * Makes the matching socket instances leave the specified rooms + * + * @param room + * @public + */ + public socketsLeave(room: Room | Room[]): void { + this.adapter.delSockets( + { + rooms: this.rooms, + except: this.exceptRooms, + }, + Array.isArray(room) ? room : [room] + ); + } + + /** + * Makes the matching socket instances disconnect + * + * @param close - whether to close the underlying connection + * @public + */ + public disconnectSockets(close: boolean = false): void { + this.adapter.disconnectSockets( + { + rooms: this.rooms, + except: this.exceptRooms, + }, + close + ); + } +} + +/** + * Format of the data when the Socket instance exists on another Socket.IO server + */ +interface SocketDetails { + id: SocketId; + handshake: Handshake; + rooms: Room[]; + data: any; +} + +/** + * Expose of subset of the attributes and methods of the Socket class + */ +export class RemoteSocket { + public readonly id: SocketId; + public readonly handshake: Handshake; + public readonly rooms: Set; + public readonly data: any; + + private readonly operator: BroadcastOperator; + + constructor(adapter: Adapter, details: SocketDetails) { + this.id = details.id; + this.handshake = details.handshake; + this.rooms = new Set(details.rooms); + this.data = details.data; + this.operator = new BroadcastOperator(adapter, new Set([this.id])); + } + + public emit(ev: string, ...args: any[]): boolean { + return this.operator.emit(ev, ...args); + } + + /** + * Joins a room. + * + * @param {String|Array} room - room or array of rooms + * @public + */ + public join(room: Room | Room[]): void { + return this.operator.socketsJoin(room); + } + + /** + * Leaves a room. + * + * @param {String} room + * @public + */ + public leave(room: Room): void { + return this.operator.socketsLeave(room); + } + + /** + * Disconnects this client. + * + * @param {Boolean} close - if `true`, closes the underlying connection + * @return {Socket} self + * + * @public + */ + public disconnect(close = false): this { + this.operator.disconnectSockets(close); + return this; + } } diff --git a/lib/index.ts b/lib/index.ts index 35fe94d96d..8884cf9ffd 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -16,7 +16,7 @@ import debugModule from "debug"; import { Socket } from "./socket"; import type { CookieSerializeOptions } from "cookie"; import type { CorsOptions } from "cors"; -import type { BroadcastOperator } from "./broadcast-operator"; +import type { BroadcastOperator, RemoteSocket } from "./broadcast-operator"; const debug = debugModule("socket.io:server"); @@ -719,6 +719,45 @@ export class Server extends EventEmitter { public get local(): BroadcastOperator { return this.sockets.local; } + + /** + * Returns the matching socket instances + * + * @public + */ + public fetchSockets(): Promise { + return this.sockets.fetchSockets(); + } + + /** + * Makes the matching socket instances join the specified rooms + * + * @param room + * @public + */ + public socketsJoin(room: Room | Room[]): void { + return this.sockets.socketsJoin(room); + } + + /** + * Makes the matching socket instances leave the specified rooms + * + * @param room + * @public + */ + public socketsLeave(room: Room | Room[]): void { + return this.sockets.socketsLeave(room); + } + + /** + * Makes the matching socket instances disconnect + * + * @param close - whether to close the underlying connection + * @public + */ + public disconnectSockets(close: boolean = false): void { + return this.sockets.disconnectSockets(close); + } } /** @@ -740,4 +779,4 @@ emitterMethods.forEach(function (fn) { module.exports = (srv?, opts?) => new Server(srv, opts); module.exports.Server = Server; -export { Socket, ServerOptions, Namespace }; +export { Socket, ServerOptions, Namespace, BroadcastOperator, RemoteSocket }; diff --git a/lib/namespace.ts b/lib/namespace.ts index 0ead94da6a..65417742a5 100644 --- a/lib/namespace.ts +++ b/lib/namespace.ts @@ -4,7 +4,7 @@ import type { Client } from "./client"; import { EventEmitter } from "events"; import debugModule from "debug"; import type { Adapter, Room, SocketId } from "socket.io-adapter"; -import { BroadcastOperator } from "./broadcast-operator"; +import { BroadcastOperator, RemoteSocket } from "./broadcast-operator"; const debug = debugModule("socket.io:namespace"); @@ -257,4 +257,43 @@ export class Namespace extends EventEmitter { public get local(): BroadcastOperator { return new BroadcastOperator(this.adapter).local; } + + /** + * Returns the matching socket instances + * + * @public + */ + public fetchSockets(): Promise { + return new BroadcastOperator(this.adapter).fetchSockets(); + } + + /** + * Makes the matching socket instances join the specified rooms + * + * @param room + * @public + */ + public socketsJoin(room: Room | Room[]): void { + return new BroadcastOperator(this.adapter).socketsJoin(room); + } + + /** + * Makes the matching socket instances leave the specified rooms + * + * @param room + * @public + */ + public socketsLeave(room: Room | Room[]): void { + return new BroadcastOperator(this.adapter).socketsLeave(room); + } + + /** + * Makes the matching socket instances disconnect + * + * @param close - whether to close the underlying connection + * @public + */ + public disconnectSockets(close: boolean = false): void { + return new BroadcastOperator(this.adapter).disconnectSockets(close); + } } diff --git a/lib/socket.ts b/lib/socket.ts index 87273a9fa1..4bae605e47 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -81,6 +81,10 @@ export interface Handshake { export class Socket extends EventEmitter { public readonly id: SocketId; public readonly handshake: Handshake; + /** + * Additional information that can be attached to the Socket instance and which will be used in the fetchSockets method + */ + public data: any = {}; public connected: boolean; public disconnected: boolean; diff --git a/test/socket.io.ts b/test/socket.io.ts index 39f6ad03b3..cf8b647874 100644 --- a/test/socket.io.ts +++ b/test/socket.io.ts @@ -13,6 +13,7 @@ import * as io_v2 from "socket.io-client-v2"; const ioc = require("socket.io-client"); import "./support/util"; +import "./utility-methods"; // Creates a socket.io client for the given server function client(srv, nsp?: string | object, opts?: object) { diff --git a/test/utility-methods.ts b/test/utility-methods.ts new file mode 100644 index 0000000000..05b409f03f --- /dev/null +++ b/test/utility-methods.ts @@ -0,0 +1,176 @@ +import { createServer } from "http"; +import { Server, Socket } from ".."; +import { io as ioc, Socket as ClientSocket } from "socket.io-client"; +import { Adapter, BroadcastOptions } from "socket.io-adapter"; +import expect from "expect.js"; +import type { AddressInfo } from "net"; + +import "./support/util"; + +const SOCKETS_COUNT = 3; + +const createPartialDone = ( + count: number, + done: () => void, + callback?: () => void +) => { + let i = 0; + return () => { + i++; + if (i === count) { + done(); + if (callback) { + callback(); + } + } + }; +}; + +class DummyAdapter extends Adapter { + fetchSockets(opts: BroadcastOptions): Promise { + return Promise.resolve([ + { + id: "42", + handshake: { + headers: { + accept: "*/*", + }, + query: { + transport: "polling", + EIO: "4", + }, + }, + rooms: ["42", "room1"], + data: { + username: "john", + }, + }, + ]); + } +} + +describe("socket.io", () => { + let io: Server, clientSockets: ClientSocket[], serverSockets: Socket[]; + beforeEach((done) => { + const srv = createServer(); + io = new Server(srv); + srv.listen(() => { + const port = (srv.address() as AddressInfo).port; + + clientSockets = []; + for (let i = 0; i < SOCKETS_COUNT; i++) { + clientSockets.push(ioc(`http://localhost:${port}`)); + } + + serverSockets = []; + io.on("connection", (socket: Socket) => { + serverSockets.push(socket); + if (serverSockets.length === SOCKETS_COUNT) { + done(); + } + }); + }); + }); + + afterEach(() => { + io.close(); + clientSockets.forEach((socket) => socket.disconnect()); + }); + + describe("utility methods", () => { + describe("fetchSockets", () => { + it("returns all socket instances", async () => { + const sockets = await io.fetchSockets(); + expect(sockets.length).to.eql(3); + }); + + it("returns all socket instances in the given room", async () => { + serverSockets[0].join(["room1", "room2"]); + serverSockets[1].join("room1"); + serverSockets[2].join("room2"); + const sockets = await io.in("room1").fetchSockets(); + expect(sockets.length).to.eql(2); + }); + + it("works with a custom adapter", async () => { + io.adapter(DummyAdapter); + const sockets = await io.fetchSockets(); + expect(sockets.length).to.eql(1); + const remoteSocket = sockets[0]; + expect(remoteSocket.id).to.eql("42"); + expect(remoteSocket.rooms).to.contain("42", "room1"); + expect(remoteSocket.data).to.eql({ username: "john" }); + }); + }); + + describe("socketsJoin", () => { + it("makes all socket instances join the given room", () => { + io.socketsJoin("room1"); + serverSockets.forEach((socket) => { + expect(socket.rooms).to.contain("room1"); + }); + }); + + it("makes all socket instances in a room join the given room", () => { + serverSockets[0].join(["room1", "room2"]); + serverSockets[1].join("room1"); + serverSockets[2].join("room2"); + io.in("room1").socketsJoin("room3"); + expect(serverSockets[0].rooms).to.contain("room3"); + expect(serverSockets[1].rooms).to.contain("room3"); + expect(serverSockets[2].rooms).to.not.contain("room3"); + }); + }); + + describe("socketsLeave", () => { + it("makes all socket instances leave the given room", () => { + serverSockets[0].join(["room1", "room2"]); + serverSockets[1].join("room1"); + serverSockets[2].join("room2"); + io.socketsLeave("room1"); + expect(serverSockets[0].rooms).to.contain("room2"); + expect(serverSockets[0].rooms).to.not.contain("room1"); + expect(serverSockets[1].rooms).to.not.contain("room1"); + }); + + it("makes all socket instances in a room leave the given room", () => { + serverSockets[0].join(["room1", "room2"]); + serverSockets[1].join("room1"); + serverSockets[2].join("room2"); + io.in("room2").socketsLeave("room1"); + expect(serverSockets[0].rooms).to.contain("room2"); + expect(serverSockets[0].rooms).to.not.contain("room1"); + expect(serverSockets[1].rooms).to.contain("room1"); + }); + }); + + describe("disconnectSockets", () => { + it("makes all socket instances disconnect", (done) => { + io.disconnectSockets(true); + + const partialDone = createPartialDone(3, done); + + clientSockets[0].on("disconnect", partialDone); + clientSockets[1].on("disconnect", partialDone); + clientSockets[2].on("disconnect", partialDone); + }); + + it("makes all socket instances in a room disconnect", (done) => { + serverSockets[0].join(["room1", "room2"]); + serverSockets[1].join("room1"); + serverSockets[2].join("room2"); + io.in("room2").disconnectSockets(true); + + const partialDone = createPartialDone(2, done, () => { + clientSockets[1].off("disconnect"); + }); + + clientSockets[0].on("disconnect", partialDone); + clientSockets[1].on("disconnect", () => { + done(new Error("should not happen")); + }); + clientSockets[2].on("disconnect", partialDone); + }); + }); + }); +});