Skip to content

Commit

Permalink
Merge pull request #1313 from JS-AK/fix/transit/timeout-middleware
Browse files Browse the repository at this point in the history
fix: updated moleculer-timeout-middleware with streams
  • Loading branch information
icebob authored Feb 3, 2025
2 parents 13d09d0 + 58b8548 commit ad06130
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/middlewares/timeout.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

"use strict";

const { Stream } = require("stream");

const { RequestTimeoutError } = require("../errors");
const { METRIC } = require("../metrics");

Expand Down Expand Up @@ -38,6 +40,11 @@ module.exports = function (broker) {
nodeID,
timeout: ctx.options.timeout
});

if (ctx.params instanceof Stream) {
ctx.params.emit("moleculer-timeout-middleware", ctx.options.timeout);
}

err = new RequestTimeoutError({ action: actionName, nodeID });

broker.metrics.increment(METRIC.MOLECULER_REQUEST_TIMEOUT_TOTAL, {
Expand Down
22 changes: 22 additions & 0 deletions src/transit.js
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,16 @@ class Transit {
pass.$pool = new Map();

this.pendingReqStreams.set(payload.id, { sender: payload.sender, stream: pass });

pass.on("moleculer-timeout-middleware", timeout => {
setTimeout(() => {
this.pendingReqStreams.delete(payload.id);
this._destroyStreamIfPossible(
pass,
`Pending request stream ${payload.id} have been closed by timeout ${timeout} ms`
);
}, 1000);
});
}

if (payload.seq > pass.$prevSeq + 1) {
Expand Down Expand Up @@ -864,6 +874,17 @@ class Transit {
// Add to pendings
this.pendingRequests.set(ctx.id, request);

if (request.stream) {
const pass = request.ctx.params;

pass.on("moleculer-timeout-middleware", timeout => {
this._destroyStreamIfPossible(
pass,
`Request stream ${ctx.id} have been closed by timeout ${timeout} ms`
);
});
}

// Publish request
return this.publish(packet)
.then(() => {
Expand Down Expand Up @@ -1103,6 +1124,7 @@ class Transit {
*/
_destroyStreamIfPossible(stream, errorMessage) {
if (!stream.destroyed && stream.destroy) {
stream.on("error", err => this.logger.error(err.message));
stream.destroy(new Error(errorMessage));
}
}
Expand Down

0 comments on commit ad06130

Please sign in to comment.