From f0fe8b9396cad6709561532b3d5e65916bb770f1 Mon Sep 17 00:00:00 2001 From: Sebastijan K <58827427+sebastijankuzner@users.noreply.github.com> Date: Tue, 7 Jul 2020 09:59:49 +0200 Subject: [PATCH] refactor(core-p2p): remove excess code from hapi-nes (#3878) --- .../src/contracts/p2p/nes-client.ts | 6 - packages/core-p2p/package.json | 2 - packages/core-p2p/src/hapi-nes/client.ts | 154 ------- packages/core-p2p/src/hapi-nes/index.ts | 123 ----- packages/core-p2p/src/hapi-nes/listener.ts | 432 ------------------ packages/core-p2p/src/hapi-nes/socket.ts | 244 +--------- yarn.lock | 2 +- 7 files changed, 2 insertions(+), 961 deletions(-) diff --git a/packages/core-kernel/src/contracts/p2p/nes-client.ts b/packages/core-kernel/src/contracts/p2p/nes-client.ts index 05271d8fc9..c033ec5e82 100644 --- a/packages/core-kernel/src/contracts/p2p/nes-client.ts +++ b/packages/core-kernel/src/contracts/p2p/nes-client.ts @@ -1,11 +1,5 @@ export interface Client { connect(options: any): Promise; - overrideReconnectionAuth(auth: any): boolean; - reauthenticate(auth: any): Promise; disconnect(): Promise; request(options: any): Promise; - message(message: any): Promise; - subscriptions(): string[]; - subscribe(path: any, handler: any): Promise; - unsubscribe(path: any, handler: any): Promise; } diff --git a/packages/core-p2p/package.json b/packages/core-p2p/package.json index 13a3d2479d..64b87feb4d 100644 --- a/packages/core-p2p/package.json +++ b/packages/core-p2p/package.json @@ -33,10 +33,8 @@ "@hapi/joi": "^17.1.1", "@hapi/boom": "^9.0.0", "@hapi/bounce": "2.x.x", - "@hapi/call": "8.x", "@hapi/cryptiles": "5.x.x", "@hapi/hoek": "9.x.x", - "@hapi/iron": "6.x.x", "@hapi/teamwork": "5.x.x", "ws": "7.x", "ajv": "^6.10.2", diff --git a/packages/core-p2p/src/hapi-nes/client.ts b/packages/core-p2p/src/hapi-nes/client.ts index 09d5da22b5..2811adb652 100755 --- a/packages/core-p2p/src/hapi-nes/client.ts +++ b/packages/core-p2p/src/hapi-nes/client.ts @@ -90,7 +90,6 @@ export class Client { private _reconnectionTimer; private _ids; private _requests; - private _subscriptions; private _heartbeat; private _packets; private _disconnectListeners; @@ -122,7 +121,6 @@ export class Client { this._reconnectionTimer = null; this._ids = 0; // Id counter this._requests = {}; // id -> { resolve, reject, timeout } - this._subscriptions = {}; // path -> [callbacks] this._heartbeat = null; this._packets = []; this._disconnectListeners = null; @@ -180,26 +178,6 @@ export class Client { }); } - public overrideReconnectionAuth(auth) { - if (!this._reconnection) { - return false; - } - - this._reconnection.settings.auth = auth; - return true; - } - - public reauthenticate(auth) { - this.overrideReconnectionAuth(auth); - - const request = { - type: "reauth", - auth, - }; - - return this._send(request, true); - } - public disconnect() { return new Promise((resolve) => this._disconnect(resolve, false)); } @@ -223,101 +201,10 @@ export class Client { return this._send(request, true); } - public message(message) { - const request = { - type: "message", - message, - }; - - return this._send(request, true); - } - public _isReady() { return this._ws && this._ws.readyState === WebSocket.OPEN; } - public subscriptions() { - return Object.keys(this._subscriptions); - } - - public subscribe(path, handler) { - if (!path || path[0] !== "/") { - return Promise.reject(NesError("Invalid path", errorTypes.USER)); - } - - const subs = this._subscriptions[path]; - if (subs) { - // Already subscribed - - if (subs.indexOf(handler) === -1) { - subs.push(handler); - } - - return Promise.resolve(); - } - - this._subscriptions[path] = [handler]; - - if (!this._isReady()) { - // Queued subscription - - return Promise.resolve(); - } - - const request = { - type: "sub", - path, - }; - - const promise = this._send(request, true); - promise.catch((ignoreErr) => { - delete this._subscriptions[path]; - }); - - return promise; - } - - public unsubscribe(path, handler) { - if (!path || path[0] !== "/") { - return Promise.reject(NesError("Invalid path", errorTypes.USER)); - } - - const subs = this._subscriptions[path]; - if (!subs) { - return Promise.resolve(); - } - - let sync = false; - if (!handler) { - delete this._subscriptions[path]; - sync = true; - } else { - const pos = subs.indexOf(handler); - if (pos === -1) { - return Promise.resolve(); - } - - subs.splice(pos, 1); - if (!subs.length) { - delete this._subscriptions[path]; - sync = true; - } - } - - if (!sync || !this._isReady()) { - return Promise.resolve(); - } - - const request = { - type: "unsub", - path, - }; - - const promise = this._send(request, true); - promise.catch(ignore); // Ignoring errors as the subscription handlers are already removed - return promise; - } - private _connect(options, initial, next) { const ws = new WebSocket(this._url, this._settings.ws); this._ws = ws; @@ -382,10 +269,6 @@ export class Client { finalize(undefined); }) .catch((err) => { - if (err.path) { - delete this._subscriptions[err.path]; - } - this._disconnect(() => nextTick(finalize)(err), true); // Stop reconnection when the hello message returns error }); }; @@ -570,11 +453,6 @@ export class Client { request.auth = auth; } - const subs = this.subscriptions(); - if (subs.length) { - request.subs = subs; - } - return this._send(request, true); } @@ -628,28 +506,6 @@ export class Client { return this.onUpdate(update.message); } - // Publish or Revoke - - if (update.type === "pub" || update.type === "revoke") { - const handlers = this._subscriptions[update.path]; - if (update.type === "revoke") { - delete this._subscriptions[update.path]; - } - - if (handlers && update.message !== undefined) { - const flags: any = {}; - if (update.type === "revoke") { - flags.revoked = true; - } - - for (let i = 0; i < handlers.length; ++i) { - handlers[i](update.message, flags); - } - } - - return; - } - // Lookup request (message must include an id from this point) const request = this._requests[update.id]; @@ -692,16 +548,6 @@ export class Client { return next(error); } - if (update.type === "reauth") { - return next(error, true); - } - - // Subscriptions - - if (update.type === "sub" || update.type === "unsub") { - return next(error); - } - next(NesError("Received invalid response", errorTypes.PROTOCOL)); return this.onError(NesError("Received unknown response type: " + update.type, errorTypes.PROTOCOL)); } diff --git a/packages/core-p2p/src/hapi-nes/index.ts b/packages/core-p2p/src/hapi-nes/index.ts index a558783b39..f28bf1bd37 100755 --- a/packages/core-p2p/src/hapi-nes/index.ts +++ b/packages/core-p2p/src/hapi-nes/index.ts @@ -1,9 +1,7 @@ /* tslint:disable */ "use strict"; -import Cryptiles from "@hapi/cryptiles"; import Hoek from "@hapi/hoek"; -import Iron from "@hapi/iron"; import Joi from "@hapi/joi"; import { Client } from "./client"; @@ -11,19 +9,6 @@ import { Listener } from "./listener"; const internals: any = { defaults: { - auth: { - endpoint: "/nes/auth", - id: "nes.auth", - type: "direct", - cookie: "nes", - isSecure: true, - isHttpOnly: true, - isSameSite: "Strict", - path: "/", - index: false, - timeout: 5000, // 5 seconds - maxConnectionsPerUser: false, - }, headers: null, payload: { maxChunkChars: false, @@ -40,38 +25,6 @@ internals.schema = Joi.object({ onConnection: Joi.function(), // async function (socket) {} onDisconnection: Joi.function(), // function (socket) {} onMessage: Joi.function(), // async function (socket, message) { return data; } // Or throw errors - auth: Joi.object({ - endpoint: Joi.string().required(), - id: Joi.string(), - type: Joi.valid("cookie", "token", "direct").required(), - route: [Joi.object(), Joi.string()], - cookie: Joi.string().required(), - isSecure: Joi.boolean(), - isHttpOnly: Joi.boolean(), - isSameSite: Joi.valid("Strict", "Lax").allow(false), - path: Joi.string().allow(null), - domain: Joi.string().allow(null), - ttl: Joi.number().allow(null), - iron: Joi.object(), - password: Joi.alternatives([Joi.string(), Joi.binary(), Joi.object()]), - index: Joi.boolean(), - timeout: Joi.number().integer().min(1).allow(false), - maxConnectionsPerUser: Joi.number() - .integer() - .min(1) - .allow(false) - .when("index", { is: true, otherwise: Joi.valid(false) }), - minAuthVerifyInterval: Joi.number() - .integer() - .allow(false) - .when("...heartbeat", { - is: false, - then: Joi.number().min(1), - otherwise: Joi.number().min(Joi.ref("...heartbeat.interval")), - }), - }) - .allow(false) - .required(), headers: Joi.array().items(Joi.string().lowercase()).min(1).allow("*", null), payload: { maxChunkChars: Joi.number().integer().min(1).allow(false), @@ -96,18 +49,8 @@ const plugin = { settings.headers = settings.headers.map((field) => field.toLowerCase()); } - if (settings.auth && settings.auth.minAuthVerifyInterval === undefined) { - settings.auth.minAuthVerifyInterval = settings.heartbeat - ? settings.heartbeat.interval - : internals.defaults.heartbeat.interval; - } - Joi.assert(settings, internals.schema, "Invalid nes configuration"); - // Authentication endpoint - - internals.auth(server, settings); - // Create a listener per connection const listener = new Listener(server, settings); @@ -128,78 +71,12 @@ const plugin = { // Decorate server and request - server.decorate("server", "broadcast", listener.broadcast); - server.decorate("server", "subscription", listener.subscription); - server.decorate("server", "publish", listener.publish); - server.decorate("server", "eachSocket", listener.eachSocket); server.decorate("request", "socket", internals.socket, { apply: true }); }, }; export { plugin, Client }; -internals.auth = function (server, settings) { - const config = settings.auth; - if (!config) { - return; - } - - if (config.type !== "direct" && !config.password) { - config.password = Cryptiles.randomString(32); - } - - if (config.type === "cookie") { - const cookieOptions = { - isSecure: config.isSecure, - isHttpOnly: config.isHttpOnly, - isSameSite: config.isSameSite, - path: config.path, - domain: config.domain, - ttl: config.ttl, - encoding: "iron", - password: config.password, - iron: config.iron, - }; - - server.state(config.cookie, cookieOptions); - } - - server.route({ - method: config.type === "direct" ? "auth" : "GET", - path: config.endpoint, - config: { - id: config.id, - isInternal: config.type === "direct", - auth: config.route, - handler: async (request, h) => { - if (!request.auth.isAuthenticated) { - return { status: "unauthenticated" }; - } - - const credentials = { - credentials: request.auth.credentials, - artifacts: request.auth.artifacts, - strategy: request.auth.strategy, - }; - - if (config.type === "direct") { - return credentials; - } - - const result: any = { status: "authenticated" }; - - if (config.type === "cookie") { - return h.response(result).state(config.cookie, credentials); - } - - const sealed = await Iron.seal(credentials, config.password, config.iron || Iron.defaults); - result.token = sealed; - return result; - }, - }, - }); -}; - internals.socket = function (request) { return request.plugins.nes ? request.plugins.nes.socket : null; }; diff --git a/packages/core-p2p/src/hapi-nes/listener.ts b/packages/core-p2p/src/hapi-nes/listener.ts index d76e5891e4..43eacd614d 100755 --- a/packages/core-p2p/src/hapi-nes/listener.ts +++ b/packages/core-p2p/src/hapi-nes/listener.ts @@ -1,10 +1,6 @@ "use strict"; -import Boom from "@hapi/boom"; -import Bounce from "@hapi/bounce"; -import Call from "@hapi/call"; import Hoek from "@hapi/hoek"; -import Joi from "@hapi/joi"; import Ws from "ws"; import { Socket } from "./socket"; @@ -16,26 +12,12 @@ const internals = { }, }; -const subSchema = Joi.object({ - filter: Joi.func(), // async function (path, update, options), where options: { credentials, params }, returns true, false, { override }, or throws an error - onSubscribe: Joi.func(), // async function (socket, path, params) - onUnsubscribe: Joi.func(), // async function (socket, path, params) - auth: Joi.object({ - mode: Joi.string().valid("required", "optional"), - scope: Joi.array().items(Joi.string()).single().min(1), - entity: Joi.valid("user", "app", "any"), - index: Joi.boolean(), - }).allow(false), -}); - export class Listener { public _stopped; private _server; private _settings; private _sockets; - private _router; - private _authRoute; private _socketCounter; private _heartbeat; private _beatTimeout; @@ -45,8 +27,6 @@ export class Listener { this._server = server; this._settings = settings; this._sockets = new Sockets(this); - this._router = new Call.Router(); - this._authRoute = this._settings.auth && this._server.lookup(this._settings.auth.id); this._socketCounter = internals.counter.min; this._heartbeat = null; this._beatTimeout = null; @@ -91,19 +71,6 @@ export class Listener { this._wss.close(); } - public _authRequired() { - if (!this._authRoute) { - return false; - } - - const auth = this._server.auth.lookup(this._authRoute); - if (!auth) { - return false; - } - - return auth.mode === "required"; - } - public _beat() { if (!this._settings.heartbeat) { return; @@ -142,17 +109,6 @@ export class Listener { }, this._settings.heartbeat.interval); } - public broadcast(message, options) { - options = options || {}; - - const update = { - type: "update", - message, - }; - - return this._broadcast(update, options); - } - public _generateId() { const id = Date.now() + ":" + this._server.info.id + ":" + this._socketCounter++; if (this._socketCounter > internals.counter.max) { @@ -162,219 +118,15 @@ export class Listener { return id; } - public subscription(path, options) { - Hoek.assert(path, "Subscription missing path"); - Joi.assert(options, subSchema, "Invalid subscription options: " + path); - - const settings = Hoek.clone(options || {}); - - // Auth configuration - - const auth = settings.auth; - if (auth) { - if (auth.scope) { - if (typeof auth.scope === "string") { - auth.scope = [auth.scope]; - } - - for (let i = 0; i < auth.scope.length; ++i) { - if (/{([^}]+)}/.test(auth.scope[i])) { - auth.hasScopeParameters = true; - break; - } - } - } - - auth.mode = auth.mode || "required"; - } - - // Path configuration - - const route = { - method: "sub", - path, - }; - - const config = { - subscribers: new Subscribers(this._server, settings), - filter: settings.filter, - auth, - }; - - this._router.add(route, config); - } - - public publish(path, update, options) { - Hoek.assert(path && path[0] === "/", "Missing or invalid subscription path:", path || "empty"); - - options = options || {}; - - const message = { - type: "pub", - path, - message: update, - }; - - return this._publish(path, message, options); - } - - public eachSocket(each, options) { - options = options || {}; - - return this._eachSocket(each, options); - } - - public async _subscribe(path, socket: Socket) { - // Errors include subscription context in messages in case returned as connection errors - - if (path.indexOf("?") !== -1) { - throw Boom.badRequest("Subscription path cannot contain query"); - } - - if (socket._subscriptions[path]) { - return; - } - - const match = this._router.route("sub", path); - if (match.isBoom) { - throw Boom.notFound("Subscription not found"); - } - - const auth = this._server.auth.lookup({ settings: { auth: match.route.auth } }); // Create a synthetic route - if (auth) { - const credentials = socket.auth.credentials; - if (credentials) { - // Check scope - - if (auth.scope) { - let scopes = auth.scope; - if (auth.hasScopeParameters) { - scopes = []; - const context = { params: match.params }; - for (let i = 0; i < auth.scope.length; ++i) { - scopes[i] = Hoek.reachTemplate(context, auth.scope[i]); - } - } - - if ( - !credentials.scope || - (typeof credentials.scope === "string" - ? !scopes.includes(credentials.scope) - : !Hoek.intersect(scopes, credentials.scope).length) - ) { - throw Boom.forbidden("Insufficient scope to subscribe, expected any of: " + scopes); - } - } - - // Check entity - - const entity = auth.entity || "any"; - if (entity === "user" && !credentials.user) { - throw Boom.forbidden("Application credentials cannot be used on a user subscription"); - } - - if (entity === "app" && credentials.user) { - throw Boom.forbidden("User credentials cannot be used on an application subscription"); - } - } else if (auth.mode === "required") { - throw Boom.unauthorized("Authentication required to subscribe"); - } - } - - await match.route.subscribers.add(socket, path, match); - socket._subscriptions[path] = match.route.subscribers; - } - - private _publish(path, _update, options) { - if (this._stopped) { - return; - } - - const match = this._router.route("sub", path); - if (match.isBoom) { - return; - } - - const each = async (socket: Socket) => { - // Filter on path if has parameters - - let update = _update; - - if (route.filter) { - let isMatch; - try { - isMatch = await route.filter(path, update.message, { - socket, - credentials: socket.auth.credentials, - params: match.params, - internal: options.internal, - }); - } catch (err) { - Bounce.rethrow(err, "system"); - } - - if (!isMatch) { - return; - } - - if (isMatch.override) { - update = Object.assign({}, update); // Shallow cloned - update.message = isMatch.override; - } - } - - return socket._send(update).catch(Hoek.ignore); // Ignore errors - }; - - const route = match.route; - return route.subscribers._forEachSubscriber(match.paramsArray.length ? path : null, options, each); - } - - private _eachSocket(each, options) { - if (this._stopped) { - return; - } - - if (!options.subscription) { - Hoek.assert(!options.user, "Cannot specify user filter without a subscription path"); - return this._sockets._forEach(each); - } - - const sub = this._router.route("sub", options.subscription); - if (sub.isBoom) { - return; - } - - const route = sub.route; - return route.subscribers._forEachSubscriber( - sub.paramsArray.length ? options.subscription : null, - options, - each, - ); // Filter on path if has parameters - } - private _add(ws, req) { // Socket object const socket = new Socket(ws, req, this); - // Subscriptions - this._sockets.add(socket); ws.once("close", async (code, message) => { this._sockets.remove(socket); - clearTimeout(socket.auth._initialAuthTimeout); - socket.auth._initialAuthTimeout = null; - - const subs = Object.keys(socket._subscriptions); - for (let i = 0; i < subs.length; ++i) { - const sub = subs[i]; - const subscribers = socket._subscriptions[sub]; - await subscribers.remove(socket); - } - - socket._subscriptions = {}; if (this._settings.onDisconnection) { this._settings.onDisconnection(socket); @@ -383,83 +135,23 @@ export class Listener { socket._removed.attend(); }); } - - private _broadcast(update, options) { - Hoek.assert( - !options.user || (this._settings.auth && this._settings.auth.index), - "Socket auth indexing is disabled", - ); - - if (this._stopped) { - return; - } - - const each = (socket: Socket) => socket._send(update).catch(Hoek.ignore); // Ignore errors - - if (options.user) { - const sockets = this._sockets._byUser[options.user]; - if (!sockets) { - return; - } - - return sockets.forEach(each); - } - - return this._sockets._forEach(each); - } } // Sockets manager class Sockets { - private _listener; private _items; - private _byUser; public constructor(listener) { - this._listener = listener; this._items = {}; - this._byUser = {}; // user -> [sockets] } public add(socket) { this._items[socket.id] = socket; } - public auth(socket) { - if (!this._listener._settings.auth.index) { - return; - } - - if (!socket.auth.credentials.user) { - return; - } - - const user = socket.auth.credentials.user; - if ( - this._listener._settings.auth.maxConnectionsPerUser && - this._byUser[user] && - this._byUser[user].length >= this._listener._settings.auth.maxConnectionsPerUser - ) { - throw Boom.serverUnavailable("Too many connections for the authenticated user"); - } - - this._byUser[user] = this._byUser[user] || []; - this._byUser[user].push(socket); - } - public remove(socket) { delete this._items[socket.id]; - - if (socket.auth.credentials && socket.auth.credentials.user) { - const user = socket.auth.credentials.user; - if (this._byUser[user]) { - this._byUser[user] = this._byUser[user].filter((item) => item !== socket); - if (!this._byUser[user].length) { - delete this._byUser[user]; - } - } - } } public length() { @@ -472,127 +164,3 @@ class Sockets { } } } - -// Subscribers manager - -class Subscribers { - private _server; - private _settings; - private _items; - private _byUser; - - public constructor(server, options) { - this._server = server; - this._settings = options; - this._items = {}; - this._byUser = {}; // user -> [item] - } - - public async add(socket, path, match) { - if (this._settings.onSubscribe) { - await this._settings.onSubscribe(socket, path, match.params); - } - - const item = this._items[socket.id]; - if (item) { - item.paths.push(path); - item.params.push(match.params); - } else { - this._items[socket.id] = { socket, paths: [path], params: [match.params] }; - - if ( - this._settings.auth && - this._settings.auth.index && - socket.auth.credentials && - socket.auth.credentials.user - ) { - const user = socket.auth.credentials.user; - this._byUser[user] = this._byUser[user] || []; - this._byUser[user].push(this._items[socket.id]); - } - } - } - - public async remove(socket, path) { - const item = this._items[socket.id]; - if (!item) { - return; - } - - if (!path) { - this._cleanup(socket, item); - - if (this._settings.onUnsubscribe) { - for (let i = 0; i < item.paths.length; ++i) { - const itemPath = item.paths[i]; - await this._remove(socket, itemPath, item.params[i]); - } - } - - return; - } - - const pos = item.paths.indexOf(path); - const params = item.params[pos]; - - if (item.paths.length === 1) { - this._cleanup(socket, item); - } else { - item.paths.splice(pos, 1); - item.params.splice(pos, 1); - } - - if (this._settings.onUnsubscribe) { - return this._remove(socket, path, params); - } - } - - public async _forEachSubscriber(path, options, each) { - const itemize = async (item) => { - if ( - item && // check item not removed - (!path || item.paths.indexOf(path) !== -1) - ) { - await each(item.socket); - } - }; - - if (options.user) { - Hoek.assert(this._settings.auth && this._settings.auth.index, "Subscription auth indexing is disabled"); - - const items = this._byUser[options.user]; - if (items) { - for (let i = 0; i < items.length; ++i) { - const item = items[i]; - await itemize(item); - } - } - } else { - const items = Object.keys(this._items); - for (let i = 0; i < items.length; ++i) { - const item = this._items[items[i]]; - await itemize(item); - } - } - } - - private async _remove(socket, path, params) { - try { - await this._settings.onUnsubscribe(socket, path, params); - } catch (err) { - this._server.log(["nes", "onUnsubscribe", "error"], err); - } - } - - private _cleanup(socket, item) { - delete this._items[socket.id]; - - if (socket.auth.credentials && socket.auth.credentials.user && this._byUser[socket.auth.credentials.user]) { - const user = socket.auth.credentials.user; - this._byUser[user] = this._byUser[user].filter((record) => record !== item); - if (!this._byUser[user].length) { - delete this._byUser[user]; - } - } - } -} diff --git a/packages/core-p2p/src/hapi-nes/socket.ts b/packages/core-p2p/src/hapi-nes/socket.ts index c016952a9d..f478ad0655 100755 --- a/packages/core-p2p/src/hapi-nes/socket.ts +++ b/packages/core-p2p/src/hapi-nes/socket.ts @@ -4,7 +4,6 @@ import Boom from "@hapi/boom"; import Bounce from "@hapi/bounce"; import Cryptiles from "@hapi/cryptiles"; import Hoek from "@hapi/hoek"; -import Iron from "@hapi/iron"; import Teamwork from "@hapi/teamwork"; const internals = { @@ -15,15 +14,12 @@ export class Socket { public server; public id; public app; - public auth; public info; - public _subscriptions; public _removed; public _pinged; private _ws; - private _cookies; private _listener; private _helloed; private _processingCount; @@ -32,12 +28,10 @@ export class Socket { public constructor(ws, req, listener) { this._ws = ws; - this._cookies = req.headers.cookie; this._listener = listener; this._helloed = false; this._pinged = true; this._processingCount = 0; - this._subscriptions = {}; this._packets = []; this._sending = false; this._removed = new Teamwork.Team(); @@ -45,14 +39,6 @@ export class Socket { this.server = this._listener._server; this.id = this._listener._generateId(); this.app = {}; - this.auth = { - isAuthenticated: false, - credentials: null, - artifacts: null, - _error: null, - _initialAuthTimeout: null, - _verified: 0, - }; this.info = { remoteAddress: req.socket.remoteAddress, @@ -60,17 +46,10 @@ export class Socket { "x-forwarded-for": req.headers["x-forwarded-for"], }; - if (this._listener._settings.auth && this._listener._settings.auth.timeout) { - this.auth._initialAuthTimeout = setTimeout(() => this.disconnect(), this._listener._settings.auth.timeout); - } - this._ws.on("message", (message) => this._onMessage(message)); } public disconnect() { - clearTimeout(this.auth._initialAuthTimeout); - this.auth._initialAuthTimeout = null; - this._ws.close(); return this._removed; } @@ -84,19 +63,7 @@ export class Socket { return this._send(response); } - public publish(path, update) { - const message = { - type: "pub", - path, - message: update, - }; - - return this._send(message); - } - public async revoke(path, update, options: any = {}) { - await this._unsubscribe(path); - const message: any = { type: "revoke", path, @@ -279,8 +246,6 @@ export class Socket { throw Boom.badRequest("Message missing id"); } - await this._verifyAuth(); - // Initialization and Authentication if (request.type === "ping") { @@ -295,32 +260,12 @@ export class Socket { throw Boom.badRequest("Connection is not initialized"); } - if (request.type === "reauth") { - return this._processReauth(request); - } - // Endpoint request if (request.type === "request") { return this._processRequest(request); } - // Custom message request - - if (request.type === "message") { - return this._processMessage(request); - } - - // Subscriptions - - if (request.type === "sub") { - return this._processSubscription(request); - } - - if (request.type === "unsub") { - return this._processUnsub(request); - } - // Unknown throw Boom.badRequest("Unknown message type"); @@ -342,24 +287,11 @@ export class Socket { } this._helloed = true; // Prevents the client from reusing the socket if erred (leaves socket open to ensure client gets the error response) - await this._authenticate(request); if (this._listener._settings.onConnection) { await this._listener._settings.onConnection(this); } - if (request.subs) { - for (let i = 0; i < request.subs.length; ++i) { - const path = request.subs[i]; - try { - await this._listener._subscribe(path, this); - } catch (err) { - err.path = path; - throw err; - } - } - } - const response = { type: "hello", id: request.id, @@ -370,23 +302,6 @@ export class Socket { return { response }; } - private async _processReauth(request) { - try { - await this._authenticate(request); - } catch (err) { - Bounce.rethrow(err, "system"); - setTimeout(() => this.disconnect(), 0); // disconnect after sending the response to the client - throw err; - } - - const response = { - type: "reauth", - id: request.id, - }; - - return { response }; - } - private async _processRequest(request) { let method = request.method; if (!method) { @@ -417,16 +332,12 @@ export class Socket { } } - if (this._listener._settings.auth && path === this._listener._settings.auth.endpoint) { - throw Boom.notFound(); - } - const shot = { method, url: path, payload: request.payload, headers: request.headers, - auth: this.auth.isAuthenticated ? this.auth : null, + auth: null, validate: false, plugins: { nes: { @@ -460,159 +371,6 @@ export class Socket { return { response, options }; } - private async _processMessage(request) { - if (!this._listener._settings.onMessage) { - throw Boom.notImplemented(); - } - - const message = await this._listener._settings.onMessage(this, request.message); - const response = { - type: "message", - id: request.id, - message, - }; - - return { response }; - } - - private async _processSubscription(request) { - await this._listener._subscribe(request.path, this); - - const response = { - type: "sub", - id: request.id, - path: request.path, - }; - - return { response }; - } - - private async _processUnsub(request) { - await this._unsubscribe(request.path); - - const response = { - type: "unsub", - id: request.id, - }; - - return { response }; - } - - private _unsubscribe(path) { - const sub = this._subscriptions[path]; - if (sub) { - delete this._subscriptions[path]; - return sub.remove(this, path); - } - } - - private async _authenticate(request) { - await this._authByCookie(); - - if (!request.auth && !this.auth.isAuthenticated && this._listener._authRequired()) { - throw Boom.unauthorized("Connection requires authentication"); - } - - if (request.auth && request.type !== "reauth" && this.auth.isAuthenticated) { - // Authenticated using a cookie during upgrade - - throw Boom.badRequest("Connection already authenticated"); - } - - clearTimeout(this.auth._initialAuthTimeout); - this.auth._initialAuthTimeout = null; - - if (request.auth) { - const config = this._listener._settings.auth; - if (config.type === "direct") { - const route = this.server.lookup(config.id); - request.auth.headers = request.auth.headers || {}; - request.auth.headers["x-forwarded-for"] = this.info["x-forwarded-for"]; - const res = await this.server.inject({ - url: route.path, - method: "auth", - headers: request.auth.headers, - remoteAddress: this.info.remoteAddress, - allowInternals: true, - validate: false, - }); - if (res.statusCode !== 200) { - throw Boom.unauthorized(res.result.message); - } - - if (!res.result.credentials) { - return; - } - - this._setCredentials(res.result); - return; - } - - try { - const auth = await Iron.unseal(request.auth, config.password, config.iron || Iron.defaults); - this._setCredentials(auth); - } catch (err) { - throw Boom.unauthorized("Invalid token"); - } - } - } - - private async _authByCookie() { - const config = this._listener._settings.auth; - if (!config) { - return; - } - - const cookies = this._cookies; - if (!cookies) { - return; - } - - let states; - try { - states = (await this.server.states.parse(cookies)).states; - } catch (err) { - throw Boom.unauthorized("Invalid nes authentication cookie"); - } - - const auth = states[config.cookie]; - if (auth) { - this._setCredentials(auth); - } - } - - private _setCredentials(auth) { - this.auth.isAuthenticated = true; - this.auth.credentials = auth.credentials; - this.auth.artifacts = auth.artifacts; - this.auth.strategy = auth.strategy; - - this.auth._verified = Date.now(); - - this._listener._sockets.auth(this); - } - - private async _verifyAuth() { - const now = Date.now(); - - if ( - !this._listener._settings.auth.minAuthVerifyInterval || - now - this.auth._verified < this._listener._settings.auth.minAuthVerifyInterval - ) { - return; - } - - this.auth._verified = now; - - try { - await this.server.auth.verify(this); - } catch (err) { - Bounce.rethrow(err, "system"); - setImmediate(() => this.disconnect()); - throw Boom.forbidden("Credential verification failed"); - } - } - private _filterHeaders(headers) { const filter = this._listener._settings.headers; if (!filter) { diff --git a/yarn.lock b/yarn.lock index c1041b8c72..8da01cb363 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1430,7 +1430,7 @@ resolved "https://registry.yarnpkg.com/@hapi/bourne/-/bourne-2.0.0.tgz#5bb2193eb685c0007540ca61d166d4e1edaf918d" integrity sha512-WEezM1FWztfbzqIUbsDzFRVMxSoLy3HugVcux6KDDtTqzPsLE8NDRHfXvev66aH1i2oOKKar3/XDjbvh/OUBdg== -"@hapi/call@8.x", "@hapi/call@8.x.x": +"@hapi/call@8.x.x": version "8.0.0" resolved "https://registry.yarnpkg.com/@hapi/call/-/call-8.0.0.tgz#a5e456b611d848cf5a12662ef120da886041c54d" integrity sha512-4xHIWWqaIDQlVU88XAnomACSoC7iWUfaLfdu2T7I0y+HFFwZUrKKGfwn6ik4kwKsJRMnOliG3UXsF8V/94+Lkg==