Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit eae37e1

Browse files
committed
test: add gc locking tests
1 parent ff28923 commit eae37e1

File tree

7 files changed

+300
-31
lines changed

7 files changed

+300
-31
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@
166166
"multihashes": "~0.4.14",
167167
"multihashing-async": "~0.6.0",
168168
"node-fetch": "^2.3.0",
169+
"p-event": "^4.1.0",
169170
"peer-book": "~0.9.0",
170171
"peer-id": "~0.12.0",
171172
"peer-info": "~0.15.0",

src/core/components/block.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ module.exports = function block (self) {
8181
cb(null, new Block(block, cid))
8282
})
8383
},
84-
(block, cb) => self._gcLock.writeLock((_cb) => {
84+
(block, cb) => self._gcLock.readLock((_cb) => {
8585
self._blockService.put(block, (err) => {
8686
if (err) {
8787
return _cb(err)
@@ -103,7 +103,7 @@ module.exports = function block (self) {
103103
return setImmediate(() => callback(errCode(err, 'ERR_INVALID_CID')))
104104
}
105105

106-
self._gcLock.readLock((cb) => self._blockService.delete(cid, cb), callback)
106+
self._gcLock.writeLock((cb) => self._blockService.delete(cid, cb), callback)
107107
}),
108108
stat: promisify((cid, options, callback) => {
109109
if (typeof options === 'function') {

src/core/components/files-regular/add-pull-stream.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ function pinFile (file, self, opts, cb) {
112112
const isRootDir = !file.path.includes('/')
113113
const shouldPin = pin && isRootDir && !opts.onlyHash && !opts.hashAlg
114114
if (shouldPin) {
115-
return self.pin.add(file.hash, { preload: false }, err => cb(err, file))
115+
return self.pin.add(file.hash, { preload: false, lock: false }, err => cb(err, file))
116116
} else {
117117
cb(null, file)
118118
}

src/core/components/gc-lock.js

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22

33
const mortice = require('mortice')
44
const pull = require('pull-stream')
5-
const log = require('debug')('ipfs:repo:gc:lock')
5+
const EventEmitter = require('events')
6+
const log = require('debug')('ipfs:gc:lock')
67

7-
class GCLock {
8+
class GCLock extends EventEmitter {
89
constructor () {
10+
super()
911
this.mutex = mortice()
12+
this.lockId = 0
1013
}
1114

1215
readLock (lockedFn, cb) {
@@ -18,16 +21,28 @@ class GCLock {
1821
}
1922

2023
lock (type, lockedFn, cb) {
21-
log(`${type} requested`)
24+
if (typeof lockedFn !== 'function') {
25+
throw new Error(`first argument to ${type} must be a function`)
26+
}
27+
if (typeof cb !== 'function') {
28+
throw new Error(`second argument to ${type} must be a callback function`)
29+
}
30+
31+
const lockId = this.lockId++
32+
log(`[${lockId}] ${type} requested`)
33+
this.emit(`${type} request`, lockId)
2234
const locked = () => new Promise((resolve, reject) => {
23-
log(`${type} started`)
24-
lockedFn((err, res) => err ? reject(err) : resolve(res))
35+
this.emit(`${type} start`, lockId)
36+
log(`[${lockId}] ${type} started`)
37+
lockedFn((err, res) => {
38+
this.emit(`${type} release`, lockId)
39+
log(`[${lockId}] ${type} released`)
40+
err ? reject(err) : resolve(res)
41+
})
2542
})
2643

2744
const lock = this.mutex[type](locked)
28-
return lock.then(res => cb(null, res)).catch(cb).finally(() => {
29-
log(`${type} released`)
30-
})
45+
return lock.then(res => cb(null, res)).catch(cb)
3146
}
3247

3348
pullReadLock (lockedPullFn) {
@@ -39,7 +54,7 @@ class GCLock {
3954
}
4055

4156
pullLock (type, lockedPullFn) {
42-
const pullLocker = new PullLocker(this.mutex, type)
57+
const pullLocker = new PullLocker(this, this.mutex, type, this.lockId++)
4358

4459
return pull(
4560
pullLocker.take(),
@@ -50,9 +65,11 @@ class GCLock {
5065
}
5166

5267
class PullLocker {
53-
constructor (mutex, type) {
68+
constructor (emitter, mutex, type, lockId) {
69+
this.emitter = emitter
5470
this.mutex = mutex
5571
this.type = type
72+
this.lockId = lockId
5673

5774
// This Promise resolves when the mutex gives us permission to start
5875
// running the locked piece of code
@@ -65,7 +82,8 @@ class PullLocker {
6582
locked () {
6683
return new Promise((resolve) => {
6784
this.releaseLock = resolve
68-
log(`${this.type} (pull) started`)
85+
log(`[${this.lockId}] ${this.type} (pull) started`)
86+
this.emitter.emit(`${this.type} start`, this.lockId)
6987

7088
// The locked piece of code is ready to start, so resolve the
7189
// this.lockReady Promise (created in the constructor)
@@ -79,7 +97,8 @@ class PullLocker {
7997
return pull(
8098
pull.asyncMap((i, cb) => {
8199
if (!this.lock) {
82-
log(`${this.type} (pull) requested`)
100+
log(`[${this.lockId}] ${this.type} (pull) requested`)
101+
this.emitter.emit(`${this.type} request`, this.lockId)
83102
// Request the lock
84103
this.lock = this.mutex[this.type](() => this.locked())
85104
}
@@ -93,7 +112,8 @@ class PullLocker {
93112
// Releases the lock
94113
release () {
95114
return pull.through(null, () => {
96-
log(`${this.type} (pull) released`)
115+
log(`[${this.lockId}] ${this.type} (pull) released`)
116+
this.emitter.emit(`${this.type} release`, this.lockId)
97117
this.releaseLock()
98118
})
99119
}

src/core/components/gc.js

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ const base32 = require('base32.js')
66
const parallel = require('async/parallel')
77
const mapLimit = require('async/mapLimit')
88
const { Key } = require('interface-datastore')
9+
const log = require('debug')('ipfs:gc')
910

1011
// Limit on the number of parallel block remove operations
1112
const BLOCK_RM_CONCURRENCY = 256
@@ -16,7 +17,7 @@ const MFS_ROOT_DS_KEY = new Key('/local/filesroot')
1617
module.exports = function gc (self) {
1718
return promisify(async (callback) => {
1819
const start = Date.now()
19-
self.log(`GC: Creating set of marked blocks`)
20+
log(`Creating set of marked blocks`)
2021

2122
self._gcLock.writeLock((lockCb) => {
2223
parallel([
@@ -26,14 +27,14 @@ module.exports = function gc (self) {
2627
(cb) => createMarkedSet(self, cb)
2728
], (err, [blocks, markedSet]) => {
2829
if (err) {
29-
self.log(`GC: Error - ${err.message}`)
30+
log(`Error - ${err.message}`)
3031
return lockCb(err)
3132
}
3233

3334
// Delete blocks that are not being used
3435
deleteUnmarkedBlocks(self, markedSet, blocks, start, (err, res) => {
3536
if (err) {
36-
self.log(`GC: Error - ${err.message}`)
37+
log(`Error - ${err.message}`)
3738
return lockCb(err)
3839
}
3940
lockCb(null, res)
@@ -55,15 +56,15 @@ function createMarkedSet (ipfs, callback) {
5556
if (err) {
5657
return cb(new Error(`Could not list pinned blocks: ${err.message}`))
5758
}
58-
ipfs.log(`GC: Found ${pins.length} pinned blocks`)
59+
log(`Found ${pins.length} pinned blocks`)
5960
cb(null, pins.map(p => new CID(p.hash)))
6061
}),
6162

6263
// Blocks used internally by the pinner
6364
(cb) => ipfs._repo.datastore.get(PIN_DS_KEY, (err, mh) => {
6465
if (err) {
6566
if (err.code === 'ERR_NOT_FOUND') {
66-
ipfs.log(`GC: No pinned blocks`)
67+
log(`No pinned blocks`)
6768
return cb(null, [])
6869
}
6970
return cb(new Error(`Could not get pin sets root from datastore: ${err.message}`))
@@ -86,7 +87,7 @@ function createMarkedSet (ipfs, callback) {
8687
(cb) => ipfs._repo.datastore.get(MFS_ROOT_DS_KEY, (err, mh) => {
8788
if (err) {
8889
if (err.code === 'ERR_NOT_FOUND') {
89-
ipfs.log(`GC: No blocks in MFS`)
90+
log(`No blocks in MFS`)
9091
return cb(null, [])
9192
}
9293
return cb(new Error(`Could not get MFS root from datastore: ${err.message}`))
@@ -110,7 +111,7 @@ function getDescendants (ipfs, cid, callback) {
110111
if (err) {
111112
return callback(new Error(`Could not get MFS root descendants from store: ${err.message}`))
112113
}
113-
ipfs.log(`GC: Found ${refs.length} MFS blocks`)
114+
log(`Found ${refs.length} MFS blocks`)
114115
callback(null, [cid, ...refs.map(r => new CID(r.ref))])
115116
})
116117
}
@@ -132,9 +133,9 @@ function deleteUnmarkedBlocks (ipfs, markedSet, blocks, start, callback) {
132133
}
133134
}
134135

135-
const msg = `GC: Marked set has ${markedSet.size} blocks. Blockstore has ${blocks.length} blocks. ` +
136+
const msg = `Marked set has ${markedSet.size} blocks. Blockstore has ${blocks.length} blocks. ` +
136137
`Deleting ${unreferenced.length} blocks.`
137-
ipfs.log(msg)
138+
log(msg)
138139

139140
mapLimit(unreferenced, BLOCK_RM_CONCURRENCY, (cid, cb) => {
140141
// Delete blocks from blockstore
@@ -146,7 +147,7 @@ function deleteUnmarkedBlocks (ipfs, markedSet, blocks, start, callback) {
146147
cb(null, res)
147148
})
148149
}, (_, delRes) => {
149-
ipfs.log(`GC: Complete (${Date.now() - start}ms)`)
150+
log(`Complete (${Date.now() - start}ms)`)
150151

151152
callback(null, res.concat(delRes))
152153
})

src/core/components/pin.js

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ module.exports = (self) => {
182182
resolvePath(self.object, paths, (err, mhs) => {
183183
if (err) { return callback(err) }
184184

185-
self._gcLock.readLock((lockCb) => {
185+
const pin = (pinComplete) => {
186186
// verify that each hash can be pinned
187187
map(mhs, (multihash, cb) => {
188188
const key = toB58String(multihash)
@@ -217,19 +217,28 @@ module.exports = (self) => {
217217
})
218218
}
219219
}, (err, results) => {
220-
if (err) { return lockCb(err) }
220+
if (err) { return pinComplete(err) }
221221

222222
// update the pin sets in memory
223223
const pinset = recursive ? recursivePins : directPins
224224
results.forEach(key => pinset.add(key))
225225

226226
// persist updated pin sets to datastore
227227
flushPins((err, root) => {
228-
if (err) { return lockCb(err) }
229-
lockCb(null, results.map(hash => ({ hash })))
228+
if (err) { return pinComplete(err) }
229+
pinComplete(null, results.map(hash => ({ hash })))
230230
})
231231
})
232-
}, callback)
232+
}
233+
234+
// When adding a file, we take a lock that gets released after pinning
235+
// is complete, so don't take a second lock here
236+
const lock = options.lock !== false
237+
if (lock) {
238+
self._gcLock.readLock(pin, callback)
239+
} else {
240+
pin(callback)
241+
}
233242
})
234243
}),
235244

0 commit comments

Comments
 (0)