Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,19 @@ $ npm install

## Configuration

Modify the `config.toml` file to match your configuration. The input section should contain the url of the OPC server (no advanced authentication supported yet).
Modify the `config.toml` file to match your configuration. The input section should contain the url of the OPC server (no advanced authentication supported yet). For each input OPC server you can configure the following entries.

```
[input]
[[input]]
url = "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer"
failoverTimeout = 5000 # time to wait before reconnection in case of failure
```

In the output section, specify the connection details for influxdb:

```
[output]
name = "influx_1"
type = "influxdb"
host = "127.0.0.1"
port = 8086
protocol = "http"
username = ""
password = ""
database = "test"
failoverTimeout = 10000 # Time after which the logger will reconnect
bufferMaxSize = 64 # Max size of the local db in MB. TODO.
writeInterval = 3000 # Interval of batch writes.
writeMaxPoints = 1000 # Max point per POST request.
```

Then, for each OPC value you want to log, repeat the following in the config file, d:
Then, for each OPC value you want to log from this server, repeat the following in the config file:

```
# A polled node:
[[measurements]]
[[input.measurements]]
name = "Int32polled"
tags = { tag1 = "test", tag2 = "AB43" }
nodeId = "ns=2;i=10849"
Expand All @@ -55,14 +37,32 @@ deadbandAbsolute = 0 # Absolute max difference for a value not to be coll
deadbandRelative = 0.0 # Relative max difference for a value not to be collected

# A monitored node
[[measurements]]
[[input.measurements]]
name = "Int32monitored"
tags = { tag1 = "test", tag2 = "AB43" }
nodeId = "ns=2;i=10849"
collectionType = "monitored"
monitorResolution = 1000 # ms
```

In the output section, specify the connection details for influxdb:

```
[output]
name = "influx_1"
type = "influxdb"
host = "127.0.0.1"
port = 8086
protocol = "http"
username = ""
password = ""
database = "test"
failoverTimeout = 10000 # Time after which the logger will reconnect
bufferMaxSize = 64 # Max size of the local db in MB. TODO.
writeInterval = 3000 # Interval of batch writes.
writeMaxPoints = 1000 # Max point per POST request.
```

## Run

```
Expand Down
73 changes: 56 additions & 17 deletions config.toml
Original file line number Diff line number Diff line change
@@ -1,34 +1,59 @@
[input]
[[input]]
url = "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer"
failoverTimeout = 5000 # time to wait before reconnection in case of failure

[output]
name = "influx_1"
type = "influxdb"
host = "128.199.39.148"
port = 8086
protocol = "http"
username = ""
password = ""
database = "test"
failoverTimeout = 10000
bufferMaxSize = 64
writeInterval = 3000
writeMaxPoints = 1000
# A polled node:
[[input.measurements]]
name = "Int32polled"
dataType = "number"
tags = { tag1 = "test", tag2 = "AB43" }
nodeId = "ns=2;i=10849"
collectionType = "polled"
pollRate = 60 # samples / minute.
deadbandAbsolute = 0 # Absolute max difference for a value not to be collected
deadbandRelative = 0.0 # Relative max difference for a value not to be collected

# A monitored node
[[input.measurements]]
name = "Int32monitored"
dataType = "number"
tags = { tag1 = "test", tag2 = "AB43" }
nodeId = "ns=2;i=10849"
collectionType = "monitored"
monitorResolution = 1000 # ms
deadbandAbsolute = 0 # Absolute max difference for a value not to be collected
deadbandRelative = 0 # Relative max difference for a value not to be collected

# A monitored array node. Log element at arrayIndex
[[input.measurements]]
name = "Uint32Array"
dataType = "number"
isArray = true
arrayIndex = 0
tags = { tag1 = "test", tag2 = "AB43" }
nodeId = "ns=2;i=10849"
collectionType = "monitored"
monitorResolution = 1000 # ms
deadbandAbsolute = 0 # Absolute max difference for a value not to be collected
deadbandRelative = 0 # Relative max difference for a value not to be collected

[[input]]
url = "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer"
failoverTimeout = 5000 # time to wait before reconnection in case of failure

# A polled node:
[[measurements]]
[[input.measurements]]
name = "Int32polled"
dataType = "number"
tags = { tag1 = "test", tag2 = "AB43" }
nodeId = "ns=37;s=10849"
nodeId = "ns=2;i=10849"
collectionType = "polled"
pollRate = 60 # samples / minute.
deadbandAbsolute = 0 # Absolute max difference for a value not to be collected
deadbandRelative = 0.0 # Relative max difference for a value not to be collected

# A monitored node
[[measurements]]
[[input.measurements]]
name = "Int32monitored"
dataType = "number"
tags = { tag1 = "test", tag2 = "AB43" }
Expand All @@ -37,3 +62,17 @@ collectionType = "monitored"
monitorResolution = 1000 # ms
deadbandAbsolute = 0 # Absolute max difference for a value not to be collected
deadbandRelative = 0 # Relative max difference for a value not to be collected

[output]
name = "influx_1"
type = "influxdb"
host = "128.199.39.148"
port = 8086
protocol = "http"
username = ""
password = ""
database = "test"
failoverTimeout = 10000
bufferMaxSize = 64
writeInterval = 3000
writeMaxPoints = 1000
34 changes: 23 additions & 11 deletions logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,32 @@ let config = loadConfig();
let wp = new writepump(config.output);
wp.Run();

// get a readpump
var rp = new readpump(config.input, config.measurements, wp);
let rps = [];
// get the readpumps
config.input.forEach(element => {
rps.push(new readpump({url: element.url, failoverTimeout: element.failoverTimeout}, element.measurements, wp))
});

async.forever(
function(forever_next) {
rp.Run(function(err) {
console.log("An error occured in the Readpump:", err)
let wait = config.failoverTimeout || 5000;
console.log("Restarting readpump in", wait, "seconds.")
setTimeout(forever_next, wait)
});
async.each(rps,
function(rp, callback){
async.forever(
function(forever_next) {
rp.Run(function(err) {
console.log("An error occured in the Readpump:", err)
let wait = config.failoverTimeout || 5000;
console.log("Restarting readpump in", wait, "seconds.")
setTimeout(forever_next, wait)
});
},
function(err) {
console.log("Restarting readpump...")
}
);
},
function(err) {
console.log("Restarting readpump...")
if(err) {
console.log("Error")
}
}
);

Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
"author": "Jeroen Coussement (@coussej)",
"dependencies": {
"async": "1.5.2",
"influx": "4.1.0",
"node-opcua": "0.0.52",
"influx": "5.0.7",
"node-opcua": "0.4.5",
"node-schedule": "1.1.0",
"toml": "2.3.0",
"nedb": "1.8.0"
Expand Down
57 changes: 48 additions & 9 deletions readpump.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ var opcua = require("node-opcua");

function ReadPump(config, measurements, writepump) {
this.uaServerUrl = config.url;
this.uaClient = new opcua.OPCUAClient();
this.uaClient;
this.uaSession;
this.uaSubscription;
this.measurements = measurements;
Expand All @@ -17,6 +17,20 @@ function ReadPump(config, measurements, writepump) {

ReadPump.prototype.ConnectOPCUA = function(callback) {
let self = this;

const options =
{
endpoint_must_exist: false,
keepSessionAlive: true,
connectionStrategy:
{
maxRetry: 10,
initialDelay: 2000,
maxDelay: 10*1000
}
};

this.uaClient = new opcua.OPCUAClient(options);
self.uaClient.connect(self.uaServerUrl, function(err) {
if (err) {
callback(err);
Expand Down Expand Up @@ -63,15 +77,15 @@ ReadPump.prototype.ExecuteOPCUAReadRequest = function(nodes, useSourceTimestamp,
return;
}

self.uaSession.read(nodes, 0, function(err, nodesToRead, dataValues) {
self.uaSession.read(nodes, 0, function(err, dataValues) {
if (err) {
callback(err, []);
return;
}
let results = []
dataValues.forEach(
function(dv, i) {
let res = dataValueToPoint(nodesToRead[i], dv, t)
let res = dataValueToPoint(nodes[i], dv, t)
results.push(res);
}
);
Expand Down Expand Up @@ -216,6 +230,8 @@ ReadPump.prototype.InitializeMeasurements = function() {
self.monitoredMeasurements.push({
name: m.name,
dataType: m.dataType,
isArray: m.isArray ? m.isArray : false,
arrayIndex: m.arrayIndex,
nodeId: m.nodeId,
attributeId: opcua.AttributeIds.Value,
tags: m.tags,
Expand Down Expand Up @@ -312,6 +328,7 @@ ReadPump.prototype.Run = function(callback) {
// declare 2 vars to avoid double callbacks
let monitoringCallbackCalled = false;
let pollingCallbackCalled = false;
let reconnectErrorCalled = false;

async.waterfall([
// connect opc
Expand All @@ -321,6 +338,16 @@ ReadPump.prototype.Run = function(callback) {
// Start both the monitoring and the polling of the measurments.
// In case of an error, close everything.
function(waterfall_next) {
self.uaClient.on("close", function () {
console.log("close and abort");
if (!reconnectErrorCalled) {
reconnectErrorCalled = true;
// close disconnect client
self.monitoredMeasurements = [];
self.polledMeasurements = [];
callback('reconnect failed');
}
});
async.parallel({
monitoring: function(parallel_callback) {
// install the subscription
Expand Down Expand Up @@ -365,12 +392,24 @@ ReadPump.prototype.Run = function(callback) {
}

function dataValueToPoint(measurement, dataValue, customTimestamp) {
let point = {
measurement: measurement,
value: dataValue.value ? dataValue.value.value : 0,
opcstatus: dataValue.statusCode.name,
timestamp: dataValue.sourceTimestamp ? dataValue.sourceTimestamp.getTime() : (new Date()).getTime()
};
let point;
if (measurement.isArray == true) {
if (dataValue.value.value.constructor.name.search("Array") != -1 ) {
point = {
measurement: measurement,
value: dataValue.value ? dataValue.value.value[measurement.arrayIndex] : 0,
opcstatus: dataValue.statusCode.name,
timestamp: dataValue.sourceTimestamp ? dataValue.sourceTimestamp.getTime() : (new Date()).getTime()
};
}
} else {
point = {
measurement: measurement,
value: dataValue.value ? dataValue.value.value : 0,
opcstatus: dataValue.statusCode.name,
timestamp: dataValue.sourceTimestamp ? dataValue.sourceTimestamp.getTime() : (new Date()).getTime()
};
}

if (customTimestamp) point.timestamp = customTimestamp;

Expand Down
Loading