Skip to content

Commit

Permalink
Add opt-in multithreading (#85)
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewkeil authored Nov 26, 2023
1 parent 3b1a6f2 commit 7d497a5
Show file tree
Hide file tree
Showing 6 changed files with 406 additions and 10 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ The optional `options` object may contain:
- `createIfMissing` (boolean, default: `true`): If `true`, create an empty database if one doesn't already exist. If `false` and the database doesn't exist, opening will fail.
- `errorIfExists` (boolean, default: `false`): If `true` and the database already exists, opening will fail.
- `passive` (boolean, default: `false`): Wait for, but do not initiate, opening of the database.
- `multithreading` (boolean, default: `false`): Allow multiple threads to access the database. This is only relevant when using [worker threads](https://nodejs.org/api/worker_threads.html)

For advanced performance tuning, the `options` object may also contain the following. Modify these options only if you can prove actual benefit for your particular application.

Expand Down
103 changes: 93 additions & 10 deletions binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,36 @@

#include <map>
#include <vector>
#include <mutex>

/**
* Forward declarations.
*/
struct Database;
struct Iterator;
static void iterator_close_do (napi_env env, Iterator* iterator, napi_value cb);
static leveldb::Status threadsafe_open(const leveldb::Options &options,
bool multithreading,
Database &db_instance);
static leveldb::Status threadsafe_close(Database &db_instance);

/**
* Macros.
* Global declarations for multi-threaded access. These are not context-aware
* by definition and is specifically to allow for cross thread access to the
* single database handle.
*/
struct LevelDbHandle
{
leveldb::DB *db;
size_t open_handle_count;
};
static std::mutex handles_mutex;
// only access this when protected by the handles_mutex!
static std::map<std::string, LevelDbHandle> db_handles;

/**
* Macros.
*/
#define NAPI_DB_CONTEXT() \
Database* database = NULL; \
NAPI_STATUS_THROWS(napi_get_value_external(env, argv[0], (void**)&database));
Expand Down Expand Up @@ -495,19 +513,21 @@ struct Database {

~Database () {
if (db_ != NULL) {
delete db_;
db_ = NULL;
threadsafe_close(*this);
}
}

leveldb::Status Open (const leveldb::Options& options,
const char* location) {
return leveldb::DB::Open(options, location, &db_);
const std::string &location,
bool multithreading) {
location_ = location;
return threadsafe_open(options, multithreading, *this);
}

void CloseDatabase () {
delete db_;
db_ = NULL;
if (db_ != NULL) {
threadsafe_close(*this);
}
if (blockCache_) {
delete blockCache_;
blockCache_ = NULL;
Expand Down Expand Up @@ -600,8 +620,66 @@ struct Database {

private:
uint32_t priorityWork_;
std::string location_;

// for separation of concerns the threadsafe functionality was kept at the global
// level and made a friend so it is explict where the threadsafe boundary exists
friend leveldb::Status threadsafe_open(const leveldb::Options &options,
bool multithreading,
Database &db_instance);
friend leveldb::Status threadsafe_close(Database &db_instance);
};


leveldb::Status threadsafe_open(const leveldb::Options &options,
bool multithreading,
Database &db_instance) {
// Bypass lock and handles if multithreading is disabled
if (!multithreading) {
return leveldb::DB::Open(options, db_instance.location_, &db_instance.db_);
}

std::unique_lock<std::mutex> lock(handles_mutex);

auto it = db_handles.find(db_instance.location_);
if (it == db_handles.end()) {
// Database not opened yet for this location, unless it was with
// multithreading disabled, in which case we're expected to fail here.
LevelDbHandle handle = {nullptr, 0};
leveldb::Status status = leveldb::DB::Open(options, db_instance.location_, &handle.db);

if (status.ok()) {
handle.open_handle_count++;
db_instance.db_ = handle.db;
db_handles[db_instance.location_] = handle;
}

return status;
}

++(it->second.open_handle_count);
db_instance.db_ = it->second.db;

return leveldb::Status::OK();
}

leveldb::Status threadsafe_close(Database &db_instance) {
std::unique_lock<std::mutex> lock(handles_mutex);

auto it = db_handles.find(db_instance.location_);
if (it == db_handles.end()) {
// Was not opened with multithreading enabled
delete db_instance.db_;
} else if (--(it->second.open_handle_count) == 0) {
delete it->second.db;
db_handles.erase(it);
}

// ensure db_ pointer is nullified in Database instance
db_instance.db_ = NULL;
return leveldb::Status::OK();
}

/**
* Base worker class for doing async work that defers closing the database.
*/
Expand Down Expand Up @@ -974,13 +1052,15 @@ struct OpenWorker final : public BaseWorker {
const bool createIfMissing,
const bool errorIfExists,
const bool compression,
const bool multithreading,
const uint32_t writeBufferSize,
const uint32_t blockSize,
const uint32_t maxOpenFiles,
const uint32_t blockRestartInterval,
const uint32_t maxFileSize)
: BaseWorker(env, database, callback, "classic_level.db.open"),
location_(location) {
location_(location),
multithreading_(multithreading) {
options_.block_cache = database->blockCache_;
options_.filter_policy = database->filterPolicy_;
options_.create_if_missing = createIfMissing;
Expand All @@ -998,11 +1078,12 @@ struct OpenWorker final : public BaseWorker {
~OpenWorker () {}

void DoExecute () override {
SetStatus(database_->Open(options_, location_.c_str()));
SetStatus(database_->Open(options_, location_, multithreading_));
}

leveldb::Options options_;
std::string location_;
bool multithreading_;
};

/**
Expand All @@ -1017,6 +1098,7 @@ NAPI_METHOD(db_open) {
const bool createIfMissing = BooleanProperty(env, options, "createIfMissing", true);
const bool errorIfExists = BooleanProperty(env, options, "errorIfExists", false);
const bool compression = BooleanProperty(env, options, "compression", true);
const bool multithreading = BooleanProperty(env, options, "multithreading", false);

const uint32_t cacheSize = Uint32Property(env, options, "cacheSize", 8 << 20);
const uint32_t writeBufferSize = Uint32Property(env, options , "writeBufferSize" , 4 << 20);
Expand All @@ -1031,7 +1113,8 @@ NAPI_METHOD(db_open) {
napi_value callback = argv[3];
OpenWorker* worker = new OpenWorker(env, database, callback, location,
createIfMissing, errorIfExists,
compression, writeBufferSize, blockSize,
compression, multithreading,
writeBufferSize, blockSize,
maxOpenFiles, blockRestartInterval,
maxFileSize);
worker->Queue(env);
Expand Down
8 changes: 8 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ export interface OpenOptions extends AbstractOpenOptions {
* @defaultValue `2 * 1024 * 1024`
*/
maxFileSize?: number | undefined

/**
* Allows multi-threaded access to a single DB instance for sharing a DB
* across multiple worker threads within the same process.
*
* @defaultValue `false`
*/
multithreading?: boolean | undefined
}

/**
Expand Down
185 changes: 185 additions & 0 deletions test/multithreading-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
'use strict'

const test = require('tape')
const tempy = require('tempy')
const path = require('path')
const { Worker } = require('worker_threads')
const { ClassicLevel } = require('..')
const {
MIN_KEY,
MID_KEY,
MAX_KEY,
CLOSED_DB_MESSAGE,
WORKER_CREATING_KEYS_MESSAGE,
WORKER_READY_TO_READ_MESSAGE,
WORKER_ERROR_MESSAGE,
START_READING_MESSAGE,
createRandomKeys,
getRandomKeys
} = require('./worker-utils')

/**
* Makes sure that the multithreading flag is working as expected
*/
test('check multithreading flag works as expected', async function (t) {
t.plan(9)
const location = tempy.directory()
const db1 = new ClassicLevel(location)
const db2 = new ClassicLevel(location)

// check that must set multithreading flag on all instances
await db1.open()
t.is(db1.location, location)
try {
await db2.open({ multithreading: true })
} catch (err) {
t.is(err.code, 'LEVEL_DATABASE_NOT_OPEN', 'second instance failed to open')
t.is(err.cause.code, 'LEVEL_LOCKED', 'second instance got lock error')
}
await db1.close()

await db1.open({ multithreading: true })
t.is(db1.location, location)
await db2.open({ multithreading: true })
t.is(db2.location, location)
// test that passing to the constructor works
const db3 = new ClassicLevel(location, { multithreading: true })
await db3.open()
t.is(db3.location, location)
const db4 = new ClassicLevel(location)
try {
await db4.open({ location, multithreading: false })
} catch (err) {
t.is(err.code, 'LEVEL_DATABASE_NOT_OPEN', 'fourth instance failed to open')
t.is(err.cause.code, 'LEVEL_LOCKED', 'second instance got lock error')
}
await db1.close()
await db2.close()
await db3.close()

const db5 = new ClassicLevel(location)
await db5.open({ location, multithreading: false })
t.is(db5.location, location)
await db5.close()
})

/**
* Tests for interleaved opening and closing of the database to check
* that the mutex for guarding the handles is working as expected. Creates
* many workers that only open and then close the db after a random delay. Goal
* is to interleave the open and close processes to ensure that the mutex is
* guarding the handles correctly. After all workers have completed the main
* thread closes the db and then opens it again as a non-multi-threaded instance
* to make sure the handle was deleted correctly.
*/
test('open/close mutex works as expected', async function (t) {
t.plan(3)
const location = tempy.directory()
const db1 = new ClassicLevel(location)
await db1.open({ multithreading: true })
t.is(db1.location, location)

const activeWorkers = []

for (let i = 0; i < 100; i++) {
const worker = new Worker(path.join(__dirname, 'worker.js'), {
workerData: { location, workerStartup: true }
})

activeWorkers.push(
new Promise((resolve, reject) => {
worker.once('message', ({ message, error }) => {
if (message === WORKER_ERROR_MESSAGE) {
return reject(error)
}
if (message === CLOSED_DB_MESSAGE) {
return resolve()
}
return reject(new Error('unexpected error\n>>> ' + error))
})
})
)
}

const results = await Promise.allSettled(activeWorkers)
const rejected = results.filter((res) => res.status === 'rejected')
t.is(rejected.length, 0)
await db1.close()

// reopen the db non-multithreaded to check that the handle record was fully
// deleted from the handle map
await db1.open({ multithreading: false })
t.is(db1.location, location)
await db1.close()
})

/**
* Tests for reading and writing to a single db from multiple threads.
*
* Starts by setting up worker and then worker reports its ready and immediately
* starts writing to the database. Main thread gets message and also writes to
* the same db but to a different key space. Goal is to concurrently write
* consecutively numbered records. Once records are all written the worker
* reports to the main thread and the main thread waits until both threads are
* complete with the writing process. When both are ready they concurrently read
* random records from the full key space for a set interval.
*/
test('allow multi-threading by same process', async function (t) {
try {
const location = tempy.directory()
const db = new ClassicLevel(location, { multithreading: true })
await db.open()

const worker = new Worker(path.join(__dirname, 'worker.js'), {
workerData: { location, readWrite: true }
})

function cleanup (err) {
worker.removeAllListeners('message')
worker.removeAllListeners('error')
worker.terminate()
if (err) {
throw err
}
}

worker.on('error', cleanup)
worker.on('message', ({ message, error }) => {
if (message === WORKER_ERROR_MESSAGE) {
cleanup(new Error(error))
}
})

// Concurrently write keys to the db on both thread and wait
// until ready before attempting to concurrently read keys
const workerReady = new Promise((resolve) => {
let mainThreadReady = false
worker.on('message', ({ message }) => {
if (message === WORKER_CREATING_KEYS_MESSAGE) {
createRandomKeys(db, MID_KEY, MAX_KEY).then(() => {
mainThreadReady = true
})
} else if (message === WORKER_READY_TO_READ_MESSAGE) {
const interval = setInterval(() => {
if (mainThreadReady) {
clearInterval(interval)
resolve()
}
}, 100)
}
})
})

await workerReady

// once db is seeded start reading keys from both threads
worker.postMessage({ message: START_READING_MESSAGE })
await getRandomKeys(db, MIN_KEY, MAX_KEY)
await db.close()

t.end()
} catch (error) {
t.fail(error.message)
t.end()
}
})
Loading

0 comments on commit 7d497a5

Please sign in to comment.