diff --git a/package.json b/package.json index 99c2922..af4631c 100644 --- a/package.json +++ b/package.json @@ -40,6 +40,7 @@ "async": "^2.6.1", "datastore-core": "~0.6.0", "interface-datastore": "~0.6.0", + "once": "^1.4.0", "pull-defer": "~0.2.3", "pull-stream": "^3.6.9", "upath": "^1.1.0" diff --git a/src/index.js b/src/index.js index e036000..21cd10a 100644 --- a/src/index.js +++ b/src/index.js @@ -5,6 +5,7 @@ const assert = require('assert') const path = require('upath') const nextTick = require('async/nextTick') +const once = require('once') const each = require('async/each') const waterfall = require('async/series') const asyncFilter = require('interface-datastore').utils.asyncFilter @@ -88,6 +89,7 @@ class S3Datastore { * @returns {void} */ put (key /* : Key */, val /* : Buffer */, callback /* : Callback */) /* : void */ { + callback = once(callback) this.opts.s3.upload({ Key: this._getFullKey(key), Body: val @@ -113,6 +115,7 @@ class S3Datastore { * @returns {void} */ get (key /* : Key */, callback /* : Callback */) /* : void */ { + callback = once(callback) this.opts.s3.getObject({ Key: this._getFullKey(key) }, (err, data) => { @@ -135,6 +138,7 @@ class S3Datastore { * @returns {void} */ has (key /* : Key */, callback /* : Callback */) /* : void */ { + callback = once(callback) this.opts.s3.headObject({ Key: this._getFullKey(key) }, (err, data) => { @@ -156,6 +160,7 @@ class S3Datastore { * @returns {void} */ delete (key /* : Key */, callback /* : Callback */) /* : void */ { + callback = once(callback) this.opts.s3.deleteObject({ Key: this._getFullKey(key) }, (err) => { @@ -182,6 +187,7 @@ class S3Datastore { deletes.push(key) }, commit: (callback /* : (err: ?Error) => void */) => { + callback = once(callback) waterfall([ (cb) => each(puts, (p, _cb) => { this.put(p.key, p.value, _cb) @@ -207,6 +213,7 @@ class S3Datastore { keys = [] } + callback = once(callback) this.opts.s3.listObjectsV2(params, (err, data) => { if (err) { return callback(new Error(err.code)) @@ -241,6 +248,8 @@ class S3Datastore { return { next: (callback/* : Callback */) => { + callback = once(callback) + // Check if we're done if (count >= keys.length) { return callback(null, null, null) @@ -278,6 +287,8 @@ class S3Datastore { // this gets called recursively, the internals need to iterate const rawStream = (end, callback) => { + callback = once(callback) + if (end) { return callback(end) } @@ -346,6 +357,7 @@ class S3Datastore { * @returns {void} */ open (callback /* : Callback */) /* : void */ { + callback = once(callback) this.opts.s3.headObject({ Key: this.path }, (err, data) => {