Skip to content

Commit

Permalink
optimize performance of broker.call
Browse files Browse the repository at this point in the history
  • Loading branch information
icebob committed Apr 11, 2017
1 parent dc57bc4 commit e99eba1
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 43 deletions.
2 changes: 1 addition & 1 deletion benchmark/perf-runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ let [b1, b2] = createBrokers(Transporters.Fake);
let count = 0;
function doRequest() {
count++;
return b1.call("echo.reply", { a: count }).then(res => {
return b2.call("echo.reply", { a: count }).then(res => {
if (count % 10000) {
// Fast cycle
doRequest();
Expand Down
88 changes: 46 additions & 42 deletions src/service-broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class ServiceBroker {
logLevel: "info",

transporter: null,
requestTimeout: 5 * 1000,
requestTimeout: 0 * 1000,
requestRetry: 0,
heartbeatInterval: 10,
heartbeatTimeout: 30,
Expand Down Expand Up @@ -609,7 +609,7 @@ class ServiceBroker {


/**
* Call an action (local or global)
* Call an action (local or remote)
*
* @param {any} actionName name of action
* @param {any} params params of action
Expand Down Expand Up @@ -677,65 +677,69 @@ class ServiceBroker {
if (ctx.metrics || this.statistics) {
// Add after to metrics & statistics
p = p.then(res => {
this.finishCall(ctx, null);
this._metricsCall(ctx, null);
return res;
});
}

if (opts.timeout > 0)
p = p.timeout(opts.timeout);

return p.catch(Promise.TimeoutError, () => {
// Convert timeout error
throw new E.RequestTimeoutError(actionName, nodeID);
}).catch(err => {
if (!(err instanceof Error)) {
err = new E.CustomError(err);
}
return p.catch(err => this._callErrorHandler(err, ctx, opts));
}

err.ctx = ctx;
_callErrorHandler(err, ctx, opts) {
const actionName = ctx.action.name;
const nodeID = ctx.nodeID;

if (isRemoteCall) {
// Remove pending request
this.transit.removePendingRequest(ctx.id);
}
if (!(err instanceof Error)) {
err = new E.CustomError(err);
}
if (err instanceof Promise.TimeoutError)
err = new E.RequestTimeoutError(actionName, nodeID);

if (err instanceof E.RequestTimeoutError) {
// Retry request
if (opts.retryCount-- > 0) {
this.logger.warn(`Action '${actionName}' call timed out on '${nodeID}'!`);
this.logger.warn(`Recall '${actionName}' action (retry: ${opts.retryCount + 1})...`);
err.ctx = ctx;

opts.ctx = ctx; // Reuse this context
return this.call(actionName, params, opts);
}
}
if (nodeID) {
// Remove pending request
this.transit.removePendingRequest(ctx.id);
}

if (err instanceof E.RequestTimeoutError) {
// Retry request
if (opts.retryCount-- > 0) {
this.logger.warn(`Action '${actionName}' call timed out on '${nodeID}'!`);
this.logger.warn(`Recall '${actionName}' action (retry: ${opts.retryCount + 1})...`);

// Set node status to unavailable
if (err.code >= 500) {
const affectedNodeID = err.nodeID || nodeID;
if (affectedNodeID != this.nodeID)
this.nodeUnavailable(affectedNodeID);
opts.ctx = ctx; // Reuse this context
return this.call(actionName, ctx.params, opts);
}
}

// Need it? this.logger.error("Action request error!", err);
// Set node status to unavailable
if (err.code >= 500) {
const affectedNodeID = err.nodeID || nodeID;
if (affectedNodeID != this.nodeID)
this.nodeUnavailable(affectedNodeID);
}

this.finishCall(ctx, err);
// Need it? this.logger.error("Action request error!", err);

// Handle fallback response
if (opts.fallbackResponse) {
this.logger.warn(`Action '${actionName}' returns fallback response!`);
if (isFunction(opts.fallbackResponse))
return opts.fallbackResponse(ctx, ctx.nodeID);
else
return Promise.resolve(opts.fallbackResponse);
}
this._metricsCall(ctx, err);

return Promise.reject(err);
});
// Handle fallback response
if (opts.fallbackResponse) {
this.logger.warn(`Action '${actionName}' returns fallback response!`);
if (isFunction(opts.fallbackResponse))
return opts.fallbackResponse(ctx, ctx.nodeID);
else
return Promise.resolve(opts.fallbackResponse);
}

return Promise.reject(err);
}

finishCall(ctx, err) {
_metricsCall(ctx, err) {
if (ctx.metrics) {
ctx._metricFinish(err);

Expand Down

0 comments on commit e99eba1

Please sign in to comment.