Skip to content

Commit

Permalink
Implement WebSocketSpdyStreamAdapter.
Browse files Browse the repository at this point in the history
WebSocketSpdyStreamAdapter wraps a SpdyStream so that it can be used by
the future WebSocketHttp2HandshakeStream class to do the  WebSocket
handshake, then handed off to WebSocketBasicStream.

Bug: 801564
Change-Id: Icac1d4e19d12b390bd61562de9150698be81f5f7
Reviewed-on: https://chromium-review.googlesource.com/912009
Reviewed-by: Ryan Hamilton <rch@chromium.org>
Reviewed-by: Adam Rice <ricea@chromium.org>
Commit-Queue: Bence Béky <bnc@chromium.org>
Cr-Commit-Position: refs/heads/master@{#537135}
  • Loading branch information
Bence Béky authored and Commit Bot committed Feb 16, 2018
1 parent e663d59 commit 29f6616
Show file tree
Hide file tree
Showing 5 changed files with 987 additions and 6 deletions.
1 change: 1 addition & 0 deletions net/spdy/chromium/spdy_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ class NET_EXPORT_PRIVATE SpdyStream {
// MORE_DATA_TO_SEND for bidirectional streams; for request/response
// streams, it must be MORE_DATA_TO_SEND if there is more data to
// upload, or NO_MORE_DATA_TO_SEND if not.
// Must not be called until Delegate::OnHeadersSent() is called.
void SendData(IOBuffer* data, int length, SpdySendStatus send_status);

// Fills SSL info in |ssl_info| and returns true when SSL is in use.
Expand Down
2 changes: 1 addition & 1 deletion net/websockets/websocket_basic_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
#include <string>
#include <vector>

#include "base/callback.h"
#include "base/memory/ref_counted.h"
#include "net/base/completion_callback.h"
#include "net/base/net_export.h"
#include "net/traffic_annotation/network_traffic_annotation.h"
#include "net/websockets/websocket_frame_parser.h"
Expand Down
163 changes: 163 additions & 0 deletions net/websockets/websocket_basic_stream_adapters.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@
#include <cstring>
#include <utility>

#include "base/bind.h"
#include "base/location.h"
#include "base/single_thread_task_runner.h"
#include "base/threading/thread_task_runner_handle.h"
#include "net/base/io_buffer.h"
#include "net/socket/client_socket_handle.h"
#include "net/socket/socket.h"
#include "net/spdy/chromium/spdy_buffer.h"

namespace net {

Expand Down Expand Up @@ -44,4 +49,162 @@ bool WebSocketClientSocketHandleAdapter::is_initialized() const {
return connection_->is_initialized();
}

WebSocketSpdyStreamAdapter::WebSocketSpdyStreamAdapter(
base::WeakPtr<SpdyStream> stream,
Delegate* delegate,
NetLogWithSource net_log)
: headers_sent_(false),
stream_(stream),
stream_error_(ERR_CONNECTION_CLOSED),
delegate_(delegate),
write_length_(0),
net_log_(net_log),
weak_factory_(this) {
stream_->SetDelegate(this);
}

WebSocketSpdyStreamAdapter::~WebSocketSpdyStreamAdapter() {
if (stream_) {
// DetachDelegate() also cancels the stream.
stream_->DetachDelegate();
}
}

void WebSocketSpdyStreamAdapter::DetachDelegate() {
delegate_ = nullptr;
}

int WebSocketSpdyStreamAdapter::Read(IOBuffer* buf,
int buf_len,
const CompletionCallback& callback) {
DCHECK(!read_callback_);
DCHECK_LT(0, buf_len);

read_buffer_ = buf;
// |read_length_| is size_t and |buf_len| is a non-negative int, therefore
// conversion is always valid.
read_length_ = buf_len;

if (!read_data_.IsEmpty())
return CopySavedReadDataIntoBuffer();

if (!stream_)
return stream_error_;

read_callback_ = callback;
return ERR_IO_PENDING;
}

int WebSocketSpdyStreamAdapter::Write(
IOBuffer* buf,
int buf_len,
const CompletionCallback& callback,
const NetworkTrafficAnnotationTag& traffic_annotation) {
CHECK(headers_sent_);
DCHECK(!write_callback_);
DCHECK(callback);
DCHECK_LT(0, buf_len);

if (!stream_)
return stream_error_;

stream_->SendData(buf, buf_len, MORE_DATA_TO_SEND);
write_callback_ = callback;
write_length_ = buf_len;
return ERR_IO_PENDING;
}

void WebSocketSpdyStreamAdapter::Disconnect() {
if (stream_) {
stream_->DetachDelegate();
stream_ = nullptr;
}
}

bool WebSocketSpdyStreamAdapter::is_initialized() const {
return true;
}

// SpdyStream::Delegate methods.
void WebSocketSpdyStreamAdapter::OnHeadersSent() {
headers_sent_ = true;
if (delegate_)
delegate_->OnHeadersSent();
}

void WebSocketSpdyStreamAdapter::OnHeadersReceived(
const SpdyHeaderBlock& response_headers) {
if (delegate_)
delegate_->OnHeadersReceived(response_headers);
}

void WebSocketSpdyStreamAdapter::OnDataReceived(
std::unique_ptr<SpdyBuffer> buffer) {
read_data_.Enqueue(std::move(buffer));
if (read_callback_)
base::ResetAndReturn(&read_callback_).Run(CopySavedReadDataIntoBuffer());
}

void WebSocketSpdyStreamAdapter::OnDataSent() {
DCHECK(write_callback_);

base::ResetAndReturn(&write_callback_).Run(write_length_);
}

void WebSocketSpdyStreamAdapter::OnTrailers(const SpdyHeaderBlock& trailers) {}

void WebSocketSpdyStreamAdapter::OnClose(int status) {
DCHECK_GT(ERR_IO_PENDING, status);

stream_error_ = status;
stream_ = nullptr;

auto self = weak_factory_.GetWeakPtr();

if (read_callback_) {
DCHECK(read_data_.IsEmpty());
// Might destroy |this|.
base::ResetAndReturn(&read_callback_).Run(status);
if (!self)
return;
}
if (write_callback_) {
// Might destroy |this|.
base::ResetAndReturn(&write_callback_).Run(status);
if (!self)
return;
}

// Delay calling delegate_->OnClose() until all buffered data are read.
if (read_data_.IsEmpty() && delegate_) {
// Might destroy |this|.
delegate_->OnClose(status);
}
}

NetLogSource WebSocketSpdyStreamAdapter::source_dependency() const {
return net_log_.source();
}

int WebSocketSpdyStreamAdapter::CopySavedReadDataIntoBuffer() {
int rv = read_data_.Dequeue(read_buffer_->data(), read_length_);

// Stream has been destroyed earlier but delegate_->OnClose() call was
// delayed until all buffered data are read. PostTask so that Read() can
// return beforehand.
if (!stream_ && delegate_ && read_data_.IsEmpty()) {
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE,
base::BindOnce(&WebSocketSpdyStreamAdapter::CallDelegateOnClose,
weak_factory_.GetWeakPtr()));
}

return rv;
}

void WebSocketSpdyStreamAdapter::CallDelegateOnClose() {
if (delegate_)
delegate_->OnClose(stream_error_);
}

} // namespace net
108 changes: 107 additions & 1 deletion net/websockets/websocket_basic_stream_adapters.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,25 @@

#include <memory>

#include "base/memory/weak_ptr.h"
#include "net/base/completion_callback.h"
#include "net/base/net_export.h"
#include "net/spdy/chromium/spdy_read_queue.h"
#include "net/spdy/chromium/spdy_stream.h"
#include "net/traffic_annotation/network_traffic_annotation.h"
#include "net/websockets/websocket_basic_stream.h"

namespace net {

class ClientSocketHandle;
class IOBuffer;
class SpdyBuffer;

// Trivial adapter to make WebSocketBasicStream use an HTTP/1.1 connection.
// Trivial adapter to make WebSocketBasicStream use a TCP/IP or TLS socket.
class NET_EXPORT_PRIVATE WebSocketClientSocketHandleAdapter
: public WebSocketBasicStream::Adapter {
public:
WebSocketClientSocketHandleAdapter() = delete;
explicit WebSocketClientSocketHandleAdapter(
std::unique_ptr<ClientSocketHandle> connection);
~WebSocketClientSocketHandleAdapter() override;
Expand All @@ -38,6 +44,106 @@ class NET_EXPORT_PRIVATE WebSocketClientSocketHandleAdapter
std::unique_ptr<ClientSocketHandle> connection_;
};

// Adapter to make WebSocketBasicStream use an HTTP/2 stream.
// Sets itself as a delegate of the SpdyStream, and forwards headers-related
// methods to WebSocketHttp2HandshakeStream, which implements
// WebSocketSpdyStreamAdapter::Delegate. After the handshake, ownership of this
// object can be passed to WebSocketBasicStream, which can read and write using
// a ClientSocketHandle-like interface.
class NET_EXPORT_PRIVATE WebSocketSpdyStreamAdapter
: public WebSocketBasicStream::Adapter,
public SpdyStream::Delegate {
public:
// Interface for forwarding SpdyStream::Delegate methods necessary for the
// handshake.
class Delegate {
public:
virtual ~Delegate() = default;
virtual void OnHeadersSent() = 0;
virtual void OnHeadersReceived(const SpdyHeaderBlock& response_headers) = 0;
// Might destroy |this|.
virtual void OnClose(int status) = 0;
};

// |delegate| must be valid until DetachDelegate() is called.
WebSocketSpdyStreamAdapter(base::WeakPtr<SpdyStream> stream,
Delegate* delegate,
NetLogWithSource net_log);
~WebSocketSpdyStreamAdapter() override;

// Called by WebSocketSpdyStreamAdapter::Delegate before it is destroyed.
void DetachDelegate();

// WebSocketBasicStream::Adapter methods.

int Read(IOBuffer* buf,
int buf_len,
const CompletionCallback& callback) override;

// Write() must not be called before Delegate::OnHeadersSent() is called.
// Write() always returns asynchronously.
int Write(IOBuffer* buf,
int buf_len,
const CompletionCallback& callback,
const NetworkTrafficAnnotationTag& traffic_annotation) override;

void Disconnect() override;
bool is_initialized() const override;

// SpdyStream::Delegate methods.

void OnHeadersSent() override;
void OnHeadersReceived(const SpdyHeaderBlock& response_headers) override;
void OnDataReceived(std::unique_ptr<SpdyBuffer> buffer) override;
void OnDataSent() override;
void OnTrailers(const SpdyHeaderBlock& trailers) override;
void OnClose(int status) override;
NetLogSource source_dependency() const override;

private:
// Copy data from read_data_ to read_buffer_.
int CopySavedReadDataIntoBuffer();

// Call WebSocketSpdyStreamAdapter::Delegate::OnClose().
void CallDelegateOnClose();

// True if SpdyStream::Delegate::OnHeadersSent() has been called.
// SpdyStream::SendData() must not be called before that.
bool headers_sent_;

// The underlying SpdyStream.
base::WeakPtr<SpdyStream> stream_;

// The error code with which SpdyStream was closed.
int stream_error_;

Delegate* delegate_;

// Buffer data pushed by SpdyStream until read through Read().
SpdyReadQueue read_data_;

// Read buffer and length used for both synchronous and asynchronous
// read operations.
IOBuffer* read_buffer_;
size_t read_length_;

// Read callback saved for asynchronous reads.
// Whenever |read_data_| is not empty, |read_callback_| must be null.
CompletionCallback read_callback_;

// Write length saved to be passed to |write_callback_|. This is necessary
// because SpdyStream::Delegate::OnDataSent() does not pass number of bytes
// written.
int write_length_;

// Write callback saved for asynchronous writes (all writes are asynchronous).
CompletionCallback write_callback_;

NetLogWithSource net_log_;

base::WeakPtrFactory<WebSocketSpdyStreamAdapter> weak_factory_;
};

} // namespace net

#endif // NET_WEBSOCKETS_WEBSOCKET_BASIC_STREAM_ADAPTERS_H_
Loading

0 comments on commit 29f6616

Please sign in to comment.