Skip to content

Commit

Permalink
Add support for dynamic channels in WebrtcTransport.
Browse files Browse the repository at this point in the history
Previously WebrtcTransport clients had to use incoming_channel_factory() to
accept incoming data channels. That worked only if the receiver knows in
advance names of all channels it can receive. Now the transport calls
EventHandler for incoming data channels, which allows the receiver to decide
dynamically if it wants to accept that channel. Also channels now can be
closed dynamically and the transport doesn't terminate connection when one
of the channels is closed.

BUG=621691

Review-Url: https://codereview.chromium.org/2146213002
Cr-Commit-Position: refs/heads/master@{#406639}
  • Loading branch information
SergeyUlanov authored and Commit bot committed Jul 20, 2016
1 parent dec7ebf commit d059c46
Show file tree
Hide file tree
Showing 23 changed files with 323 additions and 131 deletions.
18 changes: 16 additions & 2 deletions remoting/protocol/channel_dispatcher_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,29 @@ void ChannelDispatcherBase::Init(MessageChannelFactory* channel_factory,
&ChannelDispatcherBase::OnChannelReady, base::Unretained(this)));
}

void ChannelDispatcherBase::Init(std::unique_ptr<MessagePipe> message_pipe,
EventHandler* event_handler) {
event_handler_ = event_handler;
OnChannelReady(std::move(message_pipe));
}

void ChannelDispatcherBase::OnChannelReady(
std::unique_ptr<MessagePipe> message_pipe) {
channel_factory_ = nullptr;
message_pipe_ = std::move(message_pipe);
message_pipe_->StartReceiving(base::Bind(
&ChannelDispatcherBase::OnIncomingMessage, base::Unretained(this)));
message_pipe_->Start(this);

event_handler_->OnChannelInitialized(this);
}

void ChannelDispatcherBase::OnMessageReceived(
std::unique_ptr<CompoundBuffer> message) {
OnIncomingMessage(std::move(message));
}

void ChannelDispatcherBase::OnMessagePipeClosed() {
event_handler_->OnChannelClosed(this);
}

} // namespace protocol
} // namespace remoting
22 changes: 16 additions & 6 deletions remoting/protocol/channel_dispatcher_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "base/callback.h"
#include "base/macros.h"
#include "remoting/protocol/errors.h"
#include "remoting/protocol/message_pipe.h"

namespace remoting {

Expand All @@ -19,34 +20,40 @@ class CompoundBuffer;
namespace protocol {

class MessageChannelFactory;
class MessagePipe;

// Base class for channel message dispatchers. It's responsible for
// creating the named channel. Derived dispatchers then dispatch
// incoming messages on this channel as well as send outgoing
// messages.
class ChannelDispatcherBase {
class ChannelDispatcherBase : public MessagePipe::EventHandler {
public:
class EventHandler {
public:
EventHandler() {}
virtual ~EventHandler() {}

// Called after the channel is initialized.
virtual void OnChannelInitialized(
ChannelDispatcherBase* channel_dispatcher) = 0;

// Called after the channel is closed.
virtual void OnChannelClosed(ChannelDispatcherBase* channel_dispatcher) = 0;
};

// The callback is called when initialization is finished. The
// parameter is set to true on success.
typedef base::Callback<void(bool)> InitializedCallback;

virtual ~ChannelDispatcherBase();
~ChannelDispatcherBase() override;

// Creates and connects the channel in the specified
// |session|. Caller retains ownership of the Session.
// Creates and connects the channel using |channel_factory|.
void Init(MessageChannelFactory* channel_factory,
EventHandler* event_handler);

// Initializes the channel using |message_pipe| that's already connected.
void Init(std::unique_ptr<MessagePipe> message_pipe,
EventHandler* event_handler);

const std::string& channel_name() { return channel_name_; }

// Returns true if the channel is currently connected.
Expand All @@ -62,7 +69,10 @@ class ChannelDispatcherBase {

private:
void OnChannelReady(std::unique_ptr<MessagePipe> message_pipe);
void OnPipeError(int error);

// MessagePipe::EventHandler interface.
void OnMessageReceived(std::unique_ptr<CompoundBuffer> message) override;
void OnMessagePipeClosed() override;

std::string channel_name_;
MessageChannelFactory* channel_factory_ = nullptr;
Expand Down
7 changes: 7 additions & 0 deletions remoting/protocol/client_video_dispatcher_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class ClientVideoDispatcherTest : public testing::Test,

// ChannelDispatcherBase::EventHandler interface.
void OnChannelInitialized(ChannelDispatcherBase* channel_dispatcher) override;
void OnChannelClosed(ChannelDispatcherBase* channel_dispatcher) override;

protected:
void OnChannelError(int error);
Expand Down Expand Up @@ -96,6 +97,12 @@ void ClientVideoDispatcherTest::OnChannelInitialized(
initialized_ = true;
}

void ClientVideoDispatcherTest::OnChannelClosed(
ChannelDispatcherBase* channel_dispatcher) {
// Don't expect channels to be closed.
FAIL();
}

void ClientVideoDispatcherTest::OnChannelError(int error) {
// Don't expect channel creation to fail.
FAIL();
Expand Down
8 changes: 6 additions & 2 deletions remoting/protocol/connection_tester.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,7 @@ MessagePipeConnectionTester::MessagePipeConnectionTester(
MessagePipeConnectionTester::~MessagePipeConnectionTester() {}

void MessagePipeConnectionTester::RunAndCheckResults() {
host_pipe_->StartReceiving(base::Bind(
&MessagePipeConnectionTester::OnMessageReceived, base::Unretained(this)));
host_pipe_->Start(this);

for (int i = 0; i < message_count_; ++i) {
std::unique_ptr<VideoPacket> message(new VideoPacket());
Expand Down Expand Up @@ -301,5 +300,10 @@ void MessagePipeConnectionTester::OnMessageReceived(
}
}

void MessagePipeConnectionTester::OnMessagePipeClosed() {
run_loop_.Quit();
FAIL();
}

} // namespace protocol
} // namespace remoting
10 changes: 6 additions & 4 deletions remoting/protocol/connection_tester.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "base/memory/ref_counted.h"
#include "base/run_loop.h"
#include "base/single_thread_task_runner.h"
#include "remoting/protocol/message_pipe.h"

namespace net {
class DrainableIOBuffer;
Expand All @@ -26,7 +27,6 @@ class VideoPacket;

namespace protocol {

class MessagePipe;
class P2PDatagramSocket;
class P2PStreamSocket;

Expand Down Expand Up @@ -109,18 +109,20 @@ class DatagramConnectionTester {
int bad_packets_received_;
};

class MessagePipeConnectionTester {
class MessagePipeConnectionTester : public MessagePipe::EventHandler {
public:
MessagePipeConnectionTester(MessagePipe* client_pipe,
MessagePipe* host_pipe,
int message_size,
int message_count);
~MessagePipeConnectionTester();
~MessagePipeConnectionTester() override;

void RunAndCheckResults();

protected:
void OnMessageReceived(std::unique_ptr<CompoundBuffer> message);
// MessagePipe::EventHandler interface.
void OnMessageReceived(std::unique_ptr<CompoundBuffer> message) override;
void OnMessagePipeClosed() override;

private:
base::RunLoop run_loop_;
Expand Down
7 changes: 6 additions & 1 deletion remoting/protocol/connection_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ class ConnectionTest : public testing::Test,
public:
ConnectionTest() {}

void DestroyHost() {
host_connection_.reset();
run_loop_->Quit();
}

protected:
bool is_using_webrtc() { return GetParam(); }

Expand Down Expand Up @@ -311,7 +316,7 @@ TEST_P(ConnectionTest, Events) {
EXPECT_CALL(host_input_stub_, InjectKeyEvent(EqualsKeyEvent(event)))
.WillOnce(QuitRunLoop(&run_loop));

// Send capabilities from the client.
// Send key event from the client.
client_connection_->input_stub()->InjectKeyEvent(event);

run_loop.Run();
Expand Down
6 changes: 6 additions & 0 deletions remoting/protocol/ice_connection_to_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ void IceConnectionToClient::OnChannelInitialized(
NotifyIfChannelsReady();
}

void IceConnectionToClient::OnChannelClosed(
ChannelDispatcherBase* channel_dispatcher) {
// ICE transport doesn't close channels dynamically.
NOTREACHED();
}

void IceConnectionToClient::NotifyIfChannelsReady() {
DCHECK(thread_checker_.CalledOnValidThread());

Expand Down
3 changes: 2 additions & 1 deletion remoting/protocol/ice_connection_to_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class IceConnectionToClient : public ConnectionToClient,
void set_host_stub(HostStub* host_stub) override;
void set_input_stub(InputStub* input_stub) override;

private:
// Session::EventHandler interface.
void OnSessionStateChange(Session::State state) override;

Expand All @@ -65,8 +66,8 @@ class IceConnectionToClient : public ConnectionToClient,

// ChannelDispatcherBase::EventHandler interface.
void OnChannelInitialized(ChannelDispatcherBase* channel_dispatcher) override;
void OnChannelClosed(ChannelDispatcherBase* channel_dispatcher) override;

private:
void NotifyIfChannelsReady();

void CloseChannels();
Expand Down
6 changes: 6 additions & 0 deletions remoting/protocol/ice_connection_to_host.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ void IceConnectionToHost::OnChannelInitialized(
NotifyIfChannelsReady();
}

void IceConnectionToHost::OnChannelClosed(
ChannelDispatcherBase* channel_dispatcher) {
// ICE transport doesn't close channels dynamically.
NOTREACHED();
}

void IceConnectionToHost::OnVideoChannelStatus(bool active) {
event_callback_->OnConnectionReady(active);
}
Expand Down
1 change: 1 addition & 0 deletions remoting/protocol/ice_connection_to_host.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class IceConnectionToHost : public ConnectionToHost,

// ChannelDispatcherBase::EventHandler interface.
void OnChannelInitialized(ChannelDispatcherBase* channel_dispatcher) override;
void OnChannelClosed(ChannelDispatcherBase* channel_dispatcher) override;

// MonitoredVideoStub::EventHandler interface.
virtual void OnVideoChannelStatus(bool active);
Expand Down
18 changes: 14 additions & 4 deletions remoting/protocol/message_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,23 @@ namespace protocol {
// Represents a bi-directional pipe that allows to send and receive messages.
class MessagePipe {
public:
typedef base::Callback<void(std::unique_ptr<CompoundBuffer> message)>
MessageReceivedCallback;
class EventHandler {
public:
// Called when a message is received.
virtual void OnMessageReceived(std::unique_ptr<CompoundBuffer> message) = 0;

// Called when the channel is closed.
virtual void OnMessagePipeClosed() = 0;

protected:
virtual ~EventHandler() {}
};

virtual ~MessagePipe() {}

// Starts receiving incoming messages and calls |callback| for each message.
virtual void StartReceiving(const MessageReceivedCallback& callback) = 0;
// Starts the channel. |event_handler| will be called to notify when a message
// is received or the pipe is closed.
virtual void Start(EventHandler* event_handler) = 0;

// Sends a message. |done| is called when the message has been sent to the
// client, but it doesn't mean that the client has received it. |done| is
Expand Down
3 changes: 2 additions & 1 deletion remoting/protocol/message_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,9 @@ void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) {
}

void MessageReader::RunCallback(std::unique_ptr<CompoundBuffer> message) {
if (!message_received_callback_.is_null())
if (!message_received_callback_.is_null()) {
message_received_callback_.Run(std::move(message));
}
}

} // namespace protocol
Expand Down
7 changes: 4 additions & 3 deletions remoting/protocol/stream_message_pipe_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ StreamMessagePipeAdapter::StreamMessagePipeAdapter(

StreamMessagePipeAdapter::~StreamMessagePipeAdapter() {}

void StreamMessagePipeAdapter::StartReceiving(
const MessageReceivedCallback& callback) {
reader_.StartReading(socket_.get(), callback,
void StreamMessagePipeAdapter::Start(EventHandler* event_handler) {
reader_.StartReading(socket_.get(),
base::Bind(&EventHandler::OnMessageReceived,
base::Unretained(event_handler)),
base::Bind(&StreamMessagePipeAdapter::CloseOnError,
base::Unretained(this)));
}
Expand Down
2 changes: 1 addition & 1 deletion remoting/protocol/stream_message_pipe_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class StreamMessagePipeAdapter : public MessagePipe {
~StreamMessagePipeAdapter() override;

// MessagePipe interface.
void StartReceiving(const MessageReceivedCallback& callback) override;
void Start(EventHandler* event_handler) override;
void Send(google::protobuf::MessageLite* message,
const base::Closure& done) override;

Expand Down
30 changes: 25 additions & 5 deletions remoting/protocol/webrtc_connection_to_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "remoting/protocol/host_event_dispatcher.h"
#include "remoting/protocol/host_stub.h"
#include "remoting/protocol/input_stub.h"
#include "remoting/protocol/message_pipe.h"
#include "remoting/protocol/transport_context.h"
#include "remoting/protocol/webrtc_transport.h"
#include "remoting/protocol/webrtc_video_stream.h"
Expand Down Expand Up @@ -144,21 +145,20 @@ void WebrtcConnectionToClient::OnSessionStateChange(Session::State state) {

case Session::CLOSED:
case Session::FAILED:
transport_->Close(state == Session::CLOSED ? OK : session_->error());
control_dispatcher_.reset();
event_dispatcher_.reset();
transport_->Close(state == Session::CLOSED ? OK : session_->error());
event_handler_->OnConnectionClosed(
this, state == Session::CLOSED ? OK : session_->error());
break;
}
}

void WebrtcConnectionToClient::OnWebrtcTransportConnecting() {
// Create outgoing control channel by initializing |control_dispatcher_|.
// |event_dispatcher_| is initialized later because event channel is expected
// to be created by the client.
control_dispatcher_->Init(transport_->outgoing_channel_factory(), this);

event_dispatcher_->Init(transport_->incoming_channel_factory(), this);
event_dispatcher_->set_on_input_event_callback(base::Bind(
&ConnectionToClient::OnInputEventReceived, base::Unretained(this)));
}

void WebrtcConnectionToClient::OnWebrtcTransportConnected() {
Expand All @@ -170,6 +170,17 @@ void WebrtcConnectionToClient::OnWebrtcTransportError(ErrorCode error) {
Disconnect(error);
}

void WebrtcConnectionToClient::OnWebrtcTransportIncomingDataChannel(
const std::string& name,
std::unique_ptr<MessagePipe> pipe) {
if (name == event_dispatcher_->channel_name() &&
!event_dispatcher_->is_connected()) {
event_dispatcher_->set_on_input_event_callback(base::Bind(
&ConnectionToClient::OnInputEventReceived, base::Unretained(this)));
event_dispatcher_->Init(std::move(pipe), this);
}
}

void WebrtcConnectionToClient::OnWebrtcTransportMediaStreamAdded(
scoped_refptr<webrtc::MediaStreamInterface> stream) {
LOG(WARNING) << "The client created an unexpected media stream.";
Expand All @@ -188,5 +199,14 @@ void WebrtcConnectionToClient::OnChannelInitialized(
}
}

void WebrtcConnectionToClient::OnChannelClosed(
ChannelDispatcherBase* channel_dispatcher) {
DCHECK(thread_checker_.CalledOnValidThread());

LOG(ERROR) << "Channel " << channel_dispatcher->channel_name()
<< " was closed unexpectedly.";
Disconnect(INCOMPATIBLE_PROTOCOL);
}

} // namespace protocol
} // namespace remoting
4 changes: 4 additions & 0 deletions remoting/protocol/webrtc_connection_to_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ class WebrtcConnectionToClient : public ConnectionToClient,
void OnWebrtcTransportConnecting() override;
void OnWebrtcTransportConnected() override;
void OnWebrtcTransportError(ErrorCode error) override;
void OnWebrtcTransportIncomingDataChannel(
const std::string& name,
std::unique_ptr<MessagePipe> pipe) override;
void OnWebrtcTransportMediaStreamAdded(
scoped_refptr<webrtc::MediaStreamInterface> stream) override;
void OnWebrtcTransportMediaStreamRemoved(
scoped_refptr<webrtc::MediaStreamInterface> stream) override;

// ChannelDispatcherBase::EventHandler interface.
void OnChannelInitialized(ChannelDispatcherBase* channel_dispatcher) override;
void OnChannelClosed(ChannelDispatcherBase* channel_dispatcher) override;

private:
base::ThreadChecker thread_checker_;
Expand Down
Loading

0 comments on commit d059c46

Please sign in to comment.