Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add client distro sync logic #3809

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ public class CommonParams {
public static final String GROUP_NAME = "groupName";

public static final String LIGHT_BEAT_ENABLED = "lightBeatEnabled";

public static final String NAMING_REQUEST_TIMEOUT = "namingRequestTimeout";
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public class ServiceQueryRequest extends AbstractNamingRequest {
public ServiceQueryRequest() {
}

public ServiceQueryRequest(String namespace, String serviceName) {
super(namespace, serviceName, null);
public ServiceQueryRequest(String namespace, String serviceName, String groupName) {
super(namespace, serviceName, groupName);
}

public String getCluster() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfo
this.serverListManager = new ServerListManager(properties);
this.serviceInfoHolder = serviceInfoHolder;
this.httpClientProxy = new NamingHttpClientProxy(namespace, serverListManager, properties, serviceInfoHolder);
this.grpcClientProxy = new NamingGrpcClientProxy(namespace, serverListManager, serviceInfoHolder);
this.grpcClientProxy = new NamingGrpcClientProxy(namespace, serverListManager, properties, serviceInfoHolder);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.nacos.client.naming.remote.gprc;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.CommonParams;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ListView;
import com.alibaba.nacos.api.naming.pojo.Service;
Expand Down Expand Up @@ -46,7 +47,9 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;

import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;

Expand All @@ -59,17 +62,23 @@ public class NamingGrpcClientProxy implements NamingClientProxy {

private final String namespaceId;

private final String uuid;

private final Long requestTimeout;

private final RpcClient rpcClient;

private final NamingGrpcConnectionEventListener namingGrpcConnectionEventListener;

public NamingGrpcClientProxy(String namespaceId, ServerListFactory serverListFactory,
public NamingGrpcClientProxy(String namespaceId, ServerListFactory serverListFactory, Properties properties,
ServiceInfoHolder serviceInfoHolder) throws NacosException {
this.namespaceId = namespaceId;
this.uuid = UUID.randomUUID().toString();
this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));
Map<String, String> labels = new HashMap<String, String>();
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
this.rpcClient = RpcClientFactory.createClient("naming", ConnectionType.GRPC, labels);
this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
this.namingGrpcConnectionEventListener = new NamingGrpcConnectionEventListener(this);
start(serverListFactory, serviceInfoHolder);
}
Expand Down Expand Up @@ -110,8 +119,7 @@ public void updateInstance(String serviceName, String groupName, Instance instan
@Override
public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
boolean healthyOnly) throws NacosException {
ServiceQueryRequest request = new ServiceQueryRequest(namespaceId,
NamingUtils.getGroupedName(serviceName, groupName));
ServiceQueryRequest request = new ServiceQueryRequest(namespaceId, serviceName, groupName);
request.setCluster(clusters);
request.setHealthyOnly(healthyOnly);
request.setUdpPort(udpPort);
Expand Down Expand Up @@ -185,7 +193,7 @@ public boolean serverHealthy() {

private <T extends Response> T requestToServer(Request request, Class<T> responseClass) throws NacosException {
try {
Response response = rpcClient.request(request);
Response response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
throw new NacosException(response.getErrorCode(), response.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public void receiveEvent(Event event) {

// Get for Map, the algorithm is O(1).
Set<Subscriber> subscribers = subMappings.get(slowEventType);
if (null == subscribers) {
LOGGER.warn("[NotifyCenter] No subscribers for event " + slowEventType.getSimpleName());
}

// Notification single event subscriber
for (Subscriber subscriber : subscribers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,9 @@ public enum DataOperation {
/**
* Data verify.
*/
VERIFY;
VERIFY,
/**
* Data Snapshot.
*/
SNAPSHOT;
}
5 changes: 0 additions & 5 deletions console/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ server.servlet.contextPath=/nacos
nacos.naming.empty-service.auto-clean=true
nacos.naming.empty-service.clean.initial-delay-ms=50000
nacos.naming.empty-service.clean.period-time-ms=30000
spring.datasource.platform=mysql
db.num=1
db.url.0=jdbc:mysql://10.101.167.27:3306/acm?characterEncoding=utf8&connectTimeout=1000&socketTimeout=10000&autoReconnect=true
db.user=root
db.password=root


#*************** CMDB Module Related Configurations ***************#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ public class DistroException extends RuntimeException {

private static final long serialVersionUID = 1711141952413139786L;

public DistroException(String message) {
super(message);
}

public DistroException(String message, Throwable cause) {
super(message, cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ public void merge(AbstractDelayTask task) {
if (!(task instanceof DistroDelayTask)) {
return;
}
DistroDelayTask newTask = (DistroDelayTask) task;
if (!action.equals(newTask.getAction()) && createTime < newTask.getCreateTime()) {
action = newTask.getAction();
createTime = newTask.getCreateTime();
DistroDelayTask oldTask = (DistroDelayTask) task;
if (!action.equals(oldTask.getAction()) && createTime < oldTask.getCreateTime()) {
action = oldTask.getAction();
createTime = oldTask.getCreateTime();
}
setLastProcessTime(newTask.getLastProcessTime());
setLastProcessTime(oldTask.getLastProcessTime());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder;
import com.alibaba.nacos.core.distributed.distro.task.execute.DistroSyncChangeTask;
import com.alibaba.nacos.core.distributed.distro.task.execute.DistroSyncDeleteTask;

/**
* Distro delay task processor.
Expand Down Expand Up @@ -52,6 +53,10 @@ public boolean process(AbstractDelayTask task) {
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().dispatch(distroKey, syncChangeTask);
return true;
} else if (DataOperation.DELETE.equals(distroDelayTask.getAction())) {
DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().dispatch(distroKey, syncDeleteTask);
return true;
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
package com.alibaba.nacos.core.distributed.distro.task.execute;

import com.alibaba.nacos.common.task.AbstractExecuteTask;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder;
import com.alibaba.nacos.core.distributed.distro.component.DistroFailedTaskHandler;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.core.utils.Loggers;

/**
* Abstract distro execute task.
Expand All @@ -28,11 +32,53 @@ public abstract class AbstractDistroExecuteTask extends AbstractExecuteTask impl

private final DistroKey distroKey;

protected AbstractDistroExecuteTask(DistroKey distroKey) {
private final DistroComponentHolder distroComponentHolder;

protected AbstractDistroExecuteTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) {
this.distroKey = distroKey;
this.distroComponentHolder = distroComponentHolder;
}

protected DistroKey getDistroKey() {
return distroKey;
}

protected DistroComponentHolder getDistroComponentHolder() {
return distroComponentHolder;
}

@Override
public void run() {
Loggers.DISTRO.info("[DISTRO-START] {}", toString());
try {
boolean result = doExecute();
if (!result) {
handleFailedTask();
}
Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
} catch (Exception e) {
Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
handleFailedTask();
}
}

/**
* Do execute for different sub class.
*
* @return result of execute
*/
protected abstract boolean doExecute();

/**
* Handle failed task.
*/
protected void handleFailedTask() {
String type = getDistroKey().getResourceType();
DistroFailedTaskHandler failedTaskHandler = distroComponentHolder.findFailedTaskHandler(type);
if (null == failedTaskHandler) {
Loggers.DISTRO.warn("[DISTRO] Can't find failed task for type {}, so discarded", type);
return;
}
failedTaskHandler.retry(getDistroKey(), DataOperation.CHANGE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder;
import com.alibaba.nacos.core.distributed.distro.component.DistroFailedTaskHandler;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.core.utils.Loggers;
Expand All @@ -30,39 +29,21 @@
*/
public class DistroSyncChangeTask extends AbstractDistroExecuteTask {

private final DistroComponentHolder distroComponentHolder;

public DistroSyncChangeTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) {
super(distroKey);
this.distroComponentHolder = distroComponentHolder;
super(distroKey, distroComponentHolder);
}

@Override
public void run() {
Loggers.DISTRO.info("[DISTRO-START] {}", toString());
try {
String type = getDistroKey().getResourceType();
DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
distroData.setType(DataOperation.CHANGE);
boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
if (!result) {
handleFailedTask();
}
Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
} catch (Exception e) {
Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
handleFailedTask();
}
}

private void handleFailedTask() {
protected boolean doExecute() {
String type = getDistroKey().getResourceType();
DistroFailedTaskHandler failedTaskHandler = distroComponentHolder.findFailedTaskHandler(type);
if (null == failedTaskHandler) {
Loggers.DISTRO.warn("[DISTRO] Can't find failed task for type {}, so discarded", type);
return;
DistroData distroData = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
if (null != distroData) {
distroData.setType(DataOperation.CHANGE);
return getDistroComponentHolder().findTransportAgent(type)
.syncData(distroData, getDistroKey().getTargetServer());
}
failedTaskHandler.retry(getDistroKey(), DataOperation.CHANGE);
Loggers.DISTRO.warn("[DISTRO-END] {} with null data to sync, skip", toString());
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.core.distributed.distro.task.execute;

import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;

/**
* Distro sync delete task.
*
* @author xiweng.yy
*/
public class DistroSyncDeleteTask extends AbstractDistroExecuteTask {

public DistroSyncDeleteTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) {
super(distroKey, distroComponentHolder);
}

@Override
protected boolean doExecute() {
String type = getDistroKey().getResourceType();
DistroData distroData = new DistroData();
distroData.setDistroKey(getDistroKey());
distroData.setType(DataOperation.DELETE);
return getDistroComponentHolder().findTransportAgent(type)
.syncData(distroData, getDistroKey().getTargetServer());
}

@Override
public String toString() {
return "DistroSyncDeleteTask for " + getDistroKey().toString();
}
}
Loading