Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add callback, simple retry and metric for naming push #4533

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,9 @@ public class NamingResponseCode extends ResponseCode {
*/
public static final int RESOURCE_NOT_FOUND = 20404;

/**
* Stop or no need to retry.
*/
public static final int NO_NEED_RETRY = 21600;

}
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,10 @@ public void onEvent(InstancesChangeEvent event) {
for (final EventListener listener : eventListeners) {
final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);
if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
((AbstractEventListener) listener).getExecutor().execute(new Runnable() {
@Override
public void run() {
listener.onEvent(namingEvent);
}
});
continue;
((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent));
} else {
listener.onEvent(namingEvent);
}
listener.onEvent(namingEvent);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,7 @@ public void notifySubscriber(final Subscriber subscriber, final Event event) {

LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);

final Runnable job = new Runnable() {
@Override
public void run() {
subscriber.onEvent(event);
}
};

final Runnable job = () -> subscriber.onEvent(event);
final Executor executor = subscriber.executor();

if (executor != null) {
Expand All @@ -208,7 +202,7 @@ public void run() {
try {
job.run();
} catch (Throwable e) {
LOGGER.error("Event callback exception : {}", e);
LOGGER.error("Event callback exception: ", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,22 +88,16 @@ public void request(Payload grpcRequest, StreamObserver<Payload> responseObserve
Response response = requestHandler.handleRequest(request, parseObj.getMetadata());
responseObserver.onNext(GrpcUtils.convert(response));
responseObserver.onCompleted();
return;
} catch (Throwable e) {

Loggers.REMOTE_DIGEST.error(String
.format("[%s] fail to handle request ,error message :%s", "grpc", e.getMessage(), e));
Loggers.REMOTE_DIGEST.error("[{}] fail to handle request ,error message :{}", "grpc", e.getMessage(), e);
responseObserver.onNext(GrpcUtils.convert(buildFailResponse("Error")));
responseObserver.onCompleted();
return;
}
} else {
Loggers.REMOTE_DIGEST.debug(String.format("[%s] no handler for request type : %s :", "grpc", type));
responseObserver.onNext(GrpcUtils.convert(buildFailResponse("RequestHandler Not Found")));
responseObserver.onCompleted();
return;
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
import com.alibaba.nacos.naming.misc.SwitchEntry;
import com.alibaba.nacos.naming.misc.SwitchManager;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.push.AckEntry;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.naming.push.PushService;
import com.alibaba.nacos.naming.remote.udp.AckEntry;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
Expand All @@ -62,8 +63,6 @@
@RequestMapping({UtilsAndCommons.NACOS_NAMING_CONTEXT + "/operator", UtilsAndCommons.NACOS_NAMING_CONTEXT + "/ops"})
public class OperatorController {

private final PushService pushService;

private final SwitchManager switchManager;

private final ServerListManager serverListManager;
Expand All @@ -80,10 +79,9 @@ public class OperatorController {

private final RaftCore raftCore;

public OperatorController(PushService pushService, SwitchManager switchManager, ServerListManager serverListManager,
public OperatorController(SwitchManager switchManager, ServerListManager serverListManager,
ServiceManager serviceManager, ServerMemberManager memberManager, ServerStatusManager serverStatusManager,
SwitchDomain switchDomain, DistroMapper distroMapper, RaftCore raftCore) {
this.pushService = pushService;
this.switchManager = switchManager;
this.serverListManager = serverListManager;
this.serviceManager = serviceManager;
Expand All @@ -108,12 +106,13 @@ public ObjectNode pushState(@RequestParam(required = false) boolean detail,
ObjectNode result = JacksonUtils.createEmptyJsonNode();

List<AckEntry> failedPushes = PushService.getFailedPushes();
int failedPushCount = pushService.getFailedPushCount();
result.put("succeed", pushService.getTotalPush() - failedPushCount);
result.put("total", pushService.getTotalPush());
int failedPushCount = MetricsMonitor.getFailedPushMonitor().get();
int totalPushCount = MetricsMonitor.getTotalPushMonitor().get();
result.put("succeed", totalPushCount - failedPushCount);
result.put("total", totalPushCount);

if (pushService.getTotalPush() > 0) {
result.put("ratio", ((float) pushService.getTotalPush() - failedPushCount) / pushService.getTotalPush());
if (totalPushCount > 0) {
result.put("ratio", ((float) totalPushCount - failedPushCount) / totalPushCount);
} else {
result.put("ratio", 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.pojo.Subscribers;
import com.alibaba.nacos.naming.push.NamingSubscriberServiceV2Impl;
import com.alibaba.nacos.naming.push.v2.NamingSubscriberServiceV2Impl;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public ServiceChangedEvent(Service service) {

public ServiceChangedEvent(Service service, boolean incrementRevision) {
super(service);
service.renewUpdateTime();
if (incrementRevision) {
service.incrementRevision();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ private void updateInstanceMetadata(MetadataOperation<InstanceMetadata> op) {
private void deleteInstanceMetadata(MetadataOperation<InstanceMetadata> op) {
Service service = Service.newService(op.getNamespace(), op.getGroup(), op.getServiceName());
namingMetadataManager.removeInstanceMetadata(service, op.getTag());
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,12 @@ public long getLastUpdatedTime() {
return lastUpdatedTime;
}

public void renewUpdateTime() {
lastUpdatedTime = System.currentTimeMillis();
}

public void incrementRevision() {
revision.incrementAndGet();
lastUpdatedTime = System.currentTimeMillis();
}

public String getGroupedServiceName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ public class GlobalExecutor {
.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
new NameThreadFactory("com.alibaba.nacos.naming.remote-connection-manager"));

private static final ExecutorService PUSH_CALLBACK_EXECUTOR = ExecutorFactory.Managed
.newSingleExecutorService("Push", new NameThreadFactory("com.alibaba.nacos.naming.push.callback"));

/**
* Register raft leader election executor.
*
Expand Down Expand Up @@ -293,4 +296,8 @@ public static void schedulePerformanceLogger(Runnable runnable, long initialDela
public static void scheduleExpiredClientCleaner(Runnable runnable, long initialDelay, long delay, TimeUnit unit) {
EXPIRED_CLIENT_CLEANER_EXECUTOR.scheduleWithFixedDelay(runnable, initialDelay, delay, unit);
}

public static ExecutorService getCallbackExecutor() {
return PUSH_CALLBACK_EXECUTOR;
}
}
Loading