Skip to content

Commit 6b49401

Browse files
author
chengduo
authored
Merge pull request #8222 from chengduoZH/feature/fix_buffer_unit_test
Refine Channel's unit test
2 parents b5ffe5b + ac5b1bc commit 6b49401

File tree

3 files changed

+62
-96
lines changed

3 files changed

+62
-96
lines changed

paddle/framework/channel_test.cc

+47-96
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) {
176176
sum += i;
177177
}
178178
});
179-
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait 0.5 sec
179+
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait 0.1 sec
180180
EXPECT_EQ(sum, 45U);
181181

182182
CloseChannel(ch);
@@ -194,10 +194,7 @@ TEST(Channel, RecevingOrderEqualToSendingOrderWithBufferedChannel) {
194194
RecevingOrderEqualToSendingOrder(ch);
195195
}
196196

197-
// This tests that closing a buffered channel also unblocks
198-
// any receivers waiting on the channel
199-
TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
200-
auto ch = MakeChannel<int>(1);
197+
void ChannelCloseUnblocksReceiversTest(Channel<int> *ch) {
201198
size_t num_threads = 5;
202199
std::thread t[num_threads];
203200
bool thread_ended[num_threads];
@@ -208,15 +205,14 @@ TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
208205
t[i] = std::thread(
209206
[&](bool *p) {
210207
int data;
211-
// All reads should return false
212208
EXPECT_EQ(ch->Receive(&data), false);
213209
*p = true;
214210
},
215211
&thread_ended[i]);
216212
}
217-
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait
213+
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait 0.1 sec
218214

219-
// Verify that all threads are blocked
215+
// Verify that all the threads are blocked
220216
for (size_t i = 0; i < num_threads; i++) {
221217
EXPECT_EQ(thread_ended[i], false);
222218
}
@@ -225,21 +221,20 @@ TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
225221
// This should unblock all receivers
226222
CloseChannel(ch);
227223

228-
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait
224+
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait 0.1 sec
229225

230226
// Verify that all threads got unblocked
231227
for (size_t i = 0; i < num_threads; i++) {
232228
EXPECT_EQ(thread_ended[i], true);
233229
}
234230

235231
for (size_t i = 0; i < num_threads; i++) t[i].join();
236-
delete ch;
237232
}
238233

239-
// This tests that closing a buffered channel also unblocks
240-
// any senders waiting for channel to have write space
241-
TEST(Channel, BufferedChannelCloseUnblocksSendersTest) {
242-
auto ch = MakeChannel<int>(1);
234+
void ChannelCloseUnblocksSendersTest(Channel<int> *ch) {
235+
using paddle::framework::details::Buffered;
236+
using paddle::framework::details::UnBuffered;
237+
243238
size_t num_threads = 5;
244239
std::thread t[num_threads];
245240
bool thread_ended[num_threads];
@@ -259,116 +254,72 @@ TEST(Channel, BufferedChannelCloseUnblocksSendersTest) {
259254
}
260255
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait
261256

262-
// Verify that atleast 4 threads are blocked
263-
int ct = 0;
264-
for (size_t i = 0; i < num_threads; i++) {
265-
if (thread_ended[i] == false) ct++;
257+
if (dynamic_cast<Buffered<int> *>(ch)) {
258+
// If ch is Buffered, atleast 4 threads must be blocked.
259+
int ct = 0;
260+
for (size_t i = 0; i < num_threads; i++) {
261+
if (!thread_ended[i]) ct++;
262+
}
263+
EXPECT_GE(ct, 4);
264+
} else {
265+
// If ch is UnBuffered, all the threads should be blocked.
266+
for (size_t i = 0; i < num_threads; i++) {
267+
EXPECT_EQ(thread_ended[i], false);
268+
}
266269
}
267-
// Atleast 4 threads must be blocked
268-
EXPECT_GE(ct, 4);
269-
270270
// Explicitly close the thread
271271
// This should unblock all senders
272272
CloseChannel(ch);
273273

274-
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait
274+
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait
275275

276276
// Verify that all threads got unblocked
277277
for (size_t i = 0; i < num_threads; i++) {
278278
EXPECT_EQ(thread_ended[i], true);
279279
}
280280

281-
// Verify that only 1 send was successful
282-
ct = 0;
283-
for (size_t i = 0; i < num_threads; i++) {
284-
if (send_success[i]) ct++;
281+
if (dynamic_cast<Buffered<int> *>(ch)) {
282+
// Verify that only 1 send was successful
283+
int ct = 0;
284+
for (size_t i = 0; i < num_threads; i++) {
285+
if (send_success[i]) ct++;
286+
}
287+
// Only 1 send must be successful
288+
EXPECT_EQ(ct, 1);
285289
}
286-
// Only 1 send must be successful
287-
EXPECT_EQ(ct, 1);
288290

289291
for (size_t i = 0; i < num_threads; i++) t[i].join();
292+
}
293+
294+
// This tests that closing a buffered channel also unblocks
295+
// any receivers waiting on the channel
296+
TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
297+
auto ch = MakeChannel<int>(1);
298+
ChannelCloseUnblocksReceiversTest(ch);
299+
delete ch;
300+
}
301+
302+
// This tests that closing a buffered channel also unblocks
303+
// any senders waiting for channel to have write space
304+
TEST(Channel, BufferedChannelCloseUnblocksSendersTest) {
305+
auto ch = MakeChannel<int>(1);
306+
ChannelCloseUnblocksSendersTest(ch);
290307
delete ch;
291308
}
292309

293310
// This tests that closing an unbuffered channel also unblocks
294311
// unblocks any receivers waiting for senders
295312
TEST(Channel, UnbufferedChannelCloseUnblocksReceiversTest) {
296313
auto ch = MakeChannel<int>(0);
297-
size_t num_threads = 5;
298-
std::thread t[num_threads];
299-
bool thread_ended[num_threads];
300-
301-
// Launches threads that try to read and are blocked becausew of no writers
302-
for (size_t i = 0; i < num_threads; i++) {
303-
thread_ended[i] = false;
304-
t[i] = std::thread(
305-
[&](bool *p) {
306-
int data;
307-
EXPECT_EQ(ch->Receive(&data), false);
308-
*p = true;
309-
},
310-
&thread_ended[i]);
311-
}
312-
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec
313-
314-
// Verify that all the threads are blocked
315-
for (size_t i = 0; i < num_threads; i++) {
316-
EXPECT_EQ(thread_ended[i], false);
317-
}
318-
319-
// Explicitly close the thread
320-
// This should unblock all receivers
321-
CloseChannel(ch);
322-
323-
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec
324-
325-
// Verify that all threads got unblocked
326-
for (size_t i = 0; i < num_threads; i++) {
327-
EXPECT_EQ(thread_ended[i], true);
328-
}
329-
330-
for (size_t i = 0; i < num_threads; i++) t[i].join();
314+
ChannelCloseUnblocksReceiversTest(ch);
331315
delete ch;
332316
}
333317

334318
// This tests that closing an unbuffered channel also unblocks
335319
// unblocks any senders waiting for senders
336320
TEST(Channel, UnbufferedChannelCloseUnblocksSendersTest) {
337321
auto ch = MakeChannel<int>(0);
338-
size_t num_threads = 5;
339-
std::thread t[num_threads];
340-
bool thread_ended[num_threads];
341-
342-
// Launches threads that try to read and are blocked becausew of no writers
343-
for (size_t i = 0; i < num_threads; i++) {
344-
thread_ended[i] = false;
345-
t[i] = std::thread(
346-
[&](bool *p) {
347-
int data = 10;
348-
EXPECT_EQ(ch->Send(&data), false);
349-
*p = true;
350-
},
351-
&thread_ended[i]);
352-
}
353-
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec
354-
355-
// Verify that all the threads are blocked
356-
for (size_t i = 0; i < num_threads; i++) {
357-
EXPECT_EQ(thread_ended[i], false);
358-
}
359-
360-
// Explicitly close the thread
361-
// This should unblock all receivers
362-
CloseChannel(ch);
363-
364-
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec
365-
366-
// Verify that all threads got unblocked
367-
for (size_t i = 0; i < num_threads; i++) {
368-
EXPECT_EQ(thread_ended[i], true);
369-
}
370-
371-
for (size_t i = 0; i < num_threads; i++) t[i].join();
322+
ChannelCloseUnblocksReceiversTest(ch);
372323
delete ch;
373324
}
374325

paddle/framework/details/buffered_channel.h

+8
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,14 @@ namespace paddle {
2525
namespace framework {
2626
namespace details {
2727

28+
// Four of the properties of Buffered Channel:
29+
// - A send to a full channel blocks temporarily until a receive from the
30+
// channel or the channel is closed.
31+
// - A receive from an empty channel blocks temporarily until a send to the
32+
// channel or the channel is closed.
33+
// - A send to a closed channel returns false immediately.
34+
// - A receive from a closed channel returns false immediately.
35+
2836
template <typename T>
2937
class Buffered : public paddle::framework::Channel<T> {
3038
friend Channel<T>* paddle::framework::MakeChannel<T>(size_t);

paddle/framework/details/unbuffered_channel.h

+7
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@ namespace paddle {
2323
namespace framework {
2424
namespace details {
2525

26+
// Four of the properties of UnBuffered Channel:
27+
// - A send to a channel blocks temporarily until a receive from the
28+
// channel or the channel is closed.
29+
// - A receive from a channel blocks temporarily until a send to the
30+
// channel or the channel is closed.
31+
// - A send to a closed channel returns false immediately.
32+
// - A receive from a closed channel returns false immediately.
2633
template <typename T>
2734
class UnBuffered : public paddle::framework::Channel<T> {
2835
friend Channel<T>* paddle::framework::MakeChannel<T>(size_t);

0 commit comments

Comments
 (0)