Skip to content

Commit 269c757

Browse files
committed
Fix backpressuring not being maintained
1 parent 13200dc commit 269c757

File tree

1 file changed

+4
-8
lines changed

1 file changed

+4
-8
lines changed

lib/RdfParser.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
} from "@comunica/bus-rdf-parse";
99
import {ActionContext, Actor, Mediator} from "@comunica/core";
1010
import * as RDF from "rdf-js";
11-
import {Readable} from "stream";
11+
import {Readable, PassThrough} from "stream";
1212

1313
/**
1414
* An RdfParser can parse any RDF serialization, based on a given content type.
@@ -81,10 +81,7 @@ export class RdfParser<Q extends RDF.BaseQuad = RDF.Quad> {
8181
}
8282

8383
// Create a new readable
84-
const readable = new Readable({ objectMode: true });
85-
readable._read = () => {
86-
return;
87-
};
84+
const readable = new PassThrough({ objectMode: true });
8885

8986
// Delegate parsing to the mediator
9087
this.mediatorRdfParseHandle.mediate({
@@ -93,10 +90,9 @@ export class RdfParser<Q extends RDF.BaseQuad = RDF.Quad> {
9390
handleMediaType: contentType,
9491
})
9592
.then((output) => {
96-
const quads: RDF.Stream = output.handle.quads;
93+
const quads = <Readable> output.handle.quads;
9794
quads.on('error', (e) => readable.emit('error', e));
98-
quads.on('data', (quad) => readable.push(quad));
99-
quads.on('end', () => readable.push(null));
95+
quads.pipe(readable);
10096
})
10197
.catch((e) => readable.emit('error', e));
10298

0 commit comments

Comments
 (0)