Skip to content

Commit

Permalink
[Runtime] Support independent inter thread pool for each session in S…
Browse files Browse the repository at this point in the history
…essionGroup. (DeepRec-AI#424)
  • Loading branch information
shanshanpt authored Sep 8, 2022
1 parent 9794bad commit ebcc7ee
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 46 deletions.
11 changes: 7 additions & 4 deletions tensorflow/core/common_runtime/device_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ limitations under the License.
#include <vector>

#include "tensorflow/core/common_runtime/device.h"
#include "tensorflow/core/common_runtime/local_device.h"
#include "tensorflow/core/lib/core/errors.h"
#include "tensorflow/core/lib/strings/strcat.h"
#include "tensorflow/core/platform/logging.h"
Expand Down Expand Up @@ -119,13 +120,15 @@ Status DeviceFactory::ListAllPhysicalDevices(std::vector<string>* devices) {
Status DeviceFactory::AddDevices(
const SessionOptions& options, const string& name_prefix,
std::vector<std::unique_ptr<Device>>* devices) {
return AddDevices(options, name_prefix, devices, nullptr);
return AddDevices(options, name_prefix, devices, nullptr,
DeviceGlobalThreadPoolOptions());
}

Status DeviceFactory::AddDevices(
const SessionOptions& options, const string& name_prefix,
std::vector<std::unique_ptr<Device>>* devices,
const DeviceResourceMgrMap* dev_rmgr_map) {
const DeviceResourceMgrMap* dev_rmgr_map,
const DeviceGlobalThreadPoolOptions& opt) {
// CPU first. A CPU device is required.
auto cpu_factory = GetFactory("CPU");
if (!cpu_factory) {
Expand All @@ -134,7 +137,7 @@ Status DeviceFactory::AddDevices(
}
size_t init_size = devices->size();
TF_RETURN_IF_ERROR(cpu_factory->CreateDevices(
options, name_prefix, devices, dev_rmgr_map));
options, name_prefix, devices, dev_rmgr_map, opt));
if (devices->size() == init_size) {
return errors::NotFound("No CPU devices are available in this process");
}
Expand All @@ -147,7 +150,7 @@ Status DeviceFactory::AddDevices(
// Now only support "CPU" and "GPU" device
if (p.first == "GPU") {
TF_RETURN_IF_ERROR(factory->CreateDevices(
options, name_prefix, devices, dev_rmgr_map));
options, name_prefix, devices, dev_rmgr_map, opt));
} else {
TF_RETURN_IF_ERROR(factory->CreateDevices(
options, name_prefix, devices));
Expand Down
7 changes: 5 additions & 2 deletions tensorflow/core/common_runtime/device_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace tensorflow {
class Device;
struct SessionOptions;
struct DeviceResourceMgrMap;
struct DeviceGlobalThreadPoolOptions;

class DeviceFactory {
public:
Expand All @@ -46,7 +47,8 @@ class DeviceFactory {
static Status AddDevices(const SessionOptions& options,
const string& name_prefix,
std::vector<std::unique_ptr<Device>>* devices,
const DeviceResourceMgrMap* dev_rmgr_map);
const DeviceResourceMgrMap* dev_rmgr_map,
const DeviceGlobalThreadPoolOptions& opt);

// Helper for tests. Create a single device of type "type". The
// returned device is always numbered zero, so if creating multiple
Expand All @@ -72,7 +74,8 @@ class DeviceFactory {
virtual Status CreateDevices(
const SessionOptions& options, const string& name_prefix,
std::vector<std::unique_ptr<Device>>* devices,
const DeviceResourceMgrMap* dev_rmgr_map) {
const DeviceResourceMgrMap* dev_rmgr_map,
const DeviceGlobalThreadPoolOptions& opt) {
return CreateDevices(options, name_prefix, devices);
}

Expand Down
10 changes: 8 additions & 2 deletions tensorflow/core/common_runtime/direct_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ limitations under the License.
#include "tensorflow/core/common_runtime/executor_factory.h"
#include "tensorflow/core/common_runtime/function.h"
#include "tensorflow/core/common_runtime/graph_optimizer.h"
#include "tensorflow/core/common_runtime/local_device.h"
#include "tensorflow/core/common_runtime/memory_types.h"
#include "tensorflow/core/common_runtime/memory_planner.h"
#include "tensorflow/core/common_runtime/gpu_memory_planner.h"
Expand Down Expand Up @@ -319,10 +320,13 @@ class DirectSessionFactory : public SessionFactory {
}
#endif // GOOGLE_CUDA

DeviceGlobalThreadPoolOptions dev_global_tp_opt;
dev_global_tp_opt.global_threadpool_num = session_num;
dev_global_tp_opt.device_threadpool_index = 0;
std::vector<std::unique_ptr<Device>> devices;
TF_RETURN_IF_ERROR(DeviceFactory::AddDevices(
options, "/job:localhost/replica:0/task:0",
&devices, &dev_rmgr_map));
&devices, &dev_rmgr_map, dev_global_tp_opt));

#if GOOGLE_CUDA
if (use_multi_stream) {
Expand Down Expand Up @@ -351,9 +355,11 @@ class DirectSessionFactory : public SessionFactory {
#endif // TENSORFLOW_USE_NUMA
session_group->CreateLeaderSession(leader_session);
for (int i = 1; i < session_num; ++i) {
dev_global_tp_opt.device_threadpool_index = i;
std::vector<std::unique_ptr<Device>> dev;
TF_RETURN_IF_ERROR(DeviceFactory::AddDevices(
options, "/job:localhost/replica:0/task:0", &dev, &dev_rmgr_map));
options, "/job:localhost/replica:0/task:0", &dev,
&dev_rmgr_map, dev_global_tp_opt));
DeviceMgr* dev_mgr = nullptr;
#if GOOGLE_CUDA
if (use_multi_stream) {
Expand Down
22 changes: 14 additions & 8 deletions tensorflow/core/common_runtime/gpu/gpu_device.cc
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,13 @@ BaseGPUDevice::BaseGPUDevice(const SessionOptions& options, const string& name,
const string& physical_device_desc,
Allocator* gpu_allocator, Allocator* cpu_allocator,
bool sync_every_op, int32 max_streams,
const DeviceResourceMgrMap* dev_rmgr_map)
const DeviceResourceMgrMap* dev_rmgr_map,
const DeviceGlobalThreadPoolOptions& opt)
: LocalDevice(options, Device::BuildDeviceAttributes(name, physical_name,
DEVICE_GPU,
memory_limit, locality,
physical_device_desc),
dev_rmgr_map),
dev_rmgr_map, opt),
gpu_allocator_(gpu_allocator),
cpu_allocator_(cpu_allocator),
scoped_allocator_mgr_(new ScopedAllocatorMgr(name)),
Expand Down Expand Up @@ -1037,13 +1038,15 @@ Status BaseGPUDeviceFactory::ListPhysicalDevices(std::vector<string>* devices) {
Status BaseGPUDeviceFactory::CreateDevices(
const SessionOptions& options, const string& name_prefix,
std::vector<std::unique_ptr<Device>>* devices) {
return CreateDevices(options, name_prefix, devices, nullptr);
return CreateDevices(options, name_prefix, devices,
nullptr, DeviceGlobalThreadPoolOptions());
}

Status BaseGPUDeviceFactory::CreateDevices(
const SessionOptions& options, const string& name_prefix,
std::vector<std::unique_ptr<Device>>* devices,
const DeviceResourceMgrMap* dev_rmgr_map) {
const DeviceResourceMgrMap* dev_rmgr_map,
const DeviceGlobalThreadPoolOptions& opt) {
TF_RETURN_IF_ERROR(ValidateGPUMachineManager());
se::Platform* gpu_manager = GPUMachineManager();
if (gpu_manager == nullptr) {
Expand Down Expand Up @@ -1267,7 +1270,8 @@ Status BaseGPUDeviceFactory::CreateDevices(
tf_gpu_id.value());
}
TF_RETURN_IF_ERROR(CreateGPUDevice(options, name_prefix, tf_gpu_id,
bytes, it->second, devices, dev_rmgr_map));
bytes, it->second, devices,
dev_rmgr_map, opt));
}
return Status::OK();
}
Expand Down Expand Up @@ -1299,15 +1303,17 @@ Status BaseGPUDeviceFactory::CreateGPUDevice(
int64 memory_limit, const DeviceLocality& dev_locality,
std::vector<std::unique_ptr<Device>>* devices) {
return CreateGPUDevice(options, name_prefix, tf_gpu_id,
memory_limit, dev_locality, devices, nullptr);
memory_limit, dev_locality, devices,
nullptr, DeviceGlobalThreadPoolOptions());
}

Status BaseGPUDeviceFactory::CreateGPUDevice(
const SessionOptions& options, const string& name_prefix,
TfGpuId tf_gpu_id, int64 memory_limit,
const DeviceLocality& dev_locality,
std::vector<std::unique_ptr<Device>>* devices,
const DeviceResourceMgrMap* dev_rmgr_map) {
const DeviceResourceMgrMap* dev_rmgr_map,
const DeviceGlobalThreadPoolOptions& opt) {
CHECK_GE(tf_gpu_id.value(), 0);
const string device_name =
strings::StrCat(name_prefix, "/device:GPU:", tf_gpu_id.value());
Expand Down Expand Up @@ -1353,7 +1359,7 @@ Status BaseGPUDeviceFactory::CreateGPUDevice(
options, device_name, physical_name, static_cast<Bytes>(bytes_limit),
dev_locality, tf_gpu_id, GetShortDeviceDescription(platform_gpu_id, *desc),
gpu_allocator, ProcessState::singleton()->GetCPUAllocator(numa_node),
dev_rmgr_map);
dev_rmgr_map, opt);
gpu_device = std::move(tmp);
} else {
std::unique_ptr<BaseGPUDevice> tmp = CreateGPUDevice(
Expand Down
12 changes: 8 additions & 4 deletions tensorflow/core/common_runtime/gpu/gpu_device.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class BaseGPUDevice : public LocalDevice {
const string& physical_device_desc,
Allocator* gpu_allocator, Allocator* cpu_allocator,
bool sync_every_op, int32 max_streams,
const DeviceResourceMgrMap* dev_rmgr_map);
const DeviceResourceMgrMap* dev_rmgr_map,
const DeviceGlobalThreadPoolOptions& opt);

~BaseGPUDevice() override;

Expand Down Expand Up @@ -330,7 +331,8 @@ class BaseGPUDeviceFactory : public DeviceFactory {
std::vector<std::unique_ptr<Device>>* devices) override;
Status CreateDevices(const SessionOptions& options, const string& name_prefix,
std::vector<std::unique_ptr<Device>>* devices,
const DeviceResourceMgrMap* dev_rmgr_map) override;
const DeviceResourceMgrMap* dev_rmgr_map,
const DeviceGlobalThreadPoolOptions& opt) override;

struct InterconnectMap {
// Name of interconnect technology, if known.
Expand Down Expand Up @@ -377,7 +379,8 @@ class BaseGPUDeviceFactory : public DeviceFactory {
const string& name_prefix, TfGpuId tf_gpu_id,
int64 memory_limit, const DeviceLocality& dev_locality,
std::vector<std::unique_ptr<Device>>* devices,
const DeviceResourceMgrMap* dev_rmgr_map);
const DeviceResourceMgrMap* dev_rmgr_map,
const DeviceGlobalThreadPoolOptions& opt);

virtual std::unique_ptr<BaseGPUDevice> CreateGPUDevice(
const SessionOptions& options, const string& name, Bytes memory_limit,
Expand All @@ -389,7 +392,8 @@ class BaseGPUDeviceFactory : public DeviceFactory {
const SessionOptions& options, const string& name, const string& physical_name,
Bytes memory_limit, const DeviceLocality& locality, TfGpuId tf_gpu_id,
const string& physical_device_desc, Allocator* gpu_allocator,
Allocator* cpu_allocator, const DeviceResourceMgrMap* dev_rmgr_map) = 0;
Allocator* cpu_allocator, const DeviceResourceMgrMap* dev_rmgr_map,
const DeviceGlobalThreadPoolOptions& opt) = 0;

Status EnablePeerAccess(const std::vector<PlatformGpuId>& visible_gpu_order);

Expand Down
23 changes: 14 additions & 9 deletions tensorflow/core/common_runtime/gpu/gpu_device_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ class GPUDevice : public BaseGPUDevice {
const string& physical_name, Bytes memory_limit,
const DeviceLocality& locality, TfGpuId tf_gpu_id,
const string& physical_device_desc, Allocator* gpu_allocator,
Allocator* cpu_allocator, const DeviceResourceMgrMap* dev_rmgr_map)
Allocator* cpu_allocator, const DeviceResourceMgrMap* dev_rmgr_map,
const DeviceGlobalThreadPoolOptions& opt)
: BaseGPUDevice(options, name, physical_name, memory_limit, locality,
tf_gpu_id, physical_device_desc, gpu_allocator, cpu_allocator,
false /* sync every op */, 1 /* max_streams */, dev_rmgr_map) {
false /* sync every op */, 1 /* max_streams */, dev_rmgr_map, opt) {
if (options.config.has_gpu_options()) {
force_gpu_compatible_ =
options.config.gpu_options().force_gpu_compatible();
Expand Down Expand Up @@ -91,10 +92,11 @@ class GPUDeviceFactory : public BaseGPUDeviceFactory {
const SessionOptions& options, const string& name, const string& physical_name,
Bytes memory_limit, const DeviceLocality& locality, TfGpuId tf_gpu_id,
const string& physical_device_desc, Allocator* gpu_allocator,
Allocator* cpu_allocator, const DeviceResourceMgrMap* dev_rmgr_map) override {
Allocator* cpu_allocator, const DeviceResourceMgrMap* dev_rmgr_map,
const DeviceGlobalThreadPoolOptions& opt) override {
return absl::make_unique<GPUDevice>(options, name, physical_name, memory_limit,
locality, tf_gpu_id, physical_device_desc,
gpu_allocator, cpu_allocator, dev_rmgr_map);
gpu_allocator, cpu_allocator, dev_rmgr_map, opt);
}
};

Expand All @@ -119,9 +121,10 @@ class GPUCompatibleCPUDevice : public ThreadPoolDevice {
GPUCompatibleCPUDevice(const SessionOptions& options, const string& name,
Bytes memory_limit, const DeviceLocality& locality,
Allocator* allocator,
const DeviceResourceMgrMap* dev_rmgr_map)
const DeviceResourceMgrMap* dev_rmgr_map,
const DeviceGlobalThreadPoolOptions& opt)
: ThreadPoolDevice(options, name, memory_limit,
locality, allocator, dev_rmgr_map),
locality, allocator, dev_rmgr_map, opt),
numa_node_(locality.numa_node()) {
if (options.config.has_gpu_options()) {
force_gpu_compatible_ =
Expand Down Expand Up @@ -157,12 +160,14 @@ class GPUCompatibleCPUDeviceFactory : public DeviceFactory {

Status CreateDevices(const SessionOptions& options, const string& name_prefix,
std::vector<std::unique_ptr<Device>>* devices) override {
return CreateDevices(options, name_prefix, devices, nullptr);
return CreateDevices(options, name_prefix, devices, nullptr,
DeviceGlobalThreadPoolOptions());
}

Status CreateDevices(const SessionOptions& options, const string& name_prefix,
std::vector<std::unique_ptr<Device>>* devices,
const DeviceResourceMgrMap* dev_rmgr_map) override {
const DeviceResourceMgrMap* dev_rmgr_map,
const DeviceGlobalThreadPoolOptions& opt) override {
int n = 1;
auto iter = options.config.device_count().find("CPU");
if (iter != options.config.device_count().end()) {
Expand All @@ -179,7 +184,7 @@ class GPUCompatibleCPUDeviceFactory : public DeviceFactory {
devices->push_back(absl::make_unique<GPUCompatibleCPUDevice>(
options, name, Bytes(256 << 20), DeviceLocality(),
ProcessState::singleton()->GetCPUAllocator(numa_node),
dev_rmgr_map));
dev_rmgr_map, opt));
}

return Status::OK();
Expand Down
30 changes: 22 additions & 8 deletions tensorflow/core/common_runtime/local_device.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,20 @@ struct LocalDevice::EigenThreadPoolInfo {
LocalDevice::LocalDevice(const SessionOptions& options,
const DeviceAttributes& attributes)
: Device(options.env, attributes), owned_tp_info_(nullptr) {
Init(options, attributes);
Init(options, attributes, DeviceGlobalThreadPoolOptions());
}

LocalDevice::LocalDevice(const SessionOptions& options,
const DeviceAttributes& attributes,
const DeviceResourceMgrMap* dev_rmgr_map)
const DeviceResourceMgrMap* dev_rmgr_map,
const DeviceGlobalThreadPoolOptions& opt)
: Device(options.env, attributes, dev_rmgr_map), owned_tp_info_(nullptr) {
Init(options, attributes);
Init(options, attributes, opt);
}

void LocalDevice::Init(const SessionOptions& options,
const DeviceAttributes& attributes) {
const DeviceAttributes& attributes,
const DeviceGlobalThreadPoolOptions& opt) {
// Log info messages if TensorFlow is not compiled with instructions that
// could speed up performance and are available on the current CPU.
port::InfoAboutUnusedCPUFeatures();
Expand All @@ -152,11 +154,23 @@ void LocalDevice::Init(const SessionOptions& options,
}
tp_info = global_tp_info_[numa_node];
} else {
if (global_tp_info_.empty()) {
global_tp_info_.push_back(new LocalDevice::EigenThreadPoolInfo(
options, port::kNUMANoAffinity, nullptr));
if (opt.global_threadpool_num > 1) {
for (int i = 0; i < opt.global_threadpool_num; ++i) {
global_tp_info_.push_back(nullptr);
}
if (!global_tp_info_[opt.device_threadpool_index]) {
global_tp_info_[opt.device_threadpool_index] =
new LocalDevice::EigenThreadPoolInfo(
options, port::kNUMANoAffinity, nullptr);
}
tp_info = global_tp_info_[opt.device_threadpool_index];
} else {
if (global_tp_info_.empty()) {
global_tp_info_.push_back(new LocalDevice::EigenThreadPoolInfo(
options, port::kNUMANoAffinity, nullptr));
}
tp_info = global_tp_info_[0];
}
tp_info = global_tp_info_[0];
}
} else {
// Each LocalDevice owns a separate ThreadPoolDevice for numerical
Expand Down
13 changes: 11 additions & 2 deletions tensorflow/core/common_runtime/local_device.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ class Benchmark;
struct SessionOptions;
struct DeviceResourceMgrMap;

struct DeviceGlobalThreadPoolOptions {
// Default we create one global threadpool
int global_threadpool_num = 1;
// Default all devices use global_threadpool[0]
int device_threadpool_index = 0;
};

// This class is shared by ThreadPoolDevice and GPUDevice and
// initializes a shared Eigen compute device used by both. This
// should eventually be removed once we refactor ThreadPoolDevice and
Expand All @@ -38,12 +45,14 @@ class LocalDevice : public Device {
const DeviceAttributes& attributes);
LocalDevice(const SessionOptions& options,
const DeviceAttributes& attributes,
const DeviceResourceMgrMap* dev_rmgr_map);
const DeviceResourceMgrMap* dev_rmgr_map,
const DeviceGlobalThreadPoolOptions& opt);
~LocalDevice() override;

private:
void Init(const SessionOptions& options,
const DeviceAttributes& attributes);
const DeviceAttributes& attributes,
const DeviceGlobalThreadPoolOptions& opt);

static bool use_global_threadpool_;

Expand Down
5 changes: 3 additions & 2 deletions tensorflow/core/common_runtime/threadpool_device.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ ThreadPoolDevice::ThreadPoolDevice(const SessionOptions& options,
const string& name, Bytes memory_limit,
const DeviceLocality& locality,
Allocator* allocator,
const DeviceResourceMgrMap* dev_rmgr_map)
const DeviceResourceMgrMap* dev_rmgr_map,
const DeviceGlobalThreadPoolOptions& opt)
: LocalDevice(options, Device::BuildDeviceAttributes(
name, DEVICE_CPU, memory_limit, locality),
dev_rmgr_map),
dev_rmgr_map, opt),
allocator_(allocator),
scoped_allocator_mgr_(new ScopedAllocatorMgr(name)) {
Init();
Expand Down
3 changes: 2 additions & 1 deletion tensorflow/core/common_runtime/threadpool_device.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class ThreadPoolDevice : public LocalDevice {
Allocator* allocator);
ThreadPoolDevice(const SessionOptions& options, const string& name,
Bytes memory_limit, const DeviceLocality& locality,
Allocator* allocator, const DeviceResourceMgrMap* dev_rmgr_map);
Allocator* allocator, const DeviceResourceMgrMap* dev_rmgr_map,
const DeviceGlobalThreadPoolOptions& opt);
~ThreadPoolDevice() override;

Allocator* GetAllocator(AllocatorAttributes attr) override;
Expand Down
Loading

0 comments on commit ebcc7ee

Please sign in to comment.