Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unexpected task status #1767

Merged
merged 4 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix task-manager become single-node mode due to server-info is missing
Change-Id: Ia42fdbf28ac7428501fb36bc89e5ab4368a0b553
  • Loading branch information
javeme committed Feb 28, 2022
commit 8f53ecc6f118354f4562feeace283a01f717de52
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,19 @@ public String toString() {
@Override
public void run() {
if (this.cancelled()) {
// Scheduled task is running after cancelled
// A task is running after cancelled which scheduled/queued before
return;
}

TaskManager.setContext(this.context());
try {
assert this.status.code() < TaskStatus.RUNNING.code() : this.status;
if (this.checkDependenciesSuccess()) {
/*
* FIXME: worker node may reset status to RUNNING here, and the
* status in DB is CANCELLING that set by master node,
* it will lead to cancel() operation not to take effect.
*/
this.status(TaskStatus.RUNNING);
super.run();
}
Expand All @@ -308,7 +313,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {
// Callback for saving status to store
this.callable.cancelled();
} else {
// Maybe the worker is still running then set status SUCCESS
// Maybe worker node is still running then set status SUCCESS
cancelled = false;
}
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,7 @@ public synchronized void initServerInfo(Id server, NodeRole role) {
} while (page != null);
}

HugeServerInfo serverInfo = new HugeServerInfo(server, role);
serverInfo.maxLoad(this.calcMaxLoad());
this.save(serverInfo);

LOG.info("Init server info: {}", serverInfo);
this.saveServerInfo(this.selfServerId, this.selfServerRole);
}

public Id selfServerId() {
Expand All @@ -186,8 +182,9 @@ public boolean onlySingleNode() {

public void heartbeat() {
HugeServerInfo serverInfo = this.selfServerInfo();
if (serverInfo == null) {
return;
if (serverInfo == null && this.selfServerId != null) {
serverInfo = this.saveServerInfo(this.selfServerId,
this.selfServerRole);
}
serverInfo.updateTime(DateUtil.now());
this.save(serverInfo);
Expand Down Expand Up @@ -239,7 +236,11 @@ protected synchronized HugeServerInfo pickWorkerNode(
}
}

this.onlySingleNode = !hasWorkerNode;
boolean singleNode = !hasWorkerNode;
if (singleNode != this.onlySingleNode) {
LOG.info("Switch only_single_node to {}", singleNode);
this.onlySingleNode = singleNode;
}

// Only schedule to master if there is no workers and master is suitable
if (!hasWorkerNode) {
Expand All @@ -260,26 +261,35 @@ private GraphTransaction tx() {
return this.graph.systemTransaction();
}

private Id save(HugeServerInfo server) {
private HugeServerInfo saveServerInfo(Id server, NodeRole role) {
HugeServerInfo serverInfo = new HugeServerInfo(server, role);
serverInfo.maxLoad(this.calcMaxLoad());
this.save(serverInfo);

LOG.info("Init server info: {}", serverInfo);
return serverInfo;
}

private Id save(HugeServerInfo serverInfo) {
return this.call(() -> {
// Construct vertex from server info
HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph);
if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) {
throw new HugeException("Schema is missing for %s '%s'",
HugeServerInfo.P.SERVER, server);
HugeServerInfo.P.SERVER, serverInfo);
}
HugeVertex vertex = this.tx().constructVertex(false,
server.asArray());
serverInfo.asArray());
// Add or update server info in backend store
vertex = this.tx().addVertex(vertex);
return vertex.id();
});
}

private int save(Collection<HugeServerInfo> servers) {
private int save(Collection<HugeServerInfo> serverInfos) {
return this.call(() -> {
if (servers.isEmpty()) {
return servers.size();
if (serverInfos.isEmpty()) {
return serverInfos.size();
}
HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph);
if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) {
Expand All @@ -289,7 +299,7 @@ private int save(Collection<HugeServerInfo> servers) {
// Save server info in batch
GraphTransaction tx = this.tx();
int updated = 0;
for (HugeServerInfo server : servers) {
for (HugeServerInfo server : serverInfos) {
if (!server.updated()) {
continue;
}
Expand Down Expand Up @@ -319,7 +329,11 @@ private <V> V call(Callable<V> callable) {
}

private HugeServerInfo selfServerInfo() {
return this.serverInfo(this.selfServerId);
HugeServerInfo selfServerInfo = this.serverInfo(this.selfServerId);
if (selfServerInfo == null) {
LOG.warn("ServerInfo is missing: {}", this.selfServerId);
}
return selfServerInfo;
}

private HugeServerInfo serverInfo(Id server) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public final class TaskManager {
"server-info-db-worker-%d";
public static final String TASK_SCHEDULER = "task-scheduler-%d";

protected static final int SCHEDULE_PERIOD = 1; // Unit second
protected static final long SCHEDULE_PERIOD = 1000L; // unit ms

private static final int THREADS = 4;
private static final TaskManager MANAGER = new TaskManager(THREADS);
Expand Down Expand Up @@ -79,10 +79,11 @@ private TaskManager(int pool) {
// For schedule task to run, just one thread is ok
this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool(
1, TASK_SCHEDULER);
// Start after 10s waiting for HugeGraphServer startup
// Start after 10x period time waiting for HugeGraphServer startup
this.schedulerExecutor.scheduleWithFixedDelay(this::scheduleOrExecuteJob,
10L, SCHEDULE_PERIOD,
TimeUnit.SECONDS);
10 * SCHEDULE_PERIOD,
SCHEDULE_PERIOD,
TimeUnit.MILLISECONDS);
}

public void addScheduler(HugeGraphParams graph) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;

public class BaseApiTest {
Expand Down Expand Up @@ -543,17 +544,30 @@ protected static void clearSchema() {
}

protected static void waitTaskSuccess(int task) {
waitTaskStatus(task, "success");
waitTaskStatus(task, ImmutableSet.of("success"));
}

protected static void waitTaskStatus(int task, String expectedStatus) {
protected static void waitTaskCompleted(int task) {
Set<String> completed = ImmutableSet.of("success",
"cancelled",
"failed");
waitTaskStatus(task, completed);
}

protected static void waitTaskStatus(int task, Set<String> expectedStatus) {
String status;
int times = 0;
int maxTimes = 100000;
do {
Response r = client.get("/graphs/hugegraph/tasks/",
String.valueOf(task));
String content = assertResponseStatus(200, r);
status = assertJsonContains(content, "task_status");
} while (!status.equals(expectedStatus));
if (times++ > maxTimes) {
Assert.fail(String.format("Failed to wait for task %s " +
"due to timeout", task));
}
} while (!expectedStatus.contains(status));
}

protected static String parseId(String content) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@ public void testCancel() {
String status = assertJsonContains(content, "task_status");
Assert.assertTrue(status, status.equals("cancelling") ||
status.equals("cancelled"));
waitTaskStatus(taskId, "cancelled");
/*
* NOTE: should be waitTaskStatus(taskId, "cancelled"), but worker
* node may ignore the CANCELLING status due to now we can't atomic
* update task status, and then the task is running to SUCCESS.
*/
waitTaskCompleted(taskId);
} else {
assert r.getStatus() == 400;
String error = String.format(
Expand Down