Skip to content

Breaking: rename highWaterMark to highWaterMarkBytes #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
- [`db.batch(operations[, options][, callback])`](#dbbatchoperations-options-callback)
- [`db.batch()`](#dbbatch)
- [`iterator = db.iterator([options])`](#iterator--dbiteratoroptions)
- [About high water](#about-high-water)
- [`keyIterator = db.keys([options])`](#keyiterator--dbkeysoptions)
- [`valueIterator = db.values([options])`](#valueiterator--dbvaluesoptions)
- [`db.clear([options][, callback])`](#dbclearoptions-callback)
Expand Down Expand Up @@ -367,9 +368,30 @@ The `gte` and `lte` range options take precedence over `gt` and `lt` respectivel
- `values` (boolean, default: `true`): whether to return the value of each entry. If set to `false`, the iterator will yield values that are `undefined`. Prefer to use `db.values()` instead.
- `keyEncoding`: custom key encoding for this iterator, used to encode range options, to encode `seek()` targets and to decode keys.
- `valueEncoding`: custom value encoding for this iterator, used to decode values.
- `fillCache` (boolean, default: `false`): If set to `true`, LevelDB will fill its in-memory [LRU](http://en.wikipedia.org/wiki/Least_Recently_Used) cache with data that was read.
- `fillCache` (boolean, default: `false`): if set to `true`, LevelDB will fill its in-memory [LRU](http://en.wikipedia.org/wiki/Least_Recently_Used) cache with data that was read.
- `highWaterMarkBytes` (number, default: `16 * 1024`): limit the amount of data that the iterator will hold in memory. Explained below.

> :pushpin: To instead consume data using Node.js streams, see [`level-read-stream`](https://github.com/Level/read-stream).
#### About high water

While [`iterator.nextv(size)`](#iteratornextvsize-options-callback) is reading entries from LevelDB into memory, it sums up the byte length of those entries. If and when that sum has exceeded `highWaterMarkBytes`, reading will stop. If `nextv(2)` would normally yield two entries but the first entry is too large, then only one entry will be yielded. More `nextv(size)` calls must then be made to get the remaining entries.

If memory usage is less of a concern, increasing `highWaterMarkBytes` can increase the throughput of `nextv(size)`. If set to `0` then `nextv(size)` will never yield more than one entry, as `highWaterMarkBytes` will be exceeded on each call. It can not be set to `Infinity`. On key- and value iterators (see below) it applies to the byte length of keys or values respectively, rather than the combined byte length of keys _and_ values.

Optimal performance can be achieved by setting `highWaterMarkBytes` to at least `size` multiplied by the expected byte length of an entry, ensuring that `size` is always met. In other words, that `nextv(size)` will not stop reading before `size` amount of entries have been read into memory. If the iterator is wrapped in a [Node.js stream](https://github.com/Level/read-stream) or [Web Stream](https://github.com/Level/web-stream) then the `size` parameter is dictated by the stream's `highWaterMark` option. For example:

```js
const { EntryStream } = require('level-read-stream')

// If an entry is 50 bytes on average
const stream = new EntryStream(db, {
highWaterMark: 1000,
highWaterMarkBytes: 1000 * 50
})
```

Side note: the "watermark" analogy makes more sense in Node.js streams because its internal `highWaterMark` can grow, indicating the highest that the "water" has been. In a `classic-level` iterator however, `highWaterMarkBytes` is fixed once set. Getting exceeded does not change it.

The `highWaterMarkBytes` option is also applied to an internal cache that `classic-level` employs for [`next()`](#iteratornextcallback) and [`for await...of`](#for-awaitof-iterator). When `next()` is called, that cache is populated with at most 1000 entries, or less than that if `highWaterMarkBytes` is exceeded by the total byte length of entries. To avoid reading too eagerly, the cache is not populated on the first `next()` call, or the first `next()` call after a `seek()`. Only on subsequent `next()` calls.

### `keyIterator = db.keys([options])`

Expand Down
13 changes: 6 additions & 7 deletions binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -823,14 +823,14 @@ struct Iterator final : public BaseIterator {
const bool fillCache,
const bool keyAsBuffer,
const bool valueAsBuffer,
const uint32_t highWaterMark)
const uint32_t highWaterMarkBytes)
: BaseIterator(database, reverse, lt, lte, gt, gte, limit, fillCache),
id_(id),
keys_(keys),
values_(values),
keyAsBuffer_(keyAsBuffer),
valueAsBuffer_(valueAsBuffer),
highWaterMark_(highWaterMark),
highWaterMarkBytes_(highWaterMarkBytes),
first_(true),
nexting_(false),
isClosing_(false),
Expand Down Expand Up @@ -877,7 +877,7 @@ struct Iterator final : public BaseIterator {
bytesRead += v.size();
}

if (bytesRead > highWaterMark_ || cache_.size() >= size) {
if (bytesRead > highWaterMarkBytes_ || cache_.size() >= size) {
return true;
}
}
Expand All @@ -890,7 +890,7 @@ struct Iterator final : public BaseIterator {
const bool values_;
const bool keyAsBuffer_;
const bool valueAsBuffer_;
const uint32_t highWaterMark_;
const uint32_t highWaterMarkBytes_;
bool first_;
bool nexting_;
bool isClosing_;
Expand Down Expand Up @@ -1624,8 +1624,7 @@ NAPI_METHOD(iterator_init) {
const bool keyAsBuffer = EncodingIsBuffer(env, options, "keyEncoding");
const bool valueAsBuffer = EncodingIsBuffer(env, options, "valueEncoding");
const int limit = Int32Property(env, options, "limit", -1);
const uint32_t highWaterMark = Uint32Property(env, options, "highWaterMark",
16 * 1024);
const uint32_t highWaterMarkBytes = Uint32Property(env, options, "highWaterMarkBytes", 16 * 1024);

std::string* lt = RangeOption(env, options, "lt");
std::string* lte = RangeOption(env, options, "lte");
Expand All @@ -1635,7 +1634,7 @@ NAPI_METHOD(iterator_init) {
const uint32_t id = database->currentIteratorId_++;
Iterator* iterator = new Iterator(database, id, reverse, keys,
values, limit, lt, lte, gt, gte, fillCache,
keyAsBuffer, valueAsBuffer, highWaterMark);
keyAsBuffer, valueAsBuffer, highWaterMarkBytes);
napi_value result;

NAPI_STATUS_THROWS(napi_create_external(env, iterator,
Expand Down
5 changes: 5 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ export interface AdditionalIteratorOptions {
* @defaultValue `false`
*/
fillCache?: boolean | undefined

/**
* Limit the amount of data that the iterator will hold in memory.
*/
highWaterMarkBytes?: number | undefined
}

/**
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"prebuild-win32-x64": "prebuildify -t 8.14.0 --napi --strip"
},
"dependencies": {
"abstract-level": "^1.0.0",
"abstract-level": "^1.0.1",
"catering": "^2.1.0",
"module-error": "^1.0.1",
"napi-macros": "~2.0.0",
Expand Down
18 changes: 9 additions & 9 deletions test/cleanup-hanging-iterators-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ makeTest('test likely-closed iterator', function (db, t, done) {
})

makeTest('test non-closed iterator', function (db, t, done) {
// Same as the test above but with a highWaterMark of 0 so that we don't
// Same as the test above but with a highWaterMarkBytes of 0 so that we don't
// preemptively fetch all records, to ensure that the iterator is still
// active when we (attempt to) close the database.
const it = db.iterator({ highWaterMark: 0 })
const it = db.iterator({ highWaterMarkBytes: 0 })

it.next(function (err, key, value) {
t.ifError(err, 'no error from next()')
Expand All @@ -57,10 +57,10 @@ makeTest('test multiple likely-closed iterators', function (db, t, done) {
})

makeTest('test multiple non-closed iterators', function (db, t, done) {
// Same as the test above but with a highWaterMark of 0.
// Same as the test above but with a highWaterMarkBytes of 0.
for (let i = 0; i < repeats; i++) {
db.iterator({ highWaterMark: 0 })
db.iterator({ highWaterMark: 0 }).next(function () {})
db.iterator({ highWaterMarkBytes: 0 })
db.iterator({ highWaterMarkBytes: 0 }).next(function () {})
}

setTimeout(done, Math.floor(Math.random() * 50))
Expand All @@ -70,8 +70,8 @@ global.gc && makeTest('test multiple non-closed iterators with forced gc', funct
// Same as the test above but with forced GC, to test that the lifespan of an
// iterator is tied to *both* its JS object and whether the iterator was closed.
for (let i = 0; i < repeats; i++) {
db.iterator({ highWaterMark: 0 })
db.iterator({ highWaterMark: 0 }).next(function () {})
db.iterator({ highWaterMarkBytes: 0 })
db.iterator({ highWaterMarkBytes: 0 }).next(function () {})
}

setTimeout(function () {
Expand All @@ -95,7 +95,7 @@ makeTest('test closing iterators', function (db, t, done) {

makeTest('test recursive next', function (db, t, done) {
// Test that we're able to close when user keeps scheduling work
const it = db.iterator({ highWaterMark: 0 })
const it = db.iterator({ highWaterMarkBytes: 0 })

it.next(function loop (err, key) {
if (err && err.code !== 'LEVEL_ITERATOR_NOT_OPEN') throw err
Expand All @@ -107,7 +107,7 @@ makeTest('test recursive next', function (db, t, done) {

makeTest('test recursive next (random)', function (db, t, done) {
// Same as the test above but closing at a random time
const it = db.iterator({ highWaterMark: 0 })
const it = db.iterator({ highWaterMarkBytes: 0 })

it.next(function loop (err, key) {
if (err && err.code !== 'LEVEL_ITERATOR_NOT_OPEN') throw err
Expand Down
6 changes: 3 additions & 3 deletions test/iterator-gc-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ test('db without ref does not get GCed while iterating', function (t) {
db.batch(sourceData.slice(), function (err) {
t.ifError(err, 'no batch error')

// Set highWaterMark to 0 so that we don't preemptively fetch.
const it = db.iterator({ highWaterMark: 0 })
// Set highWaterMarkBytes to 0 so that we don't preemptively fetch.
const it = db.iterator({ highWaterMarkBytes: 0 })

// Remove reference
db = null
Expand All @@ -39,7 +39,7 @@ test('db without ref does not get GCed while iterating', function (t) {
iterate(it)
} else {
// But a timeout usually also allows GC to kick in. If not, the time
// between iterator ticks might. That's when "highWaterMark: 0" helps.
// between iterator ticks might. That's when "highWaterMarkBytes: 0" helps.
setTimeout(iterate.bind(null, it), 1000)
}
})
Expand Down
63 changes: 63 additions & 0 deletions test/iterator-hwm-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
'use strict'

const test = require('tape')
const testCommon = require('./common')

let db

test('highWaterMarkBytes setup', async function (t) {
db = testCommon.factory()

// Write 8 bytes
return db.batch().put('a', '0').put('b', '1').put('c', '2').put('d', '3').write()
})

test('highWaterMarkBytes limits byte length of nextv() entries', async function (t) {
const hwm = async (highWaterMarkBytes) => {
const it = db.iterator({ highWaterMarkBytes })
const entries = await it.nextv(1e3)
await it.close()
return entries
}

t.same(await hwm(0), [['a', '0']], 'accepts 0')
t.same(await hwm(Infinity), [['a', '0']], 'Infinity is interpreted as 0 (by Node-API)')
t.same(await hwm(1), [['a', '0']], 'is limited')
t.same(await hwm(2), [['a', '0'], ['b', '1']], 'highWaterMarkBytes must be exceeded, not met')
})

test('highWaterMarkBytes limits byte length of internal next() cache', async function (t) {
const hwm = async (highWaterMarkBytes) => {
const it = db.iterator({ highWaterMarkBytes })

// Because initial next() calls don't cache, make two calls
await it.next()
await it.next()

const count = 1 + it.cached
await it.close()

// Return how many bytes were retrieved natively by the second call
return count * 2
}

t.is(await hwm(0), 2, 'accepts 0')
t.is(await hwm(Infinity), 2, 'Infinity is interpreted as 0 (by Node-API)')
t.is(await hwm(1), 2, 'is limited')
t.is(await hwm(2), 4, 'highWaterMarkBytes must be exceeded, not met')
t.is(await hwm(9), 6, 'double-check that previous test did apply a limit')
})

test('highWaterMarkBytes does not affect byte length of all() entries', async function (t) {
const hwm = async (highWaterMarkBytes) => {
// Note: setting hwm does make all() slower, as it uses nextv() atm
return db.iterator({ highWaterMarkBytes }).all()
}

t.same(await hwm(0), [['a', '0'], ['b', '1'], ['c', '2'], ['d', '3']])
t.same(await hwm(1), [['a', '0'], ['b', '1'], ['c', '2'], ['d', '3']])
})

test('highWaterMarkBytes teardown', async function (t) {
return db.close()
})
2 changes: 1 addition & 1 deletion test/iterator-recursion-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ test('setUp db', function (t) {

test('iterate over a large iterator with a large watermark', function (t) {
const iterator = db.iterator({
highWaterMark: 10000000
highWaterMarkBytes: 10000000
})
const read = function () {
iterator.next(function (err, key, value) {
Expand Down
6 changes: 3 additions & 3 deletions test/iterator-starvation-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ test('iterator does not starve event loop', function (t) {
db.batch(sourceData.slice(), function (err) {
t.ifError(err, 'no batch error')

// Set a high highWaterMark to fill up the cache entirely
const it = db.iterator({ highWaterMark: Math.pow(1024, 3) })
// Set a high highWaterMarkBytes to fill up the cache entirely
const it = db.iterator({ highWaterMarkBytes: Math.pow(1024, 3) })

let breaths = 0
let entries = 0
Expand Down Expand Up @@ -77,7 +77,7 @@ test('iterator with seeks does not starve event loop', function (t) {
db.batch(sourceData.slice(), function (err) {
t.ifError(err, 'no batch error')

const it = db.iterator({ highWaterMark: Math.pow(1024, 3), limit: sourceData.length })
const it = db.iterator({ highWaterMarkBytes: Math.pow(1024, 3), limit: sourceData.length })

let breaths = 0
let entries = 0
Expand Down