Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions create.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,15 @@ module.exports = function (path, opts, keys) {

db.append({ content: content, keys: keys }, cb)
}

function addBulk(messages, cb) {
if (!cb) throw new Error("Expected callback to feed addBulk function")
else db.appendAll({messages: messages, keys: keys}, cb)
}

return {
add: add,
addBulk: addBulk,
publish: add,
id: keys.id,
keys: keys
Expand Down
3 changes: 3 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ module.exports = {
close : close,
del: valid.async(ssb.del, 'msgLink'),
publish : valid.async(feed.add, 'string|msgContent'),

// An atomic append of many messages at once
publishAll : valid.async(feed.addBulk, 'object'),
add : valid.async(ssb.add, 'msg'),
queue : valid.async(ssb.queue, 'msg'),
get : valid.async(ssb.get, 'msgLink|number|object'),
Expand Down
1 change: 0 additions & 1 deletion indexes/clock.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ var ltgt = require('ltgt')
// 53 bit integer
var MAX_INT = 0x1fffffffffffff
var u = require('../util')

var ViewLevel = require('flumeview-level')

module.exports = function (db, opts) {
Expand Down
2 changes: 1 addition & 1 deletion indexes/feed.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
var pull = require('pull-stream')
var ltgt = require('ltgt')
var u = require('../util')

var ViewLevel = require('flumeview-level')

function resolveTimestamp (msg) {

// fallback to sync time if no user timestamp or timestamp is after sync time
if (!msg.value.timestamp || msg.timestamp < msg.value.timestamp) {
return msg.timestamp
Expand Down
177 changes: 159 additions & 18 deletions minimal.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ var ssbKeys = require('ssb-keys')
var box = ssbKeys.box
var u = require('./util')
var isFeed = require('ssb-ref').isFeed
var pull = require('pull-stream')
var asyncMap = require('pull-stream/throughs/async-map')

var isArray = Array.isArray
function isFunction (f) { return typeof f === 'function' }

function unbox (data, unboxers, key) {
var plaintext

if (data && isString(data.value.content)) {
for (var i = 0; i < unboxers.length; i++) {
var unboxer = unboxers[i]
Expand Down Expand Up @@ -52,6 +55,7 @@ function unbox (data, unboxers, key) {
}
}
}

return data
}

Expand Down Expand Up @@ -117,14 +121,68 @@ module.exports = function (dirname, keys, opts) {
var queue = AsyncWrite(function (_, cb) {
var batch = state.queue
state.queue = []
append(batch, function (err, v) {
batch.forEach(function (data) {
db.post.set(u.originalData(data))

var hasBatchAppends = batch.findIndex(function (elem) {
return isArray(elem)
}) !== -1;

if (!hasBatchAppends) {

append(batch, function (err, v) {
handlePost(batch)
cb(err, v)
})
cb(err, v)
})
} else {
// If there are batch appends, we need to make sure we append those as one 'append'
// operation so that that the append is done atomically, and the appropriate callback
// is called via the flush queue to signal the write has completed

var batchIndexes = findBatchIndexRanges(batch)

var finalResult = null;

pull(
pull.values(batchIndexes),
asyncMap(function(item, mapCb) {
var startIndex = item[0]
var endIndex = item[1]
var slice = batch.slice(startIndex, endIndex)

if (slice.length === 1 && isArray(slice[0])) {
slice = slice[0]
}

append(slice, function(err, v) {
handlePost(slice)
mapCb(err, v)
})
}
),
pull.drain(function(result) {
finalResult = result
}, function(err, v) {
cb(err, finalResult)
}))

}

function handlePost(d) {
d.forEach(function (data) {
if (!isArray(data)) {
db.post.set(u.originalData(data))
} else {
data.forEach(d => db.post.set(u.originalData(d)))
}
})
}


}, function reduce (_, msg) {
return V.append(state, hmacKey, msg)
if (isArray(msg)) {
return V.appendBulk(state, hmacKey, msg)
} else {
return V.append(state, hmacKey, msg)
}
}, function (_state) {
return state.queue.length > 1000
}, function isEmpty (_state) {
Expand All @@ -139,6 +197,51 @@ module.exports = function (dirname, keys, opts) {
}
}

/**
* Takes an array of single messages and arrays (bulk messages)
* and returns a list of the slice indexes for the array that such that the
* bulk appends would be performed in one operation and the rest would
* performed in chunks.
*
* e.g. [single_message1, single_message2, [bulk_messages], single_message3, [bulk_messages]]
* would return [[0, 3], [3,4], [4,5], [5,6]]
*
* @param {*} batch the array of writes to be performed.
*/
function findBatchIndexRanges(batch) {

var batchIndexes = batch.map(function(elem, index) {
if (isArray(elem)) {
return index
} else {
return null
}

}).filter(function(elem) {
return elem !== null
})

var start = 0
var result = []
batchIndexes.forEach(function (batchIndex) {

if (start < batchIndex) {
result.push([start, batchIndex])
}

result.push([batchIndex, batchIndex + 1])
start = batchIndex + 1
})

var lastBatchIndex = batchIndexes[batchIndexes - 1];

if (lastBatchIndex < (batch.length - 1)) {
result.push([lastBatchIndex + 1, batch.length])
}

return result
}

db.last.get(function (_, last) {
// copy to so we avoid weirdness, because this object
// tracks the state coming in to the database.
Expand Down Expand Up @@ -179,18 +282,7 @@ module.exports = function (dirname, keys, opts) {
db.append = wait(function (opts, cb) {
try {
var content = opts.content
var recps = opts.content.recps
if (recps) {
const isNonEmptyArrayOfFeeds = isArray(recps) && recps.every(isFeed) && recps.length > 0
if (isFeed(recps) || isNonEmptyArrayOfFeeds) {
recps = opts.content.recps = [].concat(recps) // force to array
content = opts.content = box(opts.content, recps)
} else {
const errMsg = 'private message recipients must be valid, was:' + JSON.stringify(recps)
throw new Error(errMsg)
}
}

content = boxOrThrow(content)
var msg = V.create(
state.feeds[opts.keys.id],
opts.keys, opts.hmacKey || hmacKey,
Expand All @@ -211,6 +303,39 @@ module.exports = function (dirname, keys, opts) {
})
})

db.appendAll = wait(function (opts, cb) {
try {
var messages = opts.messages
messages = messages.map(boxOrThrow).map(function(message) {
return {
content: message,
timestamp: timestamp()
}

})

var validatedMessages = V.createAll(
state.feeds[opts.keys.id],
opts.keys,
opts.hmacKey || hmacKey,
messages
)

queue(validatedMessages, function (err) {
if (err) return cb(err)
var data = state.queue[state.queue.length - 1]
flush.push(function () {
cb(null, data)
})
})

} catch (err) {
cb(err)
return
}

})

db.buffer = function () {
return queue.buffer
}
Expand All @@ -232,6 +357,22 @@ module.exports = function (dirname, keys, opts) {
maps.push(fn)
}

function boxOrThrow(content) {
var recps = content.recps
if (recps) {
const isNonEmptyArrayOfFeeds = isArray(recps) && recps.every(isFeed) && recps.length > 0
if (isFeed(recps) || isNonEmptyArrayOfFeeds) {
recps = content.recps = [].concat(recps) // force to array
return box(content, recps)
} else {
const errMsg = 'private message recipients must be valid, was:' + JSON.stringify(recps)
throw new Error(errMsg)
}
}

return content
}

return db
}

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@
"ssb-keys": "^7.1.3",
"ssb-msgs": "^5.0.0",
"ssb-ref": "^2.12.0",
"ssb-validate": "^4.0.0",
"ssb-validate": "file:../ssb-validate",
"typewiselite": "^1.0.0",
"zerr": "^1.0.0"
},
"devDependencies": {
"hexpp": "^2.0.0",
"multicb": "^1.2.2",
"pull-abortable": "~4.1.0",
"ssb-feed": "^2.2.1",
"tape": "^4.8.0",
Expand Down
Loading