Skip to content

Add logic to stream weights in EmbeddingKVDB #4058

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion fbgemm_gpu/fbgemm_gpu/tbe/ssd/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ class SSDTableBatchedEmbeddingBags(nn.Module):
weights_placements: Tensor
weights_offsets: Tensor
_local_instance_index: int = -1
table_names: List[str]
table_offsets: List[int]
table_sizes: List[int]
_ps_server_port: Optional[str] = None

def __init__(
self,
Expand Down Expand Up @@ -152,6 +156,9 @@ def __init__(
# number of rows will be decided by bulk_init_chunk_size / size_of_each_row
bulk_init_chunk_size: int = 0,
lazy_bulk_init_enabled: bool = False,
enable_raw_embedding_streaming: bool = False, # whether enable raw embedding streaming
table_names: Optional[List[str]] = None,
table_offsets: Optional[List[int]] = None,
) -> None:
super(SSDTableBatchedEmbeddingBags, self).__init__()

Expand All @@ -164,6 +171,17 @@ def __init__(
# pyre-fixme[8]: Attribute has type `device`; used as `int`.
self.current_device: torch.device = torch.cuda.current_device()

self.enable_raw_embedding_streaming = enable_raw_embedding_streaming
# initialize the raw embedding streaming related variables
self.table_names = table_names if table_names is not None else []
self.table_offsets = table_offsets if table_offsets is not None else []
self.table_sizes = [0] + list(itertools.accumulate(rows))
if self.enable_raw_embedding_streaming:
self._ps_server_port = os.getenv("LOCAL_PS_PORT")
logging.info(
f"get env {self._ps_server_port=}, at rank {dist.get_rank()}, with {self.table_names=}, {self.table_offsets=}, {self.table_sizes=}"
)

self.feature_table_map: List[int] = (
feature_table_map if feature_table_map is not None else list(range(T_))
)
Expand Down Expand Up @@ -452,7 +470,7 @@ def __init__(
f"write_buffer_size_per_tbe={ssd_rocksdb_write_buffer_size},max_write_buffer_num_per_db_shard={ssd_max_write_buffer_num},"
f"uniform_init_lower={ssd_uniform_init_lower},uniform_init_upper={ssd_uniform_init_upper},"
f"row_storage_bitwidth={weights_precision.bit_rate()},block_cache_size_per_tbe={ssd_block_cache_size_per_tbe},"
f"use_passed_in_path:{use_passed_in_path}, real_path will be printed in EmbeddingRocksDB"
f"use_passed_in_path:{use_passed_in_path}, real_path will be printed in EmbeddingRocksDB, enable_raw_embedding_streaming:{self.enable_raw_embedding_streaming}"
)
# pyre-fixme[4]: Attribute must be annotated.
# pyre-ignore[16]
Expand All @@ -477,6 +495,11 @@ def __init__(
tbe_unique_id,
l2_cache_size,
enable_async_update,
self.enable_raw_embedding_streaming,
int(self._ps_server_port) if self._ps_server_port else 0,
self.table_names,
self.table_offsets,
self.table_sizes,
)
if self.bulk_init_chunk_size > 0:
self.ssd_uniform_init_lower: float = ssd_uniform_init_lower
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ class EmbeddingRocksDBWrapper : public torch::jit::CustomClassHolder {
bool use_passed_in_path = false,
int64_t tbe_unique_id = 0,
int64_t l2_cache_size_gb = 0,
bool enable_async_update = false)
bool enable_async_update = false,
bool enable_raw_embedding_streaming = false,
int64_t ps_server_port = 0,
std::vector<std::string> table_names = {},
std::vector<int64_t> table_offsets = {},
const std::vector<int64_t>& table_sizes = {})
: impl_(std::make_shared<ssd::EmbeddingRocksDB>(
path,
num_shards,
Expand All @@ -56,7 +61,12 @@ class EmbeddingRocksDBWrapper : public torch::jit::CustomClassHolder {
use_passed_in_path,
tbe_unique_id,
l2_cache_size_gb,
enable_async_update)) {}
enable_async_update,
enable_raw_embedding_streaming,
ps_server_port,
std::move(table_names),
std::move(table_offsets),
table_sizes)) {}

void set_cuda(
at::Tensor indices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@
#include "kv_db_table_batched_embeddings.h"
#include <folly/coro/BlockingWait.h>
#include <folly/coro/Collect.h>
#include <folly/stop_watch.h>
#include <algorithm>
#include "caffe2/torch/fb/distributed/wireSerializer/WireSerializer.h"
#include "common/time/Time.h"
#include "kv_db_cuda_utils.h"
#include "servicerouter/client/cpp2/ClientParams.h"
#include "servicerouter/client/cpp2/ServiceRouter.h"
#include "torch/csrc/autograd/record_function_ops.h"
#include "torch/types.h"

namespace kv_db {

Expand Down Expand Up @@ -73,12 +78,21 @@ EmbeddingKVDB::EmbeddingKVDB(
int64_t cache_size_gb,
int64_t unique_id,
int64_t ele_size_bytes,
bool enable_async_update)
bool enable_async_update,
bool enable_raw_embedding_streaming,
int64_t ps_server_port,
std::vector<std::string> table_names,
std::vector<int64_t> table_offsets,
const std::vector<int64_t>& table_sizes)
: unique_id_(unique_id),
num_shards_(num_shards),
max_D_(max_D),
executor_tp_(std::make_unique<folly::CPUThreadPoolExecutor>(num_shards)),
enable_async_update_(enable_async_update) {
enable_async_update_(enable_async_update),
enable_raw_embedding_streaming_(enable_raw_embedding_streaming),
table_names_(std::move(table_names)),
table_offsets_(std::move(table_offsets)),
table_sizes_(at::tensor(table_sizes)) {
CHECK(num_shards > 0);
if (cache_size_gb > 0) {
l2_cache::CacheLibCache::CacheConfig cache_config;
Expand All @@ -93,7 +107,9 @@ EmbeddingKVDB::EmbeddingKVDB(
}
XLOG(INFO) << "[TBE_ID" << unique_id_ << "] L2 created with " << num_shards_
<< " shards, dimension:" << max_D_
<< ", enable_async_update_:" << enable_async_update_;
<< ", enable_async_update_:" << enable_async_update_
<< ", enable_raw_embedding_streaming_:"
<< enable_raw_embedding_streaming_;

if (enable_async_update_) {
cache_filling_thread_ = std::make_unique<std::thread>([=] {
Expand All @@ -119,13 +135,142 @@ EmbeddingKVDB::EmbeddingKVDB(
}
});
}
if (enable_raw_embedding_streaming_) {
XLOG(INFO) << "[TBE_ID" << unique_id_
<< "] Raw embedding streaming enabled with ps_server_port at"
<< ps_server_port;
auto& factory = facebook::servicerouter::cpp2::getClientFactory();
auto params = facebook::servicerouter::ClientParams().setSingleHost(
"::", ps_server_port);
ps_client_ = factory.getSRClientUnique<
apache::thrift::Client<aiplatform::gmpp::experimental::training_ps::
TrainingParameterServerService>>(
"realtime.delta.publish.esr", params);

weights_stream_thread_ = std::make_unique<std::thread>([=, this] {
while (!stop_) {
auto stream_item_ptr = weights_to_stream_queue_.try_peek();
if (!stream_item_ptr) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
if (stop_) {
return;
}
auto& indices = stream_item_ptr->indices;
auto& weights = stream_item_ptr->weights;
folly::stop_watch<std::chrono::milliseconds> stop_watch;
folly::coro::blockingWait(tensor_stream(indices, weights));

weights_to_stream_queue_.dequeue();
XLOG_EVERY_MS(INFO, 60000)
<< "[TBE_ID" << unique_id_
<< "] end stream queue size: " << weights_to_stream_queue_.size()
<< " stream takes " << stop_watch.elapsed().count() << "ms";
}
});
}
}

EmbeddingKVDB::~EmbeddingKVDB() {
stop_ = true;
if (enable_async_update_) {
cache_filling_thread_->join();
}
if (enable_raw_embedding_streaming_) {
weights_stream_thread_->join();
}
}

folly::coro::Task<void> EmbeddingKVDB::tensor_stream(
const at::Tensor& indices,
const at::Tensor& weights) {
using namespace ::aiplatform::gmpp::experimental::training_ps;
if (indices.size(0) != weights.size(0)) {
XLOG(ERR) << "[TBE_ID" << unique_id_
<< "] Indices and weights size mismatched " << indices.size(0)
<< " " << weights.size(0);
co_return;
}
folly::stop_watch<std::chrono::milliseconds> stop_watch;
XLOG_EVERY_MS(INFO, 60000)
<< "[TBE_ID" << unique_id_
<< "] send streaming request: indices = " << indices.size(0)
<< ", weights = " << weights.size(0);

auto biggest_idx = table_sizes_.index({table_sizes_.size(0) - 1});
auto mask =
at::logical_and(indices >= 0, indices < biggest_idx).nonzero().squeeze();
auto filtered_indices = indices.index_select(0, mask);
auto filtered_weights = weights.index_select(0, mask);
auto num_invalid_indices = indices.size(0) - filtered_indices.size(0);
if (num_invalid_indices > 0) {
XLOG(INFO) << "[TBE_ID" << unique_id_
<< "] number of invalid indices: " << num_invalid_indices;
}
// 1. Transform local row indices to embedding table global row indices
at::Tensor table_indices =
(at::searchsorted(table_sizes_, filtered_indices, false, true) - 1)
.to(torch::kInt8);
auto tb_ac = table_indices.accessor<int8_t, 1>();
auto indices_ac = filtered_indices.accessor<int64_t, 1>();
auto tb_sizes_ac = table_sizes_.accessor<int64_t, 1>();
std::vector<int64_t> global_indices(tb_ac.size(0), 0);
std::vector<int16_t> shard_indices(tb_ac.size(0), 0);
auto num_ps_shard = 3;

for (int i = 0; i < tb_ac.size(0); ++i) {
int tb_idx = tb_ac[i];
global_indices[i] =
indices_ac[i] - tb_sizes_ac[tb_idx] + table_offsets_[tb_idx];
// hash to shard
// if we do row range sharding, also shard here.
auto fqn = table_names_[tb_idx];
auto hash_key = folly::to<std::string>(fqn, global_indices[i]);
auto shard_id = furcHash(hash_key.data(), hash_key.size(), num_ps_shard);
shard_indices[i] = shard_id;
}
auto global_indices_tensor = at::tensor(global_indices);
auto shard_indices_tensor = at::tensor(shard_indices);
auto total_rows = global_indices_tensor.size(0);
XLOG_EVERY_MS(INFO, 60000)
<< "[TBE_ID" << unique_id_ << "] hash and gloablize rows " << total_rows
<< " in: " << stop_watch.elapsed().count() << "ms";
stop_watch.reset();

// 2. Split by shards
for (int i = 0; i < num_ps_shard; ++i) {
auto shrad_mask = shard_indices_tensor.eq(i).nonzero().squeeze();
auto table_indices_masked = table_indices.index_select(0, shrad_mask);
auto rows_in_shard = table_indices_masked.numel();
if (rows_in_shard == 0) {
continue;
}
auto global_indices_masked =
global_indices_tensor.index_select(0, shrad_mask);
auto weights_masked = filtered_weights.index_select(0, shrad_mask);

if (weights_masked.size(0) != rows_in_shard ||
global_indices_masked.numel() != rows_in_shard) {
XLOG(ERR)
<< "[TBE_ID" << unique_id_
<< "] don't send the request for size mismatched tensors table: "
<< rows_in_shard << " weights: " << weights_masked.size(0)
<< " global_indices: " << global_indices_masked.numel();
continue;
}
SetEmbeddingsRequest req;
req.shardId() = i;
req.fqns() = table_names_;

req.tableIndices() =
torch::distributed::wireDumpTensor(table_indices_masked);
req.rowIndices() =
torch::distributed::wireDumpTensor(global_indices_masked);
req.weights() = torch::distributed::wireDumpTensor(weights_masked);
co_await ps_client_->co_setEmbeddings(req);
}
co_return;
}

void EmbeddingKVDB::update_cache_and_storage(
Expand Down Expand Up @@ -284,9 +429,9 @@ void EmbeddingKVDB::set(
return;
}

// defer the L2 cache/rocksdb update to the background thread as it could be
// parallelized with other cuda kernels, as long as all updates are finished
// before the next L2 cache lookup
// defer the L2 cache/rocksdb update to the background thread as it could
// be parallelized with other cuda kernels, as long as all updates are
// finished before the next L2 cache lookup
kv_db::RocksdbWriteMode write_mode = is_bwd
? kv_db::RocksdbWriteMode::BWD_L1_CNFLCT_MISS_WRITE_BACK
: kv_db::RocksdbWriteMode::FWD_L1_EVICTION;
Expand Down Expand Up @@ -320,7 +465,8 @@ void EmbeddingKVDB::get(
get_cache_lock_wait_duration_ +=
facebook::WallClockUtil::NowInUsecFast() - start_ts;

// this is for unittest to repro synchronization situation deterministically
// this is for unittest to repro synchronization situation
// deterministically
if (sleep_ms > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
XLOG(INFO) << "get sleep end";
Expand All @@ -343,9 +489,9 @@ void EmbeddingKVDB::get(
get_weights_fillup_total_duration_ +=
facebook::WallClockUtil::NowInUsecFast() - weight_fillup_start_ts;

// defer the L2 cache/rocksdb update to the background thread as it could
// be parallelized with other cuda kernels, as long as all updates are
// finished before the next L2 cache lookup
// defer the L2 cache/rocksdb update to the background thread as it
// could be parallelized with other cuda kernels, as long as all
// updates are finished before the next L2 cache lookup
if (enable_async_update_) {
auto tensor_copy_start_ts = facebook::WallClockUtil::NowInUsecFast();
auto new_item = tensor_copy(
Expand Down Expand Up @@ -421,8 +567,8 @@ std::shared_ptr<CacheContext> EmbeddingKVDB::get_cache(
folly::collect(futures).wait();

// the following metrics added here as the current assumption is
// get_cache will only be called in get_cuda path, if assumption no longer
// true, we should wrap this up on the caller side
// get_cache will only be called in get_cuda path, if assumption no
// longer true, we should wrap this up on the caller side
auto dur = facebook::WallClockUtil::NowInUsecFast() - start_ts;
get_cache_lookup_total_duration_ += dur;
auto cache_misses = cache_context->num_misses.load();
Expand Down Expand Up @@ -467,9 +613,10 @@ EmbeddingKVDB::set_cache(
if (l2_cache_ == nullptr) {
return folly::none;
}
// TODO: consider whether need to reconstruct indices/weights/count and free
// the original tensor since most of the tensor elem will be invalid,
// this will trade some perf for peak DRAM util saving
// TODO: consider whether need to reconstruct indices/weights/count and
// free
// the original tensor since most of the tensor elem will be
// invalid, this will trade some perf for peak DRAM util saving

auto cache_update_start_ts = facebook::WallClockUtil::NowInUsecFast();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <cuda_runtime.h>

#include <folly/coro/Task.h>
#include "aiplatform/gmpp/experimental/training_ps/gen-cpp2/TrainingParameterServerService.h"
#include "fbgemm_gpu/split_embeddings_cache/cachelib_cache.h"
#include "fbgemm_gpu/utils/dispatch_macros.h"

Expand Down Expand Up @@ -136,7 +137,12 @@ class EmbeddingKVDB : public std::enable_shared_from_this<EmbeddingKVDB> {
int64_t cache_size_gb = 0,
int64_t unique_id = 0,
int64_t ele_size_bytes = 2 /*assume by default fp16*/,
bool enable_async_update = false);
bool enable_async_update = false,
bool enable_raw_embedding_streamnig = false,
int64_t ps_server_port = 0,
std::vector<std::string> table_names = {},
std::vector<int64_t> table_offsets = {},
const std::vector<int64_t>& table_sizes = {});

virtual ~EmbeddingKVDB();

Expand Down Expand Up @@ -292,6 +298,10 @@ class EmbeddingKVDB : public std::enable_shared_from_this<EmbeddingKVDB> {
return max_D_;
}

folly::coro::Task<void> tensor_stream(
const at::Tensor& indices,
const at::Tensor& weights);

private:
/// Find non-negative embedding indices in <indices> and shard them into
/// #cachelib_pools pieces to be lookedup in parallel
Expand Down Expand Up @@ -418,6 +428,18 @@ class EmbeddingKVDB : public std::enable_shared_from_this<EmbeddingKVDB> {

// -- commone path
std::atomic<int64_t> total_cache_update_duration_{0};

// -- raw embedding streaming
bool enable_raw_embedding_streaming_;
std::vector<std::string> table_names_;
std::vector<int64_t> table_offsets_;
at::Tensor table_sizes_;
std::unique_ptr<
apache::thrift::Client<aiplatform::gmpp::experimental::training_ps::
TrainingParameterServerService>>
ps_client_;
std::unique_ptr<std::thread> weights_stream_thread_;
folly::USPSCQueue<QueueItem, true> weights_to_stream_queue_;
}; // class EmbeddingKVDB

} // namespace kv_db
Loading
Loading