Skip to content

Adding more unit tests for ChannelHolder class #8668

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
338 changes: 338 additions & 0 deletions paddle/fluid/framework/channel_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -542,3 +542,341 @@ TEST(ChannelHolder, ChannelHolderUnBufferedSendReceiveTest) {
ChannelHolderSendReceive(ch);
delete ch;
}

TEST(ChannelHolder, ChannelUninitializedTest) {
ChannelHolder *ch = new ChannelHolder();
EXPECT_EQ(ch->IsInitialized(), false);
int i = 10;
EXPECT_EQ(ch->Send(&i), false);
EXPECT_EQ(ch->Receive(&i), false);
bool is_exception = false;
try {
ch->Type();
} catch (paddle::platform::EnforceNotMet e) {
is_exception = true;
}
EXPECT_EQ(is_exception, true);
delete ch;
}

TEST(ChannelHolder, ChannelInitializedTest) {
ChannelHolder *ch = new ChannelHolder();
ch->Reset<int>(2);
EXPECT_EQ(ch->IsInitialized(), true);
// Channel should remain intialized even after close
ch->close();
EXPECT_EQ(ch->IsInitialized(), true);
delete ch;
}

TEST(ChannelHolder, TypeMismatchSendTest) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking type matching or not isn't the function of Channel, is it necessary to add the unit test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Channel enforces the datatype in send (https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/fluid/framework/channel.h#L74) and receive (https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/fluid/framework/channel.h#L81)

The channel makes sure that the variable being sent or received is same as the type of the channel. The unit test is to make sure the paddle enforce is executed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks!

// Test with unbuffered channel
ChannelHolder *ch = new ChannelHolder();
ch->Reset<int>(0);
bool is_exception = false;
bool boolean_data = true;
try {
ch->Send(&boolean_data);
} catch (paddle::platform::EnforceNotMet e) {
is_exception = true;
}
EXPECT_EQ(is_exception, true);
delete ch;

// Test with Buffered Channel
ch = new ChannelHolder();
ch->Reset<float>(10);
is_exception = false;
int int_data = 23;
try {
ch->Send(&int_data);
} catch (paddle::platform::EnforceNotMet e) {
is_exception = true;
}
EXPECT_EQ(is_exception, true);
delete ch;
}

TEST(ChannelHolder, TypeMismatchReceiveTest) {
// Test with unbuffered channel
ChannelHolder *ch = new ChannelHolder();
ch->Reset<int>(0);
bool is_exception = false;
bool float_data;
try {
ch->Receive(&float_data);
} catch (paddle::platform::EnforceNotMet e) {
is_exception = true;
}
EXPECT_EQ(is_exception, true);
delete ch;

// Test with Buffered Channel
ch = new ChannelHolder();
ch->Reset<float>(10);
is_exception = false;
int int_data = 23;
try {
ch->Receive(&int_data);
} catch (paddle::platform::EnforceNotMet e) {
is_exception = true;
}
EXPECT_EQ(is_exception, true);
delete ch;
}

void ChannelHolderCloseUnblocksReceiversTest(ChannelHolder *ch) {
size_t num_threads = 5;
std::thread t[num_threads];
bool thread_ended[num_threads];

// Launches threads that try to read and are blocked because of no writers
for (size_t i = 0; i < num_threads; i++) {
thread_ended[i] = false;
t[i] = std::thread(
[&](bool *p) {
int data;
EXPECT_EQ(ch->Receive(&data), false);
*p = true;
},
&thread_ended[i]);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait 0.1 sec

// Verify that all the threads are blocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], false);
}

// Explicitly close the channel
// This should unblock all receivers
ch->close();

std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait 0.1 sec

// Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], true);
}

for (size_t i = 0; i < num_threads; i++) t[i].join();
}

void ChannelHolderCloseUnblocksSendersTest(ChannelHolder *ch, bool isBuffered) {
using paddle::framework::details::Buffered;
using paddle::framework::details::UnBuffered;

size_t num_threads = 5;
std::thread t[num_threads];
bool thread_ended[num_threads];
bool send_success[num_threads];

// Launches threads that try to write and are blocked because of no readers
for (size_t i = 0; i < num_threads; i++) {
thread_ended[i] = false;
send_success[i] = false;
t[i] = std::thread(
[&](bool *ended, bool *success) {
int data = 10;
*success = ch->Send(&data);
*ended = true;
},
&thread_ended[i], &send_success[i]);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait

if (isBuffered) {
// If ch is Buffered, atleast 4 threads must be blocked.
int ct = 0;
for (size_t i = 0; i < num_threads; i++) {
if (!thread_ended[i]) ct++;
}
EXPECT_GE(ct, 4);
} else {
// If ch is UnBuffered, all the threads should be blocked.
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], false);
}
}
// Explicitly close the thread
// This should unblock all senders
ch->close();

std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait

// Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], true);
}

if (isBuffered) {
// Verify that only 1 send was successful
int ct = 0;
for (size_t i = 0; i < num_threads; i++) {
if (send_success[i]) ct++;
}
// Only 1 send must be successful
EXPECT_EQ(ct, 1);
}

for (size_t i = 0; i < num_threads; i++) t[i].join();
}

// This tests that closing a channelholder unblocks
// any receivers waiting on the channel
TEST(ChannelHolder, ChannelHolderCloseUnblocksReceiversTest) {
// Check for buffered channel
ChannelHolder *ch = new ChannelHolder();
ch->Reset<int>(1);
ChannelHolderCloseUnblocksReceiversTest(ch);
delete ch;

// Check for unbuffered channel
ch = new ChannelHolder();
ch->Reset<int>(0);
ChannelHolderCloseUnblocksReceiversTest(ch);
delete ch;
}

// This tests that closing a channelholder unblocks
// any senders waiting for channel to have write space
TEST(Channel, ChannelHolderCloseUnblocksSendersTest) {
// Check for buffered channel
ChannelHolder *ch = new ChannelHolder();
ch->Reset<int>(1);
ChannelHolderCloseUnblocksSendersTest(ch, true);
delete ch;

// Check for unbuffered channel
ch = new ChannelHolder();
ch->Reset<int>(0);
ChannelHolderCloseUnblocksSendersTest(ch, false);
delete ch;
}

// This tests that destroying a channelholder unblocks
// any senders waiting for channel
void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) {
size_t num_threads = 5;
std::thread t[num_threads];
bool thread_ended[num_threads];
bool send_success[num_threads];

// Launches threads that try to write and are blocked because of no readers
for (size_t i = 0; i < num_threads; i++) {
thread_ended[i] = false;
send_success[i] = false;
t[i] = std::thread(
[&](bool *ended, bool *success) {
int data = 10;
*success = ch->Send(&data);
*ended = true;
},
&thread_ended[i], &send_success[i]);
}

std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec
if (isBuffered) {
// If channel is buffered, verify that atleast 4 threads are blocked
int ct = 0;
for (size_t i = 0; i < num_threads; i++) {
if (thread_ended[i] == false) ct++;
}
// Atleast 4 threads must be blocked
EXPECT_GE(ct, 4);
} else {
// Verify that all the threads are blocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], false);
}
}
// Explicitly destroy the channel
delete ch;
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait

// Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], true);
}

// Count number of successfuld sends
int ct = 0;
for (size_t i = 0; i < num_threads; i++) {
if (send_success[i]) ct++;
}

if (isBuffered) {
// Only 1 send must be successful
EXPECT_EQ(ct, 1);
} else {
// In unbuffered channel, no send should be successful
EXPECT_EQ(ct, 0);
}

// Join all threads
for (size_t i = 0; i < num_threads; i++) t[i].join();
}

// This tests that destroying a channelholder also unblocks
// any receivers waiting on the channel
void ChannelHolderDestroyUnblockReceivers(ChannelHolder *ch) {
size_t num_threads = 5;
std::thread t[num_threads];
bool thread_ended[num_threads];

// Launches threads that try to read and are blocked because of no writers
for (size_t i = 0; i < num_threads; i++) {
thread_ended[i] = false;
t[i] = std::thread(
[&](bool *p) {
int data;
// All reads should return false
EXPECT_EQ(ch->Receive(&data), false);
*p = true;
},
&thread_ended[i]);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait

// Verify that all threads are blocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], false);
}
// delete the channel
delete ch;
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait
// Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], true);
}

for (size_t i = 0; i < num_threads; i++) t[i].join();
}

TEST(ChannelHolder, ChannelHolderDestroyUnblocksReceiversTest) {
// Check for Buffered Channel
ChannelHolder *ch = new ChannelHolder();
ch->Reset<int>(1);
ChannelHolderDestroyUnblockReceivers(ch);
// ch is already deleted already deleted in
// ChannelHolderDestroyUnblockReceivers

// Check for Unbuffered channel
ch = new ChannelHolder();
ch->Reset<int>(0);
ChannelHolderDestroyUnblockReceivers(ch);
}

TEST(ChannelHolder, ChannelHolderDestroyUnblocksSendersTest) {
// Check for Buffered Channel
ChannelHolder *ch = new ChannelHolder();
ch->Reset<int>(1);
ChannelHolderDestroyUnblockSenders(ch, true);
// ch is already deleted already deleted in
// ChannelHolderDestroyUnblockReceivers

// Check for Unbuffered channel
ch = new ChannelHolder();
ch->Reset<int>(0);
ChannelHolderDestroyUnblockSenders(ch, false);
}