Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ignore flow control in h2 when sending first request #849

Merged
merged 6 commits into from
Jul 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions src/brpc/policy/http2_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,14 +326,20 @@ inline H2Context::FrameHandler FindFrameHandler(H2FrameType type) {

H2Context::H2Context(Socket* socket, const Server* server)
: _socket(socket)
, _remote_window_left(H2Settings::DEFAULT_INITIAL_WINDOW_SIZE)
// Maximize the window size to make sending big request possible before
// receving the remote settings.
, _remote_window_left(H2Settings::MAX_WINDOW_SIZE)
, _conn_state(H2_CONNECTION_UNINITIALIZED)
, _last_received_stream_id(-1)
, _last_sent_stream_id(1)
, _goaway_stream_id(-1)
, _remote_settings_received(false)
, _deferred_window_update(0) {
// Stop printing the field which is useless for remote settings.
_remote_settings.connection_window_size = 0;
// Maximize the window size to make sending big request possible before
// receving the remote settings.
_remote_settings.stream_window_size = H2Settings::MAX_WINDOW_SIZE;
if (server) {
_unack_local_settings = server->options().h2_settings;
} else {
Expand Down Expand Up @@ -860,9 +866,28 @@ H2ParseResult H2Context::OnSettings(
return MakeH2Message(NULL);
}
const int64_t old_stream_window_size = _remote_settings.stream_window_size;
if (!ParseH2Settings(&_remote_settings, it, frame_head.payload_size)) {
LOG(ERROR) << "Fail to parse from SETTINGS";
return MakeH2Error(H2_PROTOCOL_ERROR);
if (!_remote_settings_received) {
// To solve the problem that sender can't send large request before receving
// remote setting, the initial window size of stream/connection is set to
// MAX_WINDOW_SIZE(see constructor of H2Context).
// As a result, in the view of remote side, window size is 65535 by default so
// it may not send its stream size to sender, making stream size still be
// MAX_WINDOW_SIZE. In this case we need to revert this value to default.
H2Settings tmp_settings;
if (!ParseH2Settings(&tmp_settings, it, frame_head.payload_size)) {
LOG(ERROR) << "Fail to parse from SETTINGS";
return MakeH2Error(H2_PROTOCOL_ERROR);
}
_remote_settings = tmp_settings;
_remote_window_left.fetch_sub(
H2Settings::MAX_WINDOW_SIZE - H2Settings::DEFAULT_INITIAL_WINDOW_SIZE,
butil::memory_order_relaxed);
_remote_settings_received = true;
} else {
if (!ParseH2Settings(&_remote_settings, it, frame_head.payload_size)) {
LOG(ERROR) << "Fail to parse from SETTINGS";
return MakeH2Error(H2_PROTOCOL_ERROR);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

首先这段代码可以如下简化:

if (!_remote_settings_received) {        
        _remote_settings.stream_window_size = H2Settings::DEFAULT_INITIAL_WINDOW_SIZE;
        _remote_window_left.fetch_sub(
                H2Settings::MAX_WINDOW_SIZE - H2Settings::DEFAULT_INITIAL_WINDOW_SIZE,
                butil::memory_order_relaxed);
        _remote_settings_received = true;
}
if (!ParseH2Settings(&_remote_settings, it, frame_head.payload_size)) {
        LOG(ERROR) << "Fail to parse from SETTINGS";
        return MakeH2Error(H2_PROTOCOL_ERROR);
}

其次目前的改法在WINDOWS_UPDATE过来前,还是有段时间_remote_window_left已经归位了,理论上做不到“只要server端connection-level window size设得很大就总能发送成功”,不过窗口比较小

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1.这样简化是不是有个race:当client端第一个setting包收到后,将stream_window_size和_remote_window_left复原了,此时一个req的AppendAndDestroy正在调用,发现window太小,就发失败了。现在的pr代码可以避免”复原到初始“这个中间态。

2.从server的视角来说,client的default connection window size一定是65536,所以一定是server端来负责在合适的时候发WINDOWS_UPDATE来保证client可以一直发。如果接收到setting后,并且WINDOWS_UPDATE一直没发过来,这时候发送失败是个预期的行为。如果server的connection size很大,正确的实现是发完setting后立刻发WINDOWS_UPDATE(让client立刻感知),现在brpc server的代码就是这么实现的

const int64_t window_diff =
static_cast<int64_t>(_remote_settings.stream_window_size)
Expand Down Expand Up @@ -1025,6 +1050,7 @@ void H2Context::Describe(std::ostream& os, const DescribeOptions& opt) const {
<< sep << "remote_conn_window_left="
<< _remote_window_left.load(butil::memory_order_relaxed)
<< sep << "remote_settings=" << _remote_settings
<< sep << "remote_settings_received=" << _remote_settings_received
<< sep << "local_settings=" << _local_settings
<< sep << "hpacker={";
IndentingOStream os2(os, 2);
Expand Down Expand Up @@ -1527,7 +1553,7 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
}

_sctx->Init(ctx, id);
// flow control
// check flow control restriction
if (!_cntl->request_attachment().empty()) {
const int64_t data_size = _cntl->request_attachment().size();
if (!_sctx->ConsumeWindowSize(data_size)) {
Expand Down
1 change: 1 addition & 0 deletions src/brpc/policy/http2_rpc_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ friend void InitFrameHandlers();
uint32_t _last_sent_stream_id;
int _goaway_stream_id;
H2Settings _remote_settings;
bool _remote_settings_received;
H2Settings _local_settings;
H2Settings _unack_local_settings;
HPacker _hpacker;
Expand Down
30 changes: 24 additions & 6 deletions test/brpc_http_rpc_protocol_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ class MyEchoService : public ::test::EchoService {
if (sleep_ms_str) {
bthread_usleep(strtol(sleep_ms_str->data(), NULL, 10) * 1000);
}

EXPECT_EQ(EXP_REQUEST, req->message());
res->set_message(EXP_RESPONSE);
}
};
Expand Down Expand Up @@ -996,12 +994,24 @@ TEST_F(HttpTest, http2_sanity) {
options.protocol = "h2";
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));

// 1) complete flow and
// 2) socket replacement when streamId runs out, the initial streamId is a special
// value set in ctor of H2Context
// Check that the first request with size larger than the default window can
// be sent out, when remote settings are not received.
brpc::Controller cntl;
test::EchoRequest big_req;
test::EchoResponse res;
std::string message(2 * 1024 * 1024 /* 2M */, 'x');
big_req.set_message(message);
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
cntl.http_request().uri() = "/EchoService/Echo";
channel.CallMethod(NULL, &cntl, &big_req, &res, NULL);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(EXP_RESPONSE, res.message());

// socket replacement when streamId runs out, the initial streamId is a special
// value set in ctor of H2Context so that the number 15000 is enough to run out
// of stream.
test::EchoRequest req;
req.set_message(EXP_REQUEST);
test::EchoResponse res;
for (int i = 0; i < 15000; ++i) {
brpc::Controller cntl;
cntl.http_request().set_content_type("application/json");
Expand Down Expand Up @@ -1113,6 +1123,14 @@ TEST_F(HttpTest, http2_window_used_up) {
cntl.http_request().set_content_type("application/proto");
brpc::policy::SerializeHttpRequest(&request_buf, &cntl, &req);

char settingsbuf[brpc::policy::FRAME_HEAD_SIZE + 36];
brpc::H2Settings h2_settings;
const size_t nb = brpc::policy::SerializeH2Settings(h2_settings, settingsbuf + brpc::policy::FRAME_HEAD_SIZE);
brpc::policy::SerializeFrameHead(settingsbuf, nb, brpc::policy::H2_FRAME_SETTINGS, 0, 0);
butil::IOBuf buf;
buf.append(settingsbuf, brpc::policy::FRAME_HEAD_SIZE + nb);
brpc::policy::ParseH2Message(&buf, _h2_client_sock.get(), false, NULL);

int nsuc = brpc::H2Settings::DEFAULT_INITIAL_WINDOW_SIZE / cntl.request_attachment().size();
for (int i = 0; i <= nsuc; i++) {
brpc::policy::H2UnsentRequest* h2_req = brpc::policy::H2UnsentRequest::New(&cntl);
Expand Down