Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.2.2 patches #239

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
Prev Previous commit
Next Next commit
merge errors since this was cherry-picked from master
  • Loading branch information
Matthew V committed Sep 6, 2012
commit e477211e9109a76fc630eeae883f60fcbeb84559
1 change: 1 addition & 0 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Status BuildTable(const std::string& dbname,
Slice key = iter->key();
meta->largest.DecodeFrom(key);
builder->Add(key, iter->value());
++meta->num_entries;
}

// Finish and check for builder errors
Expand Down
266 changes: 165 additions & 101 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

#include "db/db_impl.h"

#include <time.h>
#include <algorithm>
#include <errno.h>
#include <set>
#include <string>
#include <stdint.h>
Expand Down Expand Up @@ -68,14 +70,16 @@ struct DBImpl::CompactionState {
TableBuilder* builder;

uint64_t total_bytes;
uint64_t num_entries;

Output* current_output() { return &outputs[outputs.size()-1]; }

explicit CompactionState(Compaction* c)
: compaction(c),
outfile(NULL),
builder(NULL),
total_bytes(0) {
total_bytes(0),
num_entries(0) {
}
};

Expand Down Expand Up @@ -468,9 +472,10 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
mutex_.Lock();
}

Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
Log(options_.info_log, "Level-0 table #%llu: %llu bytes, %llu keys %s",
(unsigned long long) meta.number,
(unsigned long long) meta.file_size,
(unsigned long long) meta.num_entries,
s.ToString().c_str());
delete iter;
pending_outputs_.erase(meta.number);
Expand All @@ -493,6 +498,13 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.file_size;
stats_[level].Add(stats);

if (0!=meta.num_entries && s.ok())
{
// 2x since mem to disk, not disk to disk
versions_->SetWriteRate(2*stats.micros/meta.num_entries);
} // if

return s;
}

Expand Down Expand Up @@ -667,90 +679,96 @@ void DBImpl::BackgroundCall() {
bg_cv_.SignalAll();
}

void DBImpl::BackgroundCompaction() {
Status DBImpl::BackgroundCompaction() {
Status status;
bool is_manual = (manual_compaction_ != NULL);
InternalKey manual_end;

mutex_.AssertHeld();

if (imm_ != NULL) {
pthread_rwlock_rdlock(&gThreadLock0);
CompactMemTable();
status=CompactMemTable();
pthread_rwlock_unlock(&gThreadLock0);
return;
}

Compaction* c;
bool is_manual = (manual_compaction_ != NULL);
InternalKey manual_end;
if (is_manual) {
ManualCompaction* m = manual_compaction_;
c = versions_->CompactRange(m->level, m->begin, m->end);
m->done = (c == NULL);
if (c != NULL) {
manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
}
Log(options_.info_log,
"Manual compaction at level-%d from %s .. %s; will stop at %s\n",
m->level,
(m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)"),
(m->done ? "(end)" : manual_end.DebugString().c_str()));
} else {
c = versions_->PickCompaction();
}
if (status.ok())
{
Compaction* c;
if (is_manual) {
ManualCompaction* m = manual_compaction_;
c = versions_->CompactRange(m->level, m->begin, m->end);
m->done = (c == NULL);
if (c != NULL) {
manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
}
Log(options_.info_log,
"Manual compaction at level-%d from %s .. %s; will stop at %s\n",
m->level,
(m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)"),
(m->done ? "(end)" : manual_end.DebugString().c_str()));
} else {
c = versions_->PickCompaction();
}

Status status;
if (c == NULL) {
// Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {
// Move file to next level
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number),
c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(),
versions_->LevelSummary(&tmp));
} else {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
CleanupCompaction(compact);
c->ReleaseInputs();
DeleteObsoleteFiles();
if (c == NULL) {
// Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {
// Move file to next level
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number),
c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(),
versions_->LevelSummary(&tmp));
} else {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
CleanupCompaction(compact);
c->ReleaseInputs();
DeleteObsoleteFiles();
}
delete c;
}
delete c;

if (status.ok()) {
// Done
// Done
} else if (shutting_down_.Acquire_Load()) {
// Ignore compaction errors found during shutting down
// Ignore compaction errors found during shutting down
} else {
Log(options_.info_log,
"Compaction error: %s", status.ToString().c_str());
if (options_.paranoid_checks && bg_error_.ok()) {
bg_error_ = status;
}
Log(options_.info_log,
"Compaction error: %s", status.ToString().c_str());
if (options_.paranoid_checks && bg_error_.ok()) {
bg_error_ = status;
}
}

if (is_manual) {
ManualCompaction* m = manual_compaction_;
if (!status.ok()) {
m->done = true;
}
if (!m->done) {
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
m->tmp_storage = manual_end;
m->begin = &m->tmp_storage;
}
manual_compaction_ = NULL;
ManualCompaction* m = manual_compaction_;
if (!status.ok()) {
m->done = true;
}
if (!m->done) {
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
m->tmp_storage = manual_end;
m->begin = &m->tmp_storage;
}
manual_compaction_ = NULL;
}

return status;
}


void DBImpl::CleanupCompaction(CompactionState* compact) {
mutex_.AssertHeld();
if (compact->builder != NULL) {
Expand Down Expand Up @@ -813,6 +831,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
const uint64_t current_bytes = compact->builder->FileSize();
compact->current_output()->file_size = current_bytes;
compact->total_bytes += current_bytes;
compact->num_entries += compact->builder->NumEntries();
delete compact->builder;
compact->builder = NULL;

Expand Down Expand Up @@ -898,37 +917,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;

for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
// Prioritize immutable compaction work
if (has_imm_.NoBarrier_Load() != NULL) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (imm_ != NULL) {
if (0 == compact->compaction->level())
pthread_rwlock_unlock(&gThreadLock1);
pthread_rwlock_rdlock(&gThreadLock0);
CompactMemTable();
pthread_rwlock_unlock(&gThreadLock0);
if (0 == compact->compaction->level())
pthread_rwlock_rdlock(&gThreadLock1);
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
mutex_.Unlock();
imm_micros += (env_->NowMicros() - imm_start);
}

// pause to potentially hand off disk to
// memtable threads
pthread_rwlock_wrlock(&gThreadLock0);
pthread_rwlock_unlock(&gThreadLock0);

// Give priorities to level 0 compactions, unless
// this compaction is blocking a level 0 in this database
if (0 != compact->compaction->level() && level0_good)
{
pthread_rwlock_wrlock(&gThreadLock1);
pthread_rwlock_unlock(&gThreadLock1);
} // if
imm_micros+=PrioritizeWork(0==compact->compaction->level());

Slice key = input->key();
if (compact->compaction->ShouldStopBefore(key) &&
Expand Down Expand Up @@ -1041,6 +1033,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
stats_[compact->compaction->level() + 1].Add(stats);

if (status.ok()) {
if (0!=compact->num_entries)
versions_->SetWriteRate(stats.micros/compact->num_entries);
status = InstallCompactionResults(compact);
}
VersionSet::LevelSummaryStorage tmp;
Expand All @@ -1049,6 +1043,68 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
return status;
}


int64_t
DBImpl::PrioritizeWork(
bool IsLevel0)
{
int64_t start_time;
bool again;
int ret_val;
struct timespec timeout;

start_time=env_->NowMicros();

// loop while on hold due to higher priority stuff,
// but keep polling for need to handle imm_
do
{
again=false;

if (has_imm_.NoBarrier_Load() != NULL) {
mutex_.Lock();
if (imm_ != NULL) {
if (IsLevel0)
pthread_rwlock_unlock(&gThreadLock1);
pthread_rwlock_rdlock(&gThreadLock0);
CompactMemTable();
pthread_rwlock_unlock(&gThreadLock0);
if (IsLevel0)
pthread_rwlock_rdlock(&gThreadLock1);
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
} // if
mutex_.Unlock();
} // if

// pause to potentially hand off disk to
// memtable threads
clock_gettime(CLOCK_REALTIME, &timeout);
timeout.tv_sec+=5;
ret_val=pthread_rwlock_timedwrlock(&gThreadLock0, &timeout);
if (0==ret_val)
pthread_rwlock_unlock(&gThreadLock0);
again=(ETIMEDOUT==ret_val);

// Give priorities to level 0 compactions, unless
// this compaction is blocking a level 0 in this database
if (!IsLevel0 && level0_good)
{
clock_gettime(CLOCK_REALTIME, &timeout);
timeout.tv_sec+=5;
ret_val=pthread_rwlock_timedwrlock(&gThreadLock1, &timeout);
if (0==ret_val)
pthread_rwlock_unlock(&gThreadLock1);
again=again || (ETIMEDOUT==ret_val);
} // if
} while(again);

// let caller know how long was spent waiting.
return(env_->NowMicros() - start_time);

} // PrioritizeWork



namespace {
struct IterState {
port::Mutex* mu;
Expand Down Expand Up @@ -1303,12 +1359,16 @@ Status DBImpl::MakeRoomForWrite(bool force) {
assert(!writers_.empty());
bool allow_delay = !force;
Status s;
int throttle;

mutex_.Unlock();
/// slowing things down
// shared thread block throttle
env_->WriteThrottle(versions_->NumLevelFiles(0));
mutex_.Lock();
throttle=versions_->WriteThrottleUsec();
if (0!=throttle)
{
mutex_.Unlock();
/// slowing things down
env_->SleepForMicroseconds(throttle);
mutex_.Lock();
} // if

// hint to background compaction.
level0_good=(versions_->NumLevelFiles(0) < config::kL0_CompactionTrigger);
Expand Down Expand Up @@ -1338,11 +1398,15 @@ Status DBImpl::MakeRoomForWrite(bool force) {
} else if (imm_ != NULL) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
Log(options_.info_log, "waiting 2...\n");
MaybeScheduleCompaction();
bg_cv_.Wait();
Log(options_.info_log, "running 2...\n");
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
Log(options_.info_log, "waiting...\n");
bg_cv_.Wait();
Log(options_.info_log, "running...\n");
} else {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
Expand Down
1 change: 1 addition & 0 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class DBImpl : public DB {
void BackgroundCompaction();
void CleanupCompaction(CompactionState* compact);
Status DoCompactionWork(CompactionState* compact);
int64_t PrioritizeWork(bool IsLevel0);

Status OpenCompactionOutputFile(CompactionState* compact);
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);
Expand Down
Loading