Skip to content

Commit

Permalink
ADD Transport selector
Browse files Browse the repository at this point in the history
  • Loading branch information
dmoranj committed Jun 2, 2016
1 parent ba0c688 commit 6e7afc6
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 16 deletions.
7 changes: 6 additions & 1 deletion lib/bindings/MQTTBinding.js
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ function start(callback) {

mqttClient.on('connect', function() {
logger.info(context, 'MQTT Client connected');
commandHandler.init(mqttClient);
recreateSubscriptions(callback);
});
}
Expand All @@ -348,7 +347,13 @@ function stop(callback) {
], callback);
}

function executeCommand(apiKey, device, serializedPayload) {
return mqttClient.publish.bind(mqttClient, '/' + apiKey + '/' + device.id + '/cmd', serializedPayload, null);
}

exports.start = start;
exports.stop = stop;
exports.unsubscribeAll = unsubscribeAll;
exports.sendConfigurationToDevice = sendConfigurationToDevice;
exports.executeCommand = executeCommand;
exports.protocol = 'MQTT';
9 changes: 2 additions & 7 deletions lib/commandHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var async = require('async'),
logger = require('logops'),
iotAgentLib = require('iotagent-node-lib'),
constants = require('./constants'),
mqttClient,
mqttBinding = require('./bindings/MQTTBinding'),
config = require('./configService'),
context = {
op: 'IoTAgentMQTT.Commands'
Expand All @@ -51,7 +51,7 @@ function generateCommandExecution(apiKey, device, attribute) {
logger.debug('Sending command execution to device [%s] with apikey [%s] and payload [%s] ',
apiKey, device.id, attribute);

return mqttClient.publish.bind(mqttClient, '/' + apiKey + '/' + device.id + '/cmd', serialized, null);
return mqttBinding.executeCommand(apiKey, device, serialized);
}

/**
Expand Down Expand Up @@ -123,10 +123,5 @@ function updateCommand(apiKey, deviceId, device, messageObj) {
});
}

function init(newMqttClient) {
mqttClient = newMqttClient;
}

exports.updateCommand = updateCommand;
exports.handler = commandHandler;
exports.init = init;
15 changes: 7 additions & 8 deletions lib/iotagent-json.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
'use strict';

var iotAgentLib = require('iotagent-node-lib'),
mqttBinding = require('./bindings/MQTTBinding'),
transportSelector = require('./transportSelector'),
commandHandler = require('./commandHandler'),
iotaUtils = require('./iotaUtils'),
logger = require('logops'),
Expand All @@ -38,7 +38,6 @@ var iotAgentLib = require('iotagent-node-lib'),
},
config = require('./configService');


/**
* Handler for incoming notifications (for the configuration subscription mechanism).
*
Expand All @@ -47,14 +46,14 @@ var iotAgentLib = require('iotagent-node-lib'),
*/
function notificationHandler(device, updates, callback) {
function getParameters(apiKey, callback) {
callback(null, apiKey, device.id, updates);
function invokeConfiguration(apiKey, callback) {
transportSelector.applyFunctionFromBinding(
[apiKey, device.id, updates], 'sendConfigurationToDevice', null, callback);
}

async.waterfall([
apply(iotaUtils.getEffectiveApiKey, device.service, device.subservice),
getParameters,
mqttBinding.sendConfigurationToDevice
invokeConfiguration
], callback);

}
Expand Down Expand Up @@ -109,7 +108,7 @@ function start(newConfig, callback) {

iotAgentLib.addQueryMiddleware(iotAgentLib.dataPlugins.compressTimestamp.query);

mqttBinding.start(callback);
transportSelector.startTransportBindings(newConfig, callback);
}
});
}
Expand All @@ -121,7 +120,7 @@ function start(newConfig, callback) {
function stop(callback) {
logger.info(context, 'Stopping IoT Agent');
async.series([
mqttBinding.stop,
transportSelector.stopTransportBindings,
iotAgentLib.resetMiddlewares,
iotAgentLib.deactivate
], callback);
Expand Down
93 changes: 93 additions & 0 deletions lib/transportSelector.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2016 Telefonica Investigación y Desarrollo, S.A.U
*
* This file is part of iotagent-json
*
* iotagent-json is free software: you can redistribute it and/or
* modify it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the License,
* or (at your option) any later version.
*
* iotagent-json is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public
* License along with iotagent-json.
* If not, seehttp://www.gnu.org/licenses/.
*
* For those usages not covered by the GNU Affero General Public License
* please contact with::[contacto@tid.es]
*/

'use strict';

var path = require('path'),
fs = require('fs'),
logger = require('logops'),
async = require('async'),
transportBindings = [];

/**
* Start all the transport protocol bindings found in the bindings directory.
*
* @param {Object} newConfig Configuration object to start the bindings
*/
function startTransportBindings(newConfig, callback) {
function invokeBinding(binding, callback) {
binding.start(callback);
}

var bindings = fs.readdirSync(path.join(__dirname, './bindings'));

transportBindings = bindings.map(function(item) {
return require('./bindings/' + item);
});

async.map(transportBindings, invokeBinding, function(error) {
callback();
});
}

/**
* Execute the function given by the 'functionName' parameter for all the transport bindings, with the arguments
* given in the 'argument' array. If the optional parameter protocol is not null, the function will only be
* executed in the plugin of the selected protocol.
*
* @param {Array} argument Array of arguments to call the function with.
* @param {String} functionName Name of the function to call in every transport plugin.
* @param {String} protocol Transport protocol where the function must be executed.
*/
function applyFunctionFromBinding(argument, functionName, protocol, callback) {
logger.debug('Looking for bindings for the function [%s] and protocol [%s]', functionName, protocol);

function addHandler(current, binding) {
if (binding[functionName] && (!protocol || binding.protocol === protocol)) {
var args = [binding[functionName]].concat(argument),
boundFunction = binding[functionName].bind.apply(binding[functionName], args);

logger.debug('Binding found for function [%s] and protocol [%s]', functionName, protocol);
current.push(boundFunction);
}

return current;
}

async.series(transportBindings.reduce(addHandler, []), callback);
}

/**
* Stop all the transport protocol bindings of the agent.
*/
function stopTransportBindings(callback) {
function invokeBinding(binding, callback) {
binding.stop(callback);
}

async.map(transportBindings, invokeBinding, callback);
}

exports.stopTransportBindings = stopTransportBindings;
exports.applyFunctionFromBinding = applyFunctionFromBinding;
exports.startTransportBindings = startTransportBindings;

0 comments on commit 6e7afc6

Please sign in to comment.