Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix config test case and remove useless code #11521

Merged
merged 12 commits into from
Dec 18, 2023
Next Next commit
修复客户端cmsgc导致服务端推送积压直接内存oom问题
  • Loading branch information
shiyiyue1102 committed Dec 14, 2023
commit 4bc22c55f98fb6917111df509d139ed1cab41121
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ public class ConnectionMeta {
*/
String tenant;

long firstPushQueueBlockTime = 0;

long lastPushQueueBlockTime = 0;

protected Map<String, String> labels = new HashMap<>();

public String getLabel(String labelKey) {
Expand Down Expand Up @@ -292,6 +296,23 @@ public void setTenant(String tenant) {
this.tenant = tenant;
}

public void recordPushQueueBlockTimes() {
if (this.firstPushQueueBlockTime == 0) {
firstPushQueueBlockTime = System.currentTimeMillis();
} else {
lastPushQueueBlockTime = System.currentTimeMillis();
}
}

public void clearPushQueueBlockTimes() {
this.firstPushQueueBlockTime = 0;
this.lastPushQueueBlockTime = 0;
}

public boolean pushQueueBlockTimesLastOver(long timeMillsSeconds) {
return this.lastPushQueueBlockTime - this.firstPushQueueBlockTime > timeMillsSeconds;
}

@Override
public String toString() {
return "ConnectionMeta{" + "connectType='" + connectType + '\'' + ", clientIp='" + clientIp + '\''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ public void doEject() {
// remove overload connection
ejectOverLimitConnection();
}

/**
* eject the outdated connection.
*/
private void ejectOutdatedConnection() {
try {

Loggers.CONNECTION.info("Connection check task start");

Map<String, Connection> connections = connectionManager.connections;
int totalCount = connections.size();
MetricsMonitor.getLongConnectionMonitor().set(totalCount);
Expand All @@ -77,6 +77,8 @@ private void ejectOutdatedConnection() {
Connection client = entry.getValue();
if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {
outDatedConnections.add(client.getMetaInfo().getConnectionId());
} else if (client.getMetaInfo().pushQueueBlockTimesLastOver(300 * 1000)) {
outDatedConnections.add(client.getMetaInfo().getConnectionId());
}
}

Expand Down Expand Up @@ -147,7 +149,7 @@ public void onException(Throwable e) {
Loggers.CONNECTION.error("Error occurs during connection check... ", e);
}
}

/**
* eject the over limit connection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.nacos.core.remote.grpc;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.api.grpc.auto.Payload;
import com.alibaba.nacos.api.remote.DefaultRequestFuture;
import com.alibaba.nacos.api.remote.RequestCallBack;
Expand All @@ -25,15 +26,21 @@
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.remote.client.grpc.GrpcUtils;
import com.alibaba.nacos.common.remote.exception.ConnectionAlreadyClosedException;
import com.alibaba.nacos.common.remote.exception.ConnectionBusyException;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionMeta;
import com.alibaba.nacos.core.remote.RpcAckCallbackSynchronizer;
import com.alibaba.nacos.core.utils.Loggers;
import com.alibaba.nacos.plugin.control.ControlManagerCenter;
import com.alibaba.nacos.plugin.control.tps.TpsControlManager;
import com.alibaba.nacos.plugin.control.tps.request.TpsCheckRequest;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.shaded.io.netty.channel.Channel;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.atomic.AtomicReference;

/**
* grpc connection.
*
Expand All @@ -46,6 +53,8 @@ public class GrpcConnection extends Connection {

private Channel channel;

private static TpsControlManager tpsControlManager;

public GrpcConnection(ConnectionMeta metaInfo, StreamObserver streamObserver, Channel channel) {
super(metaInfo);
this.streamObserver = streamObserver;
Expand All @@ -59,19 +68,70 @@ public GrpcConnection(ConnectionMeta metaInfo, StreamObserver streamObserver, Ch
* @throws NacosException NacosException
*/
public void sendRequestNoAck(Request request) throws NacosException {
try {
sendQueueBlockCheck();
final AtomicReference<NacosRuntimeException> exception = new AtomicReference<>();
final DefaultRequestFuture future = new DefaultRequestFuture(this.getMetaInfo().getConnectionId(), "0");
this.channel.eventLoop().execute(() -> {
//StreamObserver#onNext() is not thread-safe,synchronized is required to avoid direct memory leak.
synchronized (streamObserver) {

Payload payload = GrpcUtils.convert(request);
traceIfNecessary(payload);
streamObserver.onNext(payload);
try {
Payload payload = GrpcUtils.convert(request);
traceIfNecessary(payload);
streamObserver.onNext(payload);
future.setResponse(new Response() {
@Override
public String getMessage() {
return "";
}
});
} catch (Throwable e) {
if (e instanceof StatusRuntimeException) {
exception.set(new ConnectionAlreadyClosedException(e));
} else if (e instanceof IllegalStateException) {
exception.set(new ConnectionAlreadyClosedException(e));
} else {
exception.set(new NacosRuntimeException(NacosException.SERVER_ERROR, e));
}
future.setFailResult(exception.get());
}

}
});
try {
future.get();
} catch (Exception e) {
if (e instanceof StatusRuntimeException) {
throw new ConnectionAlreadyClosedException(e);
}
if (exception.get() != null) {
throw exception.get();
}

}

private void sendQueueBlockCheck() {
if (streamObserver instanceof ServerCallStreamObserver) {
// if bytes on queue is greater than 32k ,isReady will return false.
// queue type: grpc write queue,flowed controller queue etc.
// this 32k threshold is fixed with static final.
// see io.grpc.internal.AbstractStream.TransportState.DEFAULT_ONREADY_THRESHOLD
boolean ready = ((ServerCallStreamObserver<?>) streamObserver).isReady();
if (!ready) {
if (tpsControlManager == null) {
synchronized (GrpcConnection.class.getClass()) {
if (tpsControlManager == null) {
tpsControlManager = ControlManagerCenter.getInstance().getTpsControlManager();
tpsControlManager.registerTpsPoint("SERVER_PUSH_BLOCK");
}
}
}
TpsCheckRequest tpsCheckRequest = new TpsCheckRequest("SERVER_PUSH_BLOCK",
this.getMetaInfo().getConnectionId(), this.getMetaInfo().getClientIp());
//record block only.
tpsControlManager.check(tpsCheckRequest);
getMetaInfo().recordPushQueueBlockTimes();
throw new ConnectionBusyException("too much bytes on sending queue of this stream.");
} else {
getMetaInfo().clearPushQueueBlockTimes();
}
throw e;
}
}

Expand Down Expand Up @@ -133,8 +193,12 @@ public void close() {
if (isTraced()) {
Loggers.REMOTE_DIGEST.warn("[{}] try to close connection ", connectionId);
}

closeBiStream();

try {
closeBiStream();
} catch (Throwable e) {
Loggers.REMOTE_DIGEST.warn("[{}] connection close bi stream exception : {}", connectionId, e);
}
channel.close();

} catch (Exception e) {
Expand Down