Skip to content
This repository has been archived by the owner on Oct 14, 2022. It is now read-only.

Commit

Permalink
Migrate from leveldb to sqlite (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
pfrazee committed Nov 18, 2018
1 parent 850ff6c commit 515a1dc
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 271 deletions.
10 changes: 2 additions & 8 deletions lib/apis/archives.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,7 @@ module.exports = class ArchivesAPI {
// this.archivesDB.addHostingUser(key, userRecord.id)
// ])
try {
await this.archivesDB.addHostingUser(key, userRecord.id, {
name,
ownerName: userRecord.username
})
await this.archivesDB.addHostingUser(key, userRecord.id)
} catch (e) {
if (e.alreadyHosted) {
return res.status(422).json({
Expand Down Expand Up @@ -304,10 +301,7 @@ module.exports = class ArchivesAPI {

// update the records
archiveRecord.name = name
await this.archivesDB.addHostingUser(archiveRecord.key, userRecord.id, {
name,
ownerName: userRecord.username
})
await this.archivesDB.addHostingUser(archiveRecord.key, userRecord.id)
await this.usersDB.put(userRecord)
} finally {
release[0]()
Expand Down
3 changes: 2 additions & 1 deletion lib/archiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ module.exports = class Archiver extends EventEmitter {
console.log(figures.pointerSmall, 'START Delete dead archives')
var start = Date.now()
var deadArchiveKeys = await this.cloud.archivesDB.listDeadArchiveKeys()
await Promise.all(deadArchiveKeys.map(async (archiveKey) => {
await Promise.all(deadArchiveKeys.map(async (archiveRecord) => {
var archiveKey = archiveRecord.key
// make sure the archive is closed
this.closeArchive(archiveKey)
// delete files
Expand Down
120 changes: 30 additions & 90 deletions lib/dbs/activity.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
var assert = require('assert')
var sublevel = require('subleveldown')
var collect = require('stream-collector')
var {monotonicTimestamp} = require('../helpers')
var through2 = require('through2')
var SQL = require('sql-template-strings')

// constants
// =

// used in the users-index-db
const SEPARATOR = '!'
const USERKEY = (key, username) => `${username}${SEPARATOR}${key}`

// valid actions
const ACTIONS = [
'add-archive',
Expand All @@ -23,114 +16,61 @@ const ACTIONS = [

class ActivityDB {
constructor (cloud) {
// create levels
this.globalActivityDB = sublevel(cloud.db, 'global-activity', { valueEncoding: 'json' })
this.metaDB = sublevel(cloud.db, 'global-activity-meta', { valueEncoding: 'json' })
this.usersIndexDB = sublevel(cloud.db, 'global-activity-users-index')

// initialize counter
this.createId = null
this.setupPromise = this.metaDB.get('id-counter')
.catch(_ => 1)
.then(v => {
this.createId = monotonicTimestamp(+v)
})
this.sqlite = cloud.db
}

// basic ops
// =

async writeGlobalEvent (record, opts = {}) {
await this.setupPromise
assert(record && typeof record === 'object')
assert(typeof record.userid === 'string', 'Valid userid type')
assert(typeof record.username === 'string', 'Valid username type')
assert(ACTIONS.includes(record.action), 'Valid action type')
if (!opts.doNotModify) {
record = Object.assign({}, ActivityDB.defaults(), record)
record.ts = Date.now()
}
var key = this.createId()
await Promise.all([
this.metaDB.put('id-counter', key),
this.globalActivityDB.put(key, record),
this.usersIndexDB.put(USERKEY(key, record.username), null)
])
var {ts, userid, username, action, params} = record
params = JSON.stringify(params)
await this.sqlite.run(SQL`
INSERT INTO activity
(ts, userid, username, action, params)
VALUES
(${ts}, ${userid}, ${username}, ${action}, ${params})
`)
return record
}

async delGlobalEvent (key) {
await this.setupPromise
assert(typeof key === 'string')
var record = await this.globalActivityDB.get(key)
await Promise.all([
this.globalActivityDB.del(key),
this.usersIndexDB.del(USERKEY(key, record.username))
])
await this.sqlite.run(SQL`DELETE FROM activity WHERE key = ${key}`)
}

// getters
// =

listGlobalEvents (opts) {
return new Promise((resolve, reject) => {
collect(this.globalActivityDB.createReadStream(opts), (err, res) => {
if (err) reject(err)
else resolve(res.map(toNiceObj))
})
})
listGlobalEvents ({limit, lt, gt, lte, gte, reverse} = {}) {
var query = SQL`SELECT * FROM activity`
if (lt) query.append(SQL` WHERE key < ${lt}`)
if (lte) query.append(SQL` WHERE key <= ${lte}`)
if (gt) query.append(SQL` WHERE key > ${gt}`)
if (gte) query.append(SQL` WHERE key >= ${gte}`)
if (!reverse) query.append(SQL` ORDER BY key`)
else query.append(SQL` ORDER BY key DESC`)
if (limit) query.append(SQL` LIMIT ${limit}`)
return this.sqlite.all(query)
}

listUserEvents (username, opts = {}) {
return new Promise((resolve, reject) => {
// update the start/end
if (opts.lt) opts.lt = USERKEY(opts.lt, username)
if (opts.gt) opts.gt = USERKEY(opts.gt, username)
if (opts.lte) opts.lte = USERKEY(opts.lte, username)
if (opts.gte) opts.gte = USERKEY(opts.gte, username)

// set range edges
if (!opts.lt && !opts.lte) {
opts.lte = USERKEY('\xff', username)
}
if (!opts.gt && !opts.gte) {
opts.gt = USERKEY('', username)
}

// fetch the index range
var self = this
var stream = this.usersIndexDB.createReadStream(opts)
.pipe(through2.obj(function (entry, enc, cb) {
// load the record
var key = entry.key.split(SEPARATOR)[1]
self.globalActivityDB.get(key, (_, value) => {
if (value) {
value.key = key
this.push(value)
}
cb()
})
}))
collect(stream, (err, res) => {
if (err) reject(err)
else resolve(res) // no need to use toNiceObj
})
})
var query = SQL`SELECT * FROM activity WHERE username = ${username}`
if (lt) query.append(SQL` AND key < ${lt}`)
if (lte) query.append(SQL` AND key <= ${lte}`)
if (gt) query.append(SQL` AND key > ${gt}`)
if (gte) query.append(SQL` AND key >= ${gte}`)
if (!reverse) query.append(SQL` ORDER BY key`)
else query.append(SQL` ORDER BY key DESC`)
if (limit) query.append(SQL` LIMIT ${limit}`)
return this.sqlite.all(query)
}
}
module.exports = ActivityDB

// default user-record values
ActivityDB.defaults = () => ({
ts: null,
userid: null,
username: null,
action: null,
params: {}
})

// helper to convert {key:, value:} to just {values...}
function toNiceObj (obj) {
obj.value.key = obj.key
return obj.value
}
Loading

0 comments on commit 515a1dc

Please sign in to comment.