Skip to content

Commit

Permalink
[Embedding] Add sleep in BatchEvict thread to avoid CPU consumption &…
Browse files Browse the repository at this point in the history
… refine code. (DeepRec-AI#413)
  • Loading branch information
candyzone authored Sep 1, 2022
1 parent cda6c0d commit 8b58b75
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 40 deletions.
3 changes: 2 additions & 1 deletion tensorflow/core/framework/embedding/embedding_var.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ class EmbeddingVar : public ResourceBase {
return s;
}

void BatchCommit(std::vector<K> keys, std::vector<ValuePtr<V>*> value_ptrs) {
void BatchCommit(const std::vector<K>& keys,
const std::vector<ValuePtr<V>*>& value_ptrs) {
TF_CHECK_OK(storage_manager_->BatchCommit(keys, value_ptrs));
}

Expand Down
11 changes: 7 additions & 4 deletions tensorflow/core/framework/embedding/kv_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,25 @@ class KVInterface {
virtual Status Remove(K key) = 0;

// KV Batch Lookup
virtual Status BatchLookup(std::vector<K> keys, std::vector<ValuePtr<V>**> value_ptrs) {
virtual Status BatchLookup(const std::vector<K>& keys,
std::vector<ValuePtr<V>**>* value_ptrs) {
return Status(error::Code::UNIMPLEMENTED,
"Unimplemented for BatchLookup in KVInterface.");
}
// KV Batch Insert
virtual Status BatchInsert(std::vector<K> keys, std::vector<const ValuePtr<V>*> value_ptrs) {
virtual Status BatchInsert(const std::vector<K>& keys,
const std::vector<const ValuePtr<V>*>& value_ptrs) {
return Status(error::Code::UNIMPLEMENTED,
"Unimplemented for BatchInsert in KVInterface.");
}
// KV Batch Remove
virtual Status BatchRemove(std::vector<K> keys) {
virtual Status BatchRemove(const std::vector<K>& keys) {
return Status(error::Code::UNIMPLEMENTED,
"Unimplemented for BatchRemove in KVInterface.");
}

virtual Status BatchCommit(std::vector<K> keys, std::vector<ValuePtr<V>*> value_ptrs) {return Status::OK();}
virtual Status BatchCommit(const std::vector<K>& keys,
const std::vector<ValuePtr<V>*>& value_ptrs) = 0;

// KV Size
virtual int64 Size() const = 0;
Expand Down
6 changes: 4 additions & 2 deletions tensorflow/core/framework/embedding/leveldb_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,13 @@ class LevelDBKV : public KVInterface<K, V> {
return Status::OK();
}

Status BatchInsert(std::vector<K> keys, std::vector<ValuePtr<V>*> value_ptrs) {
Status BatchInsert(const std::vector<K>& keys,
const std::vector<ValuePtr<V>*>& value_ptrs) {
return BatchCommit(keys, value_ptrs);
}

Status BatchCommit(std::vector<K> keys, std::vector<ValuePtr<V>*> value_ptrs) {
Status BatchCommit(const std::vector<K>& keys,
const std::vector<ValuePtr<V>*>& value_ptrs) {
WriteBatch batch;
for (int i = 0; i < keys.size(); i++) {
std::string value_res((char*)value_ptrs[i]->GetPtr(), sizeof(FixedLengthHeader) + total_dims_ * sizeof(V));
Expand Down
5 changes: 5 additions & 0 deletions tensorflow/core/framework/embedding/lockless_hash_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ class LocklessHashMap : public KVInterface<K, V> {
}
}

Status BatchCommit(const std::vector<K>& keys,
const std::vector<ValuePtr<V>*>& value_ptrs) {
return Status::OK();
}

Status GetSnapshot(std::vector<K>* key_list, std::vector<ValuePtr<V>* >* value_ptr_list) {
std::pair<const K, ValuePtr<V>*> *hash_map_dump;
int64 bucket_count;
Expand Down
13 changes: 2 additions & 11 deletions tensorflow/core/framework/embedding/lockless_hash_map_cpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class LocklessHashMapCPU : public KVInterface<K, V> {
return Status::OK();
}

Status BatchCommit(std::vector<K> keys, std::vector<ValuePtr<V>*> value_ptrs) {
Status BatchCommit(const std::vector<K>& keys,
const std::vector<ValuePtr<V>*>& value_ptrs) {
int batch_size = keys.size();
V** value_address = (V**)malloc(sizeof(V * ) * batch_size);
V** dev_value_address;
Expand Down Expand Up @@ -134,16 +135,6 @@ class LocklessHashMapCPU : public KVInterface<K, V> {
return Status::OK();
}

/*
Status BatchCommit(std::vector<K> keys, std::vector<ValuePtr<V>*> value_ptrs) {
int batch_size = keys.size();
for (int i = 0; i < batch_size; i++) {
Commit(keys[i],value_ptrs[i]);
}
return Status::OK();
}
*/

Status GetSnapshot(std::vector<K>* key_list, std::vector<ValuePtr<V>* >* value_ptr_list) {
std::pair<const K, ValuePtr<V>*> *hash_map_dump;
int64 bucket_count;
Expand Down
54 changes: 36 additions & 18 deletions tensorflow/core/framework/embedding/multilevel_embedding.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,20 @@ class StorageManager {
Status Init(Allocator* alloc_ = nullptr) {
switch (sc_.layout_type) {
case LayoutType::NORMAL:
new_value_ptr_fn_ = [] (Allocator* alloc, size_t size) { return new NormalValuePtr<V>(alloc, size); };
new_value_ptr_fn_ = [] (Allocator* alloc, size_t size) {
return new NormalValuePtr<V>(alloc, size); };
break;
case LayoutType::LIGHT:
new_value_ptr_fn_ = [] (Allocator* alloc, size_t size) { return new LightValuePtr<V>(alloc, size); };
new_value_ptr_fn_ = [] (Allocator* alloc, size_t size) {
return new LightValuePtr<V>(alloc, size); };
break;
case LayoutType::NORMAL_CONTIGUOUS:
new_value_ptr_fn_ = [] (Allocator* alloc, size_t size) { return new NormalContiguousValuePtr<V>(alloc, size); };
new_value_ptr_fn_ = [] (Allocator* alloc, size_t size) {
return new NormalContiguousValuePtr<V>(alloc, size); };
break;
default:
new_value_ptr_fn_ = [] (Allocator* alloc, size_t size) { return new NormalValuePtr<V>(alloc, size); };
new_value_ptr_fn_ = [] (Allocator* alloc, size_t size) {
return new NormalValuePtr<V>(alloc, size); };
break;
}
switch (sc_.type) {
Expand Down Expand Up @@ -133,7 +137,8 @@ class StorageManager {
case StorageType::HBM_DRAM:
#if GOOGLE_CUDA
#if !TENSORFLOW_USE_GPU_EV
new_value_ptr_fn_ = [] (Allocator* allocator, size_t size) { return new NormalGPUValuePtr<V>(allocator, size); };
new_value_ptr_fn_ = [] (Allocator* allocator, size_t size) {
return new NormalGPUValuePtr<V>(allocator, size); };
LOG(INFO) << "StorageManager::HBM_DRAM: " << name_;
kvs_.push_back(std::make_pair(new LocklessHashMap<K, V>(), alloc_));
kvs_.push_back(std::make_pair(new LocklessHashMapCPU<K, V>(), cpu_allocator()));
Expand Down Expand Up @@ -169,7 +174,7 @@ class StorageManager {
void SetAllocLen(int64 value_len, int slot_num){
while (flag_.test_and_set(std::memory_order_acquire));
//The start address of every slot should be aligned to 16 bytes, otherwise a coredump will happen in the ApplyOp.
alloc_len_ = (value_len * sizeof(V) % 16 == 0) ? value_len : value_len + (16 - (sizeof(V) * value_len) % 16) / sizeof(V);
alloc_len_ = (value_len * sizeof(V) % 16 == 0) ?value_len : value_len + (16 - (sizeof(V) * value_len) % 16) / sizeof(V);
int64 temp = alloc_len_ * slot_num;
if (temp > total_dims_) {
total_dims_ = temp;
Expand Down Expand Up @@ -316,18 +321,21 @@ Status GetOrCreate(K key, ValuePtr<V>** value_ptr, size_t size, bool &need_copyb

#if GOOGLE_CUDA
#if !TENSORFLOW_USE_GPU_EV
void CopyBackToGPU(int total, K* keys, int64 size, bool* copyback_flags, V** memcpy_address, size_t value_len, int *copyback_cursor, ValuePtr<V> **gpu_value_ptrs, V* memcpy_buffer_gpu){
void CopyBackToGPU(int total, K* keys, int64 size, bool* copyback_flags,
V** memcpy_address, size_t value_len, int *copyback_cursor,
ValuePtr<V> **gpu_value_ptrs, V* memcpy_buffer_gpu){
V *memcpy_buffer_cpu;
memcpy_buffer_cpu = (V*)malloc(total * value_len * sizeof(V));
int j = 0;
for (int i = 0; i < size;i++) {
if (copyback_flags[i]) {
ValuePtr<V>* gpu_value_ptr = new_value_ptr_fn_(kvs_[0].second, size);
//Copy Header Info
memcpy((char *)gpu_value_ptr->GetPtr(), (char *)memcpy_address[i] - sizeof(FixedLengthHeader), sizeof(FixedLengthHeader));
memcpy((char *)gpu_value_ptr->GetPtr(),
(char *)memcpy_address[i] - sizeof(FixedLengthHeader),
sizeof(FixedLengthHeader));
V* cpu_data_address = memcpy_address[i];
V* gpu_data_address = gpu_value_ptr->GetValue(0, 0);
//cudaMemcpy(gpu_data_address, cpu_data_address, value_len * sizeof(V), cudaMemcpyHostToDevice);
memcpy(memcpy_buffer_cpu + j * value_len, cpu_data_address, value_len * sizeof(V));
copyback_cursor[j] = i;
gpu_value_ptrs[j] = gpu_value_ptr;
Expand All @@ -336,7 +344,8 @@ Status GetOrCreate(K key, ValuePtr<V>** value_ptr, size_t size, bool &need_copyb
}
}

cudaMemcpy(memcpy_buffer_gpu, memcpy_buffer_cpu, total * value_len * sizeof(V), cudaMemcpyHostToDevice);
cudaMemcpy(memcpy_buffer_gpu, memcpy_buffer_cpu,
total * value_len * sizeof(V), cudaMemcpyHostToDevice);
}
#endif // TENSORFLOW_USE_GPU_EV
#endif // GOOGLE_CUDA
Expand Down Expand Up @@ -370,7 +379,8 @@ Status GetOrCreate(K key, ValuePtr<V>** value_ptr, size_t size, bool &need_copyb

int64 GetSnapshot(std::vector<K>* key_list, std::vector<V* >* value_list,
std::vector<int64>* version_list, std::vector<int64>* freq_list,
const EmbeddingConfig& emb_config, EmbeddingFilter<K, V, EmbeddingVar<K, V>>* filter,
const EmbeddingConfig& emb_config,
EmbeddingFilter<K, V, EmbeddingVar<K, V>>* filter,
embedding::Iterator** it) {
for (auto kv : kvs_) {
std::vector<ValuePtr<V>* > value_ptr_list;
Expand All @@ -381,8 +391,10 @@ Status GetOrCreate(K key, ValuePtr<V>** value_ptr, size_t size, bool &need_copyb
continue;
}
for (int64 i = 0; i < key_list_tmp.size(); ++i) {
V* val = value_ptr_list[i]->GetValue(emb_config.emb_index, GetOffset(emb_config.emb_index));
V* primary_val = value_ptr_list[i]->GetValue(emb_config.primary_emb_index, GetOffset(emb_config.primary_emb_index));
V* val = value_ptr_list[i]->GetValue(emb_config.emb_index,
GetOffset(emb_config.emb_index));
V* primary_val = value_ptr_list[i]->GetValue(emb_config.primary_emb_index,
GetOffset(emb_config.primary_emb_index));
key_list->push_back(key_list_tmp[i]);
if (emb_config.filter_freq != 0 || is_multi_level_
|| emb_config.record_freq) {
Expand Down Expand Up @@ -416,7 +428,8 @@ Status GetOrCreate(K key, ValuePtr<V>** value_ptr, size_t size, bool &need_copyb
TF_CHECK_OK(kv.first->GetSnapshot(&key_list, &value_ptr_list));
std::vector<std::pair<K, ValuePtr<V>* > > to_deleted;
for (int64 i = 0; i < key_list.size(); ++i) {
V* val = value_ptr_list[i]->GetValue(emb_config.primary_emb_index, GetOffset(emb_config.primary_emb_index));;
V* val = value_ptr_list[i]->GetValue(emb_config.primary_emb_index,
GetOffset(emb_config.primary_emb_index));
if (val != nullptr) {
V l2_weight = 0.0;
for (int64 j = 0; j < value_len; j++) {
Expand Down Expand Up @@ -468,6 +481,7 @@ Status GetOrCreate(K key, ValuePtr<V>** value_ptr, size_t size, bool &need_copyb
Status Destroy() {
if (eviction_thread_) {
mutex_lock l(mu_);
shutdown_cv_.notify_all();
shutdown_ = true;
}
delete eviction_thread_;
Expand All @@ -482,7 +496,8 @@ Status GetOrCreate(K key, ValuePtr<V>** value_ptr, size_t size, bool &need_copyb
return Status::OK();
}

Status BatchCommit(std::vector<K> keys, std::vector<ValuePtr<V>*> value_ptrs) {
Status BatchCommit(const std::vector<K>& keys,
const std::vector<ValuePtr<V>*>& value_ptrs) {
for (auto kv : kvs_) {
TF_CHECK_OK(kv.first->BatchCommit(keys, value_ptrs));
}
Expand All @@ -506,8 +521,6 @@ Status GetOrCreate(K key, ValuePtr<V>** value_ptr, size_t size, bool &need_copyb

mutex* get_mutex() { return &mu_; }



private:
void BatchEviction() {
Env* env = Env::Default();
Expand All @@ -527,6 +540,8 @@ Status GetOrCreate(K key, ValuePtr<V>** value_ptr, size_t size, bool &need_copyb
break;
}
// add WaitForMilliseconds() for sleep if necessary
const int kTimeoutMilliseconds = 1;
WaitForMilliseconds(&l, &shutdown_cv_, kTimeoutMilliseconds);
for (int i = 0; i < value_ptr_out_of_date_.size(); i++) {
value_ptr_out_of_date_[i]->Destroy(kvs_[0].second);
delete value_ptr_out_of_date_[i];
Expand Down Expand Up @@ -561,7 +576,9 @@ Status GetOrCreate(K key, ValuePtr<V>** value_ptr, size_t size, bool &need_copyb
// delete value_ptrs[i];
}
clock_gettime(CLOCK_MONOTONIC, &end);
LOG(INFO) << "Total Evict Time: " << ((double)(end.tv_sec - start.tv_sec) * 1000000000 + end.tv_nsec - start.tv_nsec) / 1000000 << "ms";
LOG(INFO) << "Total Evict Time: "
<< (double)(end.tv_sec - start.tv_sec) * EnvTime::kSecondsToMillis +
(end.tv_nsec - start.tv_nsec) / EnvTime::kMillisToNanos<< "ms";
} else {
for (int64 i = 0; i < true_size; ++i) {
if (kvs_[0].first->Lookup(evic_ids[i], &value_ptr).ok()) {
Expand Down Expand Up @@ -594,6 +611,7 @@ Status GetOrCreate(K key, ValuePtr<V>** value_ptr, size_t size, bool &need_copyb
BatchCache<K>* cache_;
int64 cache_capacity_;
mutex mu_;
condition_variable shutdown_cv_;
volatile bool shutdown_ GUARDED_BY(mu_) = false;

volatile bool done_ = false;
Expand Down
8 changes: 4 additions & 4 deletions tensorflow/core/framework/embedding/ssd_hashkv.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,13 @@ class SSDHashKV : public KVInterface<K, V> {

Status Insert(K key, const ValuePtr<V>* value_ptr) { return Status::OK(); }

Status BatchInsert(std::vector<K>& keys,
std::vector<ValuePtr<V>*>& value_ptrs) {
Status BatchInsert(const std::vector<K>& keys,
const std::vector<ValuePtr<V>*>& value_ptrs) {
return BatchCommit(keys, value_ptrs);
}

Status BatchCommit(std::vector<K>& keys,
std::vector<ValuePtr<V>*>& value_ptrs) {
Status BatchCommit(const std::vector<K>& keys,
const std::vector<ValuePtr<V>*>& value_ptrs) {
compaction_fn();
__sync_fetch_and_add(&total_app_count, keys.size());
for (int i = 0; i < keys.size(); i++) {
Expand Down

0 comments on commit 8b58b75

Please sign in to comment.