From 93dd167adbbd4ee1a61f054897b799ea5f728945 Mon Sep 17 00:00:00 2001 From: Daniel Moran Date: Thu, 2 Jun 2016 17:16:26 +0200 Subject: [PATCH] ADD Transport selector --- lib/bindings/MQTTBinding.js | 5 +++-- lib/commandHandler.js | 18 ++++++++++++++---- lib/transportSelector.js | 31 +++++++++++++++++++------------ test/config-test.js | 3 ++- 4 files changed, 38 insertions(+), 19 deletions(-) diff --git a/lib/bindings/MQTTBinding.js b/lib/bindings/MQTTBinding.js index 18d076f5..1ae4e961 100644 --- a/lib/bindings/MQTTBinding.js +++ b/lib/bindings/MQTTBinding.js @@ -347,8 +347,9 @@ function stop(callback) { ], callback); } -function executeCommand(apiKey, device, serializedPayload) { - return mqttClient.publish.bind(mqttClient, '/' + apiKey + '/' + device.id + '/cmd', serializedPayload, null); +function executeCommand(apiKey, device, serializedPayload, callback) { + mqttClient.publish('/' + apiKey + '/' + device.id + '/cmd', serializedPayload, null); + callback(); } exports.start = start; diff --git a/lib/commandHandler.js b/lib/commandHandler.js index db5e9879..346df9c5 100644 --- a/lib/commandHandler.js +++ b/lib/commandHandler.js @@ -27,7 +27,7 @@ var async = require('async'), logger = require('logops'), iotAgentLib = require('iotagent-node-lib'), constants = require('./constants'), - mqttBinding = require('./bindings/MQTTBinding'), + transportSelector = require('./transportSelector'), config = require('./configService'), context = { op: 'IoTAgentMQTT.Commands' @@ -43,7 +43,8 @@ var async = require('async'), */ function generateCommandExecution(apiKey, device, attribute) { var payload = {}, - serialized; + serialized, + executions; payload[attribute.name] = attribute.value; serialized = JSON.stringify(payload); @@ -51,7 +52,8 @@ function generateCommandExecution(apiKey, device, attribute) { logger.debug('Sending command execution to device [%s] with apikey [%s] and payload [%s] ', apiKey, device.id, attribute); - return mqttBinding.executeCommand(apiKey, device, serialized); + executions = transportSelector.createExecutionsForBinding([apiKey, device, serialized], 'executeCommand', null); + return executions; } /** @@ -69,6 +71,11 @@ function generateCommandExecution(apiKey, device, attribute) { function commandHandler(id, type, service, subservice, attributes, callback) { logger.debug('Handling MQTT command for device [%s] in service [%s - %s]', id, service, subservice); + function concat(previous, current) { + previous = previous.concat(current); + return previous; + } + iotAgentLib.getDeviceByName(id, service, subservice, function(error, device) { if (error) { logger.error('Command execution could not be handled, as device for entity [%s] [%s] wasn\'t found', @@ -79,7 +86,10 @@ function commandHandler(id, type, service, subservice, attributes, callback) { if (error) { callback(error); } else { - async.series(attributes.map(generateCommandExecution.bind(null, apiKey, device)), callback); + async.series(attributes + .map(generateCommandExecution.bind(null, apiKey, device)) + .reduce(concat, []), + callback); } }); } diff --git a/lib/transportSelector.js b/lib/transportSelector.js index c8a6199f..6c682c80 100644 --- a/lib/transportSelector.js +++ b/lib/transportSelector.js @@ -50,17 +50,8 @@ function startTransportBindings(newConfig, callback) { }); } -/** - * Execute the function given by the 'functionName' parameter for all the transport bindings, with the arguments - * given in the 'argument' array. If the optional parameter protocol is not null, the function will only be - * executed in the plugin of the selected protocol. - * - * @param {Array} argument Array of arguments to call the function with. - * @param {String} functionName Name of the function to call in every transport plugin. - * @param {String} protocol Transport protocol where the function must be executed. - */ -function applyFunctionFromBinding(argument, functionName, protocol, callback) { - logger.debug('Looking for bindings for the function [%s] and protocol [%s]', functionName, protocol); +function createExecutionsForBinding(argument, functionName, protocol) { + logger.debug('Creating execution for function [%s] and protocol [%s]', functionName, protocol); function addHandler(current, binding) { if (binding[functionName] && (!protocol || binding.protocol === protocol)) { @@ -74,7 +65,22 @@ function applyFunctionFromBinding(argument, functionName, protocol, callback) { return current; } - async.series(transportBindings.reduce(addHandler, []), callback); + return transportBindings.reduce(addHandler, []); +} + +/** + * Execute the function given by the 'functionName' parameter for all the transport bindings, with the arguments + * given in the 'argument' array. If the optional parameter protocol is not null, the function will only be + * executed in the plugin of the selected protocol. + * + * @param {Array} argument Array of arguments to call the function with. + * @param {String} functionName Name of the function to call in every transport plugin. + * @param {String} protocol Transport protocol where the function must be executed. + */ +function applyFunctionFromBinding(argument, functionName, protocol, callback) { + logger.debug('Looking for bindings for the function [%s] and protocol [%s]', functionName, protocol); + + async.series(createExecutionsForBinding(argument, functionName, protocol), callback); } /** @@ -91,3 +97,4 @@ function stopTransportBindings(callback) { exports.stopTransportBindings = stopTransportBindings; exports.applyFunctionFromBinding = applyFunctionFromBinding; exports.startTransportBindings = startTransportBindings; +exports.createExecutionsForBinding = createExecutionsForBinding; diff --git a/test/config-test.js b/test/config-test.js index 0b94c813..b24ae932 100644 --- a/test/config-test.js +++ b/test/config-test.js @@ -48,7 +48,8 @@ config.iota = { deviceRegistrationDuration: 'P1M', defaultType: 'Thing', defaultResource: '', - defaultKey: '1234' + defaultKey: '1234', + defaultTransport: 'MQTT' }; module.exports = config;