Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
/output
/test/output
build/
.cache

# Ignore hidden files
.*
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ You can use it to:
* [bthread or not](docs/cn/bthread_or_not.md)
* [thread-local](docs/cn/thread_local.md)
* [Execution Queue](docs/cn/execution_queue.md)
* [Active Task (experimental)](docs/cn/bthread_active_task.md)
* Client
* [Basics](docs/en/client.md)
* [Error code](docs/en/error_code.md)
Expand Down
1 change: 1 addition & 0 deletions README_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [bthread or not](docs/cn/bthread_or_not.md)
* [thread-local](docs/cn/thread_local.md)
* [Execution Queue](docs/cn/execution_queue.md)
* [Active Task(实验性)](docs/cn/bthread_active_task.md)
* [bthread tracer](docs/cn/bthread_tracer.md)
* Client
* [基础功能](docs/cn/client.md)
Expand Down
386 changes: 386 additions & 0 deletions docs/cn/bthread_active_task.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions docs/cn/io.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ linux一般使用non-blocking IO提高IO并发度。当IO并发度很低时,no

“消息”指从连接读入的有边界的二进制串,可能是来自上游client的request或来自下游server的response。brpc使用一个或多个[EventDispatcher](https://github.com/apache/brpc/blob/master/src/brpc/event_dispatcher.h)(简称为EDISP)等待任一fd发生事件。和常见的“IO线程”不同,EDISP不负责读取。IO线程的问题在于一个线程同时只能读一个fd,当多个繁忙的fd聚集在一个IO线程中时,一些读取就被延迟了。多租户、复杂分流算法,[Streaming RPC](streaming_rpc.md)等功能会加重这个问题。高负载下常见的某次读取卡顿会拖慢一个IO线程中所有fd的读取,对可用性的影响幅度较大。

由于epoll的[一个bug](https://web.archive.org/web/20150423184820/https://patchwork.kernel.org/patch/1970231/)(开发brpc时仍有)及epoll_ctl较大的开销,EDISP使用Edge triggered模式。当收到事件时,EDISP给一个原子变量加1,只有当加1前的值是0时启动一个bthread处理对应fd上的数据。在背后,EDISP把所在的pthread让给了新建的bthread,使其有更好的cache locality,可以尽快地读取fd上的数据。而EDISP所在的bthread会被偷到另外一个pthread继续执行,这个过程即是bthread的work stealing调度。要准确理解那个原子变量的工作方式可以先阅读[atomic instructions](atomic_instructions.md),再看[Socket::StartInputEvent](https://github.com/apache/brpc/blob/master/src/brpc/socket.cpp)。这些方法使得brpc读取同一个fd时产生的竞争是[wait-free](http://en.wikipedia.org/wiki/Non-blocking_algorithm#Wait-freedom)的。
由于epoll的[一个bug](https://web.archive.org/web/20150423184820/https://patchwork.kernel.org/patch/1970231/)(开发brpc时仍有)及epoll_ctl较大的开销,EDISP使用Edge triggered模式。当收到事件时,EDISP给一个原子变量加1,只有当加1前的值是0时才触发对应fd的数据处理。默认配置下(`usercode_in_coroutine=false` 且 `EventDispatcherUnsched()` 为 `false`),EDISP通过 `bthread_start_urgent` 拉起处理逻辑,并把当前worker让给新任务,使其有更好的cache locality,可以尽快读取fd上的数据。若 `EventDispatcherUnsched()` 为 `true`,则改为 `bthread_start_background`,EDISP不会主动让出当前调度。若 `usercode_in_coroutine=true`,则直接在当前协程执行处理逻辑,不额外创建bthread。要准确理解那个原子变量的工作方式可以先阅读[atomic instructions](atomic_instructions.md),再看[Socket::StartInputEvent](https://github.com/apache/brpc/blob/master/src/brpc/socket.cpp)。这些方法使得brpc读取同一个fd时产生的竞争是[wait-free](http://en.wikipedia.org/wiki/Non-blocking_algorithm#Wait-freedom)的。

在当前实现里,`Transport::ProcessEvent` 会按 `EventDispatcherUnsched()` 选择启动方式:返回 `false` 时走 `bthread_start_urgent`,返回 `true` 时走 `bthread_start_background`。此外,RDMA 在轮询模式与事件模式对 `last_msg` 的处理不同:`rdma_use_polling=false` 时不会在 `RdmaTransport::QueueMessage` 里处理 `last_msg`,轮询模式下会继续处理。并且在 `EventDispatcherUnsched()` 返回 `true` 时,`last_msg` 不会在当前执行流里直接处理,而是在新的 bthread 中执行。用户可以通过 `event_dispatcher_edisp_unsched` 来控制这一行为。

[InputMessenger](https://github.com/apache/brpc/blob/master/src/brpc/input_messenger.h)负责从fd上切割和处理消息,它通过用户回调函数理解不同的格式。Parse一般是把消息从二进制流上切割下来,运行时间较固定;Process则是进一步解析消息(比如反序列化为protobuf)后调用用户回调,时间不确定。若一次从某个fd读取出n个消息(n > 1),InputMessenger会启动n-1个bthread分别处理前n-1个消息,最后一个消息则会在原地被Process。InputMessenger会逐一尝试多种协议,由于一个连接上往往只有一种消息格式,InputMessenger会记录下上次的选择,而避免每次都重复尝试。

s
可以看到,fd间和fd内的消息都会在brpc中获得并发,这使brpc非常擅长大消息的读取,在高负载时仍能及时处理不同来源的消息,减少长尾的存在。

# 发消息
Expand Down
25 changes: 25 additions & 0 deletions docs/cn/threading_overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,28 @@
异步编程中的流程控制对于专家也充满了陷阱。任何挂起操作,如sleep一会儿或等待某事完成,都意味着用户需要显式地保存状态,并在回调函数中恢复状态。异步代码往往得写成状态机的形式。当挂起较少时,这有点麻烦,但还是可把握的。问题在于一旦挂起发生在条件判断、循环、子函数中,写出这样的状态机并能被很多人理解和维护,几乎是不可能的,而这在分布式系统中又很常见,因为一个节点往往要与多个节点同时交互。另外如果唤醒可由多种事件触发(比如fd有数据或超时了),挂起和恢复的过程容易出现race condition,对多线程编码能力要求很高。语法糖(比如lambda)可以让编码不那么“麻烦”,但无法降低难度。

共享指针在异步编程中很普遍,这看似方便,但也使内存的ownership变得难以捉摸,如果内存泄漏了,很难定位哪里没有释放;如果segment fault了,也不知道哪里多释放了一下。大量使用引用计数的用户代码很难控制代码质量,容易长期在内存问题上耗费时间。如果引用计数还需要手动维护,保持质量就更难了,维护者也不会愿意改进。没有上下文会使得[RAII](http://en.wikipedia.org/wiki/Resource_Acquisition_Is_Initialization)无法充分发挥作用, 有时需要在callback之外lock,callback之内unlock,实践中很容易出错。

## butex wait/wake 顺序规则(实用)

直接使用 `butex_wait`/`butex_wake*` 时,务必遵守:

1. 唤醒方先写结果/状态,再调用 `butex_wake*`。
2. 等待方在每次 `butex_wait` 返回后都要重检谓词条件。

`butex_wait` 返回 `0` 只表示“从 butex 等待队列被唤醒”,不代表“业务条件已经满足”。

常见写法:

```cpp
// 唤醒方
state.store(new_value, butil::memory_order_release);
bthread::butex_wake(&state);

// 等待方
while (state.load(butil::memory_order_acquire) == expected_value) {
if (bthread::butex_wait(&state, expected_value, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
// 处理超时/中断/停止等错误
}
}
```
2 changes: 1 addition & 1 deletion docs/en/io.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Non-blocking IO is usually used for increasing IO concurrency in Linux. When the

A message is a bounded binary data read from a connection, which may be a request from upstream clients or a response from downstream servers. brpc uses one or several [EventDispatcher](https://github.com/apache/brpc/blob/master/src/brpc/event_dispatcher.cpp)(referred to as EDISP) to wait for events from file descriptors. Unlike the common "IO threads", EDISP is not responsible for reading or writing. The problem of IO threads is that one thread can only read one fd at a given time, other reads may be delayed when many fds in one IO thread are busy. Multi-tenancy, complicated load balancing and [Streaming RPC](streaming_rpc.md) worsen the problem. Under high workloads, regular long delays on one fd may slow down all fds in the IO thread, causing more long tails.

Because of a [bug](https://web.archive.org/web/20150423184820/https://patchwork.kernel.org/patch/1970231/) of epoll (at the time of developing brpc) and overhead of epoll_ctl, edge triggered mode is used in EDISP. After receiving an event, an atomic variable associated with the fd is added by one atomically. If the variable is zero before addition, a bthread is started to handle the data from the fd. The pthread worker in which EDISP runs is yielded to the newly created bthread to make it start reading ASAP and have a better cache locality. The bthread in which EDISP runs will be stolen to another pthread and keep running, this mechanism is work stealing used in bthreads. To understand exactly how that atomic variable works, you can read [atomic instructions](atomic_instructions.md) first, then check [Socket::StartInputEvent](https://github.com/apache/brpc/blob/master/src/brpc/socket.cpp). These methods make contentions on dispatching events of one fd [wait-free](http://en.wikipedia.org/wiki/Non-blocking_algorithm#Wait-freedom).
Because of a [bug](https://web.archive.org/web/20150423184820/https://patchwork.kernel.org/patch/1970231/) of epoll (at the time of developing brpc) and overhead of epoll_ctl, edge triggered mode is used in EDISP. After receiving an event, an atomic variable associated with the fd is added by one atomically. Data handling is triggered only when the value is zero before addition. In the default path (`usercode_in_coroutine=false` and `EventDispatcherUnsched()` is `false`), EDISP starts processing via `bthread_start_urgent` and yields the current worker to the new task for faster reads and better cache locality. If `EventDispatcherUnsched()` is `true`, it switches to `bthread_start_background`, so EDISP does not actively yield scheduling on this event. If `usercode_in_coroutine=true`, the processing logic runs inline in the current coroutine without creating another bthread. To understand exactly how that atomic variable works, you can read [atomic instructions](atomic_instructions.md) first, then check [Socket::StartInputEvent](https://github.com/apache/brpc/blob/master/src/brpc/socket.cpp). These methods make contentions on dispatching events of one fd [wait-free](http://en.wikipedia.org/wiki/Non-blocking_algorithm#Wait-freedom).

In current implementation, `Transport::ProcessEvent` chooses start mode based on `EventDispatcherUnsched()`: `false` uses `bthread_start_urgent`, and `true` uses `bthread_start_background`. In addition, RDMA handles `last_msg` differently between polling and event modes: when `rdma_use_polling=false`, `RdmaTransport::QueueMessage` does not process `last_msg`; in polling mode it continues to process it. And when `EventDispatcherUnsched()` returns `true`, `last_msg` is not processed directly in the current execution flow but in a new bthread. Users can control this behavior through `event_dispatcher_edisp_unsched`.

Expand Down
25 changes: 25 additions & 0 deletions docs/en/threading_overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,28 @@ When an event dispatcher passes a task to a worker thread, the user code probabl
Flow controls in asynchronous programming are even difficult for experts. Any suspending operation such as sleeping for a while or waiting for something to finish, implies that users have to save states explicitly and restore states in callbacks. Asynchronous code is often written as state machines. A few suspensions are troublesome, but still handleable. The problem is that once the suspension occurs inside a condition, loop or sub-function, it's almost impossible to write such a state machine being understood and maintained by many people, although the scenario is quite common in distributed systems where a node often needs to interact with multiple nodes simultaneously. In addition, if the wakeup can be triggered by more than one events (such as either fd has data or timeout is reached), the suspension and resuming are prone to race conditions, which require good multi-threaded programming skills to solve. Syntactic sugars(such as lambda) just make coding less troublesome rather than reducing difficulty.

Shared pointers are common in asynchronous programming, which seems convenient, but also makes ownerships of memory elusive. If the memory is leaked, it's difficult to locate the code that forgot to release; if segment fault happens, where the double-free occurs is also unknown. Code with a lot of referential countings is hard to remain good-quality and may waste a lot of time on debugging memory related issues. If references are even counted manually, keeping quality of the code is harder and the maintainers are less willing to modify the code. [RAII](http://en.wikipedia.org/wiki/Resource_Acquisition_Is_Initialization) cannot be used in many scenarios in asynchronous programming, sometimes resources need to be locked before a callback and unlocked inside the callback, which is very error-prone in practice.

## Butex wait/wake ordering (practical rule)

When using `butex_wait`/`butex_wake*` directly, follow this rule strictly:

1. Waker writes result/state first, then calls `butex_wake*`.
2. Waiter always re-checks predicate after every `butex_wait` return.

`butex_wait` returning `0` only means "woken from butex queue", not "predicate is true".

Typical pattern:

```cpp
// waker
state.store(new_value, butil::memory_order_release);
bthread::butex_wake(&state);

// waiter
while (state.load(butil::memory_order_acquire) == expected_value) {
if (bthread::butex_wait(&state, expected_value, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
// handle timeout/stop/etc.
}
}
```
142 changes: 142 additions & 0 deletions src/bthread/bthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
// Date: Tue Jul 10 17:40:58 CST 2012

#include <sys/syscall.h>
#include <limits>
#include <string.h>
#include <vector>
#include <gflags/gflags.h>
#include "butil/macros.h" // BAIDU_CASSERT
#include "butil/logging.h"
Expand Down Expand Up @@ -85,6 +88,8 @@ pthread_mutex_t g_task_control_mutex = PTHREAD_MUTEX_INITIALIZER;
// Notice that we can't declare the variable as atomic<TaskControl*> which
// are not constructed before main().
TaskControl* g_task_control = NULL;
static pthread_mutex_t g_active_task_registry_mutex = PTHREAD_MUTEX_INITIALIZER;
static std::vector<bthread_active_task_type_t> g_active_task_types;

extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group);
Expand All @@ -96,6 +101,89 @@ inline TaskControl* get_task_control() {
return g_task_control;
}

static bool normalize_active_task_type(const bthread_active_task_type_t* in,
bthread_active_task_type_t* out) {
if (in == NULL || out == NULL) {
return false;
}
if (in->struct_size < sizeof(bthread_active_task_type_t)) {
return false;
}
if (in->name == NULL || in->name[0] == '\0') {
return false;
}
if (in->worker_init == NULL && in->worker_destroy == NULL &&
in->harvest == NULL) {
return false;
}
memset(out, 0, sizeof(*out));
memcpy(out, in, sizeof(*out));
out->struct_size = sizeof(*out);
return true;
}

void get_active_task_types_snapshot(std::vector<bthread_active_task_type_t>* out) {
if (out == NULL) {
return;
}
BAIDU_SCOPED_LOCK(g_active_task_registry_mutex);
*out = g_active_task_types;
}

static inline TaskMeta* current_normal_bthread_for_local_pin(int* err) {
TaskGroup* g = tls_task_group;
if (g == NULL || g->is_current_main_task() || g->is_current_pthread_task()) {
if (err) {
*err = EPERM;
}
return NULL;
}
if (err) {
*err = 0;
}
return g->current_task();
}

static inline int enter_local_pin_scope(TaskMeta* m, TaskGroup* g) {
if (m == NULL || g == NULL) {
return EINVAL;
}
if (!m->local_pin_enabled) {
m->local_pin_home_group = g;
m->local_pin_home_control = g->control();
m->local_pin_home_tag = g->tag();
m->local_pin_depth = 1;
m->local_pin_enabled = true;
return 0;
}
if (m->local_pin_home_group != g ||
m->local_pin_home_control != g->control() ||
m->local_pin_home_tag != g->tag()) {
return EPERM;
}
if (m->local_pin_depth == std::numeric_limits<uint16_t>::max()) {
return EINVAL;
}
++m->local_pin_depth;
return 0;
}

static inline int leave_local_pin_scope(TaskMeta* m) {
if (m == NULL) {
return EINVAL;
}
if (!m->local_pin_enabled || m->local_pin_depth == 0) {
return EINVAL;
}
if (--m->local_pin_depth == 0) {
m->local_pin_enabled = false;
m->local_pin_home_group = NULL;
m->local_pin_home_control = NULL;
m->local_pin_home_tag = BTHREAD_TAG_INVALID;
}
return 0;
}

inline TaskControl* get_or_new_task_control() {
butil::atomic<TaskControl*>* p = (butil::atomic<TaskControl*>*)&g_task_control;
TaskControl* c = p->load(butil::memory_order_consume);
Expand Down Expand Up @@ -146,6 +234,11 @@ bthread_t init_for_pthread_stack_trace() {
}

pthread_fake_meta->attr = BTHREAD_ATTR_PTHREAD;
pthread_fake_meta->local_pin_home_group = NULL;
pthread_fake_meta->local_pin_home_control = NULL;
pthread_fake_meta->local_pin_home_tag = BTHREAD_TAG_INVALID;
pthread_fake_meta->local_pin_depth = 0;
pthread_fake_meta->local_pin_enabled = false;
pthread_fake_meta->tid = make_tid(*pthread_fake_meta->version_butex, slot);
// Make TaskTracer use signal trace mode for pthread.
c->_task_tracer.set_running_status(syscall(SYS_gettid), pthread_fake_meta);
Expand Down Expand Up @@ -328,6 +421,55 @@ struct TidJoiner {

extern "C" {

int bthread_register_active_task_type(const bthread_active_task_type_t* type) {
bthread_active_task_type_t normalized;
if (!bthread::normalize_active_task_type(type, &normalized)) {
return EINVAL;
}
BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
if (bthread::get_task_control() != NULL) {
return EPERM;
}
BAIDU_SCOPED_LOCK(bthread::g_active_task_registry_mutex);
bthread::g_active_task_types.push_back(normalized);
return 0;
}

int bthread_butex_wake_within(const bthread_active_task_ctx_t* ctx,
void* butex) {
return bthread::TaskGroup::butex_wake_within_active_task(ctx, butex);
}

int bthread_butex_wait_local(void* butex, int expected_value,
const struct timespec* abstime) {
if (butex == NULL) {
errno = EINVAL;
return -1;
}
int err = 0;
bthread::TaskMeta* m = bthread::current_normal_bthread_for_local_pin(&err);
if (m == NULL) {
errno = err;
return -1;
}
bthread::TaskGroup* g = bthread::tls_task_group;
err = bthread::enter_local_pin_scope(m, g);
if (err != 0) {
errno = err;
return -1;
}
const int rc = bthread::butex_wait(butex, expected_value, abstime);
const int saved_errno = errno;
const int leave_err = bthread::leave_local_pin_scope(m);
if (leave_err != 0) {
LOG(ERROR) << "Fail to leave local pin scope after bthread_butex_wait_local";
errno = leave_err;
return -1;
}
errno = saved_errno;
return rc;
}

int bthread_start_urgent(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
Expand Down
Loading
Loading