-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Changes from 1 commit
eb4ab1f
761e305
2f632ec
7b498fb
c8e964a
0137caf
f6d7a2a
793a355
719a9f9
a5db723
ae27eb5
1e3aedc
df86ce4
0d5085d
43b1720
d970a32
3ec57d9
255dee3
c8d1f08
568a1d9
c0007af
8de0c2b
28b615d
05ae894
8b52444
0c7cfdf
81e3dd0
7898ca2
bdcbddb
a07ed7f
ef86efc
75159a0
c2b5ef6
7cfc53b
026158f
bf4e731
356e263
58f34d6
168046a
216e53a
4712178
681b577
c81a920
11a02dd
1e4c97a
a8c362c
43a2644
c992c51
c364b19
358b957
bcc8a69
24c072e
97f8054
36f45f4
d085a30
f869455
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,24 +3,19 @@ | |
const pull = require('pull-stream/pull') | ||
const pullThrough = require('pull-stream/throughs/through') | ||
const pullAsyncMap = require('pull-stream/throughs/async-map') | ||
const EventEmitter = require('events') | ||
const Mutex = require('../../../utils/mutex') | ||
const log = require('debug')('ipfs:gc:lock') | ||
|
||
class GCLock extends EventEmitter { | ||
class GCLock { | ||
constructor (repoOwner) { | ||
super() | ||
|
||
this.mutex = new Mutex(repoOwner, { log }) | ||
} | ||
|
||
readLock (lockedFn, cb) { | ||
this.emit(`readLock request`) | ||
return this.mutex.readLock(lockedFn, cb) | ||
} | ||
|
||
writeLock (lockedFn, cb) { | ||
this.emit(`writeLock request`) | ||
return this.mutex.writeLock(lockedFn, cb) | ||
} | ||
|
||
|
@@ -33,7 +28,7 @@ class GCLock extends EventEmitter { | |
} | ||
|
||
pullLock (type, lockedPullFn) { | ||
const pullLocker = new PullLocker(this, this.mutex, type, this.lockId++) | ||
const pullLocker = new PullLocker(this.mutex, type) | ||
|
||
return pull( | ||
pullLocker.take(), | ||
|
@@ -44,8 +39,7 @@ class GCLock extends EventEmitter { | |
} | ||
|
||
class PullLocker { | ||
constructor (emitter, mutex, type) { | ||
this.emitter = emitter | ||
constructor (mutex, type) { | ||
this.mutex = mutex | ||
this.type = type | ||
|
||
|
@@ -54,26 +48,30 @@ class PullLocker { | |
} | ||
|
||
take () { | ||
return pull( | ||
pullAsyncMap((i, cb) => { | ||
if (this.lockRequested) { | ||
return cb(null, i) | ||
} | ||
this.lockRequested = true | ||
|
||
this.emitter.emit(`${this.type} request`) | ||
|
||
this.mutex[this.type]((releaseLock) => { | ||
cb(null, i) | ||
this.releaseLock = releaseLock | ||
}) | ||
return pullAsyncMap((i, cb) => { | ||
// Check if the lock has already been acquired. | ||
// Note: new items will only come through the pull stream once the first | ||
// item has acquired a lock. | ||
if (this.releaseLock) { | ||
// The lock has been acquired so return immediately | ||
return cb(null, i) | ||
} | ||
|
||
// Request the lock | ||
this.mutex[this.type]((releaseLock) => { | ||
// The lock has been granted, so run the locked piece of code | ||
cb(null, i) | ||
|
||
// Save the release function to be called when the stream completes | ||
this.releaseLock = releaseLock | ||
}) | ||
) | ||
}) | ||
} | ||
|
||
// Releases the lock | ||
release () { | ||
return pullThrough(null, (err) => { | ||
// When the stream completes, release the lock | ||
this.releaseLock(err) | ||
}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The pipeline becomes something like: pull(
...input,
take,
process,
release,
...output
) The problem I see with this is that we may end up taking a lock for significantly longer than we may need to due to slow user pipeline actions. For example, imagine an output action that takes 1s. After we have processed the last chunk of input, we're going to have to wait a further second before the lock is released, since the release happens when the entire pipeline ends, not immediately after we've processed the last chunk. Can we eagerly request the next chunk here so we know when the source is drained before the sink requests the last read? This sounds like a module that should already exist! I don't know how we'd mitigate this on the source side, maybe buffer up a certain amount before we start processing it (this won't help with big files). You could imagine a slow source being someone adding something over the HTTP API. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a great point, locking with pull streams is very tricky. Maybe it's better to take out the pull stream locking code, and instead have an explicit There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's kinda what you're doing isn't it?...but in a helper function that adds both How would you see this working for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right there isn't really another way to do it. I think your suggestion makes the most sense. We'd need to think about how to implement that in a way that doesn't make this even more complex. |
||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,28 @@ const pEvent = require('p-event') | |
const env = require('ipfs-utils/src/env') | ||
const IPFS = require('../../src/core') | ||
|
||
// We need to detect when a readLock or writeLock is requested for the tests | ||
// so we override the Mutex class to emit an event | ||
const EventEmitter = require('events') | ||
const Mutex = require('../../src/utils/mutex') | ||
|
||
class MutexEmitter extends Mutex { | ||
constructor (repoOwner) { | ||
super(repoOwner) | ||
this.emitter = new EventEmitter() | ||
} | ||
|
||
readLock (lockedFn, cb) { | ||
this.emitter.emit('readLock request') | ||
return super.readLock(lockedFn, cb) | ||
} | ||
|
||
writeLock (lockedFn, cb) { | ||
this.emitter.emit('writeLock request') | ||
return super.writeLock(lockedFn, cb) | ||
} | ||
} | ||
|
||
describe('gc', function () { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A couple of questions:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm asking this as I expect people to use GC when their repo gets of a significant size (10GB +++) and not when it is small. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point 👍 I have been testing locally with repos a few hundred MB in size:
I haven't performed stress testing with repos of the size you're suggesting. I had a look at go-ipfs, it seems like there are some sharness tests for GC, but I didn't see any stress tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just tried with an 11GB repo and it ran in a few seconds. I will add a sharness test for stress-testing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 11GB repo and how large of a GC? half of it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. GC removes all blocks that are not pinned (same as go-ipfs). In this case I added 11GB of unpinned files and then GCed them. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a sharness test for GC, in which the number of files and their size can be easily adjusted. It seems like GC performance is linear: |
||
const fixtures = [{ | ||
path: 'test/my/path1', | ||
|
@@ -29,6 +51,7 @@ describe('gc', function () { | |
|
||
let ipfsd | ||
let ipfs | ||
let lockEmitter | ||
|
||
before(function (done) { | ||
this.timeout(40 * 1000) | ||
|
@@ -48,6 +71,11 @@ describe('gc', function () { | |
ipfsd = node | ||
ipfs = ipfsd.api | ||
|
||
// Replace the Mutex with one that emits events when a readLock or | ||
// writeLock is requested (needed in the tests below) | ||
ipfs._gcLock.mutex = new MutexEmitter(ipfs._options.repoOwner) | ||
lockEmitter = ipfs._gcLock.mutex.emitter | ||
|
||
done() | ||
}) | ||
}) | ||
|
@@ -79,13 +107,13 @@ describe('gc', function () { | |
it(`garbage collection should wait for pending ${test.name} to finish`, async () => { | ||
// Add blocks to IPFS | ||
// Note: add operation will take a read lock | ||
const addLockRequested = pEvent(ipfs._gcLock, 'readLock request') | ||
const addLockRequested = pEvent(lockEmitter, 'readLock request') | ||
const add1 = test.add1() | ||
|
||
// Once add lock has been requested, start GC | ||
await addLockRequested | ||
// Note: GC will take a write lock | ||
const gcStarted = pEvent(ipfs._gcLock, 'writeLock request') | ||
const gcStarted = pEvent(lockEmitter, 'writeLock request') | ||
const gc = ipfs.repo.gc() | ||
|
||
// Once GC has started, start second add | ||
|
@@ -109,13 +137,13 @@ describe('gc', function () { | |
it('garbage collection should wait for pending add + pin to finish', async () => { | ||
// Add blocks to IPFS | ||
// Note: add operation will take a read lock | ||
const addLockRequested = pEvent(ipfs._gcLock, 'readLock request') | ||
const addLockRequested = pEvent(lockEmitter, 'readLock request') | ||
const add1 = ipfs.add(fixtures[2], { pin: true }) | ||
|
||
// Once add lock has been requested, start GC | ||
await addLockRequested | ||
// Note: GC will take a write lock | ||
const gcStarted = pEvent(ipfs._gcLock, 'writeLock request') | ||
const gcStarted = pEvent(lockEmitter, 'writeLock request') | ||
const gc = ipfs.repo.gc() | ||
|
||
// Once GC has started, start second add | ||
|
@@ -142,13 +170,13 @@ describe('gc', function () { | |
|
||
// Remove first block from IPFS | ||
// Note: block rm will take a write lock | ||
const rmLockRequested = pEvent(ipfs._gcLock, 'writeLock request') | ||
const rmLockRequested = pEvent(lockEmitter, 'writeLock request') | ||
const rm1 = ipfs.block.rm(cid1) | ||
|
||
// Once rm lock has been requested, start GC | ||
await rmLockRequested | ||
// Note: GC will take a write lock | ||
const gcStarted = pEvent(ipfs._gcLock, 'writeLock request') | ||
const gcStarted = pEvent(lockEmitter, 'writeLock request') | ||
const gc = ipfs.repo.gc() | ||
|
||
// Once GC has started, start second rm | ||
|
@@ -185,7 +213,7 @@ describe('gc', function () { | |
|
||
// Pin first block | ||
// Note: pin add will take a read lock | ||
const pinLockRequested = pEvent(ipfs._gcLock, 'readLock request') | ||
const pinLockRequested = pEvent(lockEmitter, 'readLock request') | ||
const pin1 = ipfs.pin.add(cid1) | ||
|
||
// Once pin lock has been requested, start GC | ||
|
@@ -222,13 +250,13 @@ describe('gc', function () { | |
|
||
// Unpin first block | ||
// Note: pin rm will take a read lock | ||
const pinLockRequested = pEvent(ipfs._gcLock, 'readLock request') | ||
const pinLockRequested = pEvent(lockEmitter, 'readLock request') | ||
const pinRm1 = ipfs.pin.rm(cid1) | ||
|
||
// Once pin lock has been requested, start GC | ||
await pinLockRequested | ||
// Note: GC will take a write lock | ||
const gcStarted = pEvent(ipfs._gcLock, 'writeLock request') | ||
const gcStarted = pEvent(lockEmitter, 'writeLock request') | ||
const gc = ipfs.repo.gc() | ||
|
||
// Once GC has started, start second pin rm | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't need to be protected ? like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because
release()
is called after the stream completes, it shouldn't be possible to get here without a lock having been acquired (andthis.releaseLock
being set)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happen if
lockedPullFn()
doesnt work properly, throws or break flow ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lockedPullFn()
returns a pull stream, so it depends on what that pull-stream doesThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what im asking is if
lockedPullFn()
does something wrong is this code still safe ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe that will affect anything in the
PullLocker.release()
function - the second callback to pullThrough() should only be invoked once the stream ends.If an exception is thrown from within the pull-stream returned by lockPullFn() I believe it will be thrown outside the control of the locking class.