Skip to content

Commit 503ae24

Browse files
authored
Merge pull request #14 from oslabs-beta/james/kafka-npm-package
James/kafka npm package
2 parents 0bc7114 + 891b3b7 commit 503ae24

File tree

7 files changed

+200
-19
lines changed

7 files changed

+200
-19
lines changed

chronos_npm_package/chronos.js

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ const MongoServerWrapper = require('./wrappers/MongoServerWrapper.js');
66
const PostgresClientWrapper = require('./wrappers/PostgresClientWrapper.js');
77
const PostgresServerWrapper = require('./wrappers/PostgresServerWrapper.js');
88
const { validateInput, addNotifications } = require('./controllers/helpers');
9+
const { initialFetch } = require('./controllers/kafkaHelpers');
910

1011
let userConfig = {};
1112
const chronos = {};
@@ -101,6 +102,35 @@ chronos.track = () => {
101102
}
102103
return null;
103104
};
105+
106+
/**
107+
* **********************************************
108+
* COLLECT KAFKA METRICS
109+
* Only supports MongoDB and PostgreSQL for now!
110+
* **********************************************
111+
*/
112+
113+
chronos.kafka = function () {
114+
const { database, jmxuri } = userConfig;
115+
if (jmxuri === undefined){
116+
console.log('No specified URI for a JMX Exporter');
117+
return;
118+
}
119+
120+
// Ensures that the provided URI returns correctly formatted data.
121+
initialFetch(jmxuri);
122+
123+
if (database.type === 'MongoDB') {
124+
mongo.connect(userConfig);
125+
mongo.kafka(userConfig);
126+
}
127+
128+
if (database.type === 'PostgreSQL') {
129+
postgres.connect(userConfig);
130+
postgres.kafka(userConfig);
131+
}
132+
};
133+
104134
/**
105135
* Wraps the gRPC server object to automatically write logs to user configed DB
106136
*

chronos_npm_package/controllers/helpers.js

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,25 @@ const helpers = {
44
* Throws an error on input valid data types or on missing fields
55
* Sets the default interval to 5 seconds and dockerized to false
66
*/
7+
8+
/*
9+
User Config object:
10+
{
11+
microservice: string - Name of the microservice. Will be used as a table name in the chronos's db
12+
interval: number - The number of milliseconds between every collection of data
13+
dockerized: boolean - Should be set to true if the service is running inside of a container
14+
NEW! -> jmxuri: string - (optional) The address exposed by the JMX Exporter for collecting Kafka metrics <- NEW!
15+
database: {
16+
connection: should be a string and only supports 'REST' and 'gRPC'
17+
type: should be a string and only supports 'MongoDB' and 'PostgreSQL'.
18+
URI: should be a connection string to the database where you intend Chronos to record metrics
19+
}
20+
notifications: an array - optional for configuring slack or email notifications
21+
}
22+
23+
*/
724
validateInput(config) {
8-
const { microservice, database, interval, dockerized, connection } = config;
25+
const { microservice, database, interval, dockerized, jmxuri } = config;
926

1027
// Validate all required fields exist and are valid input types
1128
if (!microservice || typeof microservice !== 'string') {
@@ -25,6 +42,13 @@ const helpers = {
2542
);
2643
}
2744

45+
// validate that the jmxuri is a string
46+
if (jmxuri && typeof jmxuri !== 'string') {
47+
throw new Error(
48+
'Invalid input for "jmxuri" in chronos-config.js: Please provide the address of the JMX Exporter'
49+
);
50+
}
51+
2852
// Validate database type
2953
if (database.type !== 'PostgreSQL' && database.type !== 'MongoDB') {
3054
throw new Error(
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
const fetch = require('node-fetch');
2+
3+
const kafkaHelpers = {};
4+
5+
// Creates an array full of objects that store values for various Kafka metrics
6+
kafkaHelpers.extractWord = function (str) {
7+
const res = [];
8+
const arr = str.split('\n'); // `/\n/`
9+
for (const element of arr) {
10+
if (
11+
element &&
12+
element.length !== 0 &&
13+
element[0] !== '#' &&
14+
element.substring(0, 3) !== 'jmx' &&
15+
element.substring(0, 4) !== "'jmx"
16+
) {
17+
const metric = element.split(' ')[0];
18+
const value = Number(element.split(' ')[1]);
19+
const time = Date.now();
20+
const category = 'Event';
21+
res.push({ metric, value, time, category });
22+
}
23+
}
24+
return res;
25+
};
26+
27+
// executes first to ensure that the provided jmxuri provides legitimate data.
28+
// if it doesnt then an error is thrown
29+
kafkaHelpers.initialFetch = function (jmxuri) {
30+
fetch(jmxuri)
31+
.then(response => {
32+
if (response.status !== 200) {
33+
throw new Error(`Error: The provided URI for the JMX exporter is invalid`);
34+
} else {
35+
console.log('Initial fetch to JMX Exporter was successful.');
36+
}
37+
return response.text();
38+
})
39+
.then(text => {
40+
console.log('\nInitial Fetch Response Text:\n', text);
41+
});
42+
};
43+
44+
// fetches kafka metrics from the user-specified location of JMX prometheus and returns the processed result
45+
kafkaHelpers.kafkaFetch = function (config) {
46+
return fetch(config.jmxuri)
47+
.then(data => data.text())
48+
.then(data => kafkaHelpers.extractWord(data))
49+
.catch(err => console.log(err));
50+
};
51+
52+
module.exports = kafkaHelpers;

chronos_npm_package/controllers/mongo.js

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ const CommunicationModel = require('../models/CommunicationModel');
55
const ServicesModel = require('../models/ServicesModel');
66
const HealthModelFunc = require('../models/HealthModel');
77
const ContainerInfoFunc = require('../models/ContainerInfo');
8+
const KafkaModel = require('../models/KafkaModel');
9+
const { kafkaFetch } = require('./kafkaHelpers.js');
810
require('../models/ContainerInfo');
911

1012
// Handle deprecation warnings
@@ -209,6 +211,7 @@ chronos.health = ({ microservice, interval }) => {
209211
})
210212
.catch(err => console.log('Error saving health data: ', err.message));
211213
}, interval);
214+
console.log('Interval set, recording health data');
212215
};
213216

214217
/**
@@ -302,6 +305,29 @@ chronos.docker = ({ microservice, interval }) => {
302305
});
303306
};
304307

308+
/*
309+
This function takes as a parameter the promise returned from the kafkaFetch().
310+
It then takes the returned array of metrics, turns them into documents based on
311+
KafkaModel.js, and inserts them into the db at the provided uri with insertMany()
312+
*/
313+
chronos.kafka = function (userConfig) {
314+
// fetch the data from Kafka with kafkaFetch()
315+
// then take turn each result in the returned array into a kafkaModel doc
316+
// insertMany into the the KafkaModel
317+
console.log('Starting Kafka Collection');
318+
setInterval(() => {
319+
kafkaFetch(userConfig)
320+
.then(parsedArray => {
321+
const documents = [];
322+
for (const metric of parsedArray) {
323+
documents.push(KafkaModel(metric));
324+
}
325+
return KafkaModel.insertMany(documents);
326+
})
327+
.catch(err => console.log('Error inserting kafka documents: ', err));
328+
}, userConfig.interval);
329+
};
330+
305331
// // grabs container data for multiple containers info - TBD
306332
// chronos.dockerInfo = ({ microservice, interval }) => {
307333
// si.dockerInfo()
@@ -311,17 +337,4 @@ chronos.docker = ({ microservice, interval }) => {
311337
// .catch(err => console.log('Error saving health data: ', err.message));
312338
// };
313339

314-
// import data from JMX Exporter port
315-
// chronos.kafka = () => {
316-
// // write fetch request to localhost:12345/metrics
317-
// // then store metrics in database
318-
// fetch('localhost:12345/metrics')
319-
// .then(data => {
320-
// console.log(data);
321-
// const parsedData = parser.toTree(data);
322-
// console.log(parsedData);
323-
// })
324-
// .catch(err => console.log(err));
325-
// };
326-
327340
module.exports = chronos;

chronos_npm_package/controllers/postgres.js

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
const si = require('systeminformation');
33
const { Client } = require('pg');
44
const alert = require('./alert');
5+
const { kafkaFetch } = require('./kafkaHelpers');
56

67
let client;
78

@@ -377,4 +378,63 @@ chronos.docker = function ({ microservice, interval }) {
377378
});
378379
};
379380

381+
// Constructs a parameterized query string for inserting multiple data points into
382+
// the kafkametrics db based on the number of data points;
383+
function createQueryString(numRows) {
384+
let query = `
385+
INSERT INTO
386+
kafkametrics (metric, value, category, time)
387+
VALUES
388+
`;
389+
for (let i = 0; i < numRows; i++) {
390+
const newRow = `($${4 * i + 1}, $${4 * i + 2}, $${4 * i + 3}, TO_TIMESTAMP($${4 * i + 4}))`;
391+
query = query.concat(newRow);
392+
if (i !== numRows - 1) query = query.concat(',');
393+
}
394+
query = query.concat(';');
395+
return query;
396+
}
397+
398+
// Places the values being inserted into postgres into an array that will eventually
399+
// hydrate the parameterized query
400+
function createQueryArray(dataPointsArray) {
401+
const queryArray = [];
402+
for (const element of dataPointsArray) {
403+
queryArray.push(element.metric);
404+
queryArray.push(element.value);
405+
queryArray.push(element.category);
406+
queryArray.push(element.time / 1000); // Converts milliseconds to seconds to work with postgres
407+
}
408+
return queryArray;
409+
}
410+
411+
chronos.kafka = function (userConfig) {
412+
// create kafkametrics table if it does not exist
413+
const createTableQuery = `
414+
CREATE TABLE IF NOT EXISTS kafkametrics (
415+
_id SERIAL PRIMARY KEY,
416+
metric VARCHAR(200),
417+
value FLOAT DEFAULT 0.0,
418+
category VARCHAR(200) DEFAULT 'event',
419+
time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
420+
);`;
421+
422+
client
423+
.query(createTableQuery)
424+
.catch(err => console.log('Error creating kafkametrics table in PostgreSQL:\n', err));
425+
426+
setInterval(() => {
427+
kafkaFetch(userConfig)
428+
.then(parsedArray => {
429+
const numDataPoints = parsedArray.length;
430+
const queryString = createQueryString(numDataPoints);
431+
const queryArray = createQueryArray(parsedArray);
432+
//console.log('POSTGRES QUERY STRING: ', queryString);
433+
//console.log('POSTGRES QUERY ARRAY', queryArray);
434+
return client.query(queryString, queryArray);
435+
})
436+
.catch(err => console.log('Error inserting kafka metrics into PostgreSQL:\n', err));
437+
}, userConfig.interval);
438+
};
439+
380440
module.exports = chronos;

chronos_npm_package/models/KafkaModel.js

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@ const KafkaSchema = new Schema({
77
type: Date,
88
default: Date.now(),
99
},
10-
activecontrollercount: {
11-
type: Number,
10+
metric: {
11+
type: String,
1212
},
13-
offlinepartitionscount: {
13+
value: {
1414
type: Number,
1515
},
16-
uncleanleaderelectionspersec: {
17-
type: Number,
16+
category: {
17+
type: String,
18+
default: '',
1819
},
1920
});
2021

chronos_npm_package/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"axios": "^0.21.1",
1515
"hpropagate": "0.0.7",
1616
"mongoose": "^5.11.13",
17+
"node-fetch": "^2.6.7",
1718
"nodemailer": "^6.4.17",
1819
"pg": "^8.5.1",
1920
"systeminformation": "^4.34.7"

0 commit comments

Comments
 (0)