-
Notifications
You must be signed in to change notification settings - Fork 105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!: iterable pinning #231
Conversation
It might be desirable to change the default behavior of the first iteration call to allow it to pull all the local blocks without a batch limit. This would allow pins to quickly catch up to where they were if the program was interrupted without having to await the next iteration call, giving the user an easy way to have 'resume download' type behavior. Something like this: const itr = helia.pins.add(cid, { skipLocal: true })
// Fetches all the blocks that are in the local blockstore not limited to batch numbers:
const localCids = await itr.next()
// Fetches the next blocks from the network according to the batch number:
const downloadedCids = await itr.next() |
This is interesting, but I wonder if we couldn't simplify the internal implementation by just having it be a generator that pins blocks as fast as possible, then if the user wants to batch them up (or parallelise, or parallel batch) they can use existing modules like it-batch, it-parallel, it-parallel-batch or a combination of things from streaming-iterables? That would give them a lot more control over the performance characteristics of pinning, and we'll have a simpler codebase to maintain. |
I agree and have updated the code. Adding pins: // Before:
await helia.pins.add(cid)
// After:
await all(parallel(helia.pins.add(cid), { concurrency: 1 })) Removing pins: // Before:
await helia.pins.rm(cid)
// After:
await all(parallel(helia.pins.rm(cid), { concurrency: 1 })) Getting the pinned value: // Before:
const pin = await helia.pins.add(cid)
// After:
const itr = helia.pins.add(cid)
let output = await itr.next()
while (output.done === false) {
await output.value()
output = await itr.next()
}
const pin = next.value One thing this is still missing that I would like to see is being able to continue a pin where it left off or catch up without rate limiting pinning of local blocks. The simple use case that illustrates this would be the rate limited pinning of a large DAG and the program crashes part way and you want it to continue where it left off. Do you think a |
fbeed88
to
9f9bc60
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Self review, plus approving prior code. Should be looked at again by @saul-jb and @achingbrain
/** | ||
* Callback for updating a {@link DatastorePinnedBlock}'s properties when | ||
* calling `#updatePinnedBlock` | ||
* | ||
* The callback should return `false` to prevent any pinning modifications to | ||
* the block, and true in all other cases. | ||
*/ | ||
interface WithPinnedBlockCallback { | ||
(pinnedBlock: DatastorePinnedBlock): boolean | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@achingbrain I pulled this out into a locally defined interface and added some context
packages/helia/src/pins.ts
Outdated
await this.#updatePinnedBlock(cid, (pinnedBlock: DatastorePinnedBlock) => { | ||
// do not update pinned block if this block is already pinned by this CID | ||
if (pinnedBlock.pinnedBy.find(c => uint8ArrayEquals(c, cid.bytes)) != null) { | ||
return false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: we return false to say that we do not want to process the pinning of this content any further.
packages/helia/src/pins.ts
Outdated
}) | ||
pinnedBlock.pinCount++ | ||
pinnedBlock.pinnedBy.push(cid.bytes) | ||
return true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should be returning true by default inside a WithPinnedBlockCallback
packages/helia/src/pins.ts
Outdated
const shouldContinue = withPinnedBlock(pinnedBlock) | ||
if (!shouldContinue) { | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@achingbrain @saul-jb this is how I've implemented the skipLocal
option that was mentioned. We already had the logic to skip content if it was pinned, we just needed a way to exit early.
All tests pass, but I'm not sure of any implications of returning prior to checking for pinnedBlock.pinCount === 0
. However, since only .add
is returning false when pinnedBlock.pinnedBy.find(c => uint8ArrayEquals(c, cid.bytes)) != null
, and everything else returns true, I believe this should continue working as it had previously.
@@ -85,4 +87,35 @@ describe('pins (recursive)', () => { | |||
} | |||
} | |||
}) | |||
|
|||
it('can resume an interrupted pinning operation', async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a test to simulate an interrupted pinning event by only iterating over it partially, and then attempting to pin from the root again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could optimize this further, but a worthy optimization would likely require a drastic change to the pinning response and input types.
We could try to prevent calls to #updatePinnedBlock
entirely by asking users to provide cids to skip (which would have been returned in the first .add
call). But pinnedBy
is checking the block content explicitly with uint8ArrayEquals
, not just CID strings, so I assume this would cause more work in cases where we're not resuming.
Open to ideas
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few thoughts
packages/helia/src/pins.ts
Outdated
depth | ||
}) | ||
}) | ||
for await (const promise of this.#walkDag(cid, depth, options)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
calling this promise
sounds a bit weird.
packages/helia/src/pins.ts
Outdated
const dagWalker = this.dagWalkers[cid.code] | ||
const enqueue = (cid: CID, depth: number): void => { | ||
queue.push(async () => { | ||
const promise = Promise.resolve().then(async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if I follow, why does this need to be Promise.resolve().then(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is just another way to write ;(async () => {})()
without the need for a semi colon.
packages/helia/src/pins.ts
Outdated
while (queue.length + promises.length !== 0) { | ||
const func = queue.shift() | ||
|
||
if (func == null) { | ||
await promises.shift() | ||
|
||
continue | ||
} | ||
|
||
yield func | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this logic is quite loaded, having an explanation here would help. Under what circumstances would func
be null
? is it when the queue
is exhausted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, when the queue is exhausted but there are still promises running we wait until the next one has resolved before trying to pull more from the queue.
dismissed my review and pointed to regression seen when updating benchmarks/gc in #346 Some tasks for this PR:
|
I'm going to merge this so we have the API changes in and the v3 release is unblocked, we can address the performance characteristics in a follow-up. The existing "resumable pinning" change is useful though it's more "restartable pinning" than "resumable" - I'll open another issue to discuss. |
This PR reworks the pinning to be async generators instead of promises with support for batching, granting users more fine-grained control of the pinning/downloading of blocks.
As expected using a higher batch number gives much better performance than the old pinning system since its doing more fetching in parallel. A batch number of 1 will replicate the old system of a simple queue.
This is a basic example of throttling the pins download, more advanced systems can be made to control how many large pins can be downloaded at once with various speeds by calling
next
on the iterables.Another example, altering the batch size:
Adding pins:
Removing pins:
Getting the pinned value: