Skip to content

Commit

Permalink
waiting replication finish before select master
Browse files Browse the repository at this point in the history
  • Loading branch information
happyfish100 committed Apr 7, 2024
1 parent 46aa0a9 commit 0a11da8
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/common/fdir_global.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
#include "fdir_global.h"

FDIRGlobalVars g_fdir_global_vars = {
{5, 3, 0}
{5, 3, 1}
};
50 changes: 50 additions & 0 deletions src/server/binlog/binlog_local_consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,56 @@ int binlog_local_consumer_init()
return binlog_write_init();
}

static int get_busy_replication_count()
{
FDIRSlaveReplication *replication;
FDIRSlaveReplication *end;
int count;

count = 0;
end = slave_replication_array.replications +
slave_replication_array.count;
for (replication=slave_replication_array.replications;
replication<end; replication++)
{
if (FC_ATOMIC_GET(replication->stage) !=
FDIR_REPLICATION_STAGE_NONE)
{
++count;
}
}
return count;
}

void binlog_local_consumer_waiting_replication_finish()
{
int64_t start_time_us;
char time_used[32];
int count;

count = get_busy_replication_count();
if (count == 0) {
return;
}

start_time_us = get_current_time_us();
logInfo("file: "__FILE__", line: %d, "
"waiting %d replications finish ...",
__LINE__, count);

do {
fc_sleep_ms(1000);
} while (SF_G_CONTINUE_FLAG && get_busy_replication_count() > 0);

if (SF_G_CONTINUE_FLAG) {
long_to_comma_str((get_current_time_us() -
start_time_us) / 1000, time_used);
logInfo("file: "__FILE__", line: %d, "
"waiting %d replications finish done, time used: %s ms",
__LINE__, count, time_used);
}
}

int binlog_local_consumer_replication_start()
{
int result;
Expand Down
1 change: 1 addition & 0 deletions src/server/binlog/binlog_local_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ int binlog_local_consumer_init();
void binlog_local_consumer_destroy();
void binlog_local_consumer_terminate();

void binlog_local_consumer_waiting_replication_finish();
int binlog_local_consumer_replication_start();
int binlog_local_consumer_push_to_queues(ServerBinlogRecordBuffer *rbuffer);

Expand Down
2 changes: 1 addition & 1 deletion src/server/binlog/binlog_producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ void binlog_producer_destroy()
pthread_mutex_destroy(&proceduer_ctx.queue.lock);
fast_mblock_destroy(&proceduer_ctx.rb_allocator);

//TODO notify task in entries
//TODO notify task in entries
free(proceduer_ctx.ring.entries);
proceduer_ctx.ring.entries = NULL;

Expand Down
2 changes: 2 additions & 0 deletions src/server/cluster_relationship.c
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,7 @@ void cluster_relationship_trigger_reselect_master()
__sync_bool_compare_and_swap(&((FDIRServerContext *)thread_data->
arg)->cluster.clean_replications, 0, 1);
}

binlog_producer_destroy();
}

Expand Down Expand Up @@ -1422,6 +1423,7 @@ static void *cluster_thread_entrance(void* arg)
while (SF_G_CONTINUE_FLAG) {
master = CLUSTER_MASTER_ATOM_PTR;
if (master == NULL) {
binlog_local_consumer_waiting_replication_finish();
if (cluster_select_master() != 0) {
sleep_seconds = 1 + (int)((double)rand()
* (double)MAX_SLEEP_SECONDS / RAND_MAX);
Expand Down

0 comments on commit 0a11da8

Please sign in to comment.