From eb7d29c9426e6d3671d11c4e082e058e79196647 Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Wed, 3 Apr 2024 17:30:53 -0500 Subject: [PATCH] feat: bulk helper improvements (#51) Applies patches from elastic/elasticsearch-js#2199 and elastic/elasticsearch-js#2027, adding support for an onSuccess callback and fixing a bug that would cause the helper to hang when the flushInterval was lower than the request timeout. --------- Co-authored-by: JoshMock <160161+JoshMock@users.noreply.github.com> --- src/helpers.ts | 134 +++++--- test/unit/helpers/bulk.test.ts | 578 +++++++++++++++++++++++++++++++-- 2 files changed, 656 insertions(+), 56 deletions(-) diff --git a/src/helpers.ts b/src/helpers.ts index 1144891..035b3c8 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -74,11 +74,11 @@ export interface BulkStats { aborted: boolean } -interface IndexAction { +interface IndexActionOperation { index: T.BulkIndexOperation } -interface CreateAction { +interface CreateActionOperation { create: T.BulkCreateOperation } @@ -90,7 +90,9 @@ interface DeleteAction { delete: T.BulkDeleteOperation } -type UpdateAction = [UpdateActionOperation, Record] +type CreateAction = CreateActionOperation | [CreateActionOperation, unknown] +type IndexAction = IndexActionOperation | [IndexActionOperation, unknown] +type UpdateAction = [UpdateActionOperation, T.BulkUpdateAction] type Action = IndexAction | CreateAction | UpdateAction | DeleteAction export interface OnDropDocument { @@ -101,6 +103,24 @@ export interface OnDropDocument { retried: boolean } +type BulkResponseItem = Partial> + +export interface OnSuccessDocument { + result: BulkResponseItem + document?: TDocument +} + +interface ZippedResult { + result: BulkResponseItem + raw: { + action: string + document?: string + } + // this is a function so that deserialization is only done when needed + // to avoid a performance hit + document?: () => TDocument +} + export interface BulkHelperOptions extends T.BulkRequest { datasource: TDocument[] | Buffer | Readable | AsyncIterator onDocument: (doc: TDocument) => Action @@ -110,6 +130,7 @@ export interface BulkHelperOptions extends T.BulkRequest { retries?: number wait?: number onDrop?: (doc: OnDropDocument) => void + onSuccess?: (doc: OnSuccessDocument) => void } export interface BulkHelper extends Promise { @@ -379,7 +400,7 @@ export default class Helpers { clearTimeout(timeoutRef) } - // In some cases the previos http call does not have finished, + // In some cases the previous http call does not have finished, // or we didn't reach the flush bytes threshold, so we force one last operation. if (loadedOperations > 0) { const send = await semaphore() @@ -415,8 +436,8 @@ export default class Helpers { // to guarantee that no more than the number of operations // allowed to run at the same time are executed. // It returns a semaphore function which resolves in the next tick - // if we didn't reach the maximim concurrency yet, otherwise it returns - // a promise that resolves as soon as one of the running request has finshed. + // if we didn't reach the maximum concurrency yet, otherwise it returns + // a promise that resolves as soon as one of the running requests has finished. // The semaphore function resolves a send function, which will be used // to send the actual msearch request. // It also returns a finish function, which returns a promise that is resolved @@ -548,6 +569,9 @@ export default class Helpers { retries = this[kMaxRetries], wait = 5000, onDrop = noop, + // onSuccess does not default to noop, to avoid the performance hit + // of deserializing every document in the bulk request + onSuccess, ...bulkOptions } = options @@ -620,26 +644,25 @@ export default class Helpers { let chunkBytes = 0 timeoutRef = setTimeout(onFlushTimeout, flushInterval) // eslint-disable-line - // @ts-expect-error datasoruce is an iterable + // @ts-expect-error datasource is an iterable for await (const chunk of datasource) { if (shouldAbort) break timeoutRef.refresh() - const action = onDocument(chunk) - const operation = Array.isArray(action) - ? Object.keys(action[0])[0] - : Object.keys(action)[0] + const result = onDocument(chunk) + const [action, payload] = Array.isArray(result) ? result : [result, chunk] + const operation = Object.keys(action)[0] if (operation === 'index' || operation === 'create') { actionBody = serializer.serialize(action) - payloadBody = typeof chunk === 'string' ? chunk : serializer.serialize(chunk) + payloadBody = typeof payload === 'string' + ? payload + : serializer.serialize(payload) chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody) bulkBody.push(actionBody, payloadBody) } else if (operation === 'update') { - // @ts-expect-error in case of update action is an array - actionBody = serializer.serialize(action[0]) + actionBody = serializer.serialize(action) payloadBody = typeof chunk === 'string' ? `{"doc":${chunk}}` - // @ts-expect-error in case of update action is an array - : serializer.serialize({ doc: chunk, ...action[1] }) + : serializer.serialize({ doc: chunk, ...payload }) chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody) bulkBody.push(actionBody, payloadBody) } else if (operation === 'delete') { @@ -653,15 +676,16 @@ export default class Helpers { if (chunkBytes >= flushBytes) { stats.bytes += chunkBytes - const send = await semaphore() - send(bulkBody.slice()) + const bulkBodyCopy = bulkBody.slice() bulkBody.length = 0 chunkBytes = 0 + const send = await semaphore() + send(bulkBodyCopy) } } clearTimeout(timeoutRef) - // In some cases the previos http call does not have finished, + // In some cases the previous http call has not finished, // or we didn't reach the flush bytes threshold, so we force one last operation. if (!shouldAbort && chunkBytes > 0) { const send = await semaphore() @@ -697,8 +721,8 @@ export default class Helpers { // to guarantee that no more than the number of operations // allowed to run at the same time are executed. // It returns a semaphore function which resolves in the next tick - // if we didn't reach the maximim concurrency yet, otherwise it returns - // a promise that resolves as soon as one of the running request has finshed. + // if we didn't reach the maximum concurrency yet, otherwise it returns + // a promise that resolves as soon as one of the running requests has finished. // The semaphore function resolves a send function, which will be used // to send the actual bulk request. // It also returns a finish function, which returns a promise that is resolved @@ -805,57 +829,93 @@ export default class Helpers { callback() } + /** + * Zips bulk response items (the action's result) with the original document body. + * The raw string version of action and document lines are also included. + */ + function zipBulkResults (responseItems: BulkResponseItem[], bulkBody: string[]): ZippedResult[] { + const zipped = [] + let indexSlice = 0 + for (let i = 0, len = responseItems.length; i < len; i++) { + const result = responseItems[i] + const operation = Object.keys(result)[0] + let zipResult + + if (operation === 'delete') { + zipResult = { + result, + raw: { action: bulkBody[indexSlice] } + } + indexSlice += 1 + } else { + const document = bulkBody[indexSlice + 1] + zipResult = { + result, + raw: { action: bulkBody[indexSlice], document }, + // this is a function so that deserialization is only done when needed + // to avoid a performance hit + document: () => serializer.deserialize(document) + } + indexSlice += 2 + } + + zipped.push(zipResult as ZippedResult) + } + + return zipped + } + function tryBulk (bulkBody: string[], callback: (err: Error | null, bulkBody: string[]) => void): void { if (shouldAbort) return callback(null, []) client.bulk(Object.assign({}, bulkOptions, { body: bulkBody }), reqOptions as TransportRequestOptionsWithMeta) .then(response => { const result = response.body + const results = zipBulkResults(result.items, bulkBody) + if (!result.errors) { stats.successful += result.items.length - for (const item of result.items) { - if (item.update?.result === 'noop') { + for (const item of results) { + const { result, document = noop } = item + if (result.update?.result === 'noop') { stats.noop++ } + if (onSuccess != null) onSuccess({ result, document: document() }) } return callback(null, []) } const retry = [] - const { items } = result - let indexSlice = 0 - for (let i = 0, len = items.length; i < len; i++) { - const action = items[i] - const operation = Object.keys(action)[0] + for (const item of results) { + const { result, raw, document = noop } = item + const operation = Object.keys(result)[0] // @ts-expect-error - const responseItem = action[operation as keyof T.BulkResponseItemContainer] + const responseItem = result[operation as keyof T.BulkResponseItemContainer] assert(responseItem !== undefined, 'The responseItem is undefined, please file a bug report') if (responseItem.status >= 400) { // 429 is the only status code where we might want to retry // a document, because it was not an error in the document itself, - // but the ES node were handling too many operations. + // but the ES node was handling too many operations. if (responseItem.status === 429) { - retry.push(bulkBody[indexSlice]) + retry.push(raw.action) /* istanbul ignore next */ if (operation !== 'delete') { - retry.push(bulkBody[indexSlice + 1]) + retry.push(raw.document ?? '') } } else { onDrop({ status: responseItem.status, error: responseItem.error ?? null, - operation: serializer.deserialize(bulkBody[indexSlice]), + operation: serializer.deserialize(raw.action), // @ts-expect-error - document: operation !== 'delete' - ? serializer.deserialize(bulkBody[indexSlice + 1]) - : null, + document: document(), retried: isRetrying }) stats.failed += 1 } } else { stats.successful += 1 + if (onSuccess != null) onSuccess({ result, document: document() }) } - operation === 'delete' ? indexSlice += 1 : indexSlice += 2 } callback(null, retry) }) diff --git a/test/unit/helpers/bulk.test.ts b/test/unit/helpers/bulk.test.ts index 87a794c..2f776c7 100644 --- a/test/unit/helpers/bulk.test.ts +++ b/test/unit/helpers/bulk.test.ts @@ -17,15 +17,17 @@ * under the License. */ +import FakeTimers from '@sinonjs/fake-timers' import { AssertionError } from 'assert' -import * as http from 'http' import { createReadStream } from 'fs' +import * as http from 'http' import { join } from 'path' import split from 'split2' -import FakeTimers from '@sinonjs/fake-timers' +import { Readable } from 'stream' import { test } from 'tap' import { Client, errors } from '../../../' import { buildServer, connection } from '../../utils' +const { sleep } = require('../../integration/helper') let clientVersion: string = require('../../../package.json').version // eslint-disable-line if (clientVersion.includes('-')) { @@ -188,6 +190,98 @@ test('bulk index', t => { }) }) + t.test('refreshOnCompletion', async t => { + let count = 0 + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + if (params.method === 'GET') { + t.equal(params.path, '/_all/_refresh') + return { body: { acknowledged: true } } + } else { + t.equal(params.path, '/_bulk') + t.match(params.headers, { 'content-type': 'application/x-ndjson' }) + // @ts-expect-error + const [action, payload] = params.body.split('\n') + t.same(JSON.parse(action), { index: { _index: 'test' } }) + t.same(JSON.parse(payload), dataset[count++]) + return { body: { errors: false, items: [{}] } } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection, + compression: false + }) + const result = await client.helpers.bulk({ + datasource: dataset.slice(), + flushBytes: 1, + concurrency: 1, + onDocument (_doc) { + return { + index: { _index: 'test' } + } + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) + }) + + t.test('refreshOnCompletion custom index', async t => { + let count = 0 + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + if (params.method === 'GET') { + t.equal(params.path, '/test/_refresh') + return { body: { acknowledged: true } } + } else { + t.equal(params.path, '/_bulk') + t.match(params.headers, { 'content-type': 'application/x-ndjson' }) + // @ts-expect-error + const [action, payload] = params.body.split('\n') + t.same(JSON.parse(action), { index: { _index: 'test' } }) + t.same(JSON.parse(payload), dataset[count++]) + return { body: { errors: false, items: [{}] } } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection, + compression: false + }) + const result = await client.helpers.bulk({ + datasource: dataset.slice(), + flushBytes: 1, + concurrency: 1, + onDocument (_doc) { + return { + index: { _index: 'test' } + } + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) + }) + t.test('Should perform a bulk request (custom action)', async t => { let count = 0 const MockConnection = connection.buildMockConnection({ @@ -431,7 +525,7 @@ test('bulk index', t => { t.test('Server error', async t => { const MockConnection = connection.buildMockConnection({ - onRequest (params) { + onRequest (_params) { return { statusCode: 500, body: { somothing: 'went wrong' } @@ -447,12 +541,12 @@ test('bulk index', t => { datasource: dataset.slice(), flushBytes: 1, concurrency: 1, - onDocument (doc) { + onDocument (_doc) { return { index: { _index: 'test' } } }, - onDrop (doc) { + onDrop (_doc) { t.fail('This should never be called') } }) @@ -467,7 +561,7 @@ test('bulk index', t => { t.test('Server error (high flush size, to trigger the finish error)', async t => { const MockConnection = connection.buildMockConnection({ - onRequest (params) { + onRequest (_params) { return { statusCode: 500, body: { somothing: 'went wrong' } @@ -483,12 +577,12 @@ test('bulk index', t => { datasource: dataset.slice(), flushBytes: 5000000, concurrency: 1, - onDocument (doc) { + onDocument (_doc) { return { index: { _index: 'test' } } }, - onDrop (doc) { + onDrop (_doc) { t.fail('This should never be called') } }) @@ -545,12 +639,12 @@ test('bulk index', t => { flushBytes: 1, concurrency: 1, wait: 10, - onDocument (doc) { + onDocument (_doc) { return { index: { _index: 'test' } } }, - onDrop (doc) { + onDrop (_doc) { b.abort() } }) @@ -571,7 +665,7 @@ test('bulk index', t => { t.test('Invalid operation', t => { t.plan(2) const MockConnection = connection.buildMockConnection({ - onRequest (params) { + onRequest (_params) { return { body: { errors: false, items: [{}] } } } }) @@ -586,7 +680,7 @@ test('bulk index', t => { flushBytes: 1, concurrency: 1, // @ts-expect-error - onDocument (doc) { + onDocument (_doc) { return { foo: { _index: 'test' } } @@ -598,6 +692,44 @@ test('bulk index', t => { }) }) + t.test('should call onSuccess callback for each indexed document', async t => { + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + // @ts-expect-error + let [action] = params.body.split('\n') + action = JSON.parse(action) + return { body: { errors: false, items: [action] } } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection, + compression: false + }) + + let count = 0 + await client.helpers.bulk({ + datasource: dataset.slice(), + flushBytes: 1, + concurrency: 1, + onDocument (_doc) { + return { + index: { _index: 'test' } + } + }, + onSuccess ({ result, document }) { + t.same(result, { index: { _index: 'test' }}) + t.same(document, dataset[count++]) + }, + onDrop (_doc) { + t.fail('This should never be called') + } + }) + t.equal(count, 3) + t.end() + }) + t.end() }) @@ -652,6 +784,45 @@ test('bulk index', t => { }) }) + t.test('onSuccess is called for each indexed document', async t => { + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + // @ts-expect-error + let [action] = params.body.split('\n') + action = JSON.parse(action) + return { body: { errors: false, items: [action] } } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection, + compression: false + }) + const stream = createReadStream(join(__dirname, '..', '..', 'fixtures', 'small-dataset.ndjson'), 'utf8') + + let count = 0 + await client.helpers.bulk({ + datasource: stream.pipe(split()), + flushBytes: 1, + concurrency: 1, + onDocument (_doc) { + return { + index: { _index: 'test' } + } + }, + onSuccess ({ result, document }) { + t.same(result, { index: { _index: 'test' }}) + t.same(document, dataset[count++]) + }, + onDrop (_doc) { + t.fail('This should never be called') + } + }) + t.equal(count, 3) + t.end() + }) + t.end() }) @@ -707,9 +878,108 @@ test('bulk index', t => { aborted: false }) }) + + t.test('onSuccess is called for each indexed document', async t => { + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + // @ts-expect-error + let [action] = params.body.split('\n') + action = JSON.parse(action) + return { body: { errors: false, items: [action] } } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection, + compression: false + }) + + async function * generator () { + const data = dataset.slice() + for (const doc of data) { + yield doc + } + } + + let count = 0 + await client.helpers.bulk({ + datasource: generator(), + flushBytes: 1, + concurrency: 1, + onDocument (_doc) { + return { + index: { _index: 'test' } + } + }, + onSuccess ({ result, document }) { + t.same(result, { index: { _index: 'test' }}) + t.same(document, dataset[count++]) + }, + onDrop (_doc) { + t.fail('This should never be called') + } + }) + t.equal(count, 3) + t.end() + }) t.end() }) + t.test('Should use payload returned by `onDocument`', async t => { + let count = 0 + const updatedAt = '1970-01-01T12:00:00.000Z' + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + t.equal(params.path, '/_bulk') + t.match(params.headers, { + 'content-type': 'application/x-ndjson', + 'x-elastic-client-meta': `esv=${clientVersionNoMeta},js=${nodeVersion},t=${transportVersion},hc=${nodeVersion},h=bp` + }) + // @ts-expect-error + const [action, payload] = params.body.split('\n') + t.same(JSON.parse(action), { index: { _index: 'test' } }) + t.same(JSON.parse(payload), { ...dataset[count++], updatedAt }) + return { body: { errors: false, items: [{}] } } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection, + compression: false + }) + const result = await client.helpers.bulk({ + datasource: dataset.slice(), + flushBytes: 1, + concurrency: 1, + onDocument (doc) { + t.type(doc.user, 'string') // testing that doc is type of Document + return [ + { + index: { + _index: 'test' + } + }, + { ...doc, updatedAt } + ] + }, + onDrop (_doc) { + t.fail('This should never be called') + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) + }) + t.end() }) @@ -761,6 +1031,59 @@ test('bulk create', t => { aborted: false }) }) + + t.test('Should use payload returned by `onDocument`', async t => { + let count = 0 + const updatedAt = '1970-01-01T12:00:00.000Z' + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + t.equal(params.path, '/_bulk') + t.match(params.headers, { 'content-type': 'application/x-ndjson' }) + // @ts-expect-error + const [action, payload] = params.body.split('\n') + t.same(JSON.parse(action), { create: { _index: 'test', _id: count } }) + t.same(JSON.parse(payload), { ...dataset[count++], updatedAt }) + return { body: { errors: false, items: [{}] } } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection, + compression: false + }) + let id = 0 + const result = await client.helpers.bulk({ + datasource: dataset.slice(), + flushBytes: 1, + concurrency: 1, + onDocument (doc) { + return [ + { + create: { + _index: 'test', + _id: String(id++) + } + }, + { ...doc, updatedAt } + ] + }, + onDrop (_doc) { + t.fail('This should never be called') + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) + }) + t.end() }) @@ -1043,7 +1366,7 @@ test('bulk delete', t => { t.test('Should call onDrop on the correct document when doing a mix of operations that includes deletes', async t => { // checks to ensure onDrop doesn't provide the wrong document when some operations are deletes // see https://github.com/elastic/elasticsearch-js/issues/1751 - async function handler (req: http.IncomingMessage, res: http.ServerResponse) { + async function handler (_req: http.IncomingMessage, res: http.ServerResponse) { res.setHeader('content-type', 'application/json') res.end(JSON.stringify({ took: 0, @@ -1057,14 +1380,17 @@ test('bulk delete', t => { } const [{ port }, server] = await buildServer(handler) - const client = new Client({ node: `http://localhost:${port}` }) + const client = new Client({ + node: `http://localhost:${port}`, + compression: false + }) let counter = 0 const result = await client.helpers.bulk({ datasource: dataset.slice(), concurrency: 1, wait: 10, retries: 0, - onDocument (doc) { + onDocument (_doc) { counter++ if (counter === 1) { return { @@ -1104,6 +1430,64 @@ test('bulk delete', t => { server.stop() }) + t.test('should call onSuccess callback with delete action object', async t => { + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + // @ts-expect-error + let [action, payload] = params.body.split('\n') + action = JSON.parse(action) + return { body: { errors: false, items: [action] } } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection, + compression: false + }) + + let docCount = 0 + let successCount = 0 + await client.helpers.bulk({ + datasource: dataset.slice(), + flushBytes: 1, + concurrency: 1, + onDocument (_doc) { + if (docCount++ === 1) { + return { + delete: { + _index: 'test', + _id: String(docCount) + } + } + } else { + return { + index: { _index: 'test' } + } + } + }, + onSuccess ({ result, document }) { + const item = dataset[successCount] + if (successCount++ === 1) { + t.same(result, { + delete: { + _index: 'test', + _id: String(successCount) + } + }) + } else { + t.same(result, { index: { _index: 'test' }}) + t.same(document, item) + } + }, + onDrop (_doc) { + t.fail('This should never be called') + } + }) + + t.end() + }) + t.end() }) @@ -1286,12 +1670,12 @@ test('Flush interval', t => { })(), flushBytes: 5000000, concurrency: 1, - onDocument (doc) { + onDocument (_doc) { return { index: { _index: 'test' } } }, - onDrop (doc) { + onDrop (_doc) { t.fail('This should never be called') } }) @@ -1347,12 +1731,12 @@ test('Flush interval', t => { })(), flushBytes: 5000000, concurrency: 1, - onDocument (doc) { + onDocument (_doc) { return { index: { _index: 'test' } } }, - onDrop (doc) { + onDrop (_doc) { t.fail('This should never be called') } }) @@ -1419,5 +1803,161 @@ test('Flush interval', t => { }) }) + test(`flush timeout does not lock process when flushInterval is less than server timeout`, async t => { + const flushInterval = 500 + + async function handler (req: http.IncomingMessage, res: http.ServerResponse) { + setTimeout(() => { + res.writeHead(200, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ errors: false, items: [{}] })) + }, 1000) + } + + const [{ port }, server] = await buildServer(handler) + const client = new Client({ + node: `http://localhost:${port}`, + compression: false + }) + + async function * generator () { + const data = dataset.slice() + for (const doc of data) { + await sleep(flushInterval) + yield doc + } + } + + const result = await client.helpers.bulk({ + datasource: Readable.from(generator()), + flushBytes: 1, + flushInterval: flushInterval, + concurrency: 1, + onDocument (_) { + return { + index: { _index: 'test' } + } + }, + onDrop (_) { + t.fail('This should never be called') + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) + + server.stop() + }) + + test(`flush timeout does not lock process when flushInterval is greater than server timeout`, async t => { + const flushInterval = 500 + + async function handler (req: http.IncomingMessage, res: http.ServerResponse) { + setTimeout(() => { + res.writeHead(200, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ errors: false, items: [{}] })) + }, 250) + } + + const [{ port }, server] = await buildServer(handler) + const client = new Client({ + node: `http://localhost:${port}`, + compression: false + }) + + async function * generator () { + const data = dataset.slice() + for (const doc of data) { + await sleep(flushInterval) + yield doc + } + } + + const result = await client.helpers.bulk({ + datasource: Readable.from(generator()), + flushBytes: 1, + flushInterval: flushInterval, + concurrency: 1, + onDocument (_) { + return { + index: { _index: 'test' } + } + }, + onDrop (_) { + t.fail('This should never be called') + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) + + server.stop() + }) + + test(`flush timeout does not lock process when flushInterval is equal to server timeout`, async t => { + const flushInterval = 500 + + async function handler (req: http.IncomingMessage, res: http.ServerResponse) { + setTimeout(() => { + res.writeHead(200, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ errors: false, items: [{}] })) + }, flushInterval) + } + + const [{ port }, server] = await buildServer(handler) + const client = new Client({ + node: `http://localhost:${port}`, + compression: false + }) + + async function * generator () { + const data = dataset.slice() + for (const doc of data) { + await sleep(flushInterval) + yield doc + } + } + + const result = await client.helpers.bulk({ + datasource: Readable.from(generator()), + flushBytes: 1, + flushInterval: flushInterval, + concurrency: 1, + onDocument (_) { + return { + index: { _index: 'test' } + } + }, + onDrop (_) { + t.fail('This should never be called') + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) + + server.stop() + }) + t.end() })