This repository has been archived by the owner on Aug 11, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
index.js
61 lines (51 loc) · 1.8 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
var changesStream = require('changes-stream')
var pressureStream = require('pressure-stream')
var concurrentSeq = require('concurrent-seq-file')
var undef
module.exports = function (handler, config) {
if (!handler || typeof handler !== 'function') throw new Error('first argument should be function to use as a document handler')
if (!config) throw new Error('config object required. fn(handler,config)')
if (typeof config === 'string') config = {db: config}
if (!config.db) throw new Error('missing required "db" key in config.')
// include_docs by default
if (config.include_docs === undef) config.include_docs = true
var seq
if(typeof config.sequence === 'function'){
if(config.since === undefined) {
throw new Error("config.since must be set and must be the last value you saved in your database if using custom persist backend. if you dont have one set this to 0")
}
seq = concurrentSeq.starter(config.sequence,{savedValue:config.since})
seq.value = config.since
} else {
seq = concurrentSeq(config.sequence || '.sequence')
}
config.since = seq.value
var firstStart = (seq.value === 0)
if (firstStart) {
config.since = (config.now === true) ? 'now' : 0
}
var changes = changesStream(config)
var pressure = pressureStream(function (change, next) {
var saveSeq = seq(change.seq)
handler(change, function (err, data) {
saveSeq()
next(err, data)
})
}, config.concurrency || 4)
changes.on('error', function (err) {
pressure.emit('error', err)
})
changes.pipe(pressure)
// just for logging. =)
pressure.sequence = function () {
return seq.value
}
// hack to make ending the streams work-ish
// i only end in tests anyway.
pressure.end = function(){
//
changes.destroy()
this.destroy()
}
return pressure
}