Skip to content

Stream speed management using async/await (cursor) #1

Closed
@Blitzsturm

Description

@Blitzsturm

The current stream mechanics make it easy to process rows as they are read from the query rather than return them in a block; however in ultra-large result set cases it's possible the rows will be read faster than they can be processed. See this Reddit thread for more comprehensive specifics. An example use case would be executing a query that yields 1 million+ rows then sending them in blocks of 200 to an external web service that takes several seconds to process each request. In this case it could be possible to send hundreds or thousands of concurrent outbound requests overwhelming heap space and the external service.

There are two potential fixes:

  1. Allow iterator functions to return a promise which is then awaited to ensure more data is not sent to the iterator function than it can handle. The following is a hypothetical use case that buffers up to 200 rows at a time then sends it to an external service with limited speed:
	var rowBuffer = [];
	await sql`select created_at, name from events`.stream(async row => {
		if (rowBuffer.push(row) >= 200) await transformAndSend(rowBuffer.splice(0,200));
	});
	if (rowBuffer.length > 0) await transformAndSend(rowBuffer.splice(0,200));

From a syntax standpoint all that would change from existing functionality is the addition of the optional async. But from the backend the return from the result of the iterator would need to be checked if it is a promise, then awaited. This is an overtly simplified example:

	while (moreRows) {
		var row = await getRow();
		var P = StreamIterator(row);
		if (P instanceof Promise) await P;
	}

I'm not sure how the rows are received weather each is electively read or they come in as events which can be paused. If they are purposefully read then this would be pretty easy. You'd just await the callback and get the next row. If they come in as events you'd need to buffer them to be sent to the callback and pause the incoming results if that buffer gets too large.I

*Ideally the promise resolving to a true or a specified enum should stop the query from completing. So if for example the user executed a query that botches a join and results in 10 billion rows of nonsense or the designation service won't accept the results in the callback, it would be nice to have a means to gracefully and forcefully stop it from reading more rows.

  1. Alternatively (or additionally) making use of Symbol.asyncIterator would allow a standard "for await" loop as a means to asynchronously process data from an async source. This would be (very) easy to add in after altering to existing stream functionality to watch for an await returned promises and could be officially extended from the query result as an .iterate() that returns an Symbol.asyncIterator object to manage this flow for the end user. That would look something like the following in practice:
	var rowBuffer = [];
	for await (let row of sql`select created_at, name from events`.iterate()) {
		if (rowBuffer.push(row) >= 200) await transformAndSend(rowBuffer.splice(0,200));
	});
	if (rowBuffer.length > 0) await transformAndSend(rowBuffer.splice(0,200));

I'm at your disposal if you need assistance in implementing this feature or need to test it once complete. If you can make efficiently iterating through large queries as easy as a "for await" loop you'll have introduced an quality of life improvement well ahead of the curve in terms of future proofing.

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions