Skip to content

Commit

Permalink
feat: add support for catch-all listeners
Browse files Browse the repository at this point in the history
Inspired from EventEmitter2 [1]

```js
io.on("connect", socket => {

  socket.onAny((event, ...args) => {});

  socket.prependAny((event, ...args) => {});

  socket.offAny(); // remove all listeners

  socket.offAny(listener);

  const listeners = socket.listenersAny();
});
```

Breaking change: the socket.use() method is removed

This method was introduced in [2] for the same feature (having a
catch-all listener), but there were two issues:

- the API is not very user-friendly, since the user has to know the structure of the packet argument
- it uses an ERROR packet, which is reserved for Namespace authentication issues (see [3])

[1]: https://github.com/EventEmitter2/EventEmitter2
[2]: #434
[3]: https://github.com/socketio/socket.io-protocol
  • Loading branch information
darrachequesne committed Oct 25, 2020
1 parent 129c641 commit 5c73733
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 117 deletions.
112 changes: 60 additions & 52 deletions lib/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ export class Socket extends EventEmitter {
> = [];
private flags: BroadcastFlags = {};
private _rooms: Set<Room> = new Set();
private _anyListeners: Array<(...args: any[]) => void>;

/**
* Interface to a `Client` for a given `Namespace`.
Expand Down Expand Up @@ -335,7 +336,13 @@ export class Socket extends EventEmitter {
args.push(this.ack(packet.id));
}

this.dispatch(args);
if (this._anyListeners && this._anyListeners.length) {
const listeners = this._anyListeners.slice();
for (const listener of listeners) {
listener.apply(this, args);
}
}
super.emit.apply(this, args);
}

/**
Expand Down Expand Up @@ -502,86 +509,87 @@ export class Socket extends EventEmitter {
}

/**
* Dispatch incoming event to socket listeners.
* A reference to the request that originated the underlying Engine.IO Socket.
*
* @param {Array} event - event that will get emitted
* @private
* @public
*/
private dispatch(event: Array<string>): void {
debug("dispatching an event %j", event);
this.run(event, err => {
process.nextTick(() => {
if (err) {
return this._error(err.message);
}
super.emit.apply(this, event);
});
});
public get request(): IncomingMessage {
return this.client.request;
}

/**
* Sets up socket middleware.
* A reference to the underlying Client transport connection (Engine.IO Socket object).
*
* @param {Function} fn - middleware function (event, next)
* @return {Socket} self
* @public
*/
public use(
fn: (event: Array<any>, next: (err: Error) => void) => void
): Socket {
this.fns.push(fn);
return this;
public get conn() {
return this.client.conn;
}

/**
* Executes the middleware for an incoming event.
*
* @param {Array} event - event that will get emitted
* @param {Function} fn - last fn call in the middleware
* @private
* @public
*/
private run(event: Array<any>, fn: (err: Error) => void) {
const fns = this.fns.slice(0);
if (!fns.length) return fn(null);

function run(i) {
fns[i](event, function(err) {
// upon error, short-circuit
if (err) return fn(err);

// if no middleware left, summon callback
if (!fns[i + 1]) return fn(null);

// go on to next
run(i + 1);
});
}
public get rooms(): Set<Room> {
return this.adapter.socketRooms(this.id) || new Set();
}

run(0);
/**
* Adds a listener that will be fired when any event is emitted. The event name is passed as the first argument to the
* callback.
*
* @param listener
* @public
*/
public onAny(listener: (...args: any[]) => void): Socket {
this._anyListeners = this._anyListeners || [];
this._anyListeners.push(listener);
return this;
}

/**
* A reference to the request that originated the underlying Engine.IO Socket.
* Adds a listener that will be fired when any event is emitted. The event name is passed as the first argument to the
* callback. The listener is added to the beginning of the listeners array.
*
* @param listener
* @public
*/
public get request(): IncomingMessage {
return this.client.request;
public prependAny(listener: (...args: any[]) => void): Socket {
this._anyListeners = this._anyListeners || [];
this._anyListeners.unshift(listener);
return this;
}

/**
* A reference to the underlying Client transport connection (Engine.IO Socket object).
* Removes the listener that will be fired when any event is emitted.
*
* @param listener
* @public
*/
public get conn() {
return this.client.conn;
public offAny(listener?: (...args: any[]) => void): Socket {
if (!this._anyListeners) {
return this;
}
if (listener) {
const listeners = this._anyListeners;
for (let i = 0; i < listeners.length; i++) {
if (listener === listeners[i]) {
listeners.splice(i, 1);
return this;
}
}
} else {
this._anyListeners = [];
}
return this;
}

/**
* Returns an array of listeners that are listening for any event that is specified. This array can be manipulated,
* e.g. to remove listeners.
*
* @public
*/
public get rooms(): Set<Room> {
return this.adapter.socketRooms(this.id) || new Set();
public listenersAny() {
return this._anyListeners || [];
}
}
137 changes: 72 additions & 65 deletions test/socket.io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1719,6 +1719,78 @@ describe("socket.io", () => {
});
});
});

describe("onAny", () => {
it("should call listener", done => {
const srv = createServer();
const sio = new Server(srv);

srv.listen(() => {
const socket = client(srv, { multiplex: false });

socket.emit("my-event", "123");

sio.on("connection", (socket: Socket) => {
socket.onAny((event, arg1) => {
expect(event).to.be("my-event");
expect(arg1).to.be("123");
done();
});
});
});
});

it("should prepend listener", done => {
const srv = createServer();
const sio = new Server(srv);

srv.listen(() => {
const socket = client(srv, { multiplex: false });

socket.emit("my-event", "123");

sio.on("connection", (socket: Socket) => {
let count = 0;

socket.onAny((event, arg1) => {
expect(count).to.be(2);
done();
});

socket.prependAny(() => {
expect(count++).to.be(1);
});

socket.prependAny(() => {
expect(count++).to.be(0);
});
});
});
});

it("should remove listener", done => {
const srv = createServer();
const sio = new Server(srv);

srv.listen(() => {
const socket = client(srv, { multiplex: false });

socket.emit("my-event", "123");

sio.on("connection", (socket: Socket) => {
const fail = () => done(new Error("fail"));

socket.onAny(fail);
socket.offAny(fail);
expect(socket.listenersAny.length).to.be(0);

socket.onAny(() => {
done();
});
});
});
});
});
});

describe("messaging many", () => {
Expand Down Expand Up @@ -2226,69 +2298,4 @@ describe("socket.io", () => {
});
});
});

describe("socket middleware", () => {
const { Socket } = require("../dist/socket");

it("should call functions", done => {
const srv = createServer();
const sio = new Server(srv);
let run = 0;

srv.listen(() => {
const socket = client(srv, { multiplex: false });

socket.emit("join", "woot");

sio.on("connection", socket => {
socket.use((event, next) => {
expect(event).to.eql(["join", "woot"]);
event.unshift("wrap");
run++;
next();
});
socket.use((event, next) => {
expect(event).to.eql(["wrap", "join", "woot"]);
run++;
next();
});
socket.on("wrap", (data1, data2) => {
expect(data1).to.be("join");
expect(data2).to.be("woot");
expect(run).to.be(2);
done();
});
});
});
});

it("should pass errors", done => {
const srv = createServer();
const sio = new Server(srv);

srv.listen(() => {
const clientSocket = client(srv, { multiplex: false });

clientSocket.emit("join", "woot");

clientSocket.on("error", err => {
expect(err).to.be("Authentication error");
done();
});

sio.on("connection", socket => {
socket.use((event, next) => {
next(new Error("Authentication error"));
});
socket.use((event, next) => {
done(new Error("nope"));
});

socket.on("join", () => {
done(new Error("nope"));
});
});
});
});
});
});

2 comments on commit 5c73733

@p3x-robot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why you removed the socket middleware? now many things are not possible anymore!!!!!!!!!!!!!!

@darrachequesne
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's continue the discussion here: #3678

Please sign in to comment.