diff --git a/components/cast_channel/BUILD.gn b/components/cast_channel/BUILD.gn index d2d49086327c0d..55b42ba169a9c6 100644 --- a/components/cast_channel/BUILD.gn +++ b/components/cast_channel/BUILD.gn @@ -26,6 +26,8 @@ static_library("cast_channel") { "enum_table.h", "keep_alive_delegate.cc", "keep_alive_delegate.h", + "keep_alive_handler.cc", + "keep_alive_handler.h", "logger.cc", "logger.h", "mojo_data_pump.cc", diff --git a/components/cast_channel/keep_alive_delegate.cc b/components/cast_channel/keep_alive_delegate.cc index 7b253a5da8d740..9708313ef815f8 100644 --- a/components/cast_channel/keep_alive_delegate.cc +++ b/components/cast_channel/keep_alive_delegate.cc @@ -17,150 +17,49 @@ namespace cast_channel { -using ::cast_channel::ChannelError; - KeepAliveDelegate::KeepAliveDelegate( CastSocket* socket, scoped_refptr logger, std::unique_ptr inner_delegate, base::TimeDelta ping_interval, base::TimeDelta liveness_timeout) - : started_(false), - socket_(socket), - logger_(logger), - inner_delegate_(std::move(inner_delegate)), - liveness_timeout_(liveness_timeout), - ping_interval_(ping_interval), - ping_message_(CreateKeepAlivePingMessage()), - pong_message_(CreateKeepAlivePongMessage()) { - DCHECK(ping_interval_ < liveness_timeout_); + : inner_delegate_(std::move(inner_delegate)), + handler_(socket, + std::move(logger), + ping_interval, + liveness_timeout, + base::BindRepeating(&KeepAliveDelegate::OnError, + base::Unretained(this))) { DCHECK(inner_delegate_); - DCHECK(socket_); } -KeepAliveDelegate::~KeepAliveDelegate() { - DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); -} +KeepAliveDelegate::~KeepAliveDelegate() = default; void KeepAliveDelegate::SetTimersForTest( std::unique_ptr injected_ping_timer, std::unique_ptr injected_liveness_timer) { - ping_timer_ = std::move(injected_ping_timer); - liveness_timer_ = std::move(injected_liveness_timer); + handler_.SetTimersForTest(std::move(injected_ping_timer), + std::move(injected_liveness_timer)); } +// CastTransport::Delegate interface. void KeepAliveDelegate::Start() { - DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); - DCHECK(!started_); - - DVLOG(1) << "Starting keep-alive timers."; - DVLOG(1) << "Ping timeout: " << ping_interval_; - DVLOG(1) << "Liveness timeout: " << liveness_timeout_; - - // Use injected mock timers, if provided. - if (!ping_timer_) { - ping_timer_.reset(new base::RetainingOneShotTimer()); - } - if (!liveness_timer_) { - liveness_timer_.reset(new base::RetainingOneShotTimer()); - } - - ping_timer_->Start( - FROM_HERE, ping_interval_, - base::BindRepeating(&KeepAliveDelegate::SendKeepAliveMessage, - base::Unretained(this), ping_message_, - CastMessageType::kPing)); - liveness_timer_->Start( - FROM_HERE, liveness_timeout_, - base::BindRepeating(&KeepAliveDelegate::LivenessTimeout, - base::Unretained(this))); - - started_ = true; + handler_.Start(); inner_delegate_->Start(); } -void KeepAliveDelegate::ResetTimers() { - DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); - DCHECK(started_); - ping_timer_->Reset(); - liveness_timer_->Reset(); -} - -void KeepAliveDelegate::SendKeepAliveMessage(const CastMessage& message, - CastMessageType message_type) { - DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); - DVLOG(2) << "Sending " << ToString(message_type); - - socket_->transport()->SendMessage( - message, base::BindOnce(&KeepAliveDelegate::SendKeepAliveMessageComplete, - weak_factory_.GetWeakPtr(), message_type)); -} - -void KeepAliveDelegate::SendKeepAliveMessageComplete( - CastMessageType message_type, - int rv) { - DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); - DVLOG(2) << "Sending " << ToString(message_type) << " complete, rv=" << rv; - if (rv != net::OK) { - // An error occurred while sending the ping response. - DVLOG(1) << "Error sending " << ToString(message_type); - logger_->LogSocketEventWithRv(socket_->id(), ChannelEvent::PING_WRITE_ERROR, - rv); - OnError(ChannelError::CAST_SOCKET_ERROR); - return; - } - - if (liveness_timer_) - liveness_timer_->Reset(); -} - -void KeepAliveDelegate::LivenessTimeout() { - DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); - OnError(ChannelError::PING_TIMEOUT); - Stop(); -} - -// CastTransport::Delegate interface. void KeepAliveDelegate::OnError(ChannelError error_state) { - DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); DVLOG(1) << "KeepAlive::OnError: " << ::cast_channel::ChannelErrorToString(error_state); inner_delegate_->OnError(error_state); - Stop(); + handler_.Stop(); } void KeepAliveDelegate::OnMessage(const CastMessage& message) { - DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); DVLOG(2) << "KeepAlive::OnMessage : " << message.payload_utf8(); - - if (started_) - ResetTimers(); - - // Keep-alive messages are intercepted and handled by KeepAliveDelegate - // here. All other messages are passed through to |inner_delegate_|. - // Keep-alive messages are assumed to be in the form { "type": "PING|PONG" }. - if (message.namespace_() == kHeartbeatNamespace) { - const char* ping_message_type = ToString(CastMessageType::kPing); - if (message.payload_utf8().find(ping_message_type) != std::string::npos) { - DVLOG(2) << "Received PING."; - if (started_) - SendKeepAliveMessage(pong_message_, CastMessageType::kPong); - } else { - DCHECK_NE(std::string::npos, - message.payload_utf8().find(ToString(CastMessageType::kPong))); - DVLOG(2) << "Received PONG."; - } - } else { + if (!handler_.HandleMessage(message)) { inner_delegate_->OnMessage(message); } } -void KeepAliveDelegate::Stop() { - if (started_) { - started_ = false; - ping_timer_->Stop(); - liveness_timer_->Stop(); - } -} - } // namespace cast_channel diff --git a/components/cast_channel/keep_alive_delegate.h b/components/cast_channel/keep_alive_delegate.h index 5c70ab816a4b7e..0ad91ae6d870e6 100644 --- a/components/cast_channel/keep_alive_delegate.h +++ b/components/cast_channel/keep_alive_delegate.h @@ -6,12 +6,10 @@ #define COMPONENTS_CAST_CHANNEL_KEEP_ALIVE_DELEGATE_H_ #include "base/macros.h" -#include "base/memory/weak_ptr.h" -#include "base/threading/thread_checker.h" #include "base/timer/timer.h" -#include "components/cast_channel/cast_message_util.h" +#include "components/cast_channel/cast_channel_enum.h" #include "components/cast_channel/cast_transport.h" -#include "third_party/openscreen/src/cast/common/channel/proto/cast_channel.pb.h" +#include "components/cast_channel/keep_alive_handler.h" namespace cast_channel { @@ -53,58 +51,10 @@ class KeepAliveDelegate : public CastTransport::Delegate { void OnMessage(const CastMessage& message) override; private: - // Restarts the ping/liveness timeout timers. Called when a message - // is received from the remote end. - void ResetTimers(); - - // Sends a formatted PING or PONG message to the remote side. - void SendKeepAliveMessage(const CastMessage& message, - CastMessageType message_type); - - // Callback for SendKeepAliveMessage. - void SendKeepAliveMessageComplete(CastMessageType message_type, int rv); - - // Called when the liveness timer expires, indicating that the remote - // end has not responded within the |liveness_timeout_| interval. - void LivenessTimeout(); - - // Stops the ping and liveness timers if they are started. - // To be called after an error. - void Stop(); - - // Indicates that Start() was called. - bool started_; - - // Socket that is managed by the keep-alive object. - CastSocket* socket_; - - // Logging object. - scoped_refptr logger_; - // Delegate object which receives all non-keep alive messages. std::unique_ptr inner_delegate_; - // Amount of idle time to wait before disconnecting. - base::TimeDelta liveness_timeout_; - - // Amount of idle time to wait before pinging the receiver. - base::TimeDelta ping_interval_; - - // Fired when |ping_interval_| is exceeded or when triggered by test code. - std::unique_ptr ping_timer_; - - // Fired when |liveness_timer_| is exceeded. - std::unique_ptr liveness_timer_; - - // The PING message to send over the wire. - const CastMessage ping_message_; - - // The PONG message to send over the wire. - const CastMessage pong_message_; - - THREAD_CHECKER(thread_checker_); - - base::WeakPtrFactory weak_factory_{this}; + KeepAliveHandler handler_; DISALLOW_COPY_AND_ASSIGN(KeepAliveDelegate); }; diff --git a/components/cast_channel/keep_alive_handler.cc b/components/cast_channel/keep_alive_handler.cc new file mode 100644 index 00000000000000..6b460a96d66ad7 --- /dev/null +++ b/components/cast_channel/keep_alive_handler.cc @@ -0,0 +1,152 @@ +// Copyright 2020 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "components/cast_channel/keep_alive_handler.h" + +#include + +#include "base/bind.h" +#include "components/cast_channel/cast_channel_enum.h" +#include "components/cast_channel/cast_message_util.h" +#include "components/cast_channel/cast_socket.h" +#include "components/cast_channel/logger.h" +#include "net/base/net_errors.h" +#include "net/traffic_annotation/network_traffic_annotation.h" +#include "third_party/openscreen/src/cast/common/channel/proto/cast_channel.pb.h" + +namespace cast_channel { + +KeepAliveHandler::KeepAliveHandler(CastSocket* socket, + scoped_refptr logger, + base::TimeDelta ping_interval, + base::TimeDelta liveness_timeout, + OnErrorCallback on_error_cb) + : started_(false), + socket_(socket), + logger_(logger), + liveness_timeout_(liveness_timeout), + ping_interval_(ping_interval), + ping_message_(CreateKeepAlivePingMessage()), + pong_message_(CreateKeepAlivePongMessage()), + on_error_cb_(std::move(on_error_cb)) { + DCHECK(ping_interval_ < liveness_timeout_); + DCHECK(socket_); + DCHECK(!on_error_cb_.is_null()); +} + +KeepAliveHandler::~KeepAliveHandler() { + DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); +} + +void KeepAliveHandler::SetTimersForTest( + std::unique_ptr injected_ping_timer, + std::unique_ptr injected_liveness_timer) { + ping_timer_ = std::move(injected_ping_timer); + liveness_timer_ = std::move(injected_liveness_timer); +} + +void KeepAliveHandler::Start() { + DCHECK(!started_); + + DVLOG(1) << "Starting keep-alive timers."; + DVLOG(1) << "Ping interval: " << ping_interval_; + DVLOG(1) << "Liveness timeout: " << liveness_timeout_; + + // Use injected mock timers, if provided. + if (!ping_timer_) { + ping_timer_.reset(new base::RetainingOneShotTimer()); + } + if (!liveness_timer_) { + liveness_timer_.reset(new base::RetainingOneShotTimer()); + } + + ping_timer_->Start( + FROM_HERE, ping_interval_, + base::BindRepeating(&KeepAliveHandler::SendKeepAliveMessage, + base::Unretained(this), ping_message_, + CastMessageType::kPing)); + liveness_timer_->Start(FROM_HERE, liveness_timeout_, + base::BindRepeating(&KeepAliveHandler::LivenessTimeout, + base::Unretained(this))); + + started_ = true; +} + +bool KeepAliveHandler::HandleMessage(const CastMessage& message) { + DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); + DVLOG(2) << "KeepAlive::OnMessage : " << message.payload_utf8(); + + if (started_) { + ResetTimers(); + } + + // Keep-alive messages are intercepted and handled by KeepAliveHandler + // here. All other messages are passed through to |inner_delegate_|. + // Keep-alive messages are assumed to be in the form { "type": "PING|PONG" }. + if (message.namespace_() == kHeartbeatNamespace) { + static const char* ping_message_type = ToString(CastMessageType::kPing); + if (message.payload_utf8().find(ping_message_type) != std::string::npos) { + DVLOG(2) << "Received PING."; + if (started_) + SendKeepAliveMessage(pong_message_, CastMessageType::kPong); + } else { + DCHECK_NE(std::string::npos, + message.payload_utf8().find(ToString(CastMessageType::kPong))); + DVLOG(2) << "Received PONG."; + } + return true; + } + return false; +} + +void KeepAliveHandler::ResetTimers() { + DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); + DCHECK(started_); + ping_timer_->Reset(); + liveness_timer_->Reset(); +} + +void KeepAliveHandler::SendKeepAliveMessage(const CastMessage& message, + CastMessageType message_type) { + DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); + DVLOG(2) << "Sending " << ToString(message_type); + + socket_->transport()->SendMessage( + message, base::BindOnce(&KeepAliveHandler::SendKeepAliveMessageComplete, + weak_factory_.GetWeakPtr(), message_type)); +} + +void KeepAliveHandler::SendKeepAliveMessageComplete( + CastMessageType message_type, + int rv) { + DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); + DVLOG(2) << "Sending " << ToString(message_type) << " complete, rv=" << rv; + if (rv != net::OK) { + // An error occurred while sending the ping response. + DVLOG(1) << "Error sending " << ToString(message_type); + logger_->LogSocketEventWithRv(socket_->id(), ChannelEvent::PING_WRITE_ERROR, + rv); + on_error_cb_.Run(ChannelError::CAST_SOCKET_ERROR); + return; + } + + if (liveness_timer_) + liveness_timer_->Reset(); +} + +void KeepAliveHandler::LivenessTimeout() { + DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); + on_error_cb_.Run(ChannelError::PING_TIMEOUT); + Stop(); +} + +void KeepAliveHandler::Stop() { + if (started_) { + started_ = false; + ping_timer_->Stop(); + liveness_timer_->Stop(); + } +} + +} // namespace cast_channel diff --git a/components/cast_channel/keep_alive_handler.h b/components/cast_channel/keep_alive_handler.h new file mode 100644 index 00000000000000..41a6de3b309905 --- /dev/null +++ b/components/cast_channel/keep_alive_handler.h @@ -0,0 +1,105 @@ +// Copyright 2020 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef COMPONENTS_CAST_CHANNEL_KEEP_ALIVE_HANDLER_H_ +#define COMPONENTS_CAST_CHANNEL_KEEP_ALIVE_HANDLER_H_ + +#include "base/macros.h" +#include "base/memory/weak_ptr.h" +#include "base/threading/thread_checker.h" +#include "base/timer/timer.h" +#include "components/cast_channel/cast_channel_enum.h" +#include "components/cast_channel/cast_message_util.h" + +namespace cast_channel { + +class CastSocket; +class Logger; + +using ::cast::channel::CastMessage; + +class KeepAliveHandler { + public: + using OnErrorCallback = base::RepeatingCallback; + // |socket|: The socket to be kept alive. + // |logger|: The logging object which collects protocol events and error + // details. + // |ping_interval|: The amount of idle time to wait before sending a PING to + // the remote end. + // |liveness_timeout|: The amount of idle time to wait before terminating the + // connection. + KeepAliveHandler(CastSocket* socket, + scoped_refptr logger, + base::TimeDelta ping_interval, + base::TimeDelta liveness_timeout, + OnErrorCallback on_error_cb); + ~KeepAliveHandler(); + + // Restarts the ping/liveness timeout timers. Called when a message + // is received from the remote end. + void ResetTimers(); + + void SetTimersForTest( + std::unique_ptr injected_ping_timer, + std::unique_ptr injected_liveness_timer); + + void Start(); + + // Stops the ping and liveness timers if they are started. + // To be called after an error. + void Stop(); + + bool HandleMessage(const CastMessage& message); + + private: + // Sends a formatted PING or PONG message to the remote side. + void SendKeepAliveMessage(const CastMessage& message, + CastMessageType message_type); + + // Callback for SendKeepAliveMessage. + void SendKeepAliveMessageComplete(CastMessageType message_type, int rv); + + // Called when the liveness timer expires, indicating that the remote + // end has not responded within the |liveness_timeout_| interval. + void LivenessTimeout(); + + // Indicates that Start() was called. + bool started_; + + // Socket that is managed by the keep-alive object. + CastSocket* socket_; + + // Logging object. + scoped_refptr logger_; + + // Amount of idle time to wait before disconnecting. + base::TimeDelta liveness_timeout_; + + // Amount of idle time to wait before pinging the receiver. + base::TimeDelta ping_interval_; + + // Fired when |ping_interval_| is exceeded or when triggered by test code. + std::unique_ptr ping_timer_; + + // Fired when |liveness_timer_| is exceeded. + std::unique_ptr liveness_timer_; + + // The PING message to send over the wire. + const CastMessage ping_message_; + + // The PONG message to send over the wire. + const CastMessage pong_message_; + + OnErrorCallback on_error_cb_; + + THREAD_CHECKER(thread_checker_); + + base::WeakPtrFactory weak_factory_{this}; + + DISALLOW_COPY_AND_ASSIGN(KeepAliveHandler); +}; + +} // namespace cast_channel + +#endif // COMPONENTS_CAST_CHANNEL_KEEP_ALIVE_HANDLER_H_