Skip to content

Commit

Permalink
add timeout & retryCount to Context
Browse files Browse the repository at this point in the history
  • Loading branch information
icebob committed Apr 11, 2017
1 parent 057bc00 commit 0235b7c
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 120 deletions.
5 changes: 4 additions & 1 deletion src/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class Context {

this.setParams(opts.params);

this.timeout = opts.timeout || 0;
this.retryCount = opts.retryCount || 0;

if (opts.parent && opts.parent.meta) {
// Merge metadata
this.meta = _.assign({}, opts.parent.meta, opts.meta);
Expand All @@ -45,7 +48,7 @@ class Context {

// Generate ID for context
if (this.nodeID || opts.metrics)
this.id = utils.generateToken();
this.id = opts.id || utils.generateToken();

// Initialize metrics properties
if (this.metrics) {
Expand Down
17 changes: 10 additions & 7 deletions src/service-broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ class ServiceBroker {

if (opts.retryCount == null)
opts.retryCount = this.options.requestRetry || 0;

// Find action by name
let actions = this.actions.get(actionName);
if (!actions) {
Expand Down Expand Up @@ -662,7 +662,10 @@ class ServiceBroker {
requestID: opts.requestID,
metrics: this.shouldMetric(),
parent: opts.parentCtx,
meta: opts.meta
meta: opts.meta,

timeout: opts.timeout,
retryCount: opts.retryCount
});
}

Expand All @@ -675,7 +678,7 @@ class ServiceBroker {
if (!isRemoteCall) {
p = action.handler(ctx);
} else {
p = this.transit.request(ctx, opts);
p = this.transit.request(ctx);
}

if (ctx.metrics || this.statistics) {
Expand All @@ -687,8 +690,8 @@ class ServiceBroker {
}

// Timeout handler
if (opts.timeout > 0)
p = p.timeout(opts.timeout);
if (ctx.timeout > 0)
p = p.timeout(ctx.timeout);

// Error handler
return p.catch(err => this._callErrorHandler(err, ctx, opts));
Expand All @@ -713,9 +716,9 @@ class ServiceBroker {

if (err instanceof E.RequestTimeoutError) {
// Retry request
if (opts.retryCount-- > 0) {
if (ctx.retryCount-- > 0) {
this.logger.warn(`Action '${actionName}' call timed out on '${nodeID}'!`);
this.logger.warn(`Recall '${actionName}' action (retry: ${opts.retryCount + 1})...`);
this.logger.warn(`Recall '${actionName}' action (retry: ${ctx.retryCount + 1})...`);

opts.ctx = ctx; // Reuse this context
return this.call(actionName, ctx.params, opts);
Expand Down
9 changes: 3 additions & 6 deletions src/transit.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,31 +240,28 @@ class Transit {
* what will be resolved when the response received.
*
* @param {Context} ctx Context of request
* @param {any} opts Options of request
* @returns {Promise}
*
* @memberOf Transit
*/
request(ctx, opts = {}) {
request(ctx) {
// Expanded the code that v8 can optimize it. (TryCatchStatement disable optimizing)
return new Promise((resolve, reject) => this._doRequest(ctx, opts, resolve, reject));
return new Promise((resolve, reject) => this._doRequest(ctx, resolve, reject));
}

/**
* Do a remote request
*
* @param {Context} ctx Context of request
* @param {any} opts Options of request
* @param {Function} resolve Resolve of Promise
* @param {Function} reject Reject of Promise
*
* @memberOf Transit
*/
_doRequest(ctx, opts, resolve, reject) {
_doRequest(ctx, resolve, reject) {
const request = {
nodeID: ctx.nodeID,
//ctx,
//opts,
resolve,
reject
};
Expand Down
123 changes: 27 additions & 96 deletions test/unit/context.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ describe("Test Context", () => {
expect(ctx.user).not.toBeDefined();
expect(ctx.parentID).toBeNull();
expect(ctx.level).toBe(1);

expect(ctx.timeout).toBe(0);
expect(ctx.retryCount).toBe(0);

expect(ctx.params).toEqual({});

expect(ctx.meta).toEqual({});

expect(ctx.id).not.toBeDefined();

expect(ctx.metrics).toBe(false);
Expand Down Expand Up @@ -53,7 +58,9 @@ describe("Test Context", () => {
nodeID: "node-1",
meta: {
user: 1
}
},
timeout: 2000,
retryCount: 2
};
let ctx = new Context(opts);

Expand All @@ -65,6 +72,9 @@ describe("Test Context", () => {
expect(ctx.parentID).toBeDefined();
expect(ctx.level).toBe(2);

expect(ctx.timeout).toBe(2000);
expect(ctx.retryCount).toBe(2);

expect(ctx.params).toEqual({ b: 5 });

expect(ctx.id).toBeDefined();
Expand Down Expand Up @@ -121,6 +131,7 @@ describe("Test Context", () => {
let broker = new ServiceBroker({ metrics: true });

let opts = {
id: "12345",
parent: {
id: "parent123",
meta: {
Expand All @@ -133,10 +144,12 @@ describe("Test Context", () => {
meta: {
b: "Hi",
c: 100
}
},
metrics: true
};
let ctx = new Context(opts);

expect(ctx.id).toBe("12345");
expect(ctx.meta).toEqual({
a: 5,
b: "Hi",
Expand Down Expand Up @@ -197,6 +210,18 @@ describe("Test call method", () => {
expect(broker.call).toHaveBeenCalledTimes(1);
expect(broker.call).toHaveBeenCalledWith("posts.find", p, { parentCtx: ctx });
});

it("should call broker.call method with options", () => {
broker.call.mockClear();

let ctx = new Context({ broker });

let p = { id: 5 };
ctx.call("posts.find", p, { timeout: 2500 });

expect(broker.call).toHaveBeenCalledTimes(1);
expect(broker.call).toHaveBeenCalledWith("posts.find", p, { parentCtx: ctx, timeout: 2500 });
});
});

describe("Test emit method", () => {
Expand All @@ -220,100 +245,6 @@ describe("Test emit method", () => {
expect(broker.emit).toHaveBeenCalledWith("request.rest", "string-data");
});
});
/*
describe("Test invoke method", () => {
let broker = new ServiceBroker();
let ctx = new Context();
ctx.logger = broker.getLogger();
ctx._startInvoke = jest.fn();
ctx._finishInvoke = jest.fn();
let response = { id: 5 };
it("should call start & finishInvoke method", () => {
let handler = jest.fn(() => response);
let p = ctx.invoke(handler);
expect(p).toBeDefined();
expect(p.then).toBeDefined();
return p.then((data) => {
expect(ctx._startInvoke).toHaveBeenCalledTimes(1);
expect(ctx._finishInvoke).toHaveBeenCalledTimes(1);
expect(data).toBe(response);
});
});
it("should call start & finishInvoke method if handler return Promise", () => {
ctx._startInvoke.mockClear();
ctx._finishInvoke.mockClear();
let handler = jest.fn(() => Promise.resolve(response));
let p = ctx.invoke(handler);
expect(p).toBeDefined();
expect(p.then).toBeDefined();
return p.then((data) => {
expect(ctx._startInvoke).toHaveBeenCalledTimes(1);
expect(ctx._finishInvoke).toHaveBeenCalledTimes(1);
expect(data).toBe(response);
});
});
it("should call closeContext method if error catched", () => {
ctx._startInvoke.mockClear();
ctx._finishInvoke.mockClear();
let handler = jest.fn(() => Promise.reject(new ServiceNotFoundError("Something happened")));
let p = ctx.invoke(handler);
expect(p).toBeDefined();
expect(p.then).toBeDefined();
return p.catch((err) => {
expect(ctx._startInvoke).toHaveBeenCalledTimes(1);
expect(ctx._finishInvoke).toHaveBeenCalledTimes(1);
expect(ctx.error).toBe(err);
expect(err).toBeDefined();
expect(err).toBeInstanceOf(Error);
expect(err).toBeInstanceOf(ServiceNotFoundError);
expect(err.ctx).toBe(ctx);
});
});
it("should return Error if handler reject a string", () => {
ctx._startInvoke.mockClear();
ctx._finishInvoke.mockClear();
let handler = jest.fn(() => Promise.reject("Some error message"));
let p = ctx.invoke(handler);
expect(p).toBeDefined();
expect(p.then).toBeDefined();
return p.catch((err) => {
expect(ctx._startInvoke).toHaveBeenCalledTimes(1);
expect(ctx._finishInvoke).toHaveBeenCalledTimes(1);
expect(ctx.error).toBe(err);
expect(err).toBeDefined();
expect(err).toBeInstanceOf(Error);
expect(err.ctx).toBe(ctx);
});
});
it("should return Error if handler throw exception", () => {
ctx._startInvoke.mockClear();
ctx._finishInvoke.mockClear();
let handler = jest.fn(() => { throw new Error("Exception"); });
let p = ctx.invoke(handler);
expect(p).toBeDefined();
expect(p.then).toBeDefined();
return p.catch((err) => {
expect(ctx._startInvoke).toHaveBeenCalledTimes(1);
expect(ctx._finishInvoke).toHaveBeenCalledTimes(1);
expect(ctx.error).toBe(err);
expect(err).toBeDefined();
expect(err).toBeInstanceOf(Error);
expect(err.ctx).toBe(ctx);
});
});
});
*/

describe("Test _metricStart method", () => {
let broker = new ServiceBroker({ metrics: true });
Expand Down
24 changes: 14 additions & 10 deletions test/unit/service-broker.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -875,10 +875,10 @@ describe("Test broker.call method", () => {
metrics: true
});
broker.registerAction({ name: "user.create" }, "server-2");
broker.transit.request = jest.fn((ctx, opts) => Promise.resolve({ ctx, opts }));
broker.transit.request = jest.fn((ctx) => Promise.resolve({ ctx }));

it("should call transit.request with new Context without params", () => {
return broker.call("user.create").then(({ ctx, opts}) => {
return broker.call("user.create").then(({ ctx }) => {
expect(ctx).toBeDefined();
expect(ctx.broker).toBe(broker);
expect(ctx.nodeID).toBe("server-2");
Expand All @@ -889,25 +889,27 @@ describe("Test broker.call method", () => {
expect(ctx.params).toEqual({});
expect(ctx.metrics).toBe(true);

expect(opts).toEqual({"retryCount": 0, "timeout": 0});
expect(ctx.timeout).toBe(0);
expect(ctx.retryCount).toBe(0);

expect(broker.transit.request).toHaveBeenCalledTimes(1);
expect(broker.transit.request).toHaveBeenCalledWith(ctx, opts);
expect(broker.transit.request).toHaveBeenCalledWith(ctx);
});
});

it("should call transit.request with new Context with params & opts", () => {
broker.transit.request.mockClear();
let params = { limit: 5, search: "John" };
return broker.call("user.create", params, { timeout: 1000 }).then(({ ctx, opts}) => {
return broker.call("user.create", params, { timeout: 1000 }).then(({ ctx }) => {
expect(ctx).toBeDefined();
expect(ctx.action.name).toBe("user.create");
expect(ctx.params).toEqual(params);

expect(opts).toEqual({"retryCount": 0, "timeout": 1000});
expect(ctx.timeout).toBe(1000);
expect(ctx.retryCount).toBe(0);

expect(broker.transit.request).toHaveBeenCalledTimes(1);
expect(broker.transit.request).toHaveBeenCalledWith(ctx, opts);
expect(broker.transit.request).toHaveBeenCalledWith(ctx);

// expect(ctx._metricStart).toHaveBeenCalledTimes(1);
// expect(ctx._metricFinish).toHaveBeenCalledTimes(1);
Expand All @@ -926,7 +928,7 @@ describe("Test broker._callErrorHandler", () => {

let ctx = new Context({ broker, nodeID: "server-2", action: { name: "user.create" }, metrics: true});
let customErr = new CustomError("Error", 400);
let timeoutErr = new RequestTimeoutError({ action: "user.create" }, "server-2");
let timeoutErr = new RequestTimeoutError("user.create", "server-2");
ctx._metricFinish = jest.fn();
transit.removePendingRequest = jest.fn();

Expand Down Expand Up @@ -977,12 +979,14 @@ describe("Test broker._callErrorHandler", () => {
it("should retry call if retryCount > 0", () => {
ctx._metricFinish.mockClear();
broker.nodeUnavailable.mockClear();
ctx.retryCount = 2;

return broker._callErrorHandler(timeoutErr, ctx, { retryCount: 1}).then(() => {
return broker._callErrorHandler(timeoutErr, ctx, {}).then(() => {
expect(broker.call).toHaveBeenCalledTimes(1);
expect(broker.call).toHaveBeenCalledWith("user.create", {}, { retryCount: 0, ctx });
expect(broker.call).toHaveBeenCalledWith("user.create", {}, { ctx });
expect(broker.nodeUnavailable).toHaveBeenCalledTimes(0);

expect(ctx.retryCount).toBe(1);
expect(ctx._metricFinish).toHaveBeenCalledTimes(0);
});
});
Expand Down

0 comments on commit 0235b7c

Please sign in to comment.