Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions buildlib/tools/coverity.sh
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ run_coverity() {
cov-build --dir $cov_build $MAKEP all
if [ "${ucx_build_type}" == "devel" ]; then
cov-manage-emit --dir $cov_build --tu-pattern "file('.*/test/gtest/common/googletest/*')" delete || :
# Needed to suppress false positives in std::function
COV_OPT="$COV_OPT --checker-option UNINIT_CTOR:ctor_func:swap"
fi
cov-analyze --jobs $parallel_jobs $COV_OPT --disable PARSE_ERROR --security --concurrency --dir $cov_build
nerrors=$(cov-format-errors --dir $cov_build | awk '/Processing [0-9]+ errors?/ { print $2 }')
Expand Down
189 changes: 171 additions & 18 deletions test/gtest/ucp/test_ucp_proto_mock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ extern "C" {
#include <uct/base/uct_iface.h>
#include <ucp/proto/proto_debug.h>
#include <ucp/proto/proto_select.inl>

#if HAVE_IB
#include <uct/ib/base/ib_md.h>
#endif
}

class mock_iface {
public:
/* Can't use std::function due to coverity errors */
using iface_attr_func_t = void (*)(uct_iface_attr&);
using perf_attr_func_t = void (*)(uct_perf_attr_t&);
using iface_attr_func_t = std::function<void(uct_iface_attr&)>;
using perf_attr_func_t = std::function<void(uct_perf_attr_t&)>;

mock_iface() : m_tl(nullptr)
{
Expand All @@ -37,13 +40,12 @@ class mock_iface {
m_mock.cleanup();
}

void add_mock_iface(
const std::string &dev_name = "mock",
iface_attr_func_t cb = [](uct_iface_attr_t &iface_attr) {},
perf_attr_func_t perf_cb = cx7_perf_mock)
void add_mock_iface(const std::string &dev_name = "mock",
iface_attr_func_t cb = [](uct_iface_attr_t &iface_attr) {},
perf_attr_func_t perf_cb = default_perf_mock)
{
m_iface_attrs_funcs[dev_name] = cb;
m_perf_attrs_funcs[dev_name] = perf_cb;
m_iface_attrs_funcs[dev_name] = std::move(cb);
m_perf_attrs_funcs[dev_name] = std::move(perf_cb);
}

void mock_transport(const std::string &tl_name)
Expand All @@ -68,6 +70,17 @@ class mock_iface {
FAIL() << "Transport " << tl_name << " not found";
}

#if HAVE_IB
void ib_event(enum ibv_event_type event_type, uint8_t port_num)
{
uct_ib_async_event_t event = {};
event.event_type = event_type;
event.port_num = port_num;
uct_ib_md_t *md = reinterpret_cast<uct_ib_md_t *>(m_self->m_real_md);
uct_ib_handle_async_event(&md->dev, &event);
}
#endif

private:
static ucs_status_t
query_devices_mock(uct_md_h md, uct_tl_device_resource_t **tl_devices_p,
Expand All @@ -83,6 +96,7 @@ class mock_iface {
const char *first_dev_name = (*tl_devices_p)[0].name;
if (m_self->m_real_dev_name.empty()) {
m_self->m_real_dev_name = first_dev_name;
m_self->m_real_md = md;
} else if (m_self->m_real_dev_name != first_dev_name) {
*num_tl_devices_p = 0;
return UCS_OK;
Expand Down Expand Up @@ -162,10 +176,9 @@ class mock_iface {
return UCS_OK;
}

static void cx7_perf_mock(uct_perf_attr_t& perf_attr)
static void default_perf_mock(uct_perf_attr_t& perf_attr)
{
perf_attr.path_bandwidth.shared = 0.95 * perf_attr.bandwidth.shared;
perf_attr.path_bandwidth.dedicated = 0;
perf_attr.path_bandwidth = perf_attr.bandwidth;
}

/* We have to use singleton to mock C functions */
Expand All @@ -177,6 +190,7 @@ class mock_iface {
std::map<std::string, iface_attr_func_t> m_iface_attrs_funcs;
std::map<std::string, perf_attr_func_t> m_perf_attrs_funcs;
std::string m_real_dev_name;
uct_md_h m_real_md;
};

mock_iface *mock_iface::m_self = nullptr;
Expand Down Expand Up @@ -553,8 +567,9 @@ class test_ucp_proto_mock : public ucp_test, public mock_iface {
return {rkey, ucp_rkey_destroy};
}

void send_recv_rma_put(size_t size,
ucs_memory_type_t mem_type = UCS_MEMORY_TYPE_HOST)
void send_recv_rma(size_t size, ucp_operation_id_t op_id,
unsigned rkey_cfg_index = 1,
ucs_memory_type_t mem_type = UCS_MEMORY_TYPE_HOST)
{
mem_buffer recv_buf(size, mem_type);
recv_buf.pattern_fill(1);
Expand All @@ -567,10 +582,27 @@ class test_ucp_proto_mock : public ucp_test, public mock_iface {

ucp_request_param_t req_param;
req_param.op_attr_mask = 0;
auto sptr = ucp_put_nbx(sender().ep(), send_buf.ptr(), size,
(uint64_t)recv_buf.ptr(), rkey, &req_param);
ucs_status_ptr_t sptr;
if (op_id == UCP_OP_ID_PUT) {
sptr = ucp_put_nbx(sender().ep(), send_buf.ptr(), size,
(uint64_t)recv_buf.ptr(), rkey, &req_param);
} else if (op_id == UCP_OP_ID_GET) {
sptr = ucp_get_nbx(sender().ep(), send_buf.ptr(), size,
(uint64_t)recv_buf.ptr(), rkey, &req_param);
} else {
FAIL() << "Invalid operation ID: " << op_id;
sptr = nullptr;
}

EXPECT_EQ(UCS_OK, request_wait(sptr));
recv_buf.pattern_check(2);

if (op_id == UCP_OP_ID_PUT) {
recv_buf.pattern_check(2);
} else if (op_id == UCP_OP_ID_GET) {
send_buf.pattern_check(1);
}

EXPECT_EQ(rkey->cfg_index, rkey_cfg_index);
}
};

Expand Down Expand Up @@ -702,7 +734,7 @@ UCS_TEST_P(test_ucp_proto_mock_rcx, rndv_4_paths,
UCS_TEST_P(test_ucp_proto_mock_rcx, rma_put_2_lanes,
"IB_NUM_PATHS?=1", "MAX_RMA_RAILS=2")
{
send_recv_rma_put(64 * UCS_KBYTE);
send_recv_rma(64 * UCS_KBYTE, UCP_OP_ID_PUT);

ucp_proto_select_key_t key = any_key();
key.param.op_id_flags = UCP_OP_ID_PUT;
Expand Down Expand Up @@ -1141,3 +1173,124 @@ UCS_TEST_P(test_ucp_proto_mock_rcx_twins_get, use_single_net_device_rank_2,
}

UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_proto_mock_rcx_twins_get, rcx, "rc_x")


#if HAVE_DECL_IBV_EVENT_PORT_SPEED_CHANGE

class test_ucp_proto_mock_rcx_speed_change : public test_ucp_proto_mock {
public:
test_ucp_proto_mock_rcx_speed_change()
{
mock_transport("rc_mlx5");
}

virtual void init() override
{
m_port_speed["mock_0:1"] = 28e9;
add_mock_iface("mock_0:1", [](uct_iface_attr_t &iface_attr) {
iface_attr.cap.get.min_zcopy = 0;
iface_attr.bandwidth.shared = 28e9;
iface_attr.latency.c = 600e-9;
iface_attr.latency.m = 1e-9;
}, [this](uct_perf_attr_t &perf_attr) {
perf_attr.bandwidth.shared = this->m_port_speed["mock_0:1"];
});

m_port_speed["mock_1:1"] = 24e9;
add_mock_iface("mock_1:1", [](uct_iface_attr_t &iface_attr) {
iface_attr.cap.get.min_zcopy = 0;
iface_attr.bandwidth.shared = 24e9;
iface_attr.latency.c = 500e-9;
iface_attr.latency.m = 1e-9;
}, [this](uct_perf_attr_t &perf_attr) {
perf_attr.bandwidth.shared = this->m_port_speed["mock_1:1"];
});
test_ucp_proto_mock::init();
}

void set_port_speed(const std::string &iface_name, double port_speed)
{
m_port_speed[iface_name] = port_speed;

ib_event(IBV_EVENT_PORT_SPEED_CHANGE, 1);
while (progress());
}

void test_port_speed(std::function<void(unsigned)> send,
ucp_operation_id_t op_id)
{
// One EP & rkey config created during connection establishment
ucp_worker_h worker = sender().worker();
EXPECT_EQ(worker->rkey_config_count, 1);
EXPECT_EQ(worker->ep_config.length, 1);

// New rkey config created during first operation
send(1);
EXPECT_EQ(worker->rkey_config_count, 2);
EXPECT_EQ(worker->ep_config.length, 1);

// Existing rkey config is used during second operation
send(1);
EXPECT_EQ(worker->rkey_config_count, 2);
EXPECT_EQ(worker->ep_config.length, 1);

ucp_proto_select_key_t key = any_key();
key.param.op_id_flags = op_id;
key.param.op_attr = 0;

check_rkey_config(sender(), {
{0, INF, "zero-copy", "47% on rc_mlx5/mock_1:1 and 53% on rc_mlx5/mock_0:1"},
}, key, 1);

// Reduce port_speed of mock_0:1 by 50%, new EP & rkey configs are created
set_port_speed("mock_0:1", 14e9);
send(2);
EXPECT_EQ(worker->rkey_config_count, 3);
EXPECT_EQ(worker->ep_config.length, 2);

check_rkey_config(sender(), {
{0, INF, "zero-copy", "64% on rc_mlx5/mock_1:1 and 36% on rc_mlx5/mock_0:1"},
}, key, 2);

// Reduce port_speed of mock_1:1 to be equal with mock_0:1,
// new EP & rkey configs are created
set_port_speed("mock_1:1", 14e9);
send(3);
EXPECT_EQ(worker->rkey_config_count, 4);
EXPECT_EQ(worker->ep_config.length, 3);

check_rkey_config(sender(), {
{0, INF, "zero-copy", "50% on rc_mlx5/mock_1:1 and 50% on rc_mlx5/mock_0:1"},
}, key, 3);

// Reset port_speeds to initial values, should switch to initial configs
set_port_speed("mock_0:1", 28e9);
set_port_speed("mock_1:1", 24e9);
send(1);
EXPECT_EQ(worker->rkey_config_count, 4);
EXPECT_EQ(worker->ep_config.length, 3);
}

private:
std::map<std::string, double> m_port_speed;
};

UCS_TEST_P(test_ucp_proto_mock_rcx_speed_change, rma_put,
"IB_NUM_PATHS?=1", "MAX_RMA_RAILS=2", "ZCOPY_THRESH=0")
{
test_port_speed([this](unsigned rkey_cfg_index) {
send_recv_rma(64 * UCS_KBYTE, UCP_OP_ID_PUT, rkey_cfg_index);
}, UCP_OP_ID_PUT);
}

UCS_TEST_P(test_ucp_proto_mock_rcx_speed_change, rma_get,
"IB_NUM_PATHS?=1", "MAX_RMA_RAILS=2", "ZCOPY_THRESH=0")
{
test_port_speed([this](unsigned rkey_cfg_index) {
send_recv_rma(64 * UCS_KBYTE, UCP_OP_ID_GET, rkey_cfg_index);
}, UCP_OP_ID_GET);
}

UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_proto_mock_rcx_speed_change, rcx, "rc_x")

#endif // HAVE_DECL_IBV_EVENT_PORT_SPEED_CHANGE
Loading