Skip to content

Commit

Permalink
Reland "Mojo: Allow duplicate events in SyncHandleRegistry"
Browse files Browse the repository at this point in the history
This is a reland of 327ed96
Original change's description:
> Mojo: Allow duplicate events in SyncHandleRegistry
> 
> Allows mulitple registrations for the same WaitableEvent in
> SyncHandleRegistry to allow for instances of independent nested waiters
> unwittingly waiting on the same event.
> 
> BUG=754945
> R=yzshen@chromium.org
> 
> Change-Id: Ia166d860bf2a07a2db9dc39ae1350b8758950ff4
> Reviewed-on: https://chromium-review.googlesource.com/641402
> Reviewed-by: Yuzhu Shen <yzshen@chromium.org>
> Commit-Queue: Ken Rockot <rockot@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#498246}

Bug: 754945
Change-Id: Id0296e7aac938a6a13202766da2b8cb14ea30502
Reviewed-on: https://chromium-review.googlesource.com/643487
Reviewed-by: Yuzhu Shen <yzshen@chromium.org>
Commit-Queue: Ken Rockot <rockot@chromium.org>
Cr-Commit-Position: refs/heads/master@{#498665}
  • Loading branch information
krockot authored and Commit Bot committed Aug 30, 2017
1 parent fc048ad commit 267df5a
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 38 deletions.
18 changes: 9 additions & 9 deletions ipc/ipc_sync_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -646,25 +646,25 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
bool dispatch = false;
bool send_done = false;
bool should_pump_messages = false;
bool registered = registry->RegisterEvent(
context->GetSendDoneEvent(), base::Bind(&OnEventReady, &send_done));
DCHECK(registered);
base::Closure on_send_done_callback = base::Bind(&OnEventReady, &send_done);
registry->RegisterEvent(context->GetSendDoneEvent(), on_send_done_callback);

base::Closure on_pump_messages_callback;
if (pump_messages_event) {
registered = registry->RegisterEvent(
pump_messages_event,
base::Bind(&OnEventReady, &should_pump_messages));
DCHECK(registered);
on_pump_messages_callback =
base::Bind(&OnEventReady, &should_pump_messages);
registry->RegisterEvent(pump_messages_event, on_pump_messages_callback);
}

const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages };
context->received_sync_msgs()->BlockDispatch(&dispatch);
registry->Wait(stop_flags, 3);
context->received_sync_msgs()->UnblockDispatch();

registry->UnregisterEvent(context->GetSendDoneEvent());
registry->UnregisterEvent(context->GetSendDoneEvent(),
on_send_done_callback);
if (pump_messages_event)
registry->UnregisterEvent(pump_messages_event);
registry->UnregisterEvent(pump_messages_event, on_pump_messages_callback);

if (dispatch) {
// We're waiting for a reply, but we received a blocking synchronous call.
Expand Down
11 changes: 6 additions & 5 deletions ipc/ipc_sync_message_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ bool SyncMessageFilter::Send(Message* message) {
bool shutdown = false;
scoped_refptr<mojo::SyncHandleRegistry> registry =
mojo::SyncHandleRegistry::current();
registry->RegisterEvent(shutdown_event_,
base::Bind(&OnEventReady, &shutdown));
registry->RegisterEvent(&done_event, base::Bind(&OnEventReady, &done));
auto on_shutdown_callback = base::Bind(&OnEventReady, &shutdown);
auto on_done_callback = base::Bind(&OnEventReady, &done);
registry->RegisterEvent(shutdown_event_, on_shutdown_callback);
registry->RegisterEvent(&done_event, on_done_callback);

const bool* stop_flags[] = { &done, &shutdown };
registry->Wait(stop_flags, 2);
Expand All @@ -84,8 +85,8 @@ bool SyncMessageFilter::Send(Message* message) {
"SyncMessageFilter::Send", &done_event);
}

registry->UnregisterEvent(shutdown_event_);
registry->UnregisterEvent(&done_event);
registry->UnregisterEvent(shutdown_event_, on_shutdown_callback);
registry->UnregisterEvent(&done_event, on_done_callback);

{
base::AutoLock auto_lock(lock_);
Expand Down
10 changes: 6 additions & 4 deletions mojo/public/cpp/bindings/lib/sync_event_watcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ SyncEventWatcher::SyncEventWatcher(base::WaitableEvent* event,
SyncEventWatcher::~SyncEventWatcher() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (registered_)
registry_->UnregisterEvent(event_);
registry_->UnregisterEvent(event_, callback_);
destroyed_->data = true;
}

Expand Down Expand Up @@ -51,15 +51,17 @@ bool SyncEventWatcher::SyncWatch(const bool* should_stop) {

void SyncEventWatcher::IncrementRegisterCount() {
register_request_count_++;
if (!registered_)
registered_ = registry_->RegisterEvent(event_, callback_);
if (!registered_) {
registry_->RegisterEvent(event_, callback_);
registered_ = true;
}
}

void SyncEventWatcher::DecrementRegisterCount() {
DCHECK_GT(register_request_count_, 0u);
register_request_count_--;
if (register_request_count_ == 0 && registered_) {
registry_->UnregisterEvent(event_);
registry_->UnregisterEvent(event_, callback_);
registered_ = false;
}
}
Expand Down
99 changes: 85 additions & 14 deletions mojo/public/cpp/bindings/lib/sync_handle_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include "mojo/public/cpp/bindings/sync_handle_registry.h"

#include <algorithm>

#include "base/lazy_instance.h"
#include "base/logging.h"
#include "base/stl_util.h"
Expand Down Expand Up @@ -62,23 +64,52 @@ void SyncHandleRegistry::UnregisterHandle(const Handle& handle) {
handles_.erase(handle);
}

bool SyncHandleRegistry::RegisterEvent(base::WaitableEvent* event,
void SyncHandleRegistry::RegisterEvent(base::WaitableEvent* event,
const base::Closure& callback) {
auto result = events_.insert({event, callback});
DCHECK(result.second);
MojoResult rv = wait_set_.AddEvent(event);
if (rv == MOJO_RESULT_OK)
return true;
DCHECK_EQ(MOJO_RESULT_ALREADY_EXISTS, rv);
return false;
auto it = events_.find(event);
if (it == events_.end()) {
auto result = events_.emplace(event, EventCallbackList{});
it = result.first;
}

auto& callbacks = it->second.container();
if (callbacks.empty()) {
// AddEvent() must succeed since we only attempt it when there are
// previously no callbacks registered for this event.
MojoResult rv = wait_set_.AddEvent(event);
DCHECK_EQ(MOJO_RESULT_OK, rv);
}

callbacks.push_back(callback);
}

void SyncHandleRegistry::UnregisterEvent(base::WaitableEvent* event) {
void SyncHandleRegistry::UnregisterEvent(base::WaitableEvent* event,
const base::Closure& callback) {
auto it = events_.find(event);
DCHECK(it != events_.end());
events_.erase(it);
MojoResult rv = wait_set_.RemoveEvent(event);
DCHECK_EQ(MOJO_RESULT_OK, rv);
if (it == events_.end())
return;

auto& callbacks = it->second.container();
if (is_dispatching_event_callbacks_) {
// Not safe to remove any elements from |callbacks| here since an outer
// stack frame is currently iterating over it in Wait().
for (auto& cb : callbacks) {
if (cb.Equals(callback))
cb.Reset();
}
remove_invalid_event_callbacks_after_dispatch_ = true;
} else {
callbacks.erase(std::remove_if(callbacks.begin(), callbacks.end(),
[&callback](const base::Closure& cb) {
return cb.Equals(callback);
}),
callbacks.end());
if (callbacks.empty()) {
events_.erase(it);
MojoResult rv = wait_set_.RemoveEvent(event);
DCHECK_EQ(MOJO_RESULT_OK, rv);
}
}
}

bool SyncHandleRegistry::Wait(const bool* should_stop[], size_t count) {
Expand Down Expand Up @@ -109,7 +140,29 @@ bool SyncHandleRegistry::Wait(const bool* should_stop[], size_t count) {
if (ready_event) {
const auto iter = events_.find(ready_event);
DCHECK(iter != events_.end());
iter->second.Run();
bool was_dispatching_event_callbacks = is_dispatching_event_callbacks_;
is_dispatching_event_callbacks_ = true;

// NOTE: It's possible for the container to be extended by any of these
// callbacks if they call RegisterEvent, so we are careful to iterate by
// index. Also note that conversely, elements cannot be *removed* from the
// container, by any of these callbacks, so it is safe to assume the size
// only stays the same or increases, with no elements changing position.
auto& callbacks = iter->second.container();
for (size_t i = 0; i < callbacks.size(); ++i) {
auto& callback = callbacks[i];
if (callback)
callback.Run();
}

is_dispatching_event_callbacks_ = was_dispatching_event_callbacks;
if (!was_dispatching_event_callbacks &&
remove_invalid_event_callbacks_after_dispatch_) {
// If we've had events unregistered within any callback dispatch, now is
// a good time to prune them from the map.
RemoveInvalidEventCallbacks();
remove_invalid_event_callbacks_after_dispatch_ = false;
}
}
};

Expand All @@ -120,4 +173,22 @@ SyncHandleRegistry::SyncHandleRegistry() = default;

SyncHandleRegistry::~SyncHandleRegistry() = default;

void SyncHandleRegistry::RemoveInvalidEventCallbacks() {
for (auto it = events_.begin(); it != events_.end();) {
auto& callbacks = it->second.container();
callbacks.erase(
std::remove_if(callbacks.begin(), callbacks.end(),
[](const base::Closure& callback) { return !callback; }),
callbacks.end());
if (callbacks.empty()) {
MojoResult rv = wait_set_.RemoveEvent(it->first);
DCHECK_EQ(MOJO_RESULT_OK, rv);

events_.erase(it++);
} else {
++it;
}
}
}

} // namespace mojo
32 changes: 26 additions & 6 deletions mojo/public/cpp/bindings/sync_handle_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
#define MOJO_PUBLIC_CPP_BINDINGS_SYNC_HANDLE_REGISTRY_H_

#include <map>
#include <unordered_map>

#include "base/callback.h"
#include "base/containers/stack_container.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/sequence_checker.h"
Expand All @@ -30,6 +30,10 @@ class MOJO_CPP_BINDINGS_EXPORT SyncHandleRegistry
static scoped_refptr<SyncHandleRegistry> current();

using HandleCallback = base::Callback<void(MojoResult)>;

// Registers a |Handle| to be watched for |handle_signals|. If any such
// signals are satisfied during a Wait(), the Wait() is woken up and
// |callback| is run.
bool RegisterHandle(const Handle& handle,
MojoHandleSignals handle_signals,
const HandleCallback& callback);
Expand All @@ -38,11 +42,13 @@ class MOJO_CPP_BINDINGS_EXPORT SyncHandleRegistry

// Registers a |base::WaitableEvent| which can be used to wake up
// Wait() before any handle signals. |event| is not owned, and if it signals
// during Wait(), |callback| is invoked. Returns |true| if registered
// successfully or |false| if |event| was already registered.
bool RegisterEvent(base::WaitableEvent* event, const base::Closure& callback);
// during Wait(), |callback| is invoked. Note that |event| may be registered
// multiple times with different callbacks.
void RegisterEvent(base::WaitableEvent* event, const base::Closure& callback);

void UnregisterEvent(base::WaitableEvent* event);
// Unregisters a specific |event|+|callback| pair.
void UnregisterEvent(base::WaitableEvent* event,
const base::Closure& callback);

// Waits on all the registered handles and events and runs callbacks
// synchronously for any that become ready.
Expand All @@ -54,12 +60,26 @@ class MOJO_CPP_BINDINGS_EXPORT SyncHandleRegistry
private:
friend class base::RefCounted<SyncHandleRegistry>;

using EventCallbackList = base::StackVector<base::Closure, 1>;
using EventMap = std::map<base::WaitableEvent*, EventCallbackList>;

SyncHandleRegistry();
~SyncHandleRegistry();

void RemoveInvalidEventCallbacks();

WaitSet wait_set_;
std::map<Handle, HandleCallback> handles_;
std::map<base::WaitableEvent*, base::Closure> events_;
EventMap events_;

// |true| iff this registry is currently dispatching event callbacks in
// Wait(). Used to allow for safe event registration/unregistration from event
// callbacks.
bool is_dispatching_event_callbacks_ = false;

// Indicates if one or more event callbacks was unregistered during the most
// recent event callback dispatch.
bool remove_invalid_event_callbacks_after_dispatch_ = false;

SEQUENCE_CHECKER(sequence_checker_);

Expand Down

0 comments on commit 267df5a

Please sign in to comment.