Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions streaming/src/event_service.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <chrono>
#include <unordered_set>

#include "event_service.h"
Expand All @@ -6,30 +7,30 @@ namespace ray {
namespace streaming {

EventQueue::~EventQueue() {
is_freezed_ = false;
is_active_ = false;
no_full_cv_.notify_all();
no_empty_cv_.notify_all();
};

void EventQueue::Unfreeze() { is_freezed_ = true; }
void EventQueue::Unfreeze() { is_active_ = true; }

void EventQueue::Freeze() {
is_freezed_ = false;
is_active_ = false;
no_empty_cv_.notify_all();
no_full_cv_.notify_all();
}

void EventQueue::Push(const Event &t) {
std::unique_lock<std::mutex> lock(ring_buffer_mutex_);
while (Size() >= capacity_ && is_freezed_) {
while (Size() >= capacity_ && is_active_) {
STREAMING_LOG(WARNING) << " EventQueue is full, its size:" << Size()
<< " capacity:" << capacity_
<< " buffer size:" << buffer_.size()
<< " urgent_buffer size:" << urgent_buffer_.size();
no_full_cv_.wait(lock);
STREAMING_LOG(WARNING) << "Event server is full_sleep be notified";
}
if (!is_freezed_) {
if (!is_active_) {
return;
}
if (t.urgent) {
Expand All @@ -56,12 +57,22 @@ void EventQueue::Pop() {
no_full_cv_.notify_all();
}

void EventQueue::WaitFor(std::unique_lock<std::mutex> &lock) {
// To avoid deadlock when EventQueue is empty but is_active is changed in other
// thread, Event queue should awaken this condtion variable and check it again.
while (is_active_ && Empty()) {
if (!no_empty_cv_.wait_for(lock, std::chrono::milliseconds(kConditionTimeoutMs),
[this]() { return !is_active_ || !Empty(); })) {
STREAMING_LOG(DEBUG) << "No empty condition variable wait timeout."
<< " Empty => " << Empty() << ", is active " << is_active_;
}
}
}

bool EventQueue::Get(Event &evt) {
std::unique_lock<std::mutex> lock(ring_buffer_mutex_);
while (Empty() && is_freezed_) {
no_empty_cv_.wait(lock);
}
if (!is_freezed_) {
WaitFor(lock);
if (!is_active_) {
return false;
}
if (!urgent_buffer_.empty()) {
Expand All @@ -76,11 +87,9 @@ bool EventQueue::Get(Event &evt) {

Event EventQueue::PopAndGet() {
std::unique_lock<std::mutex> lock(ring_buffer_mutex_);
while (Empty() && is_freezed_) {
no_empty_cv_.wait(lock);
}
if (!is_freezed_) {
// Return error event if queue is freezed.
WaitFor(lock);
if (!is_active_) {
// Return error event if queue is active.
return Event({nullptr, EventType::ErrorEvent, false});
}
if (!urgent_buffer_.empty()) {
Expand Down
14 changes: 10 additions & 4 deletions streaming/src/event_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ enum class EventType : uint8_t {
FullChannel = 3,
// Recovery at the beginning.
Reload = 4,
// Error event if event queue is freezed.
// Error event if event queue is not active.
ErrorEvent = 5
};

Expand All @@ -50,14 +50,14 @@ struct Event {
/// processing functions ordered by its priority.
class EventQueue {
public:
EventQueue(size_t size) : urgent_(false), capacity_(size), is_freezed_(true) {}
EventQueue(size_t size) : urgent_(false), capacity_(size), is_active_(true) {}

virtual ~EventQueue();

/// Resume event queue to normal model.
void Unfreeze();

/// Push is prohibited when event queue is freezed.
/// Push is prohibited when event queue is not active.
void Freeze();

void Push(const Event &t);
Expand All @@ -84,6 +84,9 @@ class EventQueue {

inline bool Full() const { return buffer_.size() + urgent_buffer_.size() == capacity_; }

/// Wait for queue util it's timeout or any stuff in.
void WaitFor(std::unique_lock<std::mutex> &lock);

private:
std::mutex ring_buffer_mutex_;
std::condition_variable no_empty_cv_;
Expand All @@ -95,7 +98,10 @@ class EventQueue {
// Urgent event will be poped out first if urgent_ flag is true.
bool urgent_;
size_t capacity_;
bool is_freezed_;
// Event service active flag.
bool is_active_;
// Pop/Get timeout ms for condition variables wait.
static constexpr int kConditionTimeoutMs = 200;
};

class EventService {
Expand Down
17 changes: 7 additions & 10 deletions streaming/src/test/event_service_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ bool SendEmptyToChannel(ProducerChannelInfo *info) { return true; }
/// Mock function for write all messages to channel.
bool WriteAllToChannel(ProducerChannelInfo *info) { return true; }

bool stop = false;
TEST(EventServiceTest, Test1) {
std::shared_ptr<EventService> server = std::make_shared<EventService>();

Expand All @@ -19,30 +18,28 @@ TEST(EventServiceTest, Test1) {
server->Register(EventType::UserEvent, WriteAllToChannel);
server->Register(EventType::FlowEvent, WriteAllToChannel);

std::thread thread_empty([server, &mock_channel_info] {
bool stop = false;
std::thread thread_empty([server, &mock_channel_info, &stop] {
std::chrono::milliseconds MockTimer(20);
while (true) {
if (stop) break;
while (!stop) {
Event event{&mock_channel_info, EventType::EmptyEvent, true};
server->Push(event);
std::this_thread::sleep_for(MockTimer);
}
});

std::thread thread_flow([server, &mock_channel_info] {
std::thread thread_flow([server, &mock_channel_info, &stop] {
std::chrono::milliseconds MockTimer(2);
while (true) {
if (stop) break;
while (!stop) {
Event event{&mock_channel_info, EventType::FlowEvent, true};
server->Push(event);
std::this_thread::sleep_for(MockTimer);
}
});

std::thread thread_user([server, &mock_channel_info] {
std::thread thread_user([server, &mock_channel_info, &stop] {
std::chrono::milliseconds MockTimer(2);
while (true) {
if (stop) break;
while (!stop) {
Event event{&mock_channel_info, EventType::UserEvent, true};
server->Push(event);
std::this_thread::sleep_for(MockTimer);
Expand Down