Skip to content

Commit

Permalink
ADD Transport selector
Browse files Browse the repository at this point in the history
  • Loading branch information
dmoranj committed Jun 2, 2016
1 parent 6e7afc6 commit 93dd167
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 19 deletions.
5 changes: 3 additions & 2 deletions lib/bindings/MQTTBinding.js
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,9 @@ function stop(callback) {
], callback);
}

function executeCommand(apiKey, device, serializedPayload) {
return mqttClient.publish.bind(mqttClient, '/' + apiKey + '/' + device.id + '/cmd', serializedPayload, null);
function executeCommand(apiKey, device, serializedPayload, callback) {
mqttClient.publish('/' + apiKey + '/' + device.id + '/cmd', serializedPayload, null);
callback();
}

exports.start = start;
Expand Down
18 changes: 14 additions & 4 deletions lib/commandHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var async = require('async'),
logger = require('logops'),
iotAgentLib = require('iotagent-node-lib'),
constants = require('./constants'),
mqttBinding = require('./bindings/MQTTBinding'),
transportSelector = require('./transportSelector'),
config = require('./configService'),
context = {
op: 'IoTAgentMQTT.Commands'
Expand All @@ -43,15 +43,17 @@ var async = require('async'),
*/
function generateCommandExecution(apiKey, device, attribute) {
var payload = {},
serialized;
serialized,
executions;

payload[attribute.name] = attribute.value;
serialized = JSON.stringify(payload);

logger.debug('Sending command execution to device [%s] with apikey [%s] and payload [%s] ',
apiKey, device.id, attribute);

return mqttBinding.executeCommand(apiKey, device, serialized);
executions = transportSelector.createExecutionsForBinding([apiKey, device, serialized], 'executeCommand', null);
return executions;
}

/**
Expand All @@ -69,6 +71,11 @@ function generateCommandExecution(apiKey, device, attribute) {
function commandHandler(id, type, service, subservice, attributes, callback) {
logger.debug('Handling MQTT command for device [%s] in service [%s - %s]', id, service, subservice);

function concat(previous, current) {
previous = previous.concat(current);
return previous;
}

iotAgentLib.getDeviceByName(id, service, subservice, function(error, device) {
if (error) {
logger.error('Command execution could not be handled, as device for entity [%s] [%s] wasn\'t found',
Expand All @@ -79,7 +86,10 @@ function commandHandler(id, type, service, subservice, attributes, callback) {
if (error) {
callback(error);
} else {
async.series(attributes.map(generateCommandExecution.bind(null, apiKey, device)), callback);
async.series(attributes
.map(generateCommandExecution.bind(null, apiKey, device))
.reduce(concat, []),
callback);
}
});
}
Expand Down
31 changes: 19 additions & 12 deletions lib/transportSelector.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,8 @@ function startTransportBindings(newConfig, callback) {
});
}

/**
* Execute the function given by the 'functionName' parameter for all the transport bindings, with the arguments
* given in the 'argument' array. If the optional parameter protocol is not null, the function will only be
* executed in the plugin of the selected protocol.
*
* @param {Array} argument Array of arguments to call the function with.
* @param {String} functionName Name of the function to call in every transport plugin.
* @param {String} protocol Transport protocol where the function must be executed.
*/
function applyFunctionFromBinding(argument, functionName, protocol, callback) {
logger.debug('Looking for bindings for the function [%s] and protocol [%s]', functionName, protocol);
function createExecutionsForBinding(argument, functionName, protocol) {
logger.debug('Creating execution for function [%s] and protocol [%s]', functionName, protocol);

function addHandler(current, binding) {
if (binding[functionName] && (!protocol || binding.protocol === protocol)) {
Expand All @@ -74,7 +65,22 @@ function applyFunctionFromBinding(argument, functionName, protocol, callback) {
return current;
}

async.series(transportBindings.reduce(addHandler, []), callback);
return transportBindings.reduce(addHandler, []);
}

/**
* Execute the function given by the 'functionName' parameter for all the transport bindings, with the arguments
* given in the 'argument' array. If the optional parameter protocol is not null, the function will only be
* executed in the plugin of the selected protocol.
*
* @param {Array} argument Array of arguments to call the function with.
* @param {String} functionName Name of the function to call in every transport plugin.
* @param {String} protocol Transport protocol where the function must be executed.
*/
function applyFunctionFromBinding(argument, functionName, protocol, callback) {
logger.debug('Looking for bindings for the function [%s] and protocol [%s]', functionName, protocol);

async.series(createExecutionsForBinding(argument, functionName, protocol), callback);
}

/**
Expand All @@ -91,3 +97,4 @@ function stopTransportBindings(callback) {
exports.stopTransportBindings = stopTransportBindings;
exports.applyFunctionFromBinding = applyFunctionFromBinding;
exports.startTransportBindings = startTransportBindings;
exports.createExecutionsForBinding = createExecutionsForBinding;
3 changes: 2 additions & 1 deletion test/config-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ config.iota = {
deviceRegistrationDuration: 'P1M',
defaultType: 'Thing',
defaultResource: '',
defaultKey: '1234'
defaultKey: '1234',
defaultTransport: 'MQTT'
};

module.exports = config;

0 comments on commit 93dd167

Please sign in to comment.