forked from Pissandshittium/pissandshittium
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbidirectional_stream.h
242 lines (194 loc) · 8.37 KB
/
bidirectional_stream.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COMPONENTS_GRPC_SUPPORT_BIDIRECTIONAL_STREAM_H_
#define COMPONENTS_GRPC_SUPPORT_BIDIRECTIONAL_STREAM_H_
#include <memory>
#include <vector>
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h"
#include "base/synchronization/lock.h"
#include "net/http/bidirectional_stream.h"
#include "net/url_request/url_request_context_getter.h"
namespace base {
class Location;
} // namespace base
namespace net {
class HttpRequestHeaders;
class WrappedIOBuffer;
} // namespace net
namespace grpc_support {
// An adapter to net::BidirectionalStream.
// Created and configured from any thread. Start, ReadData, WriteData and
// Destroy can be called on any thread (including network thread), and post
// calls to corresponding {Start|ReadData|WriteData|Destroy}OnNetworkThread to
// the network thread. The object is always deleted on network thread. All
// callbacks into the Delegate are done on the network thread.
// The app is expected to initiate the next step like ReadData or Destroy.
// Public methods can be called on any thread.
class BidirectionalStream : public net::BidirectionalStream::Delegate {
public:
class Delegate {
public:
virtual void OnStreamReady() = 0;
virtual void OnHeadersReceived(
const spdy::SpdyHeaderBlock& response_headers,
const char* negotiated_protocol) = 0;
virtual void OnDataRead(char* data, int size) = 0;
virtual void OnDataSent(const char* data) = 0;
virtual void OnTrailersReceived(const spdy::SpdyHeaderBlock& trailers) = 0;
virtual void OnSucceeded() = 0;
virtual void OnFailed(int error) = 0;
virtual void OnCanceled() = 0;
};
BidirectionalStream(net::URLRequestContextGetter* request_context_getter,
Delegate* delegate);
~BidirectionalStream() override;
// Disables automatic flushing of each buffer passed to WriteData().
void disable_auto_flush(bool disable_auto_flush) {
disable_auto_flush_ = disable_auto_flush;
}
// Delays sending request headers until first call to Flush().
void delay_headers_until_flush(bool delay_headers_until_flush) {
delay_headers_until_flush_ = delay_headers_until_flush;
}
// Validates method and headers, initializes and starts the request. If
// |end_of_stream| is true, then stream is half-closed after sending header
// frame and no data is expected to be written.
// Returns 0 if request is valid and started successfully,
// Returns -1 if |method| is not valid HTTP method name.
// Returns position of invalid header value in |headers| if header name is
// not valid.
int Start(const char* url,
int priority,
const char* method,
const net::HttpRequestHeaders& headers,
bool end_of_stream);
// Reads more data into |buffer| up to |capacity| bytes.
bool ReadData(char* buffer, int capacity);
// Writes |count| bytes of data from |buffer|. The |end_of_stream| is
// passed to remote to indicate end of stream.
bool WriteData(const char* buffer, int count, bool end_of_stream);
// Sends buffers passed to WriteData().
void Flush();
// Cancels the request. The OnCanceled callback is invoked when request is
// caneceled, and not other callbacks are invoked afterwards..
void Cancel();
// Releases all resources for the request and deletes the object itself.
void Destroy();
private:
// States of BidirectionalStream are tracked in |read_state_| and
// |write_state_|.
// The write state is separated as it changes independently of the read state.
// There is one initial state: NOT_STARTED. There is one normal final state:
// SUCCESS, reached after READING_DONE and WRITING_DONE. There are two
// exceptional final states: CANCELED and ERROR, which can be reached from
// any other non-final state.
enum State {
// Initial state, stream not started.
NOT_STARTED,
// Stream started, request headers are being sent.
STARTED,
// Waiting for ReadData() to be called.
WAITING_FOR_READ,
// Reading from the remote, OnDataRead callback will be invoked when done.
READING,
// There is no more data to read and stream is half-closed by the remote
// side.
READING_DONE,
// Stream is canceled.
CANCELED,
// Error has occured, stream is closed.
ERR,
// Reading and writing are done, and the stream is closed successfully.
SUCCESS,
// Waiting for Flush() to be called.
WAITING_FOR_FLUSH,
// Writing to the remote, callback will be invoked when done.
WRITING,
// There is no more data to write and stream is half-closed by the local
// side.
WRITING_DONE,
};
// Container to hold buffers and sizes of the pending data to be written.
class WriteBuffers {
public:
WriteBuffers();
~WriteBuffers();
// Clears Write Buffers list.
void Clear();
// Appends |buffer| of |buffer_size| length to the end of buffer list.
void AppendBuffer(const scoped_refptr<net::IOBuffer>& buffer,
int buffer_size);
void MoveTo(WriteBuffers* target);
// Returns true of Write Buffers list is empty.
bool Empty() const;
const std::vector<scoped_refptr<net::IOBuffer>>& buffers() const {
return write_buffer_list;
}
const std::vector<int>& lengths() const { return write_buffer_len_list; }
private:
// Every IOBuffer in |write_buffer_list| points to the memory owned by the
// application.
std::vector<scoped_refptr<net::IOBuffer>> write_buffer_list;
// A list of the length of each IOBuffer in |write_buffer_list|.
std::vector<int> write_buffer_len_list;
DISALLOW_COPY_AND_ASSIGN(WriteBuffers);
};
// net::BidirectionalStream::Delegate implementations:
void OnStreamReady(bool request_headers_sent) override;
void OnHeadersReceived(
const spdy::SpdyHeaderBlock& response_headers) override;
void OnDataRead(int bytes_read) override;
void OnDataSent() override;
void OnTrailersReceived(const spdy::SpdyHeaderBlock& trailers) override;
void OnFailed(int error) override;
// Helper method to derive OnSucceeded.
void MaybeOnSucceded();
void StartOnNetworkThread(
std::unique_ptr<net::BidirectionalStreamRequestInfo> request_info);
void ReadDataOnNetworkThread(scoped_refptr<net::WrappedIOBuffer> read_buffer,
int buffer_size);
void WriteDataOnNetworkThread(scoped_refptr<net::WrappedIOBuffer> read_buffer,
int buffer_size,
bool end_of_stream);
void FlushOnNetworkThread();
void SendFlushingWriteData();
void CancelOnNetworkThread();
void DestroyOnNetworkThread();
bool IsOnNetworkThread();
void PostToNetworkThread(const base::Location& from_here,
base::OnceClosure task);
// Read state is tracking reading flow. Only accessed on network thread.
// | <--- READING <--- |
// | |
// | |
// NOT_STARTED -> STARTED --> WAITING_FOR_READ -> READING_DONE -> SUCCESS
State read_state_;
// Write state is tracking writing flow. Only accessed on network thread.
// | <--- WRITING <--- |
// | |
// | |
// NOT_STARTED -> STARTED --> WAITING_FOR_FLUSH -> WRITING_DONE -> SUCCESS
State write_state_;
bool write_end_of_stream_;
bool request_headers_sent_;
bool disable_auto_flush_;
bool delay_headers_until_flush_;
net::URLRequestContextGetter* const request_context_getter_;
scoped_refptr<net::WrappedIOBuffer> read_buffer_;
// Write data that is pending the flush.
std::unique_ptr<WriteBuffers> pending_write_data_;
// Write data that is flushed, but not sending yet.
std::unique_ptr<WriteBuffers> flushing_write_data_;
// Write data that is sending.
std::unique_ptr<WriteBuffers> sending_write_data_;
std::unique_ptr<net::BidirectionalStream> bidi_stream_;
Delegate* delegate_;
base::WeakPtr<BidirectionalStream> weak_this_;
base::WeakPtrFactory<BidirectionalStream> weak_factory_{this};
DISALLOW_COPY_AND_ASSIGN(BidirectionalStream);
};
} // namespace grpc_support
#endif // COMPONENTS_GRPC_SUPPORT_BIDIRECTIONAL_STREAM_H_