-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
102 lines (86 loc) · 2.86 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
'use strict'
const {
url, token, org, bucket,
dest_url, dest_token, dest_org, dest_bucket
} = require( './env.json' );
const { InfluxDB, FluxTableMetaData, Point, HttpError } = require( '@influxdata/influxdb-client' );
// keys we don't want to save
const delKeys = [ "result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement" ];
// need to auto-detect field types ...
const dataTypes = {
'auth_result': 'stringField',
'suspicious': 'booleanField'
};
function sleep(ms) {
return new Promise( resolve => setTimeout( resolve, ms ) );
}
var wfunc; // InfluxDB write class
function writeAPI() {
if ( !wfunc ) wfunc = new InfluxDB( { url: dest_url, token: dest_token } ).getWriteApi( dest_org, dest_bucket, 'ms' );
return wfunc;
}
var rfunc; // InfluxDB read class
function readAPI() {
if ( !rfunc ) rfunc = new InfluxDB( { url, token } ).getQueryApi( org );
return rfunc;
}
function clearObject( obj ) {
if ( !obj ) return( cb( "Error clearing the object: NO OBJECT!" ) );
delKeys.forEach( function( el ) {
if ( typeof( obj[el] ) != undefined ) delete obj[el];
} );
return obj;
}
function buildPoint( obj, cb ) {
if ( !obj ) return( cb( "Error building the point: NO OBJECT!" ) );
const point = new Point( obj._measurement );
point.timestamp( new Date( obj._time ).getTime() );
point[dataTypes[obj._field]]( obj._field, obj._value );
obj = clearObject( obj );
Object.keys( obj ).forEach( function( k ) {
point.tag( k, obj[k] );
} );
return cb( null, point );
}
function writeRecord( rec, cb ) {
writeAPI().writePoint( rec );
return( cb() );
}
function processRec( rec, cb ) {
if ( !rec ) return( cb( "Error processing record: NO RECORD!" ) );
buildPoint( rec, function( err, point ) {
if ( err ) return( cb( err ) );
writeRecord( point, function( err ) { return cb( err ) } );
} );
}
function getData( cb ) {
const r = readAPI();
const fluxQuery = 'from( bucket:"' + bucket + '" ) |> range( start: -200d ) |> filter( fn: (r) => r._measurement == "auth_activity" )';
let count = 0;
let totalc = 0;
r.queryRows( fluxQuery, {
next( row, tableMeta ) {
// if throttling is needed, here is a good place to do that
const o = tableMeta.toObject( row );
count++;
totalc++;
processRec( o, function( err ) { count--; } );
},
error( error ) { return( cb( error ) ); },
complete() {
while ( count > 0 ) { sleep( 1 ); console.log( 'getData() count:' + count ); }
writeAPI().flush()
.then( () => {
writeAPI()
.close()
.then( () => { return( cb() ); } )
.catch( e => { return( cb( e ) ); } )
} )
.catch( e => { return ( cb( e ) ); } );
}
} );
}
getData( function( err ) {
if ( err ) return( console.log( err ); );
console.log( 'Complete transfer of ' + totalc + ' records.' );
} );