Skip to content

Commit

Permalink
Separate reusable pieces from KeepAliveDelegate
Browse files Browse the repository at this point in the history
This change pulls the heartbeat message handling and timers out into a
new class that is separate from the CastTransport::Delegate
responsibilities of KeepAliveDelegate.  This will allow reuse with
libcast CastSocket.

Bug: 1050913
Change-Id: I463a5b7279a1120759506a776706ac5e85967282
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2108283
Commit-Queue: Brandon Tolsch <btolsch@chromium.org>
Reviewed-by: mark a. foltz <mfoltz@chromium.org>
Cr-Commit-Position: refs/heads/master@{#753035}
  • Loading branch information
btolsch authored and Commit Bot committed Mar 24, 2020
1 parent 8d8ed5e commit 5b0145a
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 168 deletions.
2 changes: 2 additions & 0 deletions components/cast_channel/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ static_library("cast_channel") {
"enum_table.h",
"keep_alive_delegate.cc",
"keep_alive_delegate.h",
"keep_alive_handler.cc",
"keep_alive_handler.h",
"logger.cc",
"logger.h",
"mojo_data_pump.cc",
Expand Down
129 changes: 14 additions & 115 deletions components/cast_channel/keep_alive_delegate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,150 +17,49 @@

namespace cast_channel {

using ::cast_channel::ChannelError;

KeepAliveDelegate::KeepAliveDelegate(
CastSocket* socket,
scoped_refptr<Logger> logger,
std::unique_ptr<CastTransport::Delegate> inner_delegate,
base::TimeDelta ping_interval,
base::TimeDelta liveness_timeout)
: started_(false),
socket_(socket),
logger_(logger),
inner_delegate_(std::move(inner_delegate)),
liveness_timeout_(liveness_timeout),
ping_interval_(ping_interval),
ping_message_(CreateKeepAlivePingMessage()),
pong_message_(CreateKeepAlivePongMessage()) {
DCHECK(ping_interval_ < liveness_timeout_);
: inner_delegate_(std::move(inner_delegate)),
handler_(socket,
std::move(logger),
ping_interval,
liveness_timeout,
base::BindRepeating(&KeepAliveDelegate::OnError,
base::Unretained(this))) {
DCHECK(inner_delegate_);
DCHECK(socket_);
}

KeepAliveDelegate::~KeepAliveDelegate() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
}
KeepAliveDelegate::~KeepAliveDelegate() = default;

void KeepAliveDelegate::SetTimersForTest(
std::unique_ptr<base::RetainingOneShotTimer> injected_ping_timer,
std::unique_ptr<base::RetainingOneShotTimer> injected_liveness_timer) {
ping_timer_ = std::move(injected_ping_timer);
liveness_timer_ = std::move(injected_liveness_timer);
handler_.SetTimersForTest(std::move(injected_ping_timer),
std::move(injected_liveness_timer));
}

// CastTransport::Delegate interface.
void KeepAliveDelegate::Start() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(!started_);

DVLOG(1) << "Starting keep-alive timers.";
DVLOG(1) << "Ping timeout: " << ping_interval_;
DVLOG(1) << "Liveness timeout: " << liveness_timeout_;

// Use injected mock timers, if provided.
if (!ping_timer_) {
ping_timer_.reset(new base::RetainingOneShotTimer());
}
if (!liveness_timer_) {
liveness_timer_.reset(new base::RetainingOneShotTimer());
}

ping_timer_->Start(
FROM_HERE, ping_interval_,
base::BindRepeating(&KeepAliveDelegate::SendKeepAliveMessage,
base::Unretained(this), ping_message_,
CastMessageType::kPing));
liveness_timer_->Start(
FROM_HERE, liveness_timeout_,
base::BindRepeating(&KeepAliveDelegate::LivenessTimeout,
base::Unretained(this)));

started_ = true;
handler_.Start();
inner_delegate_->Start();
}

void KeepAliveDelegate::ResetTimers() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(started_);
ping_timer_->Reset();
liveness_timer_->Reset();
}

void KeepAliveDelegate::SendKeepAliveMessage(const CastMessage& message,
CastMessageType message_type) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DVLOG(2) << "Sending " << ToString(message_type);

socket_->transport()->SendMessage(
message, base::BindOnce(&KeepAliveDelegate::SendKeepAliveMessageComplete,
weak_factory_.GetWeakPtr(), message_type));
}

void KeepAliveDelegate::SendKeepAliveMessageComplete(
CastMessageType message_type,
int rv) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DVLOG(2) << "Sending " << ToString(message_type) << " complete, rv=" << rv;
if (rv != net::OK) {
// An error occurred while sending the ping response.
DVLOG(1) << "Error sending " << ToString(message_type);
logger_->LogSocketEventWithRv(socket_->id(), ChannelEvent::PING_WRITE_ERROR,
rv);
OnError(ChannelError::CAST_SOCKET_ERROR);
return;
}

if (liveness_timer_)
liveness_timer_->Reset();
}

void KeepAliveDelegate::LivenessTimeout() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
OnError(ChannelError::PING_TIMEOUT);
Stop();
}

// CastTransport::Delegate interface.
void KeepAliveDelegate::OnError(ChannelError error_state) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DVLOG(1) << "KeepAlive::OnError: "
<< ::cast_channel::ChannelErrorToString(error_state);
inner_delegate_->OnError(error_state);
Stop();
handler_.Stop();
}

void KeepAliveDelegate::OnMessage(const CastMessage& message) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DVLOG(2) << "KeepAlive::OnMessage : " << message.payload_utf8();

if (started_)
ResetTimers();

// Keep-alive messages are intercepted and handled by KeepAliveDelegate
// here. All other messages are passed through to |inner_delegate_|.
// Keep-alive messages are assumed to be in the form { "type": "PING|PONG" }.
if (message.namespace_() == kHeartbeatNamespace) {
const char* ping_message_type = ToString(CastMessageType::kPing);
if (message.payload_utf8().find(ping_message_type) != std::string::npos) {
DVLOG(2) << "Received PING.";
if (started_)
SendKeepAliveMessage(pong_message_, CastMessageType::kPong);
} else {
DCHECK_NE(std::string::npos,
message.payload_utf8().find(ToString(CastMessageType::kPong)));
DVLOG(2) << "Received PONG.";
}
} else {
if (!handler_.HandleMessage(message)) {
inner_delegate_->OnMessage(message);
}
}

void KeepAliveDelegate::Stop() {
if (started_) {
started_ = false;
ping_timer_->Stop();
liveness_timer_->Stop();
}
}

} // namespace cast_channel
56 changes: 3 additions & 53 deletions components/cast_channel/keep_alive_delegate.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
#define COMPONENTS_CAST_CHANNEL_KEEP_ALIVE_DELEGATE_H_

#include "base/macros.h"
#include "base/memory/weak_ptr.h"
#include "base/threading/thread_checker.h"
#include "base/timer/timer.h"
#include "components/cast_channel/cast_message_util.h"
#include "components/cast_channel/cast_channel_enum.h"
#include "components/cast_channel/cast_transport.h"
#include "third_party/openscreen/src/cast/common/channel/proto/cast_channel.pb.h"
#include "components/cast_channel/keep_alive_handler.h"

namespace cast_channel {

Expand Down Expand Up @@ -53,58 +51,10 @@ class KeepAliveDelegate : public CastTransport::Delegate {
void OnMessage(const CastMessage& message) override;

private:
// Restarts the ping/liveness timeout timers. Called when a message
// is received from the remote end.
void ResetTimers();

// Sends a formatted PING or PONG message to the remote side.
void SendKeepAliveMessage(const CastMessage& message,
CastMessageType message_type);

// Callback for SendKeepAliveMessage.
void SendKeepAliveMessageComplete(CastMessageType message_type, int rv);

// Called when the liveness timer expires, indicating that the remote
// end has not responded within the |liveness_timeout_| interval.
void LivenessTimeout();

// Stops the ping and liveness timers if they are started.
// To be called after an error.
void Stop();

// Indicates that Start() was called.
bool started_;

// Socket that is managed by the keep-alive object.
CastSocket* socket_;

// Logging object.
scoped_refptr<Logger> logger_;

// Delegate object which receives all non-keep alive messages.
std::unique_ptr<CastTransport::Delegate> inner_delegate_;

// Amount of idle time to wait before disconnecting.
base::TimeDelta liveness_timeout_;

// Amount of idle time to wait before pinging the receiver.
base::TimeDelta ping_interval_;

// Fired when |ping_interval_| is exceeded or when triggered by test code.
std::unique_ptr<base::RetainingOneShotTimer> ping_timer_;

// Fired when |liveness_timer_| is exceeded.
std::unique_ptr<base::RetainingOneShotTimer> liveness_timer_;

// The PING message to send over the wire.
const CastMessage ping_message_;

// The PONG message to send over the wire.
const CastMessage pong_message_;

THREAD_CHECKER(thread_checker_);

base::WeakPtrFactory<KeepAliveDelegate> weak_factory_{this};
KeepAliveHandler handler_;

DISALLOW_COPY_AND_ASSIGN(KeepAliveDelegate);
};
Expand Down
Loading

0 comments on commit 5b0145a

Please sign in to comment.