Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,24 @@ Returns `null` if committing failed.

Same as [`core.session(options)`](#const-session--coresessionoptions), but backed by a storage snapshot so will not truncate nor append.

#### `const readBatch = core.read()`

Create a read batch.

Reads made on the batch shall be condensed into a single read transaction on the underlying storage.

Any blocks that are not in storage shall be fetched via replication, unless otherwise specified.

#### `const promise = readBatch.get(index, opts)`

Request a block from the read batch. `promise` shall resolve with the block or `null` after the batch is flushed.

`opts` are identical to the `get` API.

#### `readBatch.tryFlush()`

Flush the read batch, any gets made on the batch will only resolve _after_ calling `tryFlush`.

#### `const info = await core.info([options])`

Get information about this core, such as its total size in bytes.
Expand Down
32 changes: 29 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const Info = require('./lib/info')
const Download = require('./lib/download')
const DefaultEncryption = require('./lib/default-encryption')
const caps = require('./lib/caps')
const ReadBatch = require('./lib/read-batch')
const Replicator = require('./lib/replicator')
const { manifestHash, createManifest } = require('./lib/verifier')
const { ReadStream, WriteStream, ByteStream } = require('./lib/streams')
Expand Down Expand Up @@ -87,6 +88,8 @@ class Hypercore extends EventEmitter {
this._findingPeers = 0
this._active = opts.weak ? !!opts.active : opts.active !== false

this._readBatches = []

this._sessionIndex = -1
this._stateIndex = -1 // maintained by session state
this._monitorIndex = -1 // maintained by replication state
Expand Down Expand Up @@ -449,6 +452,10 @@ class Hypercore extends EventEmitter {

if (this.closed === true) return

for (const batch of this._readBatches.slice()) {
batch.destroy()
}

this.core.removeMonitor(this)
this.state.removeSession(this)
this._removeSession()
Expand Down Expand Up @@ -755,25 +762,44 @@ class Hypercore extends EventEmitter {
return count === end - start
}

read() {
const read = new ReadBatch(this)
read.index = this._readBatches.push(read) - 1

return read
}

_removeReadBatch(batch) {
const last = this._readBatches.pop()
if (last === batch) return
this._readBatches[batch.index] = last
last.index = batch.index
}

async get(index, opts) {
if (this.opened === false) await this.opening
if (!isValidIndex(index)) throw ASSERTION('block index is invalid', this.discoveryKey)

if (this.closing !== null)
throw SESSION_CLOSED('cannot get on a closed session', this.discoveryKey)

const encoding =
(opts && opts.valueEncoding && c.from(opts.valueEncoding)) || this.valueEncoding

if (this.onseq !== null) this.onseq(index, this)

const req = this._get(index, opts)

let block = await req

return this._handleBlock(index, block, opts)
}

async _handleBlock(index, block, opts) {
if (!block) return null

if (opts && opts.raw) return block

const encoding =
(opts && opts.valueEncoding && c.from(opts.valueEncoding)) || this.valueEncoding

if (this.encryption && (!opts || opts.decrypt !== false)) {
// Copy the block as it might be shared with other sessions.
block = b4a.from(block)
Expand Down
50 changes: 50 additions & 0 deletions lib/read-batch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
const { ASSERTION } = require('hypercore-errors')

module.exports = class ReadBatch {
constructor(core) {
this.core = core
this.rx = core.state.storage.read()

this.reads = new Map()
this.index = -1

this.destroyed = false
}

async destroy() {
this.core._removeReadBatch(this)
this.destroyed = true
this.rx.destroy()
}

async get(index, opts = {}) {
if (!isValidIndex(index)) throw ASSERTION('block index is invalid', this.discoveryKey)

if (this.core.onseq !== null) this.core.onseq(index, this.core)

const block = await this._get(index)
if (this.destroyed) return null

if (block) return this.core._handleBlock(index, block, opts)

return this.core.get(index, opts)
}

_get(index) {
if (this.reads.has(index)) return this.reads.get(index)

const promise = this.rx.getBlock(index)
this.reads.set(index, promise)

return promise
}

tryFlush() {
this.rx.tryFlush()
this.core._removeReadBatch(this)
}
}

function isValidIndex(index) {
return index === 0 || index > 0
}
96 changes: 96 additions & 0 deletions test/read-batch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
const test = require('brittle')
const b4a = require('b4a')
const HypercoreStorage = require('hypercore-storage')
const crypto = require('hypercore-crypto')

const Hypercore = require('../')
const { create, replicate, eventFlush } = require('./helpers')

test('basic', async function (t) {
const core = await create(t)

await core.append('hello')
await core.append('world')

t.is(core.length, 2)

const read = core.read()

const b0 = read.get(0)
const b1 = read.get(1)

read.tryFlush()

t.alike(await b0, b4a.from('hello'))
t.alike(await b1, b4a.from('world'))
})

test('replication', async function (t) {
const core = await create(t)
const other = await create(t, core.key)

await core.append('hello')
await core.append('world')

t.is(core.length, 2)

replicate(core, other, t)

const read = other.read()

const b0 = read.get(0)
const b1 = read.get(1, { wait: false })

read.tryFlush()

t.alike(await b0, b4a.from('hello'))
t.alike(await b1, null)
})

test('mixed replication', async function (t) {
const core = await create(t)
const other = await create(t, core.key)

await core.append('hello')
await core.append('world')

t.is(core.length, 2)

replicate(core, other, t)

t.alike(await other.get(0), b4a.from('hello'))

const read = other.read()

const b0 = read.get(0)
const b1 = read.get(1)

read.tryFlush()

t.alike(await b0, b4a.from('hello'))
t.alike(await b1, b4a.from('world'))
})

test('destroy', async function (t) {
const core = await create(t)
const other = await create(t, core.key)

await core.append('hello')
await core.append('world')

t.is(core.length, 2)

replicate(core, other, t)

t.alike(await other.get(0), b4a.from('hello'))

const read = other.read()

const exception = t.exception(read.get(0), /Batch is destroyed/)

read.destroy()

await exception

await t.execution(other.close())
})