@@ -5,6 +5,7 @@ const EventEmitter = require('events').EventEmitter
5
5
const Readable = require ( 'readable-stream' )
6
6
const toStream = require ( 'it-to-stream' )
7
7
const mapSeries = require ( 'p-each-series' )
8
+ const { default : PQueue } = require ( 'p-queue' )
8
9
const Log = require ( 'ipfs-log' )
9
10
const Entry = Log . Entry
10
11
const Index = require ( './Index' )
@@ -67,6 +68,9 @@ class Store {
67
68
// Create the operations log
68
69
this . _oplog = new Log ( this . _ipfs , this . identity , { logId : this . id , access : this . access , sortFn : this . options . sortFn } )
69
70
71
+ // _addOperation queue
72
+ this . _opqueue = new PQueue ( { concurrency : 1 } )
73
+
70
74
// Create the index
71
75
this . _index = new this . options . Index ( this . address . root )
72
76
@@ -173,6 +177,8 @@ class Store {
173
177
await this . options . onClose ( this )
174
178
}
175
179
180
+ await this . _opqueue . onIdle ( )
181
+
176
182
// Replicator teardown logic
177
183
this . _replicator . stop ( )
178
184
@@ -501,20 +507,23 @@ class Store {
501
507
}
502
508
503
509
async _addOperation ( data , { onProgressCallback, pin = false } = { } ) {
504
- if ( this . _oplog ) {
505
- // check local cache?
506
- if ( this . options . syncLocal ) {
507
- await this . syncLocal ( )
508
- }
510
+ async function addOperation ( ) {
511
+ if ( this . _oplog ) {
512
+ // check local cache?
513
+ if ( this . options . syncLocal ) {
514
+ await this . syncLocal ( )
515
+ }
509
516
510
- const entry = await this . _oplog . append ( data , this . options . referenceCount , pin )
511
- this . _recalculateReplicationStatus ( this . replicationStatus . progress + 1 , entry . clock . time )
512
- await this . _cache . set ( this . localHeadsPath , [ entry ] )
513
- await this . _updateIndex ( )
514
- this . events . emit ( 'write' , this . address . toString ( ) , entry , this . _oplog . heads )
515
- if ( onProgressCallback ) onProgressCallback ( entry )
516
- return entry . hash
517
+ const entry = await this . _oplog . append ( data , this . options . referenceCount , pin )
518
+ this . _recalculateReplicationStatus ( this . replicationStatus . progress + 1 , entry . clock . time )
519
+ await this . _cache . set ( this . localHeadsPath , [ entry ] )
520
+ await this . _updateIndex ( )
521
+ this . events . emit ( 'write' , this . address . toString ( ) , entry , this . _oplog . heads )
522
+ if ( onProgressCallback ) onProgressCallback ( entry )
523
+ return entry . hash
524
+ }
517
525
}
526
+ return this . _opqueue . add ( addOperation . bind ( this ) )
518
527
}
519
528
520
529
_addOperationBatch ( data , batchOperation , lastOperation , onProgressCallback ) {
0 commit comments