Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion src/meta/meta_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#pragma once

#include <fmt/core.h>
#include <stdint.h>
#include <algorithm>
#include <atomic>
Expand All @@ -46,6 +47,7 @@
#include "meta/duplication/duplication_info.h"
#include "meta_admin_types.h"
#include "metadata_types.h"
#include "rpc/rpc_address.h"
#include "rpc/rpc_host_port.h"
#include "runtime/api_layer1.h"
#include "task/task.h"
Expand Down Expand Up @@ -547,7 +549,30 @@ inline int count_partitions(const app_mapper &apps)

void when_update_replicas(config_type::type t, const std::function<void(bool)> &func);

// TODO(yingchun): refactor to deal both rpc_address and host_port
#define MAINTAIN_DROP_NODE(obj, field, type) \
do { \
auto obj_copy = obj; \
if (obj.primary) { \
maintain_drops(obj_copy.field, obj.primary, type); \
} \
if (obj.__isset.hp_primary && obj.hp_primary) { \
maintain_drops(obj_copy.hp_##field, obj.hp_primary, type); \
} \
} while (0)

#define MAINTAIN_DROP_NODES(obj, field, type) \
do { \
auto obj_copy = obj; \
for (const auto &secondary : obj.secondaries) { \
maintain_drops(obj_copy.field, secondary, type); \
} \
if (obj.__isset.hp_secondaries) { \
for (const auto &secondary : obj.hp_secondaries) { \
maintain_drops(obj_copy.hp_##field, secondary, type); \
} \
} \
} while (0)

template <typename T>
void maintain_drops(/*inout*/ std::vector<T> &drops, const T &node, config_type::type t)
{
Expand Down
21 changes: 5 additions & 16 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2096,18 +2096,9 @@ void server_state::drop_partition(std::shared_ptr<app_state> &app, int pidx)
SET_OBJ_IP_AND_HOST_PORT(request, node, pc, primary);

request.config = pc;
for (const auto &secondary : pc.hp_secondaries) {
maintain_drops(request.config.hp_last_drops, secondary, request.type);
}
for (const auto &secondary : pc.secondaries) {
maintain_drops(request.config.last_drops, secondary, request.type);
}
if (pc.hp_primary) {
maintain_drops(request.config.hp_last_drops, pc.hp_primary, request.type);
}
if (pc.primary) {
maintain_drops(request.config.last_drops, pc.primary, request.type);
}
MAINTAIN_DROP_NODES(request.config, last_drops, request.type);
MAINTAIN_DROP_NODE(request.config, last_drops, request.type);

RESET_IP_AND_HOST_PORT(request.config, primary);
CLEAR_IP_AND_HOST_PORT(request.config, secondaries);

Expand Down Expand Up @@ -2168,8 +2159,7 @@ void server_state::downgrade_primary_to_inactive(std::shared_ptr<app_state> &app
SET_OBJ_IP_AND_HOST_PORT(request, node, pc, primary);
request.config.ballot++;
RESET_IP_AND_HOST_PORT(request.config, primary);
maintain_drops(request.config.hp_last_drops, pc.hp_primary, request.type);
maintain_drops(request.config.last_drops, pc.primary, request.type);
MAINTAIN_DROP_NODE(request.config, last_drops, request.type);

cc.stage = config_status::pending_remote_sync;
cc.pending_sync_request = req;
Expand Down Expand Up @@ -2306,8 +2296,7 @@ void server_state::on_update_configuration(
msg->release_ref();
return;
} else {
maintain_drops(cfg_request->config.hp_last_drops, cfg_request->hp_node, cfg_request->type);
maintain_drops(cfg_request->config.last_drops, cfg_request->node, cfg_request->type);
MAINTAIN_DROP_NODES(cfg_request->config, last_drops, cfg_request->type);
}

if (response.err != ERR_IO_PENDING) {
Expand Down
Loading