Skip to content

Commit

Permalink
cherrypick Add consistency-check-urgent-mode to tester process class (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kakaiu authored Jul 12, 2024
1 parent 6fef3a9 commit e2a5bbd
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 11 deletions.
3 changes: 2 additions & 1 deletion fdbserver/SimulatedCluster.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,8 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<IClusterConne
whitelistBinPaths,
"",
{},
configDBType));
configDBType,
false));
}
if (runBackupAgents != AgentNone) {
futures.push_back(runBackup(connRecord));
Expand Down
3 changes: 2 additions & 1 deletion fdbserver/TesterInterface.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ struct TesterInterface {
ACTOR Future<Void> testerServerCore(TesterInterface interf,
Reference<IClusterConnectionRecord> ccr,
Reference<AsyncVar<struct ServerDBInfo> const> serverDBInfo,
LocalityData locality);
LocalityData locality,
bool consistencyCheckUrgentWorkLoadOnly = false);

enum test_location_t { TEST_HERE, TEST_ON_SERVERS, TEST_ON_TESTERS };
enum test_type_t {
Expand Down
3 changes: 2 additions & 1 deletion fdbserver/WorkerInterface.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,8 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> ccr,
std::string whitelistBinPaths,
std::string configPath,
std::map<std::string, std::string> manualKnobOverrides,
ConfigDBType configDBType);
ConfigDBType configDBType,
bool consistencyCheckUrgentMode);

ACTOR Future<Void> clusterController(Reference<IClusterConnectionRecord> ccr,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC,
Expand Down
10 changes: 8 additions & 2 deletions fdbserver/fdbserver.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ enum {
OPT_DCID, OPT_MACHINE_CLASS, OPT_BUGGIFY, OPT_VERSION, OPT_BUILD_FLAGS, OPT_CRASHONERROR, OPT_HELP, OPT_NETWORKIMPL, OPT_NOBUFSTDOUT, OPT_BUFSTDOUTERR,
OPT_TRACECLOCK, OPT_NUMTESTERS, OPT_DEVHELP, OPT_ROLLSIZE, OPT_MAXLOGS, OPT_MAXLOGSSIZE, OPT_KNOB, OPT_UNITTESTPARAM, OPT_TESTSERVERS, OPT_TEST_ON_SERVERS, OPT_METRICSCONNFILE,
OPT_METRICSPREFIX, OPT_LOGGROUP, OPT_LOCALITY, OPT_IO_TRUST_SECONDS, OPT_IO_TRUST_WARN_ONLY, OPT_FILESYSTEM, OPT_PROFILER_RSS_SIZE, OPT_KVFILE,
OPT_TRACE_FORMAT, OPT_WHITELIST_BINPATH, OPT_BLOB_CREDENTIAL_FILE, OPT_CONFIG_PATH, OPT_USE_TEST_CONFIG_DB, OPT_FAULT_INJECTION, OPT_PROFILER, OPT_PRINT_SIMTIME, OPT_FLOW_PROCESS_NAME, OPT_FLOW_PROCESS_ENDPOINT
OPT_TRACE_FORMAT, OPT_WHITELIST_BINPATH, OPT_BLOB_CREDENTIAL_FILE, OPT_CONFIG_PATH, OPT_USE_TEST_CONFIG_DB, OPT_FAULT_INJECTION, OPT_PROFILER, OPT_PRINT_SIMTIME, OPT_FLOW_PROCESS_NAME, OPT_FLOW_PROCESS_ENDPOINT, OPT_CONSISTENCY_CHECK_URGENT_MODE
};

CSimpleOpt::SOption g_rgOptions[] = {
Expand Down Expand Up @@ -199,6 +199,7 @@ CSimpleOpt::SOption g_rgOptions[] = {
{ OPT_PRINT_SIMTIME, "--print-sim-time", SO_NONE },
{ OPT_FLOW_PROCESS_NAME, "--process-name", SO_REQ_SEP },
{ OPT_FLOW_PROCESS_ENDPOINT, "--process-endpoint", SO_REQ_SEP },
{ OPT_CONSISTENCY_CHECK_URGENT_MODE, "--consistency-check-urgent-mode", SO_NONE },

#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
Expand Down Expand Up @@ -1036,6 +1037,7 @@ struct CLIOptions {
LocalityData localities;
int minTesterCount = 1;
bool testOnServers = false;
bool consistencyCheckUrgentMode = false;

TLSConfig tlsConfig = TLSConfig(TLSEndpointType::SERVER);
double fileIoTimeout = 0.0;
Expand Down Expand Up @@ -1518,6 +1520,9 @@ struct CLIOptions {
case OPT_TEST_ON_SERVERS:
testOnServers = true;
break;
case OPT_CONSISTENCY_CHECK_URGENT_MODE:
consistencyCheckUrgentMode = true;
break;
case OPT_METRICSCONNFILE:
metricsConnFile = args.OptionArg();
break;
Expand Down Expand Up @@ -2154,7 +2159,8 @@ int main(int argc, char* argv[]) {
opts.whitelistBinPaths,
opts.configPath,
opts.manualKnobOverrides,
opts.configDBType));
opts.configDBType,
opts.consistencyCheckUrgentMode));
actors.push_back(histogramReport());
// actors.push_back( recurring( []{}, .001 ) ); // for ASIO latency measurement

Expand Down
13 changes: 11 additions & 2 deletions fdbserver/tester.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -707,15 +707,17 @@ ACTOR Future<Void> testerServerWorkload(WorkloadRequest work,
ACTOR Future<Void> testerServerCore(TesterInterface interf,
Reference<IClusterConnectionRecord> ccr,
Reference<AsyncVar<struct ServerDBInfo> const> dbInfo,
LocalityData locality) {
LocalityData locality,
bool consistencyCheckUrgentWorkLoadOnly) {
state PromiseStream<Future<Void>> addWorkload;
state Future<Void> workerFatalError = actorCollection(addWorkload.getFuture());

// Dedicated to consistencyCheckerUrgent
// At any time, we only allow at most 1 consistency checker workload on a server
state std::pair<int64_t, Future<Void>> consistencyCheckerUrgentTester = std::make_pair(0, Future<Void>());

TraceEvent(SevInfo, "StartingTesterServerCore", interf.id());
TraceEvent(SevInfo, "StartingTesterServerCore", interf.id())
.detail("ConsistencyCheckUrgentMode", consistencyCheckUrgentWorkLoadOnly);
loop choose {
when(wait(workerFatalError)) {}
when(wait(consistencyCheckerUrgentTester.second.isValid() ? consistencyCheckerUrgentTester.second : Never())) {
Expand Down Expand Up @@ -748,6 +750,13 @@ ACTOR Future<Void> testerServerCore(TesterInterface interf,
.detail("ConsistencyCheckerId", consistencyCheckerUrgentTester.first)
.detail("ClientId", work.clientId)
.detail("ClientCount", work.clientCount);
} else if (consistencyCheckUrgentWorkLoadOnly) {
TraceEvent(SevError, "StartingTesterServerCoreUnexpectedWorkload", interf.id())
.detail("SharedRandomNumber", work.sharedRandomNumber)
.detail("ClientId", work.clientId)
.detail("ClientCount", work.clientCount)
.detail("WorkLoad", work.title);
// Drop the workload
} else {
addWorkload.send(testerServerWorkload(work, ccr, dbInfo, locality, /*isConsistencyCheckUrgent=*/false));
}
Expand Down
13 changes: 9 additions & 4 deletions fdbserver/worker.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1790,7 +1790,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
ConfigBroadcastInterface configBroadcastInterface,
Reference<ConfigNode> configNode,
Reference<LocalConfiguration> localConfig) {
Reference<LocalConfiguration> localConfig,
bool consistencyCheckUrgentMode) {
state PromiseStream<ErrorInfo> errors;
state Reference<AsyncVar<Optional<DataDistributorInterface>>> ddInterf(
new AsyncVar<Optional<DataDistributorInterface>>());
Expand Down Expand Up @@ -1866,7 +1867,9 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
errorForwarders.add(loadedPonger(interf.debugPing.getFuture()));
errorForwarders.add(waitFailureServer(interf.waitFailure.getFuture()));
errorForwarders.add(monitorTraceLogIssues(issues));
errorForwarders.add(testerServerCore(interf.testerInterface, connRecord, dbInfo, locality));
// If consistencyCheckUrgentMode is set, the tester server drops any workLoad other than consistencyCheckUrgent
errorForwarders.add(
testerServerCore(interf.testerInterface, connRecord, dbInfo, locality, consistencyCheckUrgentMode));
errorForwarders.add(monitorHighMemory(memoryProfileThreshold));

filesClosed.add(stopping.getFuture());
Expand Down Expand Up @@ -3126,7 +3129,8 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
std::string whitelistBinPaths,
std::string configPath,
std::map<std::string, std::string> manualKnobOverrides,
ConfigDBType configDBType) {
ConfigDBType configDBType,
bool consistencyCheckUrgentMode) {
state std::vector<Future<Void>> actors;
state Promise<Void> recoveredDiskFiles;
state Reference<ConfigNode> configNode;
Expand Down Expand Up @@ -3224,7 +3228,8 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
dbInfo,
configBroadcastInterface,
configNode,
localConfig),
localConfig,
consistencyCheckUrgentMode),
"WorkerServer",
UID(),
&normalWorkerErrors()));
Expand Down

0 comments on commit e2a5bbd

Please sign in to comment.