Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: load config from pino on startup #90

Merged
merged 4 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ stream, it emits the following events:

* `parseLine(line)` a function that is used to parse line received from `pino`.

* `expectPinoConfig` a boolean that indicates if the transport expects a pino to add some of its configuration to the stream. Default: `false`.
10xLaCroixDrinker marked this conversation as resolved.
Show resolved Hide resolved

## Example

### custom parseLine
Expand Down
68 changes: 59 additions & 9 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,22 @@
const metadata = Symbol.for('pino.metadata')
const split = require('split2')
const { Duplex } = require('readable-stream')
const { parentPort, workerData } = require('worker_threads')

function createDeferred () {
let resolve
let reject
const promise = new Promise((_resolve, _reject) => {
resolve = _resolve
reject = _reject
})
promise.resolve = resolve
promise.reject = reject
return promise
}

module.exports = function build (fn, opts = {}) {
const waitForConfig = opts.expectPinoConfig === true && workerData?.workerData?.pinoWillSendConfig === true
const parseLines = opts.parse === 'lines'
const parseLine = typeof opts.parseLine === 'function' ? opts.parseLine : JSON.parse
const close = opts.close || defaultClose
Expand Down Expand Up @@ -50,27 +64,63 @@ module.exports = function build (fn, opts = {}) {
}
}

if (opts.expectPinoConfig === true && workerData?.workerData?.pinoWillSendConfig !== true) {
setImmediate(() => {
stream.emit('error', new Error('This transport is not compatible with the current version of pino. Please upgrade pino to the latest version.'))
})
}

if (opts.metadata !== false) {
stream[metadata] = true
stream.lastTime = 0
stream.lastLevel = 0
stream.lastObj = null
}

let res = fn(stream)
if (waitForConfig) {
let pinoConfig = {}
const configReceived = createDeferred()
parentPort.on('message', function handleMessage (message) {
if (message.code === 'PINO_CONFIG') {
pinoConfig = message.config
configReceived.resolve()
parentPort.off('message', handleMessage)
}
})

if (res && typeof res.catch === 'function') {
res.catch((err) => {
stream.destroy(err)
Object.defineProperties(stream, {
levels: {
get () { return pinoConfig.levels }
},
messageKey: {
get () { return pinoConfig.messageKey }
},
errorKey: {
get () { return pinoConfig.errorKey }
}
})

// set it to null to not retain a reference to the promise
res = null
} else if (opts.enablePipelining && res) {
return Duplex.from({ writable: stream, readable: res })
return configReceived.then(finish)
}

return stream
return finish()

function finish () {
let res = fn(stream)

if (res && typeof res.catch === 'function') {
res.catch((err) => {
stream.destroy(err)
})

// set it to null to not retain a reference to the promise
res = null
} else if (opts.enablePipelining && res) {
return Duplex.from({ writable: stream, readable: res })
}

return stream
}
}

function defaultClose (err, cb) {
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"snazzy": "^9.0.0",
"standard": "^17.0.0",
"tap": "^16.0.0",
"thread-stream": "^2.4.1",
"tsd": "^0.31.0"
},
"tsd": {
Expand Down
7 changes: 7 additions & 0 deletions test/base.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ test('rejecting errors the stream', async ({ same, plan }) => {
same(err.message, 'kaboom')
})

test('emits an error if the transport expects pino to send the config, but pino is not going to', async function ({ plan, same }) {
plan(1)
const stream = build(() => {}, { expectPinoConfig: true })
const [err] = await once(stream, 'error')
same(err.message, 'This transport is not compatible with the current version of pino. Please upgrade pino to the latest version.')
})

test('set metadata', ({ same, plan, equal }) => {
plan(9)

Expand Down
22 changes: 22 additions & 0 deletions test/fixtures/transport-async-iteration.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
'use strict'

const build = require('../..')

module.exports = async function (threadStreamOpts) {
const { port, opts = {} } = threadStreamOpts
return build(
async function (source) {
for await (const obj of source) {
port.postMessage({
data: obj,
pinoConfig: {
levels: source.levels,
messageKey: source.messageKey,
errorKey: source.errorKey
}
})
}
},
opts
)
}
22 changes: 22 additions & 0 deletions test/fixtures/transport-on-data.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
'use strict'

const build = require('../..')

module.exports = async function (threadStreamOpts) {
const { port, opts = {} } = threadStreamOpts
return build(
function (source) {
source.on('data', function (line) {
port.postMessage({
data: line,
pinoConfig: {
levels: source.levels,
messageKey: source.messageKey,
errorKey: source.errorKey
}
})
})
},
opts
)
}
24 changes: 24 additions & 0 deletions test/fixtures/transport-transform.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict'

const { Transform, pipeline } = require('stream')
const build = require('../..')

module.exports = function (threadStreamOpts) {
const { opts = {} } = threadStreamOpts
return build(function (source) {
const transform = new Transform({
objectMode: true,
autoDestroy: true,
transform (chunk, enc, cb) {
chunk.service = 'from transform'
chunk.level = `${source.levels.labels[chunk.level]}(${chunk.level})`
chunk[source.messageKey] = chunk[source.messageKey].toUpperCase()
cb(null, JSON.stringify(chunk) + '\n')
}
})

pipeline(source, transform, () => {})

return transform
}, { ...opts, enablePipelining: true })
}
15 changes: 15 additions & 0 deletions test/fixtures/worker-pipeline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict'

const { pipeline, PassThrough } = require('stream')

module.exports = async function ({ targets }) {
const streams = await Promise.all(targets.map(async (t) => {
const fn = require(t.target)
const stream = await fn(t.options)
return stream
}))

const stream = new PassThrough()
pipeline(stream, ...streams, () => {})
return stream
}
Loading
Loading