forked from apache/doris
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[improvement](thirdparty) introduce brpc stream patch to make stream …
…write to socket in background bthread (apache#30458)
- Loading branch information
Showing
1 changed file
with
170 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp | ||
index b6c8e750..c52e9451 100644 | ||
--- a/src/brpc/controller.cpp | ||
+++ b/src/brpc/controller.cpp | ||
@@ -1169,6 +1169,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) { | ||
wopt.pipelined_count = _pipelined_count; | ||
wopt.auth_flags = _auth_flags; | ||
wopt.ignore_eovercrowded = has_flag(FLAGS_IGNORE_EOVERCROWDED); | ||
+ wopt.write_in_background = write_to_socket_in_background(); | ||
int rc; | ||
size_t packet_size = 0; | ||
if (user_packet_guard) { | ||
diff --git a/src/brpc/controller.h b/src/brpc/controller.h | ||
index 658cc695..9221f583 100644 | ||
--- a/src/brpc/controller.h | ||
+++ b/src/brpc/controller.h | ||
@@ -144,6 +144,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); | ||
static const uint32_t FLAGS_HEALTH_CHECK_CALL = (1 << 19); | ||
static const uint32_t FLAGS_PB_SINGLE_REPEATED_TO_ARRAY = (1 << 20); | ||
static const uint32_t FLAGS_MANAGE_HTTP_BODY_ON_ERROR = (1 << 21); | ||
+ static const uint32_t FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND = (1 << 22); | ||
|
||
public: | ||
struct Inheritable { | ||
@@ -350,6 +351,17 @@ public: | ||
bool is_done_allowed_to_run_in_place() const | ||
{ return has_flag(FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE); } | ||
|
||
+ // Create a background KEEPWRITE bthread to write to socket when issuing | ||
+ // RPCs, instead of trying to write to socket once in calling thread (see | ||
+ // `Socket::StartWrite` in socket.cpp). | ||
+ // The socket write could take some time (several microseconds maybe), if | ||
+ // you cares about it and don't want the calling thread to be blocked, you | ||
+ // can set this flag. | ||
+ // Should provides better batch effect in situations like when you are | ||
+ // continually issuing lots of async RPC calls in only one thread. | ||
+ void set_write_to_socket_in_background(bool f) { set_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND, f); } | ||
+ bool write_to_socket_in_background() const { return has_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND); } | ||
+ | ||
// ------------------------------------------------------------------------ | ||
// Server-side methods. | ||
// These calls shall be made from the server side only. Their results are | ||
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp | ||
index e3878c19..27748434 100644 | ||
--- a/src/brpc/socket.cpp | ||
+++ b/src/brpc/socket.cpp | ||
@@ -1620,7 +1620,7 @@ int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) { | ||
// in some protocols(namely RTMP). | ||
req->Setup(this); | ||
|
||
- if (ssl_state() != SSL_OFF) { | ||
+ if (opt.write_in_background || ssl_state() != SSL_OFF) { | ||
// Writing into SSL may block the current bthread, always write | ||
// in the background. | ||
goto KEEPWRITE_IN_BACKGROUND; | ||
diff --git a/src/brpc/socket.h b/src/brpc/socket.h | ||
index 6f710ee2..28a7ada6 100644 | ||
--- a/src/brpc/socket.h | ||
+++ b/src/brpc/socket.h | ||
@@ -269,10 +269,20 @@ public: | ||
// Default: false | ||
bool ignore_eovercrowded; | ||
|
||
+ // The calling thread directly creates KeepWrite thread to write into | ||
+ // this socket, skipping writing once. | ||
+ // In situations like when you are continually issuing lots of | ||
+ // StreamWrite or async RPC calls in only one thread, directly creating | ||
+ // KeepWrite thread at first provides batch write effect and better | ||
+ // performance. Otherwise, each write only writes one `msg` into socket | ||
+ // and no KeepWrite thread can be created, which brings poor | ||
+ // performance. | ||
+ bool write_in_background; | ||
+ | ||
WriteOptions() | ||
: id_wait(INVALID_BTHREAD_ID), abstime(NULL) | ||
, pipelined_count(0), auth_flags(0) | ||
- , ignore_eovercrowded(false) {} | ||
+ , ignore_eovercrowded(false), write_in_background(false) {} | ||
}; | ||
int Write(butil::IOBuf *msg, const WriteOptions* options = NULL); | ||
|
||
diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp | ||
index d8466d2a..2d565759 100644 | ||
--- a/src/brpc/stream.cpp | ||
+++ b/src/brpc/stream.cpp | ||
@@ -271,7 +271,8 @@ void Stream::TriggerOnConnectIfNeed() { | ||
bthread_mutex_unlock(&_connect_mutex); | ||
} | ||
|
||
-int Stream::AppendIfNotFull(const butil::IOBuf &data) { | ||
+int Stream::AppendIfNotFull(const butil::IOBuf &data, | ||
+ const StreamWriteOptions* options) { | ||
if (_cur_buf_size > 0) { | ||
std::unique_lock<bthread_mutex_t> lck(_congestion_control_mutex); | ||
if (_produced >= _remote_consumed + _cur_buf_size) { | ||
@@ -290,7 +291,9 @@ int Stream::AppendIfNotFull(const butil::IOBuf &data) { | ||
|
||
size_t data_length = data.length(); | ||
butil::IOBuf copied_data(data); | ||
- const int rc = _fake_socket_weak_ref->Write(&copied_data); | ||
+ Socket::WriteOptions wopt; | ||
+ wopt.write_in_background = options != NULL && options->write_in_background; | ||
+ const int rc = _fake_socket_weak_ref->Write(&copied_data, &wopt); | ||
if (rc != 0) { | ||
// Stream may be closed by peer before | ||
LOG(WARNING) << "Fail to write to _fake_socket, " << berror(); | ||
@@ -679,13 +682,14 @@ void Stream::HandleRpcResponse(butil::IOBuf* response_buffer) { | ||
policy::ProcessRpcResponse(msg); | ||
} | ||
|
||
-int StreamWrite(StreamId stream_id, const butil::IOBuf &message) { | ||
+int StreamWrite(StreamId stream_id, const butil::IOBuf &message, | ||
+ const StreamWriteOptions* options) { | ||
SocketUniquePtr ptr; | ||
if (Socket::Address(stream_id, &ptr) != 0) { | ||
return EINVAL; | ||
} | ||
Stream* s = (Stream*)ptr->conn(); | ||
- const int rc = s->AppendIfNotFull(message); | ||
+ const int rc = s->AppendIfNotFull(message, options); | ||
if (rc == 0) { | ||
return 0; | ||
} | ||
diff --git a/src/brpc/stream.h b/src/brpc/stream.h | ||
index fbf2d51d..410a5a09 100644 | ||
--- a/src/brpc/stream.h | ||
+++ b/src/brpc/stream.h | ||
@@ -82,6 +82,18 @@ struct StreamOptions { | ||
StreamInputHandler* handler; | ||
}; | ||
|
||
+struct StreamWriteOptions | ||
+{ | ||
+ StreamWriteOptions() : write_in_background(false) {} | ||
+ | ||
+ // Write message to socket in background thread. | ||
+ // Provides batch write effect and better performance in situations when | ||
+ // you are continually issuing lots of StreamWrite or async RPC calls in | ||
+ // only one thread. Otherwise, each StreamWrite directly writes message into | ||
+ // socket and brings poor performance. | ||
+ bool write_in_background; | ||
+}; | ||
+ | ||
// [Called at the client side] | ||
// Create a stream at client-side along with the |cntl|, which will be connected | ||
// when receiving the response with a stream from server-side. If |options| is | ||
@@ -104,7 +116,8 @@ int StreamAccept(StreamId* response_stream, Controller &cntl, | ||
// - EAGAIN: |stream_id| is created with positive |max_buf_size| and buf size | ||
// which the remote side hasn't consumed yet excceeds the number. | ||
// - EINVAL: |stream_id| is invalied or has been closed | ||
-int StreamWrite(StreamId stream_id, const butil::IOBuf &message); | ||
+int StreamWrite(StreamId stream_id, const butil::IOBuf &message, | ||
+ const StreamWriteOptions* options = NULL); | ||
|
||
// Write util the pending buffer size is less than |max_buf_size| or orrur | ||
// occurs | ||
diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h | ||
index 259f0b77..f24b75a3 100644 | ||
--- a/src/brpc/stream_impl.h | ||
+++ b/src/brpc/stream_impl.h | ||
@@ -42,7 +42,8 @@ public: | ||
|
||
// --------------------- SocketConnection -------------- | ||
|
||
- int AppendIfNotFull(const butil::IOBuf& msg); | ||
+ int AppendIfNotFull(const butil::IOBuf& msg, | ||
+ const StreamWriteOptions* options = NULL); | ||
static int Create(const StreamOptions& options, | ||
const StreamSettings *remote_settings, | ||
StreamId *id); |