Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions src/io/default_compact_strategy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ namespace io {
DefaultCompactStrategy::DefaultCompactStrategy(const TableSchema& schema)
: m_schema(schema),
m_raw_key_operator(GetRawKeyOperatorFromSchema(m_schema)),
m_last_type(leveldb::TKT_FORSEEK), m_cur_type(leveldb::TKT_FORSEEK),
m_last_ts(-1), m_del_row_ts(-1), m_del_col_ts(-1), m_del_qual_ts(-1), m_cur_ts(-1),
m_del_row_seq(0), m_del_col_seq(0), m_del_qual_seq(0), m_version_num(0),
m_snapshot(leveldb::kMaxSequenceNumber) {
m_snapshot(leveldb::kMaxSequenceNumber),
m_has_put(false), m_lock_ts(kMaxTimeStamp) {
// build index
for (int32_t i = 0; i < m_schema.column_families_size(); ++i) {
const std::string name = m_schema.column_families(i).name();
m_cf_indexs[name] = i;
}
m_has_put = false;
VLOG(11) << "DefaultCompactStrategy construct";
}

Expand All @@ -47,6 +48,10 @@ bool DefaultCompactStrategy::Drop(const Slice& tera_key, uint64_t n,
return true;
}

if (type == leveldb::TKT_LOCK) {
return !m_schema.enable_txn();
}

m_cur_type = type;
m_cur_ts = ts;
int32_t cf_id = -1;
Expand Down Expand Up @@ -254,16 +259,22 @@ bool DefaultCompactStrategy::ScanDrop(const Slice& tera_key, uint64_t n) {
return true;
}

if (type == leveldb::TKT_LOCK && !m_schema.enable_txn()) {
return true;
}

m_cur_type = type;
m_last_ts = m_cur_ts;
m_cur_ts = ts;
int32_t cf_id = -1;
if (type != leveldb::TKT_DEL && DropIllegalColumnFamily(col.ToString(), &cf_id)) {
if (type != leveldb::TKT_DEL && type != leveldb::TKT_LOCK &&
DropIllegalColumnFamily(col.ToString(), &cf_id)) {
// drop illegal column family
return true;
}

if (type >= leveldb::TKT_VALUE && DropByLifeTime(cf_id, ts)) {
if (type >= leveldb::TKT_VALUE && type != leveldb::TKT_LOCK &&
DropByLifeTime(cf_id, ts)) {
// drop out-of-life-time record
return true;
}
Expand All @@ -277,6 +288,12 @@ bool DefaultCompactStrategy::ScanDrop(const Slice& tera_key, uint64_t n) {
m_version_num = 0;
m_del_row_ts = m_del_col_ts = m_del_qual_ts = -1;
m_has_put = false;
m_lock_ts = kMaxTimeStamp;

if (type == leveldb::TKT_LOCK) {
m_lock_ts = n;
return true;
}

// no break in switch: need to set multiple variables
switch (type) {
Expand All @@ -288,6 +305,12 @@ bool DefaultCompactStrategy::ScanDrop(const Slice& tera_key, uint64_t n) {
m_del_qual_ts = ts;
default:;
}
} else if (type == leveldb::TKT_LOCK) {
m_lock_ts = n;
return true;
} else if (ts >= m_lock_ts) {
VLOG(15) << "tera.DefaultCompactStrategy: drop locked data, lock ts: " << m_lock_ts;
return true;
} else if (m_del_row_ts >= ts) {
// skip deleted row and the same row_del mark
return true;
Expand Down
3 changes: 2 additions & 1 deletion src/io/default_compact_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ class DefaultCompactStrategy : public leveldb::CompactStrategy {
std::string m_last_key;
std::string m_last_col;
std::string m_last_qual;
int64_t m_last_ts;
leveldb::TeraKeyType m_last_type;
leveldb::TeraKeyType m_cur_type;
int64_t m_last_ts;
int64_t m_del_row_ts;
int64_t m_del_col_ts;
int64_t m_del_qual_ts;
Expand All @@ -73,6 +73,7 @@ class DefaultCompactStrategy : public leveldb::CompactStrategy {
uint32_t m_version_num;
uint64_t m_snapshot;
bool m_has_put;
int64_t m_lock_ts;
};

class DefaultCompactStrategyFactory : public leveldb::CompactStrategyFactory {
Expand Down
126 changes: 126 additions & 0 deletions src/io/lock.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright (c) 2016, Baidu.com, Inc. All Rights Reserved
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "io/lock.h"

#include "io/io_utils.h"

namespace tera {
namespace io {

LockManager::LockManager() {

}

LockManager::~LockManager() {

}

bool LockManager::Lock(const std::string& key, uint64_t id,
const std::string& annotation, StatusCode* status) {
MutexLock l(&mutex_);

LockIterator it = lock_map_.find(key);
if (it == lock_map_.end()) {
LockNode& lock_node = lock_map_[key];
lock_node.id = id;
lock_node.annotation = annotation;
VLOG(10) << "lock key: " << key << " id: " << id
<< " annotation: " << annotation;
return true;
}

LockNode& ln = it->second;
if (ln.id == id) {
SetStatusCode(kLockDoubleLock, status);
} else {
SetStatusCode(kLockNotOwn, status);
}
VLOG(10) << "fail to lock key: " << key << " id: " << id
<< " annotation: " << annotation
<< " status: " << StatusCodeToString(*status);
return false;
}

bool LockManager::Unlock(const std::string& key, uint64_t id,
std::string* annotation, StatusCode* status) {
MutexLock l(&mutex_);

LockIterator it;
if (!GetLockNode(key, id, &it, status)) {
VLOG(10) << "fail to unlock key: " << key << " id: " << id
<< " status: " << StatusCodeToString(*status);
return false;
}

if (annotation != NULL) {
annotation->assign(it->second.annotation);
}
lock_map_.erase(it);
VLOG(10) << "unlock key: " << key << " id: " << id
<< " annotation: " << it->second.annotation;
return true;
}

bool LockManager::IsLocked(const std::string& key, uint64_t id,
std::string* annotation, StatusCode* status) {
MutexLock l(&mutex_);

LockIterator it;
if (!GetLockNode(key, id, &it, status)) {
return false;
}

LockNode& ln = it->second;
if (annotation != NULL) {
annotation->assign(ln.annotation);
}
return true;
}

bool LockManager::IsLocked(const std::string& key, uint64_t* id,
std::string* annotation, StatusCode* status) {
MutexLock l(&mutex_);

LockIterator it = lock_map_.find(key);
if (it == lock_map_.end()) {
SetStatusCode(kLockNotExist, status);
return false;
}

LockNode& ln = it->second;
if (id != NULL) {
*id = ln.id;
}
if (annotation != NULL) {
annotation->assign(ln.annotation);
}
return true;
}

bool LockManager::GetLockNode(const std::string& key, uint64_t id,
LockIterator* ret_it, StatusCode* status) {
mutex_.AssertHeld();

LockIterator it = lock_map_.find(key);
if (it == lock_map_.end()) {
VLOG(10) << "lock not exist, key: " << key << " id: " << id;
SetStatusCode(kLockNotExist, status);
return false;
}

LockNode& ln = it->second;
if (ln.id != id) {
VLOG(10) << "lock not own, key: " << key << " id: " << id
<< ", real id: " << ln.id << ", annotation: " << ln.annotation;
SetStatusCode(kLockNotOwn, status);
return false;
}

*ret_it = it;
return true;
}

} // namespace tabletnode
} // namespace tera
54 changes: 54 additions & 0 deletions src/io/lock.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) 2016, Baidu.com, Inc. All Rights Reserved
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef TERA_IO_LOCK_MANAGER_H_
#define TERA_IO_LOCK_MANAGER_H_

#include "common/mutex.h"

#include "proto/status_code.pb.h"

namespace tera {
namespace io {

class LockManager {
struct LockNode {
uint64_t id;
std::string annotation;
};
public:
LockManager();
~LockManager();

// key is a unique name of the item you want to lock
// id is used to authenticate
// annotation is a hint for others to do cleanup work after the lock owner dead

bool Lock(const std::string& key, uint64_t id,
const std::string& annotation = "", StatusCode* status = NULL);

bool Unlock(const std::string& key, uint64_t id,
std::string* annotation = NULL, StatusCode* status = NULL);

bool IsLocked(const std::string& key, uint64_t id,
std::string* annotation = NULL, StatusCode* status = NULL);

bool IsLocked(const std::string& key, uint64_t* id = NULL,
std::string* annotation = NULL, StatusCode* status = NULL);

private:
typedef std::map<std::string, LockNode> LockMap;
typedef LockMap::iterator LockIterator;
bool GetLockNode(const std::string& key, uint64_t id, LockIterator* ret_it,
StatusCode* status = NULL);

private:
Mutex mutex_;
LockMap lock_map_;
};

} // namespace io
} // namespace tera

#endif // TERA_IO_LOCK_MANAGER_H_
Loading