diff --git a/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts b/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts index ca3ba4445f940a4..c3098aec005b2ff 100644 --- a/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts +++ b/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts @@ -10,8 +10,9 @@ import type { Logger } from '@kbn/core/server'; import moment from 'moment'; import * as Rx from 'rxjs'; import { timeout } from 'rxjs/operators'; -import { finished, Writable } from 'stream'; -import { promisify } from 'util'; +import { Writable } from 'stream'; +import { finished } from 'stream/promises'; +import { setImmediate } from 'timers/promises'; import type { RunContext, TaskManagerStartContract, @@ -377,7 +378,16 @@ export class ExecuteReportTask implements ReportingTask { stream.end(); - await promisify(finished)(stream, { readable: false }); + // FIXME: Wait for the stream to end so that we don't trigger `ERR_STREAM_PREMATURE_CLOSE` errors when calling `finished` below (see https://github.com/nodejs/node/issues/45281). Before merging this should be fixed! + await (async function pendingCallbacks() { + if ((stream as any)._writableState.pendingcb > 0) { + await setImmediate(); + await pendingCallbacks(); + } + })(); + + // FIXME: This is where the "Premature close" error is thrown! + await finished(stream, { readable: false }); report._seq_no = stream.getSeqNo()!; report._primary_term = stream.getPrimaryTerm()!;