Skip to content

Commit 1da8edc

Browse files
authored
Merge 0ec755b into eb73e4c
2 parents eb73e4c + 0ec755b commit 1da8edc

File tree

4 files changed

+125
-16
lines changed

4 files changed

+125
-16
lines changed

src/binding/transaction.cpp

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <chrono>
12
#include <sstream>
23
#include <thread>
34
#include "database.h"
@@ -140,7 +141,12 @@ napi_value Transaction::Abort(napi_env env, napi_callback_info info) {
140141
/**
141142
* State for the `Commit` async work.
142143
*/
143-
typedef BaseAsyncState<std::shared_ptr<TransactionHandle>> TransactionCommitState;
144+
struct TransactionCommitState final : BaseAsyncState<std::shared_ptr<TransactionHandle>> {
145+
TransactionCommitState(napi_env env, std::shared_ptr<TransactionHandle> txnHandle)
146+
: BaseAsyncState<std::shared_ptr<TransactionHandle>>(env, txnHandle), timestamp(0) {}
147+
148+
uint64_t timestamp;
149+
};
144150

145151
/**
146152
* Commits the transaction.
@@ -186,8 +192,14 @@ napi_value Transaction::Commit(napi_env env, napi_callback_info info) {
186192
if (!state->handle || !state->handle->dbHandle || !state->handle->dbHandle->opened() || state->handle->dbHandle->isCancelled()) {
187193
state->status = rocksdb::Status::Aborted("Database closed during transaction commit operation");
188194
} else {
195+
// set current timestamp before committing
196+
auto now = std::chrono::system_clock::now();
197+
auto timestamp = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
198+
state->handle->txn->SetCommitTimestamp(timestamp);
199+
189200
state->status = state->handle->txn->Commit();
190201
if (state->status.ok()) {
202+
state->timestamp = timestamp;
191203
DEBUG_LOG("Transaction::Commit emitted committed event\n")
192204
state->handle->state = TransactionState::Committed;
193205
state->handle->dbHandle->descriptor->notify(env, "committed", nullptr);
@@ -207,17 +219,20 @@ napi_value Transaction::Commit(napi_env env, napi_callback_info info) {
207219
if (state->status.ok()) {
208220
DEBUG_LOG("Transaction::Commit complete closing handle=%p\n", state->handle.get())
209221

210-
// BUG!
211222
if (state->handle) {
212223
state->handle->close();
213224
} else {
214225
DEBUG_LOG("Transaction::Commit complete, but handle is null!\n")
215226
}
216227

228+
napi_value result;
229+
double milliseconds = static_cast<double>(state->timestamp) / 1000.0;
230+
NAPI_STATUS_THROWS_VOID(::napi_create_double(env, milliseconds, &result))
231+
217232
DEBUG_LOG("Transaction::Commit complete calling resolve\n")
218233
napi_value resolve;
219234
NAPI_STATUS_THROWS_VOID(::napi_get_reference_value(env, state->resolveRef, &resolve))
220-
NAPI_STATUS_THROWS_VOID(::napi_call_function(env, global, resolve, 0, nullptr, nullptr))
235+
NAPI_STATUS_THROWS_VOID(::napi_call_function(env, global, resolve, 1, &result, nullptr))
221236
} else {
222237
napi_value reject;
223238
napi_value error;
@@ -257,6 +272,11 @@ napi_value Transaction::CommitSync(napi_env env, napi_callback_info info) {
257272
}
258273
(*txnHandle)->state = TransactionState::Committing;
259274

275+
// set current timestamp before committing
276+
auto now = std::chrono::system_clock::now();
277+
auto timestamp = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
278+
(*txnHandle)->txn->SetCommitTimestamp(timestamp);
279+
260280
rocksdb::Status status = (*txnHandle)->txn->Commit();
261281
if (status.ok()) {
262282
DEBUG_LOG("Transaction::CommitSync emitted committed event\n")
@@ -265,13 +285,18 @@ napi_value Transaction::CommitSync(napi_env env, napi_callback_info info) {
265285

266286
DEBUG_LOG("Transaction::CommitSync closing txnHandle=%p\n", (*txnHandle).get())
267287
(*txnHandle)->close();
268-
} else {
269-
napi_value error;
270-
ROCKSDB_CREATE_ERROR_LIKE_VOID(error, status, "Transaction commit failed")
271-
NAPI_STATUS_THROWS(::napi_throw(env, error))
288+
289+
// Return the timestamp as milliseconds (convert from microseconds)
290+
napi_value result;
291+
double milliseconds = static_cast<double>(timestamp) / 1000.0;
292+
NAPI_STATUS_THROWS(::napi_create_double(env, milliseconds, &result))
293+
return result;
272294
}
273295

274-
NAPI_RETURN_UNDEFINED()
296+
napi_value error;
297+
ROCKSDB_CREATE_ERROR_LIKE_VOID(error, status, "Transaction commit failed")
298+
NAPI_STATUS_THROWS(::napi_throw(env, error))
299+
return nullptr;
275300
}
276301

277302
/**

src/load-binding.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ export type NativeTransaction = {
1919
id: number;
2020
new(context: NativeDatabase, options?: TransactionOptions): NativeTransaction;
2121
abort(): void;
22-
commit(resolve: () => void, reject: (err: Error) => void): void;
23-
commitSync(): void;
22+
commit(resolve: (timestamp: number) => void, reject: (err: Error) => void): void;
23+
commitSync(): number;
2424
get(key: Key, resolve: (value: Buffer) => void, reject: (err: Error) => void): number;
2525
getCount(options?: RangeOptions): number;
2626
getSync(key: Key): Buffer;

src/transaction.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ export class Transaction extends DBI {
3030
/**
3131
* Commit the transaction.
3232
*/
33-
async commit(): Promise<void> {
33+
async commit(): Promise<number> {
3434
try {
35-
await new Promise<void>((resolve, reject) => {
35+
return await new Promise<number>((resolve, reject) => {
3636
this.notify('beforecommit');
3737
this.#txn.commit(resolve, reject);
3838
});
@@ -45,10 +45,10 @@ export class Transaction extends DBI {
4545
}
4646
}
4747

48-
commitSync(): void {
48+
commitSync(): number {
4949
try {
5050
this.notify('beforecommit');
51-
this.#txn.commitSync();
51+
return this.#txn.commitSync();
5252
} finally {
5353
this.notify('aftercommit', {
5454
next: null,

test/transactions.test.ts

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import { assert, describe, expect, it } from 'vitest';
22
import { rimraf } from 'rimraf';
3-
import { RocksDatabase } from '../src/index.js';
3+
import { RocksDatabase, Transaction } from '../src/index.js';
44
import { generateDBPath } from './lib/util.js';
5-
import type { Transaction } from '../src/transaction.js';
65
import { withResolvers } from '../src/util.js';
76

87
const testOptions = [
@@ -476,6 +475,21 @@ for (const { name, options, txnOptions } of testOptions) {
476475
await rimraf(dbPath);
477476
}
478477
});
478+
479+
it(`${name} async should return the callback's value`, async () => {
480+
let db: RocksDatabase | null = null;
481+
const dbPath = generateDBPath();
482+
483+
try {
484+
db = RocksDatabase.open(dbPath, options);
485+
await expect(db.transaction(async (_txn: Transaction) => {
486+
return 'foo'
487+
}, txnOptions)).resolves.toBe('foo');
488+
} finally {
489+
db?.close();
490+
await rimraf(dbPath);
491+
}
492+
});
479493
});
480494

481495
describe(`transactionSync() (${name})`, () => {
@@ -804,6 +818,36 @@ for (const { name, options, txnOptions } of testOptions) {
804818
await rimraf(dbPath);
805819
}
806820
});
821+
822+
it(`${name} sync should return the callback's value`, async () => {
823+
let db: RocksDatabase | null = null;
824+
const dbPath = generateDBPath();
825+
826+
try {
827+
db = RocksDatabase.open(dbPath, options);
828+
expect(db.transactionSync((_txn: Transaction) => {
829+
return 'foo'
830+
}, txnOptions)).toBe('foo');
831+
} finally {
832+
db?.close();
833+
await rimraf(dbPath);
834+
}
835+
});
836+
837+
it(`${name} sync should return the callback's promise`, async () => {
838+
let db: RocksDatabase | null = null;
839+
const dbPath = generateDBPath();
840+
841+
try {
842+
db = RocksDatabase.open(dbPath, options);
843+
await expect(db.transactionSync(async (_txn: Transaction) => {
844+
return 'foo'
845+
}, txnOptions)).resolves.toBe('foo');
846+
} finally {
847+
db?.close();
848+
await rimraf(dbPath);
849+
}
850+
});
807851
});
808852

809853
describe(`Error handling (${name})`, () => {
@@ -834,3 +878,43 @@ for (const { name, options, txnOptions } of testOptions) {
834878
});
835879
});
836880
}
881+
882+
describe('Transaction', () => {
883+
it('should return the commit timestamp sync', async () => {
884+
let db: RocksDatabase | null = null;
885+
const dbPath = generateDBPath();
886+
887+
try {
888+
db = RocksDatabase.open(dbPath);
889+
const start = Date.now();
890+
const txn = new Transaction(db.store);
891+
txn.putSync('foo', 'bar');
892+
const ts = txn.commitSync();
893+
expect(ts).toBeGreaterThan(start);
894+
expect(ts).toBeLessThanOrEqual(Date.now() + 1);
895+
expect(db.getSync('foo')).toBe('bar');
896+
} finally {
897+
db?.close();
898+
await rimraf(dbPath);
899+
}
900+
});
901+
902+
it('should resolve the commit timestamp async', async () => {
903+
let db: RocksDatabase | null = null;
904+
const dbPath = generateDBPath();
905+
906+
try {
907+
db = RocksDatabase.open(dbPath);
908+
const start = Date.now();
909+
const txn = new Transaction(db.store);
910+
txn.putSync('foo', 'bar');
911+
const ts = await txn.commit();
912+
expect(ts).toBeGreaterThan(start);
913+
expect(ts).toBeLessThanOrEqual(Date.now() + 1);
914+
expect(db.getSync('foo')).toBe('bar');
915+
} finally {
916+
db?.close();
917+
await rimraf(dbPath);
918+
}
919+
});
920+
});

0 commit comments

Comments
 (0)