Skip to content

Commit

Permalink
Merge pull request #685 from jason-fox/feature/mqtt
Browse files Browse the repository at this point in the history
Add flags to disable MQTT and AMQP
  • Loading branch information
fgalan authored Nov 4, 2022
2 parents 5a70f66 + 04e88ac commit 8cf9b5f
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
- Add IOTA_MQTT_DISABLED (mqtt.disabled) and IOTA_AMQP_DISABLED (amqp.disabled) flags to disable MQTT and AMQP (#685)
- Set Nodejs 14 as minimum version in packages.json (effectively removing Nodev12 from supported versions)
14 changes: 12 additions & 2 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,12 @@ config.mqtt = {
/**
* Whether to use slashes at the beginning of topic when sending or not
*/
avoidLeadingSlash: false
avoidLeadingSlash: false,

/**
* Flag to disable the MQTT transport. (default is false).
*/
disabled: false
};

/**
Expand Down Expand Up @@ -131,7 +136,12 @@ config.amqp = {
/**
* durable queue flag (default is false).
*/
options: { durable: true }
options: { durable: true },

/**
* Flag to disable the AMQP transport. (default is false).
*/
disabled: false
};

/**
Expand Down
2 changes: 2 additions & 0 deletions docs/installationguide.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ The ones relating specific JSON bindings are described in the following table.
| IOTA_MQTT_AVOID_LEADING_SLASH | mqtt.avoidLeadingSlash |
| IOTA_MQTT_CLEAN | mqtt.clean |
| IOTA_MQTT_CLIENT_ID | mqtt.clientId |
| IOTA_MQTT_DISABLED | mqtt.disabled |
| IOTA_AMQP_HOST | amqp.host |
| IOTA_AMQP_PORT | amqp.port |
| IOTA_AMQP_USERNAME | amqp.username |
Expand All @@ -211,6 +212,7 @@ The ones relating specific JSON bindings are described in the following table.
| IOTA_AMQP_DURABLE | amqp.durable |
| IOTA_AMQP_RETRIES | amqp.retries |
| IOTA_AMQP_RETRY_TIME | amqp.retryTime |
| IOTA_AMQP_DISABLED | amqp.disabled |
| IOTA_HTTP_HOST | http.host |
| IOTA_HTTP_PORT | http.port |
| IOTA_HTTP_TIMEOUT | http.timeout |
Expand Down
50 changes: 26 additions & 24 deletions lib/bindings/AMQPBinding.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,60 +70,62 @@ function queueListener(msg) {
function start(callback) {
let exchange;
let queue;
const amqpConfig = config.getConfig().amqp;
if (!amqpConfig) {
return config.getLogger().error(context, 'Error AMPQ is not configured');
}
if (amqpConfig.disabled) {
return config.getLogger().warn(context, 'AMPQ is disabled');
}

if (config.getConfig() && config.getConfig().amqp && config.getConfig().amqp.exchange) {
exchange = config.getConfig().amqp.exchange;
if (amqpConfig.exchange) {
exchange = amqpConfig.exchange;
} else {
exchange = constants.AMQP_DEFAULT_EXCHANGE;
}

if (config.getConfig() && config.getConfig().amqp && config.getConfig().amqp.queue) {
queue = config.getConfig().amqp.queue;
if (amqpConfig.queue) {
queue = amqpConfig.queue;
} else {
queue = constants.AMQP_DEFAULT_QUEUE;
}

let durable;

if (
config.getConfig() &&
config.getConfig().amqp &&
config.getConfig().amqp.options &&
config.getConfig().amqp.options.durable
amqpConfig.options && amqpConfig.options.durable
) {
durable = config.getConfig().amqp.options.durable;
durable = amqpConfig.options.durable;
} else {
durable = constants.AMQP_DEFAULT_DURABLE;
}

let retries;
let retryTime;

if (config.getConfig() && config.getConfig().amqp && config.getConfig().amqp.retries) {
retries = config.getConfig().amqp.retries;
if (amqpConfig.retries) {
retries = amqpConfig.retries;
} else {
retries = constants.AMQP_DEFAULT_RETRIES;
}
if (config.getConfig() && config.getConfig().amqp && config.getConfig().amqp.retrytime) {
retryTime = config.getConfig().amqp.retryTime;
if (amqpConfig.retrytime) {
retryTime = amqpConfig.retryTime;
} else {
retryTime = constants.AMQP_DEFAULT_RETRY_TIME;
}

let uri = 'amqp://';
if (config.getConfig().amqp) {
if (config.getConfig().amqp.username && config.getConfig().amqp.password) {
uri += config.getConfig().amqp.username + ':' + config.getConfig().amqp.password + '@';
}
if (config.getConfig().amqp.host) {
uri += config.getConfig().amqp.host;
if (config.getConfig().amqp.port) {
uri += ':' + config.getConfig().amqp.port;
}

if (amqpConfig.username && amqpConfig.password) {
uri += amqpConfig.username + ':' + amqpConfig.password + '@';
}
if (amqpConfig.host) {
uri += amqpConfig.host;
if (amqpConfig.port) {
uri += ':' + amqpConfig.port;
}
} else {
return config.getLogger().error(context, 'Error AMQP is not configured');
}

let isConnecting = false;
let numRetried = 0;

Expand Down
3 changes: 3 additions & 0 deletions lib/bindings/MQTTBinding.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ function start(callback) {
if (!mqttConfig) {
return config.getLogger().error(context, 'Error MQTT is not configured');
}
if (mqttConfig.disabled) {
return config.getLogger().warn(context, 'MQTT is disabled');
}
const rejectUnauthorized =
typeof mqttConfig.rejectUnauthorized === 'boolean' ? mqttConfig.rejectUnauthorized : true;
let rndSuffix = '_' + Math.random().toString(16).substr(2, 8);
Expand Down
20 changes: 16 additions & 4 deletions lib/configService.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ function processEnvironmentVariables() {
'IOTA_MQTT_AVOID_LEADING_SLASH',
'IOTA_MQTT_CLEAN',
'IOTA_MQTT_CLIENT_ID',
'IOTA_MQTT_DISABLED',
'IOTA_AMQP_HOST',
'IOTA_AMQP_PORT',
'IOTA_AMQP_USERNAME',
Expand All @@ -79,6 +80,7 @@ function processEnvironmentVariables() {
'IOTA_AMQP_DURABLE',
'IOTA_AMQP_RETRIES',
'IOTA_AMQP_RETRY_TIME',
'IOTA_AMQP_DISABLED',
'IOTA_HTTP_HOST',
'IOTA_HTTP_PORT',
'IOTA_HTTP_TIMEOUT',
Expand All @@ -102,7 +104,8 @@ function processEnvironmentVariables() {
'IOTA_MQTT_KEEPALIVE',
'IOTA_MQTT_AVOID_LEADING_SLASH',
'IOTA_MQTT_CLEAN',
'IOTA_MQTT_CLIENT_ID'
'IOTA_MQTT_CLIENT_ID',
'IOTA_MQTT_DISABLED'
];
const amqpVariables = [
'IOTA_AMQP_HOST',
Expand All @@ -113,7 +116,8 @@ function processEnvironmentVariables() {
'IOTA_AMQP_QUEUE',
'IOTA_AMQP_DURABLE',
'IOTA_AMQP_RETRIES',
'IOTA_AMQP_RETRY_TIME'
'IOTA_AMQP_RETRY_TIME',
'IOTA_AMQP_DISABLED'
];
const httpVariables = ['IOTA_HTTP_HOST', 'IOTA_HTTP_PORT', 'IOTA_HTTP_TIMEOUT', 'IOTA_HTTP_KEY', 'IOTA_HTTP_CERT'];

Expand Down Expand Up @@ -149,7 +153,7 @@ function processEnvironmentVariables() {
}

if (anyIsSet(mqttVariables)) {
config.mqtt = {};
config.mqtt = config.mqtt || {};
}

if (process.env.IOTA_MQTT_PROTOCOL) {
Expand Down Expand Up @@ -225,8 +229,12 @@ function processEnvironmentVariables() {
config.mqtt.clientId = process.env.IOTA_MQTT_CLIENT_ID;
}

if (process.env.IOTA_MQTT_DISABLED && process.env.IOTA_MQTT_DISABLED.trim().toLowerCase() === 'true'){
config.mqtt.disabled = true;
}

if (anyIsSet(amqpVariables)) {
config.amqp = {};
config.amqp = config.amqp || {};
}

if (process.env.IOTA_AMQP_HOST) {
Expand Down Expand Up @@ -266,6 +274,10 @@ function processEnvironmentVariables() {
config.amqp.retryTime = process.env.IOTA_AMQP_RETRY_TIME;
}

if (process.env.IOTA_AMQP_DISABLED && process.env.IOTA_AMQP_DISABLED.trim().toLowerCase() === 'true'){
config.amqp.disabled = true;
}

if (anyIsSet(httpVariables)) {
config.http = {};
}
Expand Down
6 changes: 6 additions & 0 deletions test/unit/startup-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ describe('Startup tests', function () {
process.env.IOTA_MQTT_RETRIES = '2';
process.env.IOTA_MQTT_RETRY_TIME = '5';
process.env.IOTA_MQTT_KEEPALIVE = '0';
process.env.IOTA_MQTT_DISABLED = 'true';
});

afterEach(function () {
Expand All @@ -63,6 +64,7 @@ describe('Startup tests', function () {
delete process.env.IOTA_MQTT_RETRIES;
delete process.env.IOTA_MQTT_RETRY_TIME;
delete process.env.IOTA_MQTT_KEEPALIVE;
delete process.env.IOTA_MQTT_DISABLED;
});

it('should load the MQTT environment variables in the internal configuration', function (done) {
Expand All @@ -80,6 +82,7 @@ describe('Startup tests', function () {
config.getConfig().mqtt.retries.should.equal('2');
config.getConfig().mqtt.retryTime.should.equal('5');
config.getConfig().mqtt.keepalive.should.equal('0');
config.getConfig().mqtt.disabled.should.equal(true);
done();
});
});
Expand All @@ -95,6 +98,7 @@ describe('Startup tests', function () {
process.env.IOTA_AMQP_DURABLE = 'true';
process.env.IOTA_AMQP_RETRIES = '0';
process.env.IOTA_AMQP_RETRY_TIME = '5';
process.env.IOTA_AMQP_DISABLED = 'true';
});

afterEach(function () {
Expand All @@ -107,6 +111,7 @@ describe('Startup tests', function () {
delete process.env.IOTA_AMQP_DURABLE;
delete process.env.IOTA_AMQP_RETRIES;
delete process.env.IOTA_AMQP_RETRY_TIME;
delete process.env.IOTA_AMQP_DISABLED;
});

it('should load the AMQP environment variables in the internal configuration', function (done) {
Expand All @@ -120,6 +125,7 @@ describe('Startup tests', function () {
config.getConfig().amqp.options.durable.should.equal(true);
config.getConfig().amqp.retries.should.equal('0');
config.getConfig().amqp.retryTime.should.equal('5');
config.getConfig().amqp.disabled.should.equal(true);
done();
});
});
Expand Down

0 comments on commit 8cf9b5f

Please sign in to comment.