Skip to content
This repository was archived by the owner on Aug 11, 2022. It is now read-only.

Commit cfd43b4

Browse files
aredridelzkat
authored andcommitted
search: first stab at streaming search
Credit: @aredridel PR-URL: #13746 Reviewed-By: @othiym23
1 parent 643dae2 commit cfd43b4

File tree

5 files changed

+182
-85
lines changed

5 files changed

+182
-85
lines changed

lib/cache/update-index.js

+142-54
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,18 @@ var cacheFile = require('npm-cache-filename')
1111
var getCacheStat = require('./get-stat.js')
1212
var mapToRegistry = require('../utils/map-to-registry.js')
1313
var pulseTillDone = require('../utils/pulse-till-done.js')
14-
var parseJSON = require('../utils/parse-json.js')
14+
var jsonstream = require('JSONStream')
15+
var asyncMap = require('slide').asyncMap
16+
var writeStreamAtomic = require('fs-write-stream-atomic')
17+
var once = require('once')
1518

1619
/* /-/all is special.
1720
* It uses timestamp-based caching and partial updates,
1821
* because it is a monster.
1922
*/
20-
function updateIndex (staleness, cb) {
21-
assert(typeof cb === 'function', 'must pass callback to updateIndex')
23+
function updateIndex (staleness, args, notArgs, filter, cb) {
24+
assert(typeof filter === 'function', 'must pass filter callback to updateIndex')
25+
assert(typeof cb === 'function', 'must pass final callback to updateIndex')
2226

2327
mapToRegistry('-/all', npm.config, function (er, uri, auth) {
2428
if (er) return cb(er)
@@ -27,79 +31,163 @@ function updateIndex (staleness, cb) {
2731
timeout: staleness,
2832
follow: true,
2933
staleOk: true,
30-
auth: auth
34+
auth: auth,
35+
streaming: true
3136
}
32-
var cacheBase = cacheFile(npm.config.get('cache'))(uri)
33-
var cachePath = path.join(cacheBase, '.cache.json')
34-
log.info('updateIndex', cachePath)
37+
var cacheBase = path.join(cacheFile(npm.config.get('cache'))(uri), '_search')
38+
log.info('updateIndex', cacheBase)
3539

3640
getCacheStat(function (er, st) {
3741
if (er) return cb(er)
3842

3943
mkdir(cacheBase, function (er, made) {
4044
if (er) return cb(er)
4145

42-
fs.readFile(cachePath, function (er, data) {
43-
if (er) {
44-
log.warn('', 'Building the local index for the first time, please be patient')
45-
return updateIndex_(uri, params, {}, cachePath, cb)
46-
}
46+
chownr(made || cacheBase, st.uid, st.gid, function (er) {
47+
if (er) return cb(er)
4748

48-
chownr(made || cachePath, st.uid, st.gid, function (er) {
49+
fs.readdir(cacheBase, function (er, cacheFiles) {
4950
if (er) return cb(er)
5051

51-
data = parseJSON.noExceptions(data)
52-
if (!data) {
53-
fs.writeFile(cachePath, '{}', function (er) {
54-
if (er) return cb(new Error('Broken cache.'))
55-
56-
log.warn('', 'Building the local index for the first time, please be patient')
57-
return updateIndex_(uri, params, {}, cachePath, cb)
58-
})
59-
}
60-
61-
var t = +data._updated || 0
62-
// use the cache and update in the background if it's not too old
63-
if (Date.now() - t < 60000) {
64-
cb(null, data)
65-
cb = function () {}
66-
}
67-
68-
if (t === 0) {
69-
log.warn('', 'Building the local index for the first time, please be patient')
70-
} else {
71-
log.verbose('updateIndex', 'Cached search data present with timestamp', t)
72-
uri += '/since?stale=update_after&startkey=' + t
73-
}
74-
updateIndex_(uri, params, data, cachePath, cb)
52+
cacheFiles.sort()
53+
54+
var latest = 0
55+
asyncMap(cacheFiles, function (file, cb) {
56+
log.silly('search', 'reading cache ' + file)
57+
cb = once(cb)
58+
var m = /^(\d+)-(\d+)[.]json/.exec(file)
59+
if (m) {
60+
latest = Number(m[2])
61+
var cacheFile = path.join(cacheBase, file)
62+
63+
fs.stat(cacheFile, function (er, stat) {
64+
if (er) return cb(er)
65+
var r = fs.createReadStream(cacheFile).pipe(log.newStream('readCache', stat.size))
66+
var f = r.pipe(collectResults(filter, args, notArgs, cb))
67+
f.once('error', cb)
68+
})
69+
} else {
70+
cb(null, {})
71+
}
72+
}, function (err, data) {
73+
if (err) return cb(err)
74+
75+
data = data.reduce(function (a, e) {
76+
Object.keys(e).forEach(function (k) {
77+
a[k] = e[k]
78+
})
79+
return a
80+
}, {})
81+
82+
// use the cache and make no request if it's not too old
83+
if (Date.now() - latest < 60000) {
84+
finish(data, cb)
85+
} else {
86+
if (latest === 0) {
87+
log.warn('', 'Building the local index for the first time, please be patient')
88+
} else {
89+
log.verbose('updateIndex', 'Cached search data present with timestamp', latest)
90+
uri += '/since?stale=update_after&startkey=' + latest
91+
}
92+
93+
updateIndex_(uri, params, latest, filter, args, notArgs, cacheBase, function (err, updated) {
94+
if (err) return cb(err)
95+
96+
Object.keys(updated).forEach(function (k) {
97+
data[k] = updated[k]
98+
})
99+
100+
finish(data, cb)
101+
})
102+
}
103+
})
75104
})
76105
})
77106
})
78107
})
79108
})
80109
}
81110

82-
function updateIndex_ (all, params, data, cachePath, cb) {
83-
log.silly('update-index', 'fetching', all)
84-
npm.registry.request(all, params, pulseTillDone('updateIndex', function (er, updates, _, res) {
85-
if (er) return cb(er, data)
111+
function finish (data, cb) {
112+
var keys = Object.keys(data)
113+
keys.sort()
114+
var results = keys.map(function (k) {
115+
return data[k]
116+
})
86117

87-
var headers = res.headers
88-
var updated = updates._updated || Date.parse(headers.date)
118+
cb(null, results)
119+
}
89120

90-
Object.keys(updates).forEach(function (p) { data[p] = updates[p] })
121+
function updateIndex_ (all, params, latest, filter, args, notArgs, cacheBase, cb) {
122+
cb = once(cb)
123+
log.silly('update-index', 'fetching', all)
124+
npm.registry.request(all, params, pulseTillDone('updateIndex', function (er, res) {
125+
if (er) return cb(er)
91126

92-
data._updated = updated
93-
getCacheStat(function (er, st) {
94-
if (er) return cb(er)
127+
var results = null
128+
var updated = null
129+
var wroteUpdate = false
130+
131+
var trackerStream = log.newStream('updateIndex')
132+
133+
var tmpName = path.join(cacheBase, latest + '-next.json')
134+
var writeStream = writeStreamAtomic(tmpName)
135+
res.setMaxListeners(20) // node 0.8 has a lower margin
136+
res.pipe(writeStream)
137+
res.pipe(trackerStream)
138+
writeStream.once('error', cb)
139+
writeStream.once('close', function () {
140+
wroteUpdate = true
141+
maybeFinishUpdateIndex()
142+
})
95143

96-
fs.writeFile(cachePath, JSON.stringify(data), function (er) {
97-
delete data._updated
98-
if (er) return cb(er)
99-
chownr(cachePath, st.uid, st.gid, function (er) {
100-
cb(er, data)
144+
res.pipe(collectResults(filter, args, notArgs, function (err, results_, updated_) {
145+
if (err) return cb(err)
146+
results = results_
147+
updated = updated_
148+
maybeFinishUpdateIndex()
149+
}))
150+
151+
function maybeFinishUpdateIndex () {
152+
if (results && wroteUpdate) {
153+
var finalName = path.join(cacheBase, latest + '-' + updated + '.json')
154+
log.silly('update-index', 'moving final cache file into place', finalName)
155+
fs.rename(tmpName, finalName, function (err) {
156+
if (err) return cb(err)
157+
cb(null, results)
101158
})
102-
})
103-
})
159+
}
160+
}
104161
}))
105162
}
163+
164+
function collectResults (filter, args, notArgs, cb) {
165+
cb = once(cb)
166+
167+
var results = {}
168+
var updated = null
169+
var stream = jsonstream.parse('*', function (pkg, key) {
170+
if (key[0] === '_updated') {
171+
updated = pkg
172+
return
173+
}
174+
if (key[0][0] !== '_') {
175+
if (filter(pkg, args, notArgs)) {
176+
log.verbose('search', 'matched ' + pkg.name)
177+
results[pkg.name] = pkg
178+
} else {
179+
log.silly('search', 'not matched ' + pkg.name)
180+
}
181+
} else {
182+
log.silly('search', 'skipping ' + key)
183+
}
184+
})
185+
186+
stream.once('error', cb)
187+
188+
stream.once('end', function () {
189+
cb(null, results, updated)
190+
})
191+
192+
return stream
193+
}

lib/search.js

+13-20
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,16 @@ var columnify = require('columnify')
66
var updateIndex = require('./cache/update-index.js')
77
var usage = require('./utils/usage')
88
var output = require('./utils/output.js')
9+
var log = require('npmlog')
910

1011
search.usage = usage(
1112
'search',
1213
'npm search [--long] [search terms ...]'
1314
)
1415

16+
search.usage = 'npm search [--long] [search terms ...]' +
17+
'\n\naliases: s, se'
18+
1519
search.completion = function (opts, cb) {
1620
var compl = {}
1721
var partial = opts.partialWord
@@ -66,35 +70,24 @@ function search (args, silent, staleness, cb) {
6670
return s.toLowerCase()
6771
})
6872

69-
getFilteredData(staleness, opts, searchexclude, function (er, data) {
73+
updateIndex(staleness, opts, searchexclude, filter, function (er, data) {
7074
// now data is the list of data that we want to show.
7175
// prettify and print it, and then provide the raw
7276
// data to the cb.
7377
if (er || silent) return cb(er, data)
78+
log.clearProgress()
7479
output(prettify(data, args))
7580
cb(null, data)
7681
})
7782
}
7883

79-
function getFilteredData (staleness, args, notArgs, cb) {
80-
updateIndex(staleness, function (er, data) {
81-
if (er) return cb(er)
82-
return cb(null, filter(data, args, notArgs))
83-
})
84-
}
85-
8684
function filter (data, args, notArgs) {
87-
// data={<name>:{package data}}
88-
return Object.keys(data).map(function (d) {
89-
return data[d]
90-
}).filter(function (d) {
91-
return typeof d === 'object'
92-
}).map(stripData).map(getWords).filter(function (data) {
93-
return filterWords(data, args, notArgs)
94-
}).reduce(function (l, r) {
95-
l[r.name] = r
96-
return l
97-
}, {})
85+
if (typeof data !== 'object') return false
86+
87+
data = getWords(stripData(data))
88+
if (filterWords(data, args, notArgs)) return true
89+
90+
return false
9891
}
9992

10093
function stripData (data) {
@@ -163,7 +156,7 @@ function prettify (data, args) {
163156
var lines = Object.keys(data).map(function (d) {
164157
// strip keyname
165158
return data[d]
166-
}).map(function (dat) {
159+
}).map(stripData).map(function (dat) {
167160
dat.author = dat.maintainers
168161
delete dat.maintainers
169162
dat.date = dat.time

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@
131131
"inherits",
132132
"ini",
133133
"init-package-json",
134+
"jsonstream",
134135
"lockfile",
135136
"lodash._baseindexof",
136137
"lodash._baseuniq",

test/tap/search.js

+18-8
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ var common = require('../common-tap.js')
1212
var pkg = path.resolve(__dirname, 'search')
1313
var cache = path.resolve(pkg, 'cache')
1414
var registryCache = path.resolve(cache, 'localhost_1337', '-', 'all')
15-
var cacheJsonFile = path.resolve(registryCache, '.cache.json')
15+
var searchDir = path.resolve(registryCache, '_search')
1616

1717
var timeMock = {
1818
epoch: 1411727900,
@@ -53,15 +53,18 @@ test('No previous cache, init cache triggered by first search', function (t) {
5353
'search', 'do not do extra search work on my behalf',
5454
'--registry', common.registry,
5555
'--cache', cache,
56-
'--loglevel', 'silent',
57-
'--color', 'always'
56+
'--loglevel', 'error',
57+
'--color', 'never'
5858
],
5959
EXEC_OPTS,
60-
function (err, code) {
60+
function (err, code, stdout, stderr) {
6161
s.close()
62+
t.equal(stderr, '', 'had no error output')
6263
t.equal(code, 0, 'search finished successfully')
6364
t.ifErr(err, 'search finished successfully')
6465

66+
var cacheJsonFile = path.resolve(searchDir, '0-' + timeMock.future + '.json')
67+
6568
t.ok(
6669
fs.existsSync(cacheJsonFile),
6770
cacheJsonFile + ' expected to have been created'
@@ -102,15 +105,18 @@ test('previous cache, _updated set, should trigger since request', function (t)
102105
'search', 'do not do extra search work on my behalf',
103106
'--registry', common.registry,
104107
'--cache', cache,
105-
'--loglevel', 'silly',
108+
'--loglevel', 'error',
106109
'--color', 'always'
107110
],
108111
EXEC_OPTS,
109-
function (err, code) {
112+
function (err, code, stdout, stderr) {
110113
s.close()
114+
t.equal(stderr, '', 'no error output')
111115
t.equal(code, 0, 'search finished successfully')
112116
t.ifErr(err, 'search finished successfully')
113117

118+
var cacheJsonFile = path.resolve(searchDir, '0-' + timeMock.epoch + '.json')
119+
114120
var cacheData = JSON.parse(fs.readFileSync(cacheJsonFile, 'utf8'))
115121
t.equal(
116122
cacheData._updated,
@@ -143,12 +149,14 @@ searches.forEach(function (search) {
143149
'search', search.term,
144150
'--registry', common.registry,
145151
'--cache', cache,
146-
'--loglevel', 'silent',
152+
'--loglevel', 'error',
147153
'--color', 'always'
148154
],
149155
EXEC_OPTS,
150-
function (err, code, stdout) {
156+
function (err, code, stdout, stderr) {
151157
s.close()
158+
t.equal(stderr, '', 'no error output')
159+
t.notEqual(stdout, '', 'got output')
152160
t.equal(code, 0, 'search finished successfully')
153161
t.ifErr(err, 'search finished successfully')
154162
// \033 == \u001B
@@ -183,6 +191,8 @@ function setupCache () {
183191
cleanup()
184192
mkdirp.sync(cache)
185193
mkdirp.sync(registryCache)
194+
mkdirp.sync(searchDir)
195+
var cacheJsonFile = path.resolve(searchDir, '0-' + timeMock.epoch + '.json')
186196
var res = fs.writeFileSync(cacheJsonFile, stringifyUpdated(timeMock.epoch))
187197
if (res) throw new Error('Creating cache file failed')
188198
}

0 commit comments

Comments
 (0)