8
8
* is useless, but subclasses can easily layer in compression and/or encryption codecs.
9
9
*
10
10
* Uses thread pools so that (i) page encoding and writeback occur on background threads, and (ii)
11
- * page reads and decoding can be " prefetched" in the background during detected sequential scans.
11
+ * pages can be prefetched and decoded on background threads during detected sequential scans.
12
12
* These schemes are each a bit complex but at least they're separate: when asked to read a page,
13
13
* we first wait for any outstanding writes to finish, and vice-versa.
14
14
*/
@@ -120,7 +120,7 @@ class InnerDatabaseFile : public SQLiteVFS::File {
120
120
121
121
int FileSize (sqlite3_int64 *pSize) override {
122
122
try {
123
- FinishUpserts ();
123
+ UpsertBarrier ();
124
124
*pSize = DetectPageSize () * DetectPageCount ();
125
125
return SQLITE_OK;
126
126
} catch (std::exception &exn) {
@@ -144,7 +144,7 @@ class InnerDatabaseFile : public SQLiteVFS::File {
144
144
// 5. Little thread synchronization overhead is affordable, as Zstandard decompression of a
145
145
// SQLite page (4-64 KiB) takes only 1-10 microseconds -- about 10x time to memcpy.
146
146
// Subclasses may in turn subclass this to add their own logic to SeekCursor and DecodePage.
147
- struct PageFetchJob {
147
+ struct FetchJob {
148
148
enum State { NEW, QUEUE, WIP, DONE };
149
149
// NEW = idle, QUEUE = ready & waiting, WIP = work in progress, DONE = complete or error
150
150
// QUEUE=>WIP is the only contentious transition; background threads and the main thread
@@ -176,15 +176,15 @@ class InnerDatabaseFile : public SQLiteVFS::File {
176
176
std::chrono::nanoseconds t_seek = std::chrono::nanoseconds::zero(),
177
177
t_decode = std::chrono::nanoseconds::zero();
178
178
179
- PageFetchJob (InnerDatabaseFile &that)
179
+ FetchJob (InnerDatabaseFile &that)
180
180
: cursor(*(that.outer_db_), "SELECT pageno, data, meta1, meta2 FROM " +
181
181
that.inner_db_pages_table_ +
182
182
" WHERE pageno >= ? ORDER BY pageno"),
183
183
page_size(that.page_size_) {
184
184
PutState (State::NEW);
185
185
}
186
186
187
- virtual ~PageFetchJob () {}
187
+ virtual ~FetchJob () {}
188
188
189
189
inline void *EffectiveDest () noexcept {
190
190
return dest ? dest : (decodebuf.resize (page_size), decodebuf.data ());
@@ -286,12 +286,12 @@ class InnerDatabaseFile : public SQLiteVFS::File {
286
286
};
287
287
288
288
// Override me!
289
- virtual std::unique_ptr<PageFetchJob> NewPageFetchJob () {
290
- return std::unique_ptr<PageFetchJob >(new PageFetchJob (*this ));
289
+ virtual std::unique_ptr<FetchJob> NewFetchJob () {
290
+ return std::unique_ptr<FetchJob >(new FetchJob (*this ));
291
291
}
292
292
293
293
const size_t MAX_FETCH_CURSORS = 4 ;
294
- std::vector<std::unique_ptr<PageFetchJob >> fetch_jobs_;
294
+ std::vector<std::unique_ptr<FetchJob >> fetch_jobs_;
295
295
ThreadPool fetch_thread_pool_;
296
296
std::mutex seek_lock_; // serializes outer db interactions among fetch background threads
297
297
std::atomic<bool > seek_interrupt_; // broadcast that main thread wants seek_lock_
@@ -300,15 +300,15 @@ class InnerDatabaseFile : public SQLiteVFS::File {
300
300
sqlite3_int64 longest_read_ = 0 ;
301
301
302
302
void *BackgroundFetchJob (void *ctx) noexcept {
303
- PageFetchJob *job = (PageFetchJob *)ctx;
303
+ FetchJob *job = (FetchJob *)ctx;
304
304
std::unique_lock<std::mutex> seek_lock (seek_lock_);
305
305
while (seek_interrupt_.load (std::memory_order_relaxed)) {
306
306
// yield to main thread
307
307
seek_lock.unlock ();
308
308
std::this_thread::yield ();
309
309
seek_lock.lock ();
310
310
}
311
- if (!job->TransitionState (PageFetchJob ::State::QUEUE, PageFetchJob ::State::WIP)) {
311
+ if (!job->TransitionState (FetchJob ::State::QUEUE, FetchJob ::State::WIP)) {
312
312
return nullptr ; // they took our job!!!!
313
313
}
314
314
job->Execute (&seek_lock);
@@ -324,16 +324,15 @@ class InnerDatabaseFile : public SQLiteVFS::File {
324
324
bool can_prefetch = fetch_thread_pool_.MaxThreads () > 1 ;
325
325
326
326
// Is there already a background job to prefetch the desired page?
327
- PageFetchJob *job = nullptr ;
327
+ FetchJob *job = nullptr ;
328
328
bool foreground = false ;
329
329
if (can_prefetch) {
330
330
for (auto job_i = fetch_jobs_.begin (); job_i != fetch_jobs_.end (); job_i++) {
331
331
if ((*job_i)->pageno == pageno) {
332
- assert ((*job_i)->GetState () > PageFetchJob ::State::NEW);
332
+ assert ((*job_i)->GetState () > FetchJob ::State::NEW);
333
333
job = job_i->get ();
334
334
// If yes & it hasn't yet started, race to run it here in the foreground
335
- foreground =
336
- job->TransitionState (PageFetchJob::State::QUEUE, PageFetchJob::State::WIP);
335
+ foreground = job->TransitionState (FetchJob::State::QUEUE, FetchJob::State::WIP);
337
336
break ;
338
337
}
339
338
}
@@ -343,7 +342,7 @@ class InnerDatabaseFile : public SQLiteVFS::File {
343
342
if (!job) {
344
343
for (auto job_i = fetch_jobs_.begin (); job_i != fetch_jobs_.end (); job_i++) {
345
344
auto st = (*job_i)->GetState ();
346
- if (st == PageFetchJob ::State::NEW || st == PageFetchJob ::State::DONE) {
345
+ if (st == FetchJob ::State::NEW || st == FetchJob ::State::DONE) {
347
346
if ((*job_i)->cursor_pageno + 1 == pageno) {
348
347
job = job_i->get ();
349
348
break ;
@@ -357,17 +356,17 @@ class InnerDatabaseFile : public SQLiteVFS::File {
357
356
if (!job || (fetch_jobs_.size () < MAX_FETCH_CURSORS &&
358
357
job->cursor_pageno + 1 != pageno && job->cursor_pageno )) {
359
358
assert (fetch_jobs_.size () < MAX_FETCH_CURSORS);
360
- fetch_jobs_.push_back (NewPageFetchJob ());
359
+ fetch_jobs_.push_back (NewFetchJob ());
361
360
job = fetch_jobs_.back ().get ();
362
361
}
363
362
#ifndef NDEBUG
364
- if (job->GetState () == PageFetchJob ::State::DONE && job->cursor_pageno ) {
363
+ if (job->GetState () == FetchJob ::State::DONE && job->cursor_pageno ) {
365
364
++prefetch_wasted_;
366
365
}
367
366
#endif
368
367
job->Renew ();
369
368
job->pageno = pageno;
370
- job->PutState (PageFetchJob ::State::WIP);
369
+ job->PutState (FetchJob ::State::WIP);
371
370
foreground = true ;
372
371
}
373
372
@@ -386,10 +385,10 @@ class InnerDatabaseFile : public SQLiteVFS::File {
386
385
} else {
387
386
job->Execute ();
388
387
}
389
- assert (job->GetState () == PageFetchJob ::State::DONE);
388
+ assert (job->GetState () == FetchJob ::State::DONE);
390
389
} else {
391
390
// Semi-busy-wait for background job to finish
392
- while (job->GetState () < PageFetchJob ::State::DONE) {
391
+ while (job->GetState () < FetchJob ::State::DONE) {
393
392
std::this_thread::yield ();
394
393
}
395
394
#ifndef NDEBUG
@@ -400,6 +399,7 @@ class InnerDatabaseFile : public SQLiteVFS::File {
400
399
if (!job->errmsg .empty ()) {
401
400
_DBG << job->errmsg << _EOL;
402
401
job->Renew ();
402
+ PrefetchBarrier ();
403
403
throw SQLite::Exception (job->errmsg , SQLITE_IOERR_READ);
404
404
}
405
405
@@ -422,15 +422,15 @@ class InnerDatabaseFile : public SQLiteVFS::File {
422
422
for (auto job_i = fetch_jobs_.begin (); job_i != fetch_jobs_.end (); job_i++) {
423
423
if (job_i->get () != job) {
424
424
auto st = (*job_i)->GetState ();
425
- if (st == PageFetchJob ::State::QUEUE || st == PageFetchJob ::State::WIP) {
425
+ if (st == FetchJob ::State::QUEUE || st == FetchJob ::State::WIP) {
426
426
++active_jobs;
427
427
}
428
428
}
429
429
}
430
430
// always leave at least one slot free for a non-sequential read to use
431
431
if (active_jobs + 2 <= fetch_thread_pool_.MaxThreads ()) {
432
432
job->pageno = pageno_hint;
433
- job->PutState (PageFetchJob ::State::QUEUE);
433
+ job->PutState (FetchJob ::State::QUEUE);
434
434
fetch_thread_pool_.Enqueue (
435
435
job, [this ](void *job) { return this ->BackgroundFetchJob (job); }, nullptr );
436
436
} else {
@@ -467,7 +467,7 @@ class InnerDatabaseFile : public SQLiteVFS::File {
467
467
if (iAmt == 0 ) {
468
468
return SQLITE_OK;
469
469
}
470
- FinishUpserts ();
470
+ UpsertBarrier ();
471
471
if (!DetectPageSize ()) { // file is empty
472
472
memset (zBuf, 0 , iAmt);
473
473
return iAmt > 0 ? SQLITE_IOERR_SHORT_READ : SQLITE_OK;
@@ -532,7 +532,7 @@ class InnerDatabaseFile : public SQLiteVFS::File {
532
532
seek_interrupt_.store (false , std::memory_order_relaxed);
533
533
}
534
534
for (auto &job : fetch_jobs_) {
535
- if (job->TransitionState (PageFetchJob ::State::QUEUE, PageFetchJob ::State::NEW)) {
535
+ if (job->TransitionState (FetchJob ::State::QUEUE, FetchJob ::State::NEW)) {
536
536
job->Renew ();
537
537
}
538
538
}
@@ -542,8 +542,8 @@ class InnerDatabaseFile : public SQLiteVFS::File {
542
542
}
543
543
// wipe all prefetch cursors
544
544
for (auto &job : fetch_jobs_) {
545
- assert (job->GetState () == PageFetchJob ::State::NEW ||
546
- job->GetState () == PageFetchJob ::State::DONE);
545
+ assert (job->GetState () == FetchJob ::State::NEW ||
546
+ job->GetState () == FetchJob ::State::DONE);
547
547
job->Renew ();
548
548
job->ResetCursor ();
549
549
}
@@ -614,7 +614,7 @@ class InnerDatabaseFile : public SQLiteVFS::File {
614
614
std::vector<std::unique_ptr<EncodeJob>> encode_job_pool_;
615
615
std::mutex encode_job_pool_mutex_;
616
616
617
- ThreadPool thread_pool_ ;
617
+ ThreadPool upsert_thread_pool_ ;
618
618
std::string upsert_errmsg_;
619
619
620
620
// enqueue page encoding+upsert on thread pool
@@ -637,7 +637,7 @@ class InnerDatabaseFile : public SQLiteVFS::File {
637
637
assert (job->insert ? pageno == page_count_ + 1 : pageno <= page_count_);
638
638
page_count_ += job->insert ? 1 : 0 ;
639
639
// use ThreadPool to run encoding jobs in parallel and upsert jobs serially
640
- thread_pool_ .Enqueue (
640
+ upsert_thread_pool_ .Enqueue (
641
641
job.release (),
642
642
[](void *job) noexcept {
643
643
((EncodeJob *)job)->Execute ();
@@ -651,8 +651,8 @@ class InnerDatabaseFile : public SQLiteVFS::File {
651
651
throw ;
652
652
}
653
653
654
- if (thread_pool_ .MaxThreads () == 1 ) {
655
- FinishUpserts ();
654
+ if (upsert_thread_pool_ .MaxThreads () == 1 ) {
655
+ UpsertBarrier ();
656
656
}
657
657
}
658
658
@@ -695,8 +695,8 @@ class InnerDatabaseFile : public SQLiteVFS::File {
695
695
}
696
696
697
697
// wait for background upserts to complete + raise any error message
698
- void FinishUpserts (bool ignore_error = false ) {
699
- thread_pool_ .Barrier ();
698
+ void UpsertBarrier (bool ignore_error = false ) {
699
+ upsert_thread_pool_ .Barrier ();
700
700
if (!ignore_error && !upsert_errmsg_.empty ()) {
701
701
throw SQLite::Exception (upsert_errmsg_, SQLITE_IOERR_WRITE);
702
702
}
@@ -747,7 +747,7 @@ class InnerDatabaseFile : public SQLiteVFS::File {
747
747
return SQLITE_OK;
748
748
} catch (std::exception &exn) {
749
749
_DBG << exn.what () << _EOL;
750
- FinishUpserts (true );
750
+ UpsertBarrier (true );
751
751
page_count_ = 0 ; // to be redetected
752
752
return SQLITE_IOERR_WRITE;
753
753
}
@@ -757,7 +757,7 @@ class InnerDatabaseFile : public SQLiteVFS::File {
757
757
assert (!read_only_);
758
758
try {
759
759
PrefetchBarrier ();
760
- FinishUpserts ();
760
+ UpsertBarrier ();
761
761
if (!DetectPageSize ()) {
762
762
return size == 0 ? SQLITE_OK : SQLITE_IOERR_TRUNCATE;
763
763
}
@@ -790,7 +790,7 @@ class InnerDatabaseFile : public SQLiteVFS::File {
790
790
int Sync (int flags) override {
791
791
assert (!read_only_);
792
792
try {
793
- FinishUpserts ();
793
+ UpsertBarrier ();
794
794
} catch (std::exception &exn) {
795
795
_DBG << exn.what () << _EOL;
796
796
page_count_ = 0 ; // to be redetected
@@ -861,7 +861,7 @@ class InnerDatabaseFile : public SQLiteVFS::File {
861
861
PrefetchBarrier ();
862
862
if (!read_only_) {
863
863
int rc;
864
- if ((rc = Sync (0 )) != SQLITE_OK) { // includes FinishUpserts
864
+ if ((rc = Sync (0 )) != SQLITE_OK) { // includes UpsertBarrier
865
865
return rc;
866
866
}
867
867
outer_db_->exec (" PRAGMA incremental_vacuum" );
@@ -925,7 +925,7 @@ class InnerDatabaseFile : public SQLiteVFS::File {
925
925
// MAX(pageno) instead of COUNT(pageno) because the latter would trigger table scan
926
926
select_page_count_(*outer_db_,
927
927
" SELECT IFNULL(MAX(pageno), 0) FROM " + inner_db_pages_table_),
928
- thread_pool_ (threads, threads * 3 ),
928
+ upsert_thread_pool_ (threads, threads * 3 ),
929
929
fetch_thread_pool_(std::min(noprefetch ? 1 : threads, MAX_FETCH_CURSORS),
930
930
MAX_FETCH_CURSORS),
931
931
seek_interrupt_(false ) {
0 commit comments