Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support persistent service and instance register/deregister #4608

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public static void shutdownThreadPool(ExecutorService executor, Logger logger) {
while (retry > 0) {
retry--;
try {
if (executor.awaitTermination(1, TimeUnit.SECONDS)) {
if (executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
return;
}
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.alibaba.nacos.naming.core.v2.ServiceManager;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManagerDelegate;
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
import com.alibaba.nacos.naming.core.v2.metadata.InstanceMetadata;
Expand All @@ -34,6 +35,7 @@
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.core.v2.service.ClientOperationService;
import com.alibaba.nacos.naming.core.v2.service.ClientOperationServiceProxy;
import com.alibaba.nacos.naming.healthcheck.HealthCheckReactor;
import com.alibaba.nacos.naming.healthcheck.RsInfo;
import com.alibaba.nacos.naming.healthcheck.heartbeat.ClientBeatProcessorV2;
Expand All @@ -54,7 +56,7 @@
@org.springframework.stereotype.Service
public class InstanceOperatorClientImpl implements InstanceOperator {

private final ClientManagerDelegate clientManager;
private final ClientManager clientManager;

private final ClientOperationService clientOperationService;

Expand All @@ -67,7 +69,7 @@ public class InstanceOperatorClientImpl implements InstanceOperator {
private final SwitchDomain switchDomain;

public InstanceOperatorClientImpl(ClientManagerDelegate clientManager,
ClientOperationService clientOperationService, ServiceStorage serviceStorage,
ClientOperationServiceProxy clientOperationService, ServiceStorage serviceStorage,
NamingMetadataOperateService metadataOperateService, NamingMetadataManager metadataManager,
SwitchDomain switchDomain) {
this.clientManager = clientManager;
Expand All @@ -83,20 +85,22 @@ public InstanceOperatorClientImpl(ClientManagerDelegate clientManager,
*/
@Override
public void registerInstance(String namespaceId, String serviceName, Instance instance) {
String clientId = instance.toInetAddr();
createIpPortClientIfAbsent(clientId, instance.isEphemeral());
Service service = getService(namespaceId, serviceName, instance.isEphemeral());
boolean ephemeral = instance.isEphemeral();
String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
createIpPortClientIfAbsent(clientId, ephemeral);
Service service = getService(namespaceId, serviceName, ephemeral);
clientOperationService.registerInstance(service, instance, clientId);
}

@Override
public void removeInstance(String namespaceId, String serviceName, Instance instance) {
String clientId = instance.toInetAddr();
if (!clientManager.allClientId().contains(clientId)) {
boolean ephemeral = instance.isEphemeral();
String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
if (!clientManager.contains(clientId)) {
Loggers.SRV_LOG.warn("remove instance from non-exist client: {}", clientId);
return;
}
Service service = getService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName, ephemeral);
clientOperationService.deregisterInstance(service, instance, clientId);
}

Expand Down Expand Up @@ -246,7 +250,7 @@ public List<? extends Instance> listAllInstances(String namespaceId, String serv
}

private void createIpPortClientIfAbsent(String clientId, boolean ephemeral) {
if (!clientManager.allClientId().contains(clientId)) {
if (!clientManager.contains(clientId)) {
clientManager.clientConnected(new IpPortBasedClient(clientId, ephemeral));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,4 @@ protected final void loadSubscribers(ConcurrentMap<Service, Subscriber> subscrib
this.subscribers.clear();
this.subscribers.putAll(subscribers);
}

/**
* Whether the current client has expired.
*
* @param currentTime current time
* @return is expire
*/
public abstract boolean isExpire(final long currentTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,12 @@ public interface Client {
* @return sync data
*/
ClientSyncData generateSyncData();

/**
* Whether current client is expired.
*
* @param currentTime unified current timestamp
* @return true if client has expired, otherwise false
*/
boolean isExpire(long currentTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import com.alibaba.nacos.naming.core.v2.pojo.HeartBeatInstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.healthcheck.heartbeat.ClientBeatCheckTaskV2;
import com.alibaba.nacos.naming.healthcheck.HealthCheckReactor;
import com.alibaba.nacos.naming.healthcheck.heartbeat.ClientBeatCheckTaskV2;

import java.util.Collection;

Expand All @@ -36,23 +36,31 @@
*/
public class IpPortBasedClient extends AbstractClient {

private static final String ID_DELIMITER = "#";

private final String clientId;

private final boolean ephemeral;

private final ClientBeatCheckTaskV2 beatCheckTask;
private ClientBeatCheckTaskV2 beatCheckTask;

public IpPortBasedClient(String clientId, boolean ephemeral) {
this.ephemeral = ephemeral;
this.clientId = clientId;
beatCheckTask = new ClientBeatCheckTaskV2(this);
scheduleCheckTask();
if (ephemeral) {
beatCheckTask = new ClientBeatCheckTaskV2(this);
scheduleCheckTask();
}
}

private void scheduleCheckTask() {
HealthCheckReactor.scheduleCheck(beatCheckTask);
}

public static String getClientId(String address, boolean ephemeral) {
return address + ID_DELIMITER + ephemeral;
}

@Override
public String getClientId() {
return clientId;
Expand All @@ -78,8 +86,13 @@ public Collection<InstancePublishInfo> getAllInstancePublishInfo() {
return publishers.values();
}

/**
* Destroy current client.
*/
public void destroy() {
HealthCheckReactor.cancelCheck(beatCheckTask);
if (ephemeral) {
HealthCheckReactor.cancelCheck(beatCheckTask);
}
}

private InstancePublishInfo parseToHeartBeatInstance(InstancePublishInfo instancePublishInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.alibaba.nacos.naming.core.v2.client.Client;

import java.util.Collection;
import java.util.function.BiConsumer;

/**
* The manager of {@code Client} Nacos naming client.
Expand Down Expand Up @@ -75,15 +74,6 @@ public interface ClientManager {
*/
Collection<String> allClientId();

/**
* Traverse processing.
*
* @param consumer {@link BiConsumer}
*/
default void forEach(BiConsumer<String, Client> consumer) {

}

/**
* Whether the client is responsible by current server.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package com.alibaba.nacos.naming.core.v2.client.manager;

import com.alibaba.nacos.common.utils.IPUtil;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.manager.impl.ConnectionBasedClientManager;
import com.alibaba.nacos.naming.core.v2.client.manager.impl.IpPortBasedClientManager;
import com.alibaba.nacos.naming.core.v2.client.manager.impl.EphemeralIpPortClientManager;
import com.alibaba.nacos.naming.core.v2.client.manager.impl.PersistentIpPortClientManager;
import org.springframework.stereotype.Component;

import java.util.Collection;
Expand All @@ -34,12 +36,16 @@ public class ClientManagerDelegate implements ClientManager {

private final ConnectionBasedClientManager connectionBasedClientManager;

private final IpPortBasedClientManager ipPortBasedClientManager;
private final EphemeralIpPortClientManager ephemeralIpPortClientManager;

private final PersistentIpPortClientManager persistentIpPortClientManager;

public ClientManagerDelegate(ConnectionBasedClientManager connectionBasedClientManager,
IpPortBasedClientManager ipPortBasedClientManager) {
EphemeralIpPortClientManager ephemeralIpPortClientManager,
PersistentIpPortClientManager persistentIpPortClientManager) {
this.connectionBasedClientManager = connectionBasedClientManager;
this.ipPortBasedClientManager = ipPortBasedClientManager;
this.ephemeralIpPortClientManager = ephemeralIpPortClientManager;
this.persistentIpPortClientManager = persistentIpPortClientManager;
}

@Override
Expand All @@ -64,14 +70,16 @@ public Client getClient(String clientId) {

@Override
public boolean contains(String clientId) {
return connectionBasedClientManager.contains(clientId) || ipPortBasedClientManager.contains(clientId);
return connectionBasedClientManager.contains(clientId) || ephemeralIpPortClientManager.contains(clientId)
|| persistentIpPortClientManager.contains(clientId);
}

@Override
public Collection<String> allClientId() {
Collection<String> result = new HashSet<>();
result.addAll(connectionBasedClientManager.allClientId());
result.addAll(ipPortBasedClientManager.allClientId());
result.addAll(ephemeralIpPortClientManager.allClientId());
result.addAll(persistentIpPortClientManager.allClientId());
return result;
}

Expand All @@ -86,15 +94,13 @@ public boolean verifyClient(String clientId) {
}

private ClientManager getClientManagerById(String clientId) {
return clientId.contains(":") ? ipPortBasedClientManager : connectionBasedClientManager;
}

public ConnectionBasedClientManager getConnectionBasedClientManager() {
return connectionBasedClientManager;
if (isConnectionBasedClient(clientId)) {
return connectionBasedClientManager;
}
return clientId.endsWith("false") ? persistentIpPortClientManager : ephemeralIpPortClientManager;
}

public IpPortBasedClientManager getIpPortBasedClientManager() {
return ipPortBasedClientManager;
private boolean isConnectionBasedClient(String clientId) {
return !clientId.contains(IPUtil.IP_PORT_SPLITER);
}

}
}

This file was deleted.

Loading