Skip to content

Commit

Permalink
Add support for sending and receiving D-Bus signals.
Browse files Browse the repository at this point in the history
ObjectProxy is used to receive signals from the remote object.
ExportedObject is used to send signals from the exported object.

Note that signals are asynchronos so we don't have a test in
end_to_end_sync_unittest.cc

BUG=90036
TEST=run unit tests


Review URL: http://codereview.chromium.org/7655033

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@97831 0039d316-1c4b-4281-b951-d872f2087c98
  • Loading branch information
satorux@chromium.org committed Aug 23, 2011
1 parent 852754a commit 3beaaa4
Show file tree
Hide file tree
Showing 12 changed files with 599 additions and 18 deletions.
91 changes: 88 additions & 3 deletions dbus/bus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
//
// TODO(satorux):
// - Handle "disconnected" signal.
// - Add support for signal sending
// - Add support for signal monitoring
// - Collect metrics (ex. # of method calls, method call time, etc.)

#include "dbus/bus.h"
Expand Down Expand Up @@ -203,6 +201,9 @@ Bus::Bus(const Options& options)
Bus::~Bus() {
DCHECK(!connection_);
DCHECK(owned_service_names_.empty());
DCHECK(match_rules_added_.empty());
DCHECK(filter_functions_added_.empty());
DCHECK(registered_object_paths_.empty());
DCHECK_EQ(0, num_pending_watches_);
DCHECK_EQ(0, num_pending_timeouts_);
}
Expand Down Expand Up @@ -276,6 +277,11 @@ void Bus::ShutdownAndBlock() {
<< owned_service_names_.size();
}

// Detach from the remote objects.
for (size_t i = 0; i < object_proxies_.size(); ++i) {
object_proxies_[i]->Detach();
}

// Private connection should be closed.
if (connection_ && connection_type_ == PRIVATE) {
dbus_connection_close(connection_);
Expand Down Expand Up @@ -404,29 +410,108 @@ void Bus::SendWithReply(DBusMessage* request,
CHECK(success) << "Unable to allocate memory";
}

void Bus::Send(DBusMessage* request, uint32* serial) {
DCHECK(connection_);
AssertOnDBusThread();

const bool success = dbus_connection_send(connection_, request, serial);
CHECK(success) << "Unable to allocate memory";
}

void Bus::AddFilterFunction(DBusHandleMessageFunction filter_function,
void* user_data) {
DCHECK(connection_);
AssertOnDBusThread();

if (filter_functions_added_.find(filter_function) !=
filter_functions_added_.end()) {
LOG(ERROR) << "Filter function already exists: " << filter_function;
return;
}

const bool success = dbus_connection_add_filter(
connection_, filter_function, user_data, NULL);
CHECK(success) << "Unable to allocate memory";
filter_functions_added_.insert(filter_function);
}

void Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function,
void* user_data) {
DCHECK(connection_);
AssertOnDBusThread();

if (filter_functions_added_.find(filter_function) ==
filter_functions_added_.end()) {
LOG(ERROR) << "Requested to remove an unknown filter function: "
<< filter_function;
return;
}

dbus_connection_remove_filter(connection_, filter_function, user_data);
filter_functions_added_.erase(filter_function);
}

void Bus::AddMatch(const std::string& match_rule, DBusError* error) {
DCHECK(connection_);
AssertOnDBusThread();

if (match_rules_added_.find(match_rule) != match_rules_added_.end()) {
LOG(ERROR) << "Match rule already exists: " << match_rule;
return;
}

dbus_bus_add_match(connection_, match_rule.c_str(), error);
match_rules_added_.insert(match_rule);
}

void Bus::RemoveMatch(const std::string& match_rule, DBusError* error) {
DCHECK(connection_);
AssertOnDBusThread();

if (match_rules_added_.find(match_rule) == match_rules_added_.end()) {
LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule;
return;
}

dbus_bus_remove_match(connection_, match_rule.c_str(), error);
match_rules_added_.erase(match_rule);
}

bool Bus::TryRegisterObjectPath(const std::string& object_path,
const DBusObjectPathVTable* vtable,
void* user_data,
DBusError* error) {
DCHECK(connection_);
AssertOnDBusThread();

return dbus_connection_try_register_object_path(
DCHECK(registered_object_paths_.find(object_path) ==
registered_object_paths_.end())
<< "Object path already registered: " << object_path;

const bool success = dbus_connection_try_register_object_path(
connection_,
object_path.c_str(),
vtable,
user_data,
error);
if (success)
registered_object_paths_.insert(object_path);
return success;
}

void Bus::UnregisterObjectPath(const std::string& object_path) {
DCHECK(connection_);
AssertOnDBusThread();

DCHECK(registered_object_paths_.find(object_path) !=
registered_object_paths_.end())
<< "Requested to unregister an unknown object path: " << object_path;

const bool success = dbus_connection_unregister_object_path(
connection_,
object_path.c_str());
CHECK(success) << "Unable to allocate memory";
registered_object_paths_.erase(object_path);
}

void Bus::ShutdownInternal(OnShutdownCallback callback) {
Expand Down
75 changes: 71 additions & 4 deletions dbus/bus.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ class Bus : public base::RefCountedThreadSafe<Bus> {
// The caller must not delete the returned object. The bus will own the
// object. Never returns NULL.
//
// The object proxy is used to call remote methods.
// The object proxy is used to call methods of remote objects, and
// receive signals from them.
//
// |service_name| looks like "org.freedesktop.NetworkManager", and
// |object_path| looks like "/org/freedesktop/NetworkManager/Devices/0".
Expand All @@ -180,7 +181,8 @@ class Bus : public base::RefCountedThreadSafe<Bus> {
// path. The caller must not delete the returned object. The bus will
// own the object. Never returns NULL.
//
// The exported object is used to export objects to other D-Bus clients.
// The exported object is used to export methods of local objects, and
// send signal from them.
//
// Must be called in the origin thread.
virtual ExportedObject* GetExportedObject(const std::string& service_name,
Expand Down Expand Up @@ -240,14 +242,73 @@ class Bus : public base::RefCountedThreadSafe<Bus> {
int timeout_ms,
DBusError* error);

// Requests to send a message to the bus.
// Requests to send a message to the bus. The reply is handled with
// |pending_call| at a later time.
//
// BLOCKING CALL.
virtual void SendWithReply(DBusMessage* request,
DBusPendingCall** pending_call,
int timeout_ms);

// Tries to register the object path.
// Requests to send a message to the bus. The message serial number will
// be stored in |serial|.
//
// BLOCKING CALL.
virtual void Send(DBusMessage* request, uint32* serial);

// Adds the message filter function. |filter_function| will be called
// when incoming messages are received.
//
// When a new incoming message arrives, filter functions are called in
// the order that they were added until the the incoming message is
// handled by a filter function.
//
// The same filter function must not be added more than once.
//
// BLOCKING CALL.
virtual void AddFilterFunction(DBusHandleMessageFunction filter_function,
void* user_data);

// Removes the message filter previously added by AddFilterFunction().
//
// BLOCKING CALL.
virtual void RemoveFilterFunction(DBusHandleMessageFunction filter_function,
void* user_data);

// Adds the match rule. Messages that match the rule will be processed
// by the filter functions added by AddFilterFunction().
//
// You cannot specify which filter function to use for a match rule.
// Instead, you should check if an incoming message is what you are
// interested in, in the filter functions.
//
// The same match rule must not be added more than once.
//
// The match rule looks like:
// "type='signal', interface='org.chromium.SomeInterface'".
//
// See "Message Bus Message Routing" section in the D-Bus specification
// for details about match rules:
// http://dbus.freedesktop.org/doc/dbus-specification.html#message-bus-routing
//
// BLOCKING CALL.
virtual void AddMatch(const std::string& match_rule, DBusError* error);

// Removes the match rule previously added by AddMatch().
//
// BLOCKING CALL.
virtual void RemoveMatch(const std::string& match_rule, DBusError* error);

// Tries to register the object path. Returns true on success.
// Returns false if the object path is already registered.
//
// |message_function| in |vtable| will be called every time when a new
// |message sent to the object path arrives.
//
// The same object path must not be added more than once.
//
// See also documentation of |dbus_connection_try_register_object_path| at
// http://dbus.freedesktop.org/doc/api/html/group__DBusConnection.html
//
// BLOCKING CALL.
virtual bool TryRegisterObjectPath(const std::string& object_path,
Expand Down Expand Up @@ -349,6 +410,12 @@ class Bus : public base::RefCountedThreadSafe<Bus> {
base::PlatformThreadId dbus_thread_id_;

std::set<std::string> owned_service_names_;
// The following sets are used to check if rules/object_paths/filters
// are properly cleaned up before destruction of the bus object.
std::set<std::string> match_rules_added_;
std::set<std::string> registered_object_paths_;
std::set<DBusHandleMessageFunction> filter_functions_added_;

std::vector<scoped_refptr<dbus::ObjectProxy> > object_proxies_;
std::vector<scoped_refptr<dbus::ExportedObject> > exported_objects_;

Expand Down
45 changes: 45 additions & 0 deletions dbus/end_to_end_async_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ class EndToEndAsyncTest : public testing::Test {
object_proxy_ = bus_->GetObjectProxy("org.chromium.TestService",
"/org/chromium/TestObject");
ASSERT_TRUE(bus_->HasDBusThread());

// Connect to the "Test" signal from the remote object.
object_proxy_->ConnectToSignal(
"org.chromium.TestInterface",
"Test",
base::Bind(&EndToEndAsyncTest::OnTestSignal,
base::Unretained(this)),
base::Bind(&EndToEndAsyncTest::OnConnected,
base::Unretained(this)));
// Wait until the object proxy is connected to the signal.
message_loop_.Run();
}

void TearDown() {
Expand Down Expand Up @@ -111,12 +122,36 @@ class EndToEndAsyncTest : public testing::Test {
message_loop_.Quit();
}

// Called when the "Test" signal is received, in the main thread.
// Copy the string payload to |test_signal_string_|.
void OnTestSignal(dbus::Signal* signal) {
dbus::MessageReader reader(signal);
ASSERT_TRUE(reader.PopString(&test_signal_string_));
message_loop_.Quit();
}

// Called when connected to the signal.
void OnConnected(const std::string& interface_name,
const std::string& signal_name,
bool success) {
ASSERT_TRUE(success);
message_loop_.Quit();
}

// Wait for the hey signal to be received.
void WaitForTestSignal() {
// OnTestSignal() will quit the message loop.
message_loop_.Run();
}

MessageLoop message_loop_;
std::vector<std::string> response_strings_;
scoped_ptr<base::Thread> dbus_thread_;
scoped_refptr<dbus::Bus> bus_;
dbus::ObjectProxy* object_proxy_;
scoped_ptr<dbus::TestService> test_service_;
// Text message from "Test" signal.
std::string test_signal_string_;
};

TEST_F(EndToEndAsyncTest, Echo) {
Expand Down Expand Up @@ -198,3 +233,13 @@ TEST_F(EndToEndAsyncTest, BrokenMethod) {
// Should fail because the method is broken.
ASSERT_EQ("", response_strings_[0]);
}

TEST_F(EndToEndAsyncTest, TestSignal) {
const char kMessage[] = "hello, world";
// Send the test signal from the exported object.
test_service_->SendTestSignal(kMessage);
// Receive the signal with the object proxy. The signal is handeled in
// EndToEndAsyncTest::OnTestSignal() in the main thread.
WaitForTestSignal();
ASSERT_EQ(kMessage, test_signal_string_);
}
42 changes: 36 additions & 6 deletions dbus/exported_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ bool ExportedObject::ExportMethodAndBlock(
MethodCallCallback method_call_callback) {
bus_->AssertOnDBusThread();

// Check if the method is already exported.
const std::string absolute_method_name =
GetAbsoluteMethodName(interface_name, method_name);
if (method_table_.find(absolute_method_name) != method_table_.end()) {
LOG(ERROR) << absolute_method_name << " is already exported";
return false;
}

if (!bus_->Connect())
return false;
if (!bus_->SetUpAsyncOperations())
Expand All @@ -60,12 +68,7 @@ bool ExportedObject::ExportMethodAndBlock(
if (!Register())
return false;

const std::string absolute_method_name =
GetAbsoluteMethodName(interface_name, method_name);
if (method_table_.find(absolute_method_name) != method_table_.end()) {
LOG(ERROR) << absolute_method_name << " is already exported";
return false;
}
// Add the method callback to the method table.
method_table_[absolute_method_name] = method_call_callback;

return true;
Expand All @@ -86,6 +89,25 @@ void ExportedObject::ExportMethod(const std::string& interface_name,
bus_->PostTaskToDBusThread(FROM_HERE, task);
}

void ExportedObject::SendSignal(Signal* signal) {
// For signals, the object path should be set to the path to the sender
// object, which is this exported object here.
signal->SetPath(object_path_);

// Increment the reference count so we can safely reference the
// underlying signal message until the signal sending is complete. This
// will be unref'ed in SendSignalInternal().
DBusMessage* signal_message = signal->raw_message();
dbus_message_ref(signal_message);

// Bind() won't compile if we pass signal_message. See the comment at
// ObjectProxy::CallMethod() for details.
bus_->PostTaskToDBusThread(FROM_HERE,
base::Bind(&ExportedObject::SendSignalInternal,
this,
static_cast<void*>(signal_message)));
}

void ExportedObject::Unregister() {
bus_->AssertOnDBusThread();

Expand Down Expand Up @@ -124,6 +146,14 @@ void ExportedObject::OnExported(OnExportedCallback on_exported_callback,
on_exported_callback.Run(interface_name, method_name, success);
}

void ExportedObject::SendSignalInternal(void* in_signal_message) {
DBusMessage* signal_message =
static_cast<DBusMessage*>(in_signal_message);
uint32 serial = 0;
bus_->Send(signal_message, &serial);
dbus_message_unref(signal_message);
}

bool ExportedObject::Register() {
bus_->AssertOnDBusThread();

Expand Down
Loading

0 comments on commit 3beaaa4

Please sign in to comment.