@@ -45,13 +45,15 @@ function InfluxdbBackend(startupTime, config, events) {
45
45
46
46
self . defaultHost = '127.0.0.1' ;
47
47
self . defaultPort = 8086 ;
48
+ self . defaultVersion = 0.8 ;
48
49
self . defaultFlushEnable = true ;
49
50
self . defaultProxyEnable = false ;
50
51
self . defaultProxySuffix = 'raw' ;
51
52
self . defaultProxyFlushInterval = 1000 ;
52
53
53
54
self . host = self . defaultHost ;
54
55
self . port = self . defaultPort ;
56
+ self . version = self . defaultVersion ;
55
57
self . protocol = http ;
56
58
self . flushEnable = self . defaultFlushEnable ;
57
59
self . proxyEnable = self . defaultProxyEnable ;
@@ -66,6 +68,7 @@ function InfluxdbBackend(startupTime, config, events) {
66
68
if ( config . influxdb ) {
67
69
self . host = config . influxdb . host || self . defaultHost ;
68
70
self . port = config . influxdb . port || self . defaultPort ;
71
+ self . version = config . influxdb . version || self . defaultVersion ;
69
72
self . user = config . influxdb . username ;
70
73
self . pass = config . influxdb . password ;
71
74
self . database = config . influxdb . database ;
@@ -87,6 +90,14 @@ function InfluxdbBackend(startupTime, config, events) {
87
90
}
88
91
}
89
92
93
+ if ( self . version >= 0.9 ) {
94
+ self . assembleEvent = self . assembleEvent_v09 ;
95
+ self . httpPOST = self . httpPOST_v09 ;
96
+ } else {
97
+ self . assembleEvent = self . assembleEvent_v08 ;
98
+ self . httpPOST = self . httpPOST_v08 ;
99
+ }
100
+
90
101
if ( self . proxyEnable ) {
91
102
self . log ( 'Starting the buffer flush interval. (every ' + self . proxyFlushInterval + 'ms)' ) ;
92
103
setInterval ( function ( ) {
@@ -355,7 +366,7 @@ InfluxdbBackend.prototype.clearRegistry = function () {
355
366
return registry ;
356
367
}
357
368
358
- InfluxdbBackend . prototype . assembleEvent = function ( name , events ) {
369
+ InfluxdbBackend . prototype . assembleEvent_v08 = function ( name , events ) {
359
370
var self = this ;
360
371
361
372
var payload = {
@@ -380,7 +391,19 @@ InfluxdbBackend.prototype.assembleEvent = function (name, events) {
380
391
return payload ;
381
392
}
382
393
383
- InfluxdbBackend . prototype . httpPOST = function ( points ) {
394
+ InfluxdbBackend . prototype . assembleEvent_v09 = function ( name , events ) {
395
+ var self = this ;
396
+
397
+ var payload = {
398
+ name : name ,
399
+ timestamp : events [ 0 ] [ 'time' ] ,
400
+ fields : { value : events [ 0 ] [ 'value' ] }
401
+ }
402
+
403
+ return payload ;
404
+ }
405
+
406
+ InfluxdbBackend . prototype . httpPOST_v08 = function ( points ) {
384
407
/* Do not send if there are no points. */
385
408
if ( ! points . length ) { return ; }
386
409
@@ -392,7 +415,7 @@ InfluxdbBackend.prototype.httpPOST = function (points) {
392
415
self . logDebug ( function ( ) {
393
416
return 'Sending ' + points . length + ' different points via ' + protocolName ;
394
417
} ) ;
395
-
418
+
396
419
self . influxdbStats . numStats = points . length ;
397
420
398
421
var options = {
@@ -435,6 +458,66 @@ InfluxdbBackend.prototype.httpPOST = function (points) {
435
458
req . end ( ) ;
436
459
}
437
460
461
+ InfluxdbBackend . prototype . httpPOST_v09 = function ( points ) {
462
+ /* Do not send if there are no points. */
463
+ if ( ! points . length ) { return ; }
464
+
465
+ var self = this ,
466
+ query = { u : self . user , p : self . pass } ,
467
+ protocolName = self . protocol == http ? 'HTTP' : 'HTTPS' ,
468
+ startTime ;
469
+
470
+ self . logDebug ( function ( ) {
471
+ return 'Sending ' + points . length + ' different points via ' + protocolName ;
472
+ } ) ;
473
+
474
+ self . influxdbStats . numStats = points . length ;
475
+
476
+ var options = {
477
+ hostname : self . host ,
478
+ port : self . port ,
479
+ path : '/write?' + querystring . stringify ( query ) ,
480
+ method : 'POST' ,
481
+ agent : false // Is it okay to use "undefined" here? (keep-alive)
482
+ } ;
483
+
484
+ var req = self . protocol . request ( options ) ;
485
+
486
+ req . on ( 'socket' , function ( res ) {
487
+ startTime = process . hrtime ( ) ;
488
+ } ) ;
489
+
490
+ req . on ( 'response' , function ( res ) {
491
+ var status = res . statusCode ;
492
+
493
+ self . influxdbStats . httpResponseTime = millisecondsSince ( startTime ) ;
494
+
495
+ if ( status !== 200 ) {
496
+ self . log ( protocolName + ' Error: ' + status ) ;
497
+ }
498
+ } ) ;
499
+
500
+ req . on ( 'error' , function ( e , i ) {
501
+ self . log ( e ) ;
502
+ } ) ;
503
+
504
+ var payload = JSON . stringify ( {
505
+ database : self . database ,
506
+ time_precision : 'ms' ,
507
+ points : points
508
+ } ) ;
509
+
510
+ self . influxdbStats . payloadSize = Buffer . byteLength ( payload ) ;
511
+
512
+ self . logDebug ( function ( ) {
513
+ var size = ( self . influxdbStats . payloadSize / 1024 ) . toFixed ( 2 ) ;
514
+ return 'Payload size ' + size + ' KB' ;
515
+ } ) ;
516
+
517
+ req . write ( payload ) ;
518
+ req . end ( ) ;
519
+ }
520
+
438
521
InfluxdbBackend . prototype . configCheck = function ( ) {
439
522
var self = this ,
440
523
success = true ;
0 commit comments