Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fluent-bit/flb_input_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,5 +162,6 @@ int flb_input_chunk_down(struct flb_input_chunk *ic);
int flb_input_chunk_is_up(struct flb_input_chunk *ic);
void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic,
size_t chunk_size);
size_t flb_input_chunk_get_total_ring_buffer_size(const struct flb_config *config);

#endif
8 changes: 8 additions & 0 deletions include/fluent-bit/flb_input_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <fluent-bit/flb_pipe.h>
#include <fluent-bit/flb_thread_pool.h>
#include <mpack/mpack.h>
#include <signal.h>

#define BUFFER_SIZE 65535

Expand Down Expand Up @@ -89,6 +90,13 @@ struct flb_input_thread_instance {
int input_coro_id;
struct mk_list input_coro_list;
struct mk_list input_coro_list_destroy;

/*
* Pause state flag for shutdown synchronization.
* Set to 1 when thread completes pause processing.
* Checked by main thread to ensure safe shutdown.
*/
volatile sig_atomic_t is_paused;
};

int flb_input_thread_instance_init(struct flb_config *config,
Expand Down
1 change: 1 addition & 0 deletions include/fluent-bit/flb_ring_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ int flb_ring_buffer_add_event_loop(struct flb_ring_buffer *rb, void *evl, uint8_

int flb_ring_buffer_write(struct flb_ring_buffer *rb, void *ptr, size_t size);
int flb_ring_buffer_read(struct flb_ring_buffer *rb, void *ptr, size_t size);
size_t flb_ring_buffer_get_used(struct flb_ring_buffer *rb);

#endif
49 changes: 48 additions & 1 deletion src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
#include <fluent-bit/flb_pipe.h>
#include <fluent-bit/flb_custom.h>
#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_input_thread.h>
#include <fluent-bit/flb_output.h>
#include <fluent-bit/flb_error.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_input_chunk.h>
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_engine.h>
#include <fluent-bit/flb_event.h>
Expand Down Expand Up @@ -800,12 +802,42 @@ int sb_segregate_chunks(struct flb_config *config)
}
#endif

/* Check if all threaded inputs have completed pause */
static int all_threaded_inputs_paused(struct flb_config *config)
{
struct mk_list *head;
struct flb_input_instance *in;

mk_list_foreach(head, &config->inputs) {
in = mk_list_entry(head, struct flb_input_instance, _head);

if (in->is_threaded && in->thi) {
/*
* Skip inputs that cannot acknowledge pause:
* - No pause/resume callbacks defined
* - No context (plugin initialization failed)
*/
if (in->p->cb_pause == NULL || in->p->cb_resume == NULL ||
in->context == NULL) {
continue;
}

if (in->thi->is_paused == FLB_FALSE) {
return FLB_FALSE;
}
}
}

return FLB_TRUE;
}

int flb_engine_start(struct flb_config *config)
{
int ret;
int tasks = 0;
int fs_chunks = 0;
int mem_chunks = 0;
size_t rb_size = 0;
uint64_t ts;
char tmp[16];
int rb_flush_flag;
Expand Down Expand Up @@ -1164,6 +1196,7 @@ int flb_engine_start(struct flb_config *config)
fs_chunks = 0;
tasks = flb_task_running_count(config);
flb_storage_chunk_count(config, &mem_chunks, &fs_chunks);
rb_size = flb_input_chunk_get_total_ring_buffer_size(config);

if ((mem_chunks + fs_chunks) > 0) {
flb_info("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d",
Expand All @@ -1174,7 +1207,21 @@ int flb_engine_start(struct flb_config *config)
flb_task_running_print(config);
}

ret = tasks + mem_chunks + fs_chunks;
ret = tasks + mem_chunks + fs_chunks + (rb_size > 0);

if (rb_size > 0) {
flb_info("[engine] ring buffer pending: %zu bytes", rb_size);
flb_input_chunk_ring_buffer_collector(config, NULL);
}

/* Check thread pause only when all other work is done */
if (ret == 0) {
if (!all_threaded_inputs_paused(config)) {
ret++;
flb_debug("[engine] waiting for threaded inputs to complete pause");
}
}

if (ret > 0 && (config->grace_count < config->grace || config->grace == -1)) {
if (config->grace_count == 1) {
/*
Expand Down
55 changes: 48 additions & 7 deletions src/flb_input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -2413,6 +2413,7 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in)
if (flb_input_chunk_is_mem_overlimit(in) == FLB_FALSE &&
in->config->is_running == FLB_TRUE &&
in->config->is_ingestion_active == FLB_TRUE &&
in->config->is_shutting_down == FLB_FALSE &&
in->mem_buf_status == FLB_INPUT_PAUSED) {
in->mem_buf_status = FLB_INPUT_RUNNING;
if (in->p->cb_resume) {
Expand All @@ -2426,6 +2427,7 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in)
if (flb_input_chunk_is_storage_overlimit(in) == FLB_FALSE &&
in->config->is_running == FLB_TRUE &&
in->config->is_ingestion_active == FLB_TRUE &&
in->config->is_shutting_down == FLB_FALSE &&
in->storage_buf_status == FLB_INPUT_PAUSED) {
in->storage_buf_status = FLB_INPUT_RUNNING;
if (in->p->cb_resume) {
Expand Down Expand Up @@ -2667,11 +2669,17 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
}
}

/* Check if the input plugin has been paused */
if (flb_input_buf_paused(in) == FLB_TRUE) {
flb_debug("[input chunk] %s is paused, cannot append records",
flb_input_name(in));
return -1;
/*
* Check if the input plugin has been paused.
* During shutdown (is_shutting_down=TRUE), we must accept data to flush
* remaining ring buffer contents, so we skip the pause check.
*/
if (in->config->is_shutting_down == FLB_FALSE) {
if (flb_input_buf_paused(in) == FLB_TRUE) {
flb_debug("[input chunk] %s is paused, cannot append records",
flb_input_name(in));
return -1;
}
}

if (buf_size == 0) {
Expand Down Expand Up @@ -3063,8 +3071,21 @@ void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data)
cr = NULL;

while (1) {
if (flb_input_buf_paused(ins) == FLB_TRUE) {
break;
/*
* During normal operation we respect the pause state to maintain
* backpressure: if the input is paused we stop consuming from
* the ring buffer.
*
* During shutdown (is_shutting_down == FLB_TRUE) we intentionally
* skip this pause check so the ring buffer can be fully drained,
* even when backpressure would normally prevent further reads.
* This is critical to flush all enqueued records and avoid data
* loss during graceful shutdown.
*/
if (ctx->is_shutting_down == FLB_FALSE) {
if (flb_input_buf_paused(ins) == FLB_TRUE) {
break;
}
}

ret = flb_ring_buffer_read(ins->rb,
Expand Down Expand Up @@ -3286,3 +3307,23 @@ void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic,
}
}
}

/*
* Calculate total size of all ring buffers across all threaded input instances.
* Returns 0 if no data is pending in ring buffers.
*/
size_t flb_input_chunk_get_total_ring_buffer_size(const struct flb_config *config)
{
size_t total_size = 0;
struct mk_list *head;
struct flb_input_instance *ins;

mk_list_foreach(head, &config->inputs) {
ins = mk_list_entry(head, struct flb_input_instance, _head);
if (flb_input_is_threaded(ins) && ins->rb) {
total_size += flb_ring_buffer_get_used(ins->rb);
}
}

return total_size;
}
12 changes: 11 additions & 1 deletion src/flb_input_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,21 @@ static inline int handle_input_event(flb_pipefd_t fd, struct flb_input_instance
if (operation == FLB_INPUT_THREAD_PAUSE) {
if (ins->p->cb_pause && ins->context) {
ins->p->cb_pause(ins->context, ins->config);

/* Mark thread as paused for shutdown synchronization */
if (ins->is_threaded && ins->thi) {
ins->thi->is_paused = FLB_TRUE;
}
}
}
else if (operation == FLB_INPUT_THREAD_RESUME) {
if (ins->p->cb_resume) {
if (ins->p->cb_resume && ins->context) {
ins->p->cb_resume(ins->context, ins->config);

/* Clear paused flag on resume */
if (ins->is_threaded && ins->thi) {
ins->thi->is_paused = FLB_FALSE;
}
}
}
else if (operation == FLB_INPUT_THREAD_EXIT) {
Expand Down
4 changes: 4 additions & 0 deletions src/flb_ring_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,8 @@ int flb_ring_buffer_read(struct flb_ring_buffer *rb, void *ptr, size_t size)
return 0;
}

size_t flb_ring_buffer_get_used(struct flb_ring_buffer *rb)
{
return lwrb_get_full(rb->ctx);
}

Loading