Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connection flag #31

Merged
merged 7 commits into from
Dec 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
<a name="v0.1.1"></a>

# [0.4.2](https://github.com/moleculerjs/moleculer-channels/compare/v0.1.0...v0.1.1)

- Added Typescript support
- Added `connection` that prevents publishing events before the adapter is connected

<a name="v0.1.0"></a>

# v0.1.0 (2021-10-17)

First public version.
1,897 changes: 874 additions & 1,023 deletions package-lock.json

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,24 @@
"amqplib": "^0.8.0",
"benchmarkify": "^3.0.0",
"coveralls": "^3.1.1",
"eslint": "^8.0.1",
"eslint": "^8.5.0",
"eslint-config-prettier": "^8.3.0",
"eslint-plugin-node": "^11.1.0",
"eslint-plugin-prettier": "^4.0.0",
"eslint-plugin-promise": "^5.1.0",
"eslint-plugin-promise": "^5.2.0",
"eslint-plugin-security": "^1.4.0",
"ioredis": "^4.28.0",
"jest": "^27.2.5",
"jest-cli": "^27.2.5",
"ioredis": "^4.28.2",
"jest": "^27.4.5",
"jest-cli": "^27.4.5",
"kafkajs": "^1.15.0",
"kleur": "^4.1.4",
"moleculer": "^0.14.17",
"moleculer": "^0.14.18",
"moleculer-repl": "^0.6.6",
"msgpack5": "^5.3.2",
"nats": "^2.2.0",
"nodemon": "^2.0.13",
"nats": "^2.4.0",
"nodemon": "^2.0.15",
"npm-check": "^5.9.2",
"prettier": "^2.4.1"
"prettier": "^2.5.1"
},
"jest": {
"testEnvironment": "node",
Expand Down
9 changes: 7 additions & 2 deletions src/adapters/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

const BaseAdapter = require("./base");
const _ = require("lodash");
const { MoleculerError } = require("moleculer").Errors;
const { MoleculerError, MoleculerRetryableError } = require("moleculer").Errors;
const C = require("../constants");

let Amqplib;
Expand Down Expand Up @@ -94,7 +94,7 @@ class AmqpAdapter extends BaseAdapter {
* @type {Map<string,SubscriptionEntry>}
*/
this.subscriptions = new Map();
this.connected = false;

this.stopping = false;
this.connectAttempt = 0;
this.connectionCount = 0; // To detect reconnections
Expand Down Expand Up @@ -235,6 +235,7 @@ class AmqpAdapter extends BaseAdapter {
.then(() => {
this.connection = null;
this.channel = null;
this.connected = false;
resolve();
})
.catch(reject);
Expand Down Expand Up @@ -507,6 +508,10 @@ class AmqpAdapter extends BaseAdapter {
// Adapter is stopping. Publishing no longer is allowed
if (this.stopping) return;

if (!this.connected) {
icebob marked this conversation as resolved.
Show resolved Hide resolved
throw new MoleculerRetryableError("Adapter not yet connected. Skipping publishing.");
}

// Available options: http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish
const messageOptions = _.defaultsDeep(
{},
Expand Down
5 changes: 4 additions & 1 deletion src/adapters/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class BaseAdapter {
* @type {Map<string, string[]>}
*/
this.activeMessages = new Map();

/** @type {Boolean} Flag indicating the adapter's connection status */
this.connected = false;
}

/**
Expand Down Expand Up @@ -119,7 +122,7 @@ class BaseAdapter {
*/
metricsIncrement(metricName, chan) {
if (!this.broker.isMetricsEnabled()) return;

this.broker.metrics.increment(metricName, {
channel: chan.name,
group: chan.group
Expand Down
13 changes: 11 additions & 2 deletions src/adapters/kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

const BaseAdapter = require("./base");
const _ = require("lodash");
const { MoleculerError } = require("moleculer").Errors;
const { MoleculerError, MoleculerRetryableError } = require("moleculer").Errors;
const C = require("../constants");
/** Name of the partition where an error occurred while processing the message */
const HEADER_ORIGINAL_PARTITION = "x-original-partition";
Expand Down Expand Up @@ -171,6 +171,8 @@ class KafkaAdapter extends BaseAdapter {
await this.producer.connect();

this.logger.info("Kafka adapter is connected.");

this.connected = true;
}

/**
Expand Down Expand Up @@ -201,7 +203,10 @@ class KafkaAdapter extends BaseAdapter {
// Release the pointers
this.consumers = new Map();
})
.then(() => resolve())
.then(() => {
this.connected = false;
resolve();
})
.catch(err => reject(err));
} else {
this.logger.warn(
Expand Down Expand Up @@ -498,6 +503,10 @@ class KafkaAdapter extends BaseAdapter {
// Adapter is stopping. Publishing no longer is allowed
if (this.stopping) return;

if (!this.connected) {
throw new MoleculerRetryableError("Adapter not yet connected. Skipping publishing.");
}

this.logger.debug(`Publish a message to '${channelName}' topic...`, payload, opts);

const data = opts.raw ? payload : this.serializer.serialize(payload);
Expand Down
9 changes: 9 additions & 0 deletions src/adapters/nats.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
const BaseAdapter = require("./base");
const _ = require("lodash");
const C = require("../constants");
const { MoleculerRetryableError } = require("moleculer").Errors;

let NATS;

Expand Down Expand Up @@ -130,6 +131,8 @@ class NatsAdapter extends BaseAdapter {
this.manager = await this.connection.jetstreamManager();

this.client = this.connection.jetstream(); // JetStreamOptions

this.connected = true;
}

/**
Expand All @@ -149,6 +152,8 @@ class NatsAdapter extends BaseAdapter {
} catch (error) {
this.logger.error("Error while closing NATS JetStream connection.", error);
}

this.connected = false;
}

/**
Expand Down Expand Up @@ -426,6 +431,10 @@ class NatsAdapter extends BaseAdapter {
// Adapter is stopping. Publishing no longer is allowed
if (this.stopping) return;

if (!this.connected) {
throw new MoleculerRetryableError("Adapter not yet connected. Skipping publishing.");
}

try {
// Remap headers into JetStream format
if (opts.headers) {
Expand Down
13 changes: 11 additions & 2 deletions src/adapters/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

const _ = require("lodash");
const BaseAdapter = require("./base");
const { ServiceSchemaError } = require("moleculer").Errors;
const { ServiceSchemaError, MoleculerRetryableError } = require("moleculer").Errors;
const C = require("../constants");
/** Redis generated ID of the message that was not processed properly*/
const HEADER_ORIGINAL_ID = "x-original-id";
Expand Down Expand Up @@ -149,6 +149,8 @@ class RedisAdapter extends BaseAdapter {
this.nackedName,
await this.createRedisClient(this.nackedName, this.opts.redis)
);

this.connected = true;
}

/**
Expand All @@ -171,7 +173,10 @@ class RedisAdapter extends BaseAdapter {
// Release the pointers
this.clients = new Map();
})
.then(() => resolve())
.then(() => {
this.connected = false;
resolve();
})
.catch(err => reject(err));
} else {
this.logger.warn(
Expand Down Expand Up @@ -670,6 +675,10 @@ class RedisAdapter extends BaseAdapter {
// Adapter is stopping. Publishing no longer is allowed
if (this.stopping) return;

if (!this.connected) {
throw new MoleculerRetryableError("Adapter not yet connected. Skipping publishing.");
}

this.logger.debug(`Publish a message to '${channelName}' channel...`, payload, opts);

const clientPub = this.clients.get(this.pubName);
Expand Down
2 changes: 1 addition & 1 deletion test/integration/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ describe("Integration tests", () => {
};
// ---- ^ SETUP ^ ---

const numMessages = 20
const numMessages = 20;

await Promise.all(
_.times(numMessages, () => broker.sendToChannel("test.balanced.topic", msg))
Expand Down