Skip to content

Commit

Permalink
ping
Browse files Browse the repository at this point in the history
  • Loading branch information
zxwing committed Aug 27, 2018
1 parent 4f4126b commit a5af7b1
Show file tree
Hide file tree
Showing 12 changed files with 350 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.zstack.header.core.ReturnValueCompletion;
import org.zstack.header.core.workflow.*;
import org.zstack.header.errorcode.ErrorCode;
import org.zstack.header.errorcode.OperationFailureException;
import org.zstack.header.errorcode.SysErrors;
import org.zstack.header.exception.CloudRuntimeException;
import org.zstack.header.host.*;
import org.zstack.header.managementnode.ManagementNodeChangeListener;
Expand Down Expand Up @@ -148,9 +150,8 @@ private void passThrough(HostMessage msg) {
}

if (vo == null) {
String err = "Cannot find host: " + msg.getHostUuid() + ", it may have been deleted";
bus.replyErrorByMessageType((Message) msg, err);
return;
ErrorCode err = Platform.err(SysErrors.RESOURCE_NOT_FOUND, "cannot find host[uuid:%s], it may have been deleted", msg.getHostUuid());
throw new OperationFailureException(err);
}

HypervisorFactory factory = this.getHypervisorFactory(HypervisorType.valueOf(vo.getHypervisorType()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.zstack.compute.host;

import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.zstack.core.cloudbus.CloudBus;
import org.zstack.core.cloudbus.CloudBusCallBack;
import org.zstack.core.thread.AsyncTimer;
import org.zstack.header.core.Completion;
import org.zstack.header.core.NoErrorCompletion;
import org.zstack.header.errorcode.ErrorCode;
import org.zstack.header.exception.CloudRuntimeException;
import org.zstack.header.host.HostConstant;
import org.zstack.header.host.ReconnectHostMsg;
import org.zstack.header.message.MessageReply;

import java.util.concurrent.TimeUnit;

@Configurable(preConstruction = true, autowire = Autowire.BY_TYPE)
public abstract class HostReconnectTask extends AsyncTimer {
protected String uuid;
private NoErrorCompletion completion;

@Autowired
protected CloudBus bus;

protected enum CanDoAnswer {
Ready,
NotReady,
NoReconnect
}

protected abstract CanDoAnswer canDoReconnect();

public HostReconnectTask(String uuid, NoErrorCompletion completion) {
super(TimeUnit.SECONDS, HostGlobalConfig.PING_HOST_INTERVAL.value(Long.class));
this.uuid = uuid;
this.completion = completion;
}

private void reconnectNow(String uuid, Completion completion) {
ReconnectHostMsg msg = new ReconnectHostMsg();
msg.setHostUuid(uuid);
msg.setSkipIfHostConnected(true);
bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, uuid);
bus.send(msg, new CloudBusCallBack(completion) {
@Override
public void run(MessageReply reply) {
if (reply.isSuccess()) {
completion.success();
} else {
completion.fail(reply.getError());
}
}
});
}

@Override
protected void execute() {
CanDoAnswer answer = canDoReconnect();
if (answer == CanDoAnswer.Ready) {
reconnectNow(uuid, new Completion(completion) {
@Override
public void success() {
completion.done();
}

@Override
public void fail(ErrorCode errorCode) {
// still fail to reconnect the host, continue this reconnect task
continueToRunThisTimer();
}
});
} else if (answer == CanDoAnswer.NotReady) {
// still not ready to reconnect the host, continue this reconnect task
continueToRunThisTimer();
} else if (answer == CanDoAnswer.NoReconnect) {
completion.done();
} else {
throw new CloudRuntimeException(String.format("should not be here[%s]", answer));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.zstack.compute.host;

import org.zstack.header.core.NoErrorCompletion;

public interface HostReconnectTaskFactory {
HostReconnectTask createTask(String uuid, NoErrorCompletion completion);

String getHypervisorType();
}
114 changes: 31 additions & 83 deletions compute/src/main/java/org/zstack/compute/host/HostTrackImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.zstack.core.cloudbus.CloudBus;
import org.zstack.core.cloudbus.CloudBusCallBack;
import org.zstack.core.cloudbus.ResourceDestinationMaker;
import org.zstack.core.componentloader.PluginRegistry;
import org.zstack.core.db.DatabaseFacade;
import org.zstack.core.db.Q;
import org.zstack.core.db.SQLBatch;
Expand All @@ -13,6 +14,8 @@
import org.zstack.header.core.Completion;
import org.zstack.header.core.NoErrorCompletion;
import org.zstack.header.errorcode.ErrorCode;
import org.zstack.header.errorcode.SysErrors;
import org.zstack.header.exception.CloudResourceUnmanagedException;
import org.zstack.header.exception.CloudRuntimeException;
import org.zstack.header.host.*;
import org.zstack.header.managementnode.ManagementNodeChangeListener;
Expand Down Expand Up @@ -42,28 +45,10 @@ public class HostTrackImpl implements HostTracker, ManagementNodeChangeListener,
private CloudBus bus;
@Autowired
private ThreadFacade thdf;
@Autowired
private PluginRegistry pluginRgty;

private static Map<String, Class> hostTrackerPreReconnectCheckers = new HashMap<>();

private static HostTrackerPreReconnectChecker newHostTrackerPreReconnectChecker(Class clz) {
try {
return (HostTrackerPreReconnectChecker) clz.getConstructor().newInstance();
} catch (Exception e) {
throw new CloudRuntimeException(e);
}
}

static {
BeanUtils.reflections.getSubTypesOf(HostTrackerPreReconnectChecker.class).forEach(clz -> {
HostTrackerPreReconnectChecker checker = newHostTrackerPreReconnectChecker(clz);
Class old = hostTrackerPreReconnectCheckers.get(checker.getHypervisorType());
if (old != null) {
throw new CloudRuntimeException(String.format("duplicate HostTrackerPreReconnectChecker[%s, %s] with the same hypervisor type[%s]", clz, checker.getHypervisorType(), checker.getHypervisorType()));
}

hostTrackerPreReconnectCheckers.put(checker.getHypervisorType(), clz);
});
}
private static Map<String, HostReconnectTaskFactory> hostReconnectTaskFactories = new HashMap<>();

enum ReconnectDecision {
DoNothing,
Expand All @@ -88,70 +73,10 @@ public void run(MessageReply reply) {
});
}

private class ReconnectTask extends AsyncTimer {
private String uuid;
private String hypervisorType;
private NoErrorCompletion completion;

public ReconnectTask(String uuid, String hypervisorType, NoErrorCompletion completion) {
super(TimeUnit.SECONDS, HostGlobalConfig.PING_HOST_INTERVAL.value(Long.class));
this.uuid = uuid;
this.hypervisorType = hypervisorType;
this.completion = completion;
}

@Override
protected void execute() {
Class clz = hostTrackerPreReconnectCheckers.get(hypervisorType);
if (clz == null) {
reconnectNow(uuid, new Completion(completion) {
@Override
public void success() {
completion.done();
}

@Override
public void fail(ErrorCode errorCode) {
// still fail to reconnect the host, continue this reconnect task
continueToRunThisTimer();
}
});

return;
}

HostTrackerPreReconnectChecker preReconnectChecker = newHostTrackerPreReconnectChecker(clz);
Boolean canDo = preReconnectChecker.canDoReconnect(uuid);
if (canDo == null) {
// the host is deleted
completion.done();
return;
}

if (canDo) {
reconnectNow(uuid, new Completion(completion) {
@Override
public void success() {
completion.done();
}

@Override
public void fail(ErrorCode errorCode) {
// still fail to reconnect the host, continue this reconnect task
continueToRunThisTimer();
}
});
} else {
// still not ready to reconnect the host, continue this reconnect task
continueToRunThisTimer();
}
}
}

private class Tracker extends AsyncTimer {
private String uuid;
private String hypervisorType;
private ReconnectTask reconnectTask;
private HostReconnectTask reconnectTask;

public Tracker(String uuid) {
super(TimeUnit.SECONDS, HostGlobalConfig.PING_HOST_INTERVAL.value(Long.class));
Expand Down Expand Up @@ -257,12 +182,13 @@ private void submitReconnectTask() {
reconnectTask.cancel();
}

reconnectTask = new ReconnectTask(uuid, hypervisorType, new NoErrorCompletion() {
reconnectTask = getHostReconnectTaskFactory(hypervisorType).createTask(uuid, new NoErrorCompletion() {
@Override
public void done() {
continueToRunThisTimer();
}
});

reconnectTask.start();
}

Expand Down Expand Up @@ -338,8 +264,19 @@ public void iJoin(String nodeId) {

}

private HostReconnectTaskFactory getHostReconnectTaskFactory(String hvType) {
HostReconnectTaskFactory f = hostReconnectTaskFactories.get(hvType);
if (f == null) {
throw new CloudRuntimeException(String.format("cannot find HostReconnectTaskFactory with hypervisorType[%s]", hvType));
}

return f;
}

@Override
public boolean start() {
populateExtensions();

HostGlobalConfig.PING_HOST_INTERVAL.installUpdateExtension((oldConfig, newConfig) -> {
logger.debug(String.format("%s change from %s to %s, restart host trackers",
oldConfig.getCanonicalName(), oldConfig.value(), newConfig.value()));
Expand All @@ -350,6 +287,17 @@ public boolean start() {
return true;
}

private void populateExtensions() {
pluginRgty.getExtensionList(HostReconnectTaskFactory.class).forEach(f -> {
HostReconnectTaskFactory old = hostReconnectTaskFactories.get(f.getHypervisorType());
if (old != null) {
throw new CloudRuntimeException(String.format("duplicate HostReconnectTaskFactory[%s, %s] with the same type[%s]", f, old, f.getHypervisorType()));
}

hostReconnectTaskFactories.put(f.getHypervisorType(), f);
});
}

@Override
public boolean stop() {
return true;
Expand Down

This file was deleted.

6 changes: 6 additions & 0 deletions conf/springConfigXml/Kvm.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
http://zstack.org/schema/zstack/plugin.xsd"
default-init-method="init" default-destroy-method="destroy">

<bean id="KVMHostReconnectTaskFactory" class="org.zstack.kvm.KVMHostReconnectTaskFactory">
<zstack:plugin>
<zstack:extension interface="org.zstack.compute.host.HostReconnectTaskFactory" />
</zstack:plugin>
</bean>

<bean id="KvmHostReserveExtension" class="org.zstack.kvm.KvmHostReserveExtension">
<zstack:plugin>
<zstack:extension interface="org.zstack.header.Component" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.zstack.kvm;

import org.zstack.compute.host.HostReconnectTask;
import org.zstack.compute.host.HostReconnectTaskFactory;
import org.zstack.core.Platform;
import org.zstack.header.core.NoErrorCompletion;

public class KVMHostReconnectTaskFactory implements HostReconnectTaskFactory {
@Override
public HostReconnectTask createTask(String uuid, NoErrorCompletion completion) {
return Platform.New(() -> new KVMReconnectHostTask(uuid, completion));
}

@Override
public String getHypervisorType() {
return KVMConstant.KVM_HYPERVISOR_TYPE;
}
}

This file was deleted.

29 changes: 29 additions & 0 deletions plugin/kvm/src/main/java/org/zstack/kvm/KVMReconnectHostTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.zstack.kvm;

import org.zstack.compute.host.HostReconnectTask;
import org.zstack.core.db.Q;
import org.zstack.header.core.NoErrorCompletion;
import org.zstack.utils.network.NetworkUtils;

import javax.persistence.Tuple;
import java.util.concurrent.TimeUnit;

public class KVMReconnectHostTask extends HostReconnectTask {
public KVMReconnectHostTask(String uuid, NoErrorCompletion completion) {
super(uuid, completion);
}

@Override
protected CanDoAnswer canDoReconnect() {
Tuple t = Q.New(KVMHostVO.class).select(KVMHostVO_.managementIp, KVMHostVO_.port).eq(KVMHostVO_.uuid, uuid).findTuple();
if (t == null) {
return CanDoAnswer.NoReconnect;
}

String ip = t.get(0, String.class);
int port = t.get(1, Integer.class);

return NetworkUtils.isRemotePortOpen(ip, port, (int) TimeUnit.SECONDS.toMillis(2)) ?
CanDoAnswer.Ready : CanDoAnswer.NotReady;
}
}
Loading

0 comments on commit a5af7b1

Please sign in to comment.