Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dev-ops/integration_test.env
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ APPLICATION_BASE_URL = '/notification/'
API_DOC_URL = 'xx'
INTERNAL_ACCESS_TOKEN = 'internal-access-token'
ACCESS_TOKEN_SECRET = 'ACCESS_TOKEN_SECRET'
ERROR_LOG_LEVEL='silly'
DISABLE_LOG=false
47 changes: 47 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000
networks:
- elevate_net
logging:
driver: none
kafka:
image: 'confluentinc/cp-kafka:7.3.0'
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
networks:
- elevate_net
logging:
driver: none
notification:
image: shikshalokamqa/elevate-notification:2.2
ports:
- '3000:3001'
command: ['nodemon', 'app.js']
environment:
- KAFKA_URL=kafka:9092
env_file:
- ${env_file}
depends_on:
- kafka
networks:
- elevate_net
networks:
elevate_net:
external: false
6 changes: 6 additions & 0 deletions src/.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,9 @@ SENDGRID_FROM_MAIL = "example@test.com"

# Api doc url
API_DOC_URL = '/api-doc'

#Winston logging level
ERROR_LOG_LEVEL='silly'

#Disable all logs
DISABLE_LOG=false
42 changes: 27 additions & 15 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@ const path = require('path')
require('dotenv').config({ path: './.env' })
require('@configs')

const { elevateLog, correlationIdMiddleware } = require('elevate-logger')
elevateLog.config(process.env.ERROR_LOG_LEVEL, 'notification', process.env.DISABLE_LOG)
const logger = elevateLog.init()

let environmentData = require('./envVariables')()

if (!environmentData.success) {
console.log('Server could not start . Not all environment variable is provided')
logger.error('Server could not start . Not all environment variable is provided', {
triggerNotification: true,
})
process.exit()
}

Expand All @@ -25,6 +31,7 @@ const app = express()
require('@health-checks')(app)

app.use(cors())
app.use(correlationIdMiddleware)

app.use(bodyParser.urlencoded({ extended: true, limit: '50MB' }))
app.use(bodyParser.json({ limit: '50MB' }))
Expand All @@ -34,18 +41,19 @@ app.use(express.static('public'))
app.get(process.env.API_DOC_URL, function (req, res) {
res.sendFile(path.join(__dirname, './api-doc/index.html'))
})

/* Logs request info if environment is not development*/
if (process.env.ENABLE_LOG === 'true') {
app.all('*', (req, res, next) => {
console.log('***Notification Service Logs Starts Here***')
console.log('%s %s on %s from ', req.method, req.url, new Date(), req.headers['user-agent'])
console.log('Request Headers: ', req.headers)
console.log('Request Body: ', req.body)
console.log('Request Files: ', req.files)
console.log('***Notification Service Logs Ends Here***')
next()
app.all('*', (req, res, next) => {
logger.info('***Notification Service Request Log***', {
request: {
requestType: `Request Type ${req.method} for ${req.url} on ${new Date()} from `,
requestHeaders: req.headers,
requestBody: req.body,
requestFiles: req.files,
},
})
}
next()
})

/* Registered routes here */
require('./routes')(app)
Expand All @@ -55,18 +63,22 @@ app.listen(process.env.APPLICATION_PORT, (res, err) => {
if (err) {
onError(err)
}
console.log('Environment: ' + process.env.APPLICATION_ENV)
console.log('Application is running on the port:' + process.env.APPLICATION_PORT)
logger.info('Environment: ' + process.env.APPLICATION_ENV)
logger.info('Application is running on the port:' + process.env.APPLICATION_PORT)
})

// Handles specific listen errors with friendly messages
function onError(error) {
switch (error.code) {
case 'EACCES':
console.log(process.env.APPLICATION_PORT + ' requires elevated privileges')
logger.error(process.env.APPLICATION_PORT + ' requires elevated privileges', {
triggerNotification: true,
})
process.exit(1)
case 'EADDRINUSE':
console.log(process.env.APPLICATION_PORT + ' is already in use')
logger.error(process.env.APPLICATION_PORT + ' is already in use', {
triggerNotification: true,
})
process.exit(1)
default:
throw error
Expand Down
82 changes: 37 additions & 45 deletions src/configs/kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,61 +6,53 @@
*/

//Dependencies
const kafka = require('kafka-node')
const { Kafka } = require('kafkajs')
const emailNotifications = require('@generics/helpers/email-notifications')

module.exports = function (config) {
Producer = kafka.Producer
KeyedMessage = kafka.KeyedMessage
client = new kafka.KafkaClient({
kafkaHost: process.env.KAFKA_HOST,
})

client.on('error', function (error) {
console.error.bind(console, 'kafka connection error!')
})

client.on('connect', () => {
console.log('Connected to kafka client')
const { elevateLog } = require('elevate-logger')
const logger = elevateLog.init()
module.exports = async function (config) {
const kafkaIps = process.env.KAFKA_HOST.split(',')
const KafkaClient = new Kafka({
clientId: 'mentoring',
brokers: kafkaIps,
})

producer = new Producer(client)
const producer = KafkaClient.producer()
const consumer = KafkaClient.consumer({ groupId: process.env.KAFKA_GROUP_ID })

producer.on('ready', function () {
console.log('Connected to Kafka')
})
await producer.connect()
await consumer.connect()

producer.on('error', function (err) {
console.error.bind(console, 'kafka producer creation error!')
producer.on('producer.connect', () => {
logger.info(`KafkaProvider: connected`)
})

const consumer = new kafka.ConsumerGroup(
{
kafkaHost: process.env.KAFKA_HOST,
groupId: process.env.KAFKA_GROUP_ID,
autoCommit: true,
},
process.env.KAFKA_TOPIC
)

consumer.on('message', async function (message) {
try {
let notificationData = JSON.parse(message.value)
if (notificationData.type == 'email' && notificationData.email) {
emailNotifications.sendEmail(notificationData.email)
}
} catch (error) {
console.log('failed', error)
}
producer.on('producer.disconnect', () => {
logger.error(`KafkaProvider: could not connect`, {
triggerNotification: true,
})
})

consumer.on('error', async function (error) {
console.log('kafka consumer intialization error', error)
})
const subscribeToConsumer = async () => {
await consumer.subscribe({ topics: [process.env.KAFKA_TOPIC] })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
let notificationData = JSON.parse(message.value)
if (notificationData.type == 'email' && notificationData.email) {
emailNotifications.sendEmail(notificationData.email)
}
} catch (error) {
logger.error('Subscribe to consumer failed:' + error, {
triggerNotification: true,
})
}
},
})
}
subscribeToConsumer()

global.kafkaClient = {
kafkaProducer: producer,
kafkaClient: client,
kafkaKeyedMessage: KeyedMessage,
kafkaClient: KafkaClient,
}
}
11 changes: 9 additions & 2 deletions src/constants/common.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const { elevateLog, correlationId } = require('elevate-logger')
const logger = elevateLog.init()
/**
* name : constants/common.js
* author : Aman Kumar Gupta
Expand All @@ -15,13 +17,18 @@
* @param {String} result - result
* @returns {JSON} Returns response format
*/
const successResponse = ({ statusCode = 200, responseCode = 'OK', message, result = [] }) => {
return {
const successResponse = ({ statusCode = 200, responseCode = 'OK', message, result = [], meta = {} }) => {
let response = {
statusCode,
responseCode,
message,
result,
meta: { ...meta, correlation: correlationId.getId() },
}

logger.info('Request Response', { response: response })

return response
}

/**
Expand Down
8 changes: 8 additions & 0 deletions src/envVariables.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ let enviromentVariables = {
message: 'Required api doc url',
optional: false,
},
ERROR_LOG_LEVEL: {
message: 'Required Error log level',
optional: false,
},
DISABLE_LOG: {
message: 'Required disable log level',
optional: false,
},
}

let success = true
Expand Down
23 changes: 15 additions & 8 deletions src/health-checks/kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,29 @@
*/

// Dependencies
const kafka = require('kafka-node')
const kafka = require('kafkajs')

function health_check() {
return new Promise(async (resolve, reject) => {
const client = new kafka.KafkaClient({
kafkaHost: process.env.KAFKA_URL,
const kafkaIps = process.env.KAFKA_URL.split(',')
const KafkaClient = new Kafka({
clientId: 'mentoring',
brokers: kafkaIps,
})

const producer = new kafka.Producer(client)
const producer = KafkaClient.producer()
await producer.connect()

producer.on('error', function (err) {
return resolve(false)
})
producer.on('ready', function () {
producer.on('producer.connect', async () => {
logger.info(`KafkaProvider: connected`)
await producer.disconnect()
return resolve(true)
})
producer.on('producer.disconnect', async () => {
logger.error(`KafkaProvider: could not connect`)
await producer.disconnect()
return resolve(false)
})
})
}

Expand Down
3 changes: 2 additions & 1 deletion src/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
"cli-table": "^0.3.11",
"cors": "^2.8.5",
"dotenv": "^10.0.0",
"elevate-logger": "^3.1.0",
"express": "^4.17.1",
"express-validator": "^5.3.1",
"jsonwebtoken": "^8.5.1",
"kafka-node": "^5.0.0",
"kafkajs": "^2.2.2",
"module-alias": "^2.2.2",
"require-all": "^3.0.0",
"uuid": "^8.3.2"
Expand Down
10 changes: 10 additions & 0 deletions src/routes/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ const authenticator = require('@middlewares/authenticator')
// const pagination = require('@middlewares/pagination')
const expressValidator = require('express-validator')
const fs = require('fs')
const { elevateLog, correlationId } = require('elevate-logger')
const logger = elevateLog.init()

module.exports = (app) => {
app.use(authenticator)
Expand Down Expand Up @@ -100,10 +102,18 @@ module.exports = (app) => {
if (error.data) {
errorData = error.data
}

if (status == 500) {
logger.error('Server error!', { message: error, triggerNotification: true })
} else {
logger.info(message, { message: error })
}

res.status(status).json({
responseCode,
message,
error: errorData,
meta: { correlation: correlationId.getId() },
})
})
}