Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/replica/duplication/duplication_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ void load_mutation::run()
decree last_decree = _duplicator->progress().last_decree;
_start_decree = last_decree + 1;

_duplicator->set_loading_private_log_state(true);
// Load the mutations from plog that have been committed recently, if any.
const auto max_plog_committed_decree =
std::min(_replica->private_log()->max_decree_on_disk(), _replica->last_applied_decree());
Expand Down
1 change: 1 addition & 0 deletions src/replica/duplication/load_from_private_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ void load_from_private_log::replay_log_block()
// case2: !err.is_ok(err.code() == ERR_HANDLE_EOF) and no next file, need commit the last
// mutations()
step_down_next_stage(_mutation_batch.last_decree(), _mutation_batch.move_all_mutations());
_duplicator->set_loading_private_log_state(false);
}

load_from_private_log::load_from_private_log(replica *r, replica_duplicator *dup)
Expand Down
1 change: 1 addition & 0 deletions src/replica/duplication/replica_duplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ replica_duplicator::replica_duplicator(const duplication_entry &ent, replica *r)
_replica->private_log()->max_commit_on_disk());

_status = ent.status;
_is_loading_private_log.store(false);

const auto it = ent.progress.find(get_gpid().get_partition_index());
CHECK_PREFIX_MSG(it != ent.progress.end(),
Expand Down
10 changes: 10 additions & 0 deletions src/replica/duplication/replica_duplicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ class replica_duplicator : public replica_base, public pipeline::base

writer.EndObject();
}
void set_loading_private_log_state(bool is_loading)
{
_is_loading_private_log.store(is_loading, std::memory_order_release);
}
bool get_loading_private_log_state()
{
return _is_loading_private_log.load(std::memory_order_acquire);
}

private:
friend class duplication_test_base;
Expand Down Expand Up @@ -205,6 +213,8 @@ class replica_duplicator : public replica_base, public pipeline::base
// TODO(wutao1): calculate the counters independently for each remote cluster
// if we need to duplicate to multiple clusters someday.
METRIC_VAR_DECLARE_counter(dup_confirmed_mutations);

std::atomic_bool _is_loading_private_log;
};

typedef std::unique_ptr<replica_duplicator> replica_duplicator_u_ptr;
Expand Down
15 changes: 12 additions & 3 deletions src/replica/duplication/replica_duplicator_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <stdint.h>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -95,15 +96,23 @@ class replica_duplicator_manager : public replica_base
}
writer.EndArray();
}
bool check_still_have_dup_pipeline_loading()
{
for (auto &kv : _duplications) {
if (kv.second->get_loading_private_log_state()) {
return true;
}
}
return false;
}
// called by close replica also
void remove_all_duplications();

private:
void sync_duplication(const duplication_entry &ent);

void remove_non_existed_duplications(const std::map<dupid_t, duplication_entry> &);

void remove_all_duplications();

private:
friend class duplication_sync_timer_test;
friend class duplication_test_base;
friend class replica_duplicator_manager_test;
Expand Down
8 changes: 8 additions & 0 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -732,5 +732,13 @@ void replica::METRIC_FUNC_NAME_SET(dup_pending_mutations)()
METRIC_SET(*_duplication_mgr, dup_pending_mutations);
}

bool replica::having_dup_loading()
{
if (_duplication_mgr == nullptr) {
return false;
}
return _duplication_mgr->check_still_have_dup_pipeline_loading();
}

} // namespace replication
} // namespace dsn
2 changes: 2 additions & 0 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

void update_app_duplication_status(bool duplicating);

bool having_dup_loading();

//
// Backup
//
Expand Down
12 changes: 12 additions & 0 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include "nfs_types.h"
#include "ranger/access_type.h"
#include "replica.h"
#include "replica/duplication/replica_duplicator_manager.h"
#include "replica/duplication/replica_follower.h"
#include "replica/kms_key_provider.h"
#include "replica/replica_context.h"
Expand Down Expand Up @@ -1720,6 +1721,15 @@ void replica_stub::on_node_query_reply_scatter2(replica_stub_ptr this_, gpid id)
replica_ptr replica = get_replica(id);
if (replica != nullptr && replica->status() != partition_status::PS_POTENTIAL_SECONDARY &&
replica->status() != partition_status::PS_PARTITION_SPLIT) {

// deal with unexpected close when duplication and balance function running at the same time
if (replica->status() == partition_status::PS_INACTIVE && replica->having_dup_loading()) {
LOG_DEBUG(
"%s: replica not exists on meta server,and still have dup on it. wait to close",
replica->name());
return;
}

if (replica->status() == partition_status::PS_INACTIVE &&
dsn_now_ms() - replica->create_time_milliseconds() <
FLAGS_gc_memory_replica_interval_ms) {
Expand Down Expand Up @@ -2435,6 +2445,8 @@ void replica_stub::close_replica(replica_ptr r)
gpid id = r->get_gpid();
std::string name = r->name();

// deal with duplication conflict with balance
r->get_duplication_manager()->remove_all_duplications();
r->close();

{
Expand Down
Loading