Skip to content

Commit

Permalink
Support register persistent instance and tcp health check (#4636)
Browse files Browse the repository at this point in the history
* Fix client beat error in cluster mode

* Add health check for persistent ip port client

* Add health check for persistent ip port client

* Fix client beat error in cluster mode

* Migrate default TCP health check v1.x to v2.x

* Fix unit test error

* Fix dump persistent client snapshot error problem

* Fix load persistent client snapshot error problem

* Fix subscribe error for client id
  • Loading branch information
KomachiSion authored Jan 6, 2021
1 parent 98a821e commit ac73e73
Show file tree
Hide file tree
Showing 31 changed files with 1,149 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ public void onEvent(Event event) {
}
ClientEvent clientEvent = (ClientEvent) event;
Client client = clientEvent.getClient();
// Only ephemeral data sync by Distro, persist client should sync by raft.
if (!client.isEphemeral()) {
return;
}
if (!clientManager.isResponsibleClient(client)) {
return;
}
Expand Down Expand Up @@ -189,7 +193,7 @@ public DistroData getDatumSnapshot() {
List<ClientSyncData> datum = new LinkedList<>();
for (String each : clientManager.allClientId()) {
Client client = clientManager.getClient(each);
if (null == client) {
if (null == client || !client.isEphemeral()) {
continue;
}
datum.add(client.generateSyncData());
Expand All @@ -205,7 +209,7 @@ public DistroData getVerifyData() {
List<String> verifyData = new LinkedList<>();
for (String each : clientManager.allClientId()) {
Client client = clientManager.getClient(each);
if (null == client) {
if (null == client || !client.isEphemeral()) {
continue;
}
if (clientManager.isResponsibleClient(client)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.IPUtil;
import com.alibaba.nacos.naming.core.v2.ServiceManager;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
Expand Down Expand Up @@ -160,8 +161,9 @@ public ServiceInfo listInstance(String namespaceId, String serviceName, Subscrib
boolean healthOnly) {
Service service = getService(namespaceId, serviceName, true);
if (null != subscriber) {
createIpPortClientIfAbsent(subscriber.getAddrStr(), true);
clientOperationService.subscribeService(service, subscriber, subscriber.getAddrStr());
String clientId = IpPortBasedClient.getClientId(subscriber.getAddrStr(), true);
createIpPortClientIfAbsent(clientId, true);
clientOperationService.subscribeService(service, subscriber, clientId);
}
ServiceInfo serviceInfo = serviceStorage.getData(service);
ServiceInfo result = ServiceUtil.selectInstances(serviceInfo, cluster, healthOnly, true);
Expand Down Expand Up @@ -191,7 +193,7 @@ public Instance getInstance(String namespaceId, String serviceName, String clust
public int handleBeat(String namespaceId, String serviceName, String ip, int port, String cluster,
RsInfo clientBeat) throws NacosException {
Service service = getService(namespaceId, serviceName, true);
String clientId = ip + ":" + port;
String clientId = IpPortBasedClient.getClientId(ip + IPUtil.IP_PORT_SPLITER + port, true);
IpPortBasedClient client = (IpPortBasedClient) clientManager.getClient(clientId);
if (null == client || !client.getAllPublishedService().contains(service)) {
if (null == clientBeat) {
Expand Down Expand Up @@ -234,7 +236,7 @@ public long getHeartBeatInterval(String namespaceId, String serviceName, String
.containsKey(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
return ConvertUtils.toLong(metadata.get().getExtendData().get(PreservedMetadataKeys.HEART_BEAT_INTERVAL));
}
String clientId = ip + ":" + port;
String clientId = IpPortBasedClient.getClientId(ip + IPUtil.IP_PORT_SPLITER + port, true);
Client client = clientManager.getClient(clientId);
InstancePublishInfo instance = null != client ? client.getInstancePublishInfo(service) : null;
if (null != instance && instance.getExtendDatum().containsKey(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Abstract implementation of {@code Client}.
Expand Down Expand Up @@ -116,14 +115,4 @@ public ClientSyncData generateSyncData() {
}
return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
}

protected final void loadPublishers(ConcurrentMap<Service, InstancePublishInfo> publishers) {
this.publishers.clear();
this.publishers.putAll(publishers);
}

protected final void loadSubscribers(ConcurrentMap<Service, Subscriber> subscribers) {
this.subscribers.clear();
this.subscribers.putAll(subscribers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@

import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;

import java.io.Serializable;
import java.util.List;

/**
* Client sync data.
*
* @author xiweng.yy
*/
public class ClientSyncData {
public class ClientSyncData implements Serializable {

private static final long serialVersionUID = -5141768777704539562L;

private String clientId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
package com.alibaba.nacos.naming.core.v2.client.impl;

import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.naming.core.v2.ServiceManager;
import com.alibaba.nacos.naming.core.v2.client.AbstractClient;
import com.alibaba.nacos.naming.core.v2.pojo.HeartBeatInstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.client.ClientSyncData;
import com.alibaba.nacos.naming.core.v2.pojo.HealthCheckInstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.healthcheck.HealthCheckReactor;
import com.alibaba.nacos.naming.healthcheck.heartbeat.ClientBeatCheckTaskV2;
import com.alibaba.nacos.naming.healthcheck.v2.HealthCheckTaskV2;

import java.util.Collection;
import java.util.List;

/**
* Nacos naming client based ip and port.
Expand All @@ -42,19 +46,28 @@ public class IpPortBasedClient extends AbstractClient {

private final boolean ephemeral;

private final String responsibleId;

private ClientBeatCheckTaskV2 beatCheckTask;

private HealthCheckTaskV2 healthCheckTaskV2;

public IpPortBasedClient(String clientId, boolean ephemeral) {
this.ephemeral = ephemeral;
this.clientId = clientId;
this.responsibleId = getResponsibleTagFromId();
if (ephemeral) {
beatCheckTask = new ClientBeatCheckTaskV2(this);
scheduleCheckTask();
HealthCheckReactor.scheduleCheck(beatCheckTask);
} else {
healthCheckTaskV2 = new HealthCheckTaskV2(this);
HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
}
}

private void scheduleCheckTask() {
HealthCheckReactor.scheduleCheck(beatCheckTask);
private String getResponsibleTagFromId() {
int index = clientId.indexOf(IpPortBasedClient.ID_DELIMITER);
return clientId.substring(0, index);
}

public static String getClientId(String address, boolean ephemeral) {
Expand All @@ -71,9 +84,13 @@ public boolean isEphemeral() {
return ephemeral;
}

public String getResponsibleId() {
return responsibleId;
}

@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
return super.addServiceInstance(service, parseToHeartBeatInstance(instancePublishInfo));
return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo));
}

@Override
Expand All @@ -92,39 +109,42 @@ public Collection<InstancePublishInfo> getAllInstancePublishInfo() {
public void destroy() {
if (ephemeral) {
HealthCheckReactor.cancelCheck(beatCheckTask);
} else {
healthCheckTaskV2.setCancelled(true);
}
}

private InstancePublishInfo parseToHeartBeatInstance(InstancePublishInfo instancePublishInfo) {
if (instancePublishInfo instanceof HeartBeatInstancePublishInfo) {
return instancePublishInfo;
private HealthCheckInstancePublishInfo parseToHealthCheckInstance(InstancePublishInfo instancePublishInfo) {
if (instancePublishInfo instanceof HealthCheckInstancePublishInfo) {
return (HealthCheckInstancePublishInfo) instancePublishInfo;
}
HeartBeatInstancePublishInfo result = new HeartBeatInstancePublishInfo();
HealthCheckInstancePublishInfo result = new HealthCheckInstancePublishInfo();
result.setIp(instancePublishInfo.getIp());
result.setPort(instancePublishInfo.getPort());
result.setHealthy(instancePublishInfo.isHealthy());
result.setExtendDatum(instancePublishInfo.getExtendDatum());
if (!ephemeral) {
result.initHealthCheck();
}
return result;
}

public void clearAllSubscribers() {
subscribers.clear();
}

public void load(final IpPortBasedClient client) {
loadPublishers(client.publishers);
loadSubscribers(client.subscribers);
}

/**
* clone new {@link IpPortBasedClient}.
* Load {@code ClientSyncData} and update current client.
*
* @return
* @param client client sync data
*/
public IpPortBasedClient clone() {
final IpPortBasedClient clone = new IpPortBasedClient(clientId, ephemeral);
clone.subscribers.putAll(subscribers);
clone.publishers.putAll(publishers);
return clone;
public void loadClientSyncData(ClientSyncData client) {
List<String> namespaces = client.getNamespaces();
List<String> groupNames = client.getGroupNames();
List<String> serviceNames = client.getServiceNames();
List<InstancePublishInfo> instances = client.getInstancePublishInfos();
for (int i = 0; i < namespaces.size(); i++) {
Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i), ephemeral);
Service singleton = ServiceManager.getInstance().getSingleton(service);
HealthCheckInstancePublishInfo instance = parseToHealthCheckInstance(instances.get(i));
instance.initHealthCheck();
publishers.put(singleton, instance);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ public Collection<String> allClientId() {

@Override
public boolean isResponsibleClient(Client client) {
return distroMapper.responsible(client.getClientId());
if (client instanceof IpPortBasedClient) {
return distroMapper.responsible(((IpPortBasedClient) client).getResponsibleId());
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.naming.core.v2.pojo;

import com.alibaba.nacos.naming.healthcheck.HealthCheckStatus;
import com.fasterxml.jackson.annotation.JsonIgnore;

import java.util.concurrent.atomic.AtomicInteger;

/**
* Instance publish info with health check for v1.x.
*
* @author xiweng.yy
*/
public class HealthCheckInstancePublishInfo extends InstancePublishInfo {

private long lastHeartBeatTime = System.currentTimeMillis();

private HealthCheckStatus healthCheckStatus;

public HealthCheckInstancePublishInfo() {
}

public HealthCheckInstancePublishInfo(String ip, int port) {
super(ip, port);
}

public long getLastHeartBeatTime() {
return lastHeartBeatTime;
}

public void setLastHeartBeatTime(long lastHeartBeatTime) {
this.lastHeartBeatTime = lastHeartBeatTime;
}

public void initHealthCheck() {
healthCheckStatus = new HealthCheckStatus();
}

public boolean tryStartCheck() {
return healthCheckStatus.isBeingChecked.compareAndSet(false, true);
}

public void finishCheck() {
healthCheckStatus.isBeingChecked.set(false);
}

public void resetOkCount() {
healthCheckStatus.checkOkCount.set(0);
}

public void resetFailCount() {
healthCheckStatus.checkFailCount.set(0);
}

@JsonIgnore
public AtomicInteger getOkCount() {
return healthCheckStatus.checkOkCount;
}

@JsonIgnore
public AtomicInteger getFailCount() {
return healthCheckStatus.checkFailCount;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.nacos.common.utils.IPUtil;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand All @@ -27,7 +28,9 @@
*
* @author xiweng.yy
*/
public class InstancePublishInfo {
public class InstancePublishInfo implements Serializable {

private static final long serialVersionUID = -74988890439616025L;

private String ip;

Expand Down
Loading

0 comments on commit ac73e73

Please sign in to comment.