Skip to content

Commit

Permalink
Merge pull request alibaba#4684 from KomachiSion/develop-fix-jraft-wr…
Browse files Browse the repository at this point in the history
…ite-request-problem

Cherry-pick jraft write request problem from nacos2.0.0
  • Loading branch information
KeRan213539 authored Jan 13, 2021
2 parents 9989473 + 591350e commit 220d70a
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public Class<? extends Event> subscribeType() {
NotifyCenter.registerToPublisher(ConfigDumpEvent.class, NotifyCenter.ringBufferSize);
NotifyCenter.registerSubscriber(new DumpConfigHandler());

this.protocol.addLogProcessors(Collections.singletonList(this));
this.protocol.addRequestProcessors(Collections.singletonList(this));
LogUtil.DEFAULT_LOG.info("use DistributedTransactionServicesImpl");
}

Expand Down Expand Up @@ -390,7 +390,7 @@ public CompletableFuture<RestResult<String>> dataImport(File file) {
if (submit) {
List<ModifyRequest> requests = batchUpdate.stream().map(ModifyRequest::new)
.collect(Collectors.toList());
CompletableFuture<Response> future = protocol.submitAsync(WriteRequest.newBuilder().setGroup(group())
CompletableFuture<Response> future = protocol.writeAsync(WriteRequest.newBuilder().setGroup(group())
.setData(ByteString.copyFrom(serializer.serialize(requests)))
.putExtendInfo(DATA_IMPORT_KEY, Boolean.TRUE.toString()).build());
futures.add(future);
Expand Down Expand Up @@ -432,14 +432,14 @@ public Boolean update(List<ModifyRequest> sqlContext, BiConsumer<Boolean, Throwa
.putAllExtendInfo(EmbeddedStorageContextUtils.getCurrentExtendInfo())
.setType(sqlContext.getClass().getCanonicalName()).build();
if (Objects.isNull(consumer)) {
Response response = this.protocol.submit(request);
Response response = this.protocol.write(request);
if (response.getSuccess()) {
return true;
}
LogUtil.DEFAULT_LOG.error("execute sql modify operation failed : {}", response.getErrMsg());
return false;
} else {
this.protocol.submitAsync(request).whenComplete((BiConsumer<Response, Throwable>) (response, ex) -> {
this.protocol.writeAsync(request).whenComplete((BiConsumer<Response, Throwable>) (response, ex) -> {
String errMsg = Objects.isNull(ex) ? response.getErrMsg() : ExceptionUtil.getCause(ex).getMessage();
consumer.accept(response.getSuccess(),
StringUtils.isBlank(errMsg) ? null : new NJdbcException(errMsg));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ public interface ConsistencyProtocol<T extends Config, P extends RequestProcesso
void init(T config);

/**
* Add a log handler.
* Add a request handler.
*
* @param processors {@link RequestProcessor}
*/
void addLogProcessors(Collection<P> processors);
void addRequestProcessors(Collection<P> processors);

/**
* Copy of metadata information for this consensus protocol.
Expand Down Expand Up @@ -87,7 +87,7 @@ public interface ConsistencyProtocol<T extends Config, P extends RequestProcesso
* @return submit operation result {@link Response}
* @throws Exception {@link Exception}
*/
Response submit(WriteRequest request) throws Exception;
Response write(WriteRequest request) throws Exception;

/**
* Data submission operation, returning submission results asynchronously.
Expand All @@ -97,7 +97,7 @@ public interface ConsistencyProtocol<T extends Config, P extends RequestProcesso
* @return {@link CompletableFuture} submit result
* @throws Exception when submit throw Exception
*/
CompletableFuture<Response> submitAsync(WriteRequest request);
CompletableFuture<Response> writeAsync(WriteRequest request);

/**
* New member list .
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public Class<? extends Event> subscribeType() {
}

@Override
public void addLogProcessors(Collection<RequestProcessor4CP> processors) {
public void addRequestProcessors(Collection<RequestProcessor4CP> processors) {
raftServer.createMultiRaftGroup(processors);
}

Expand All @@ -175,14 +175,14 @@ public CompletableFuture<Response> aGetData(ReadRequest request) {
}

@Override
public Response submit(WriteRequest request) throws Exception {
CompletableFuture<Response> future = submitAsync(request);
public Response write(WriteRequest request) throws Exception {
CompletableFuture<Response> future = writeAsync(request);
// Here you wait for 10 seconds, as long as possible, for the request to complete
return future.get(10_000L, TimeUnit.MILLISECONDS);
}

@Override
public CompletableFuture<Response> submitAsync(WriteRequest request) {
public CompletableFuture<Response> writeAsync(WriteRequest request) {
return raftServer.commit(request.getGroup(), request, new CompletableFuture<>());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@Deprecated
public class NacosGetRequestProcessor extends AbstractProcessor implements RpcProcessor<GetRequest> {

private static final String INTEREST_NAME = GetRequest.class.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@Deprecated
public class NacosLogProcessor extends AbstractProcessor implements RpcProcessor<Log> {

private static final String INTEREST_NAME = Log.class.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.nacos.consistency.Serializer;
import com.alibaba.nacos.consistency.entity.ReadRequest;
import com.alibaba.nacos.core.distributed.raft.JRaftServer;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;

Expand All @@ -28,17 +29,22 @@
*/
public class NacosReadRequestProcessor extends AbstractProcessor implements RpcProcessor<ReadRequest> {

public NacosReadRequestProcessor(Serializer serializer) {
private static final String INTEREST_NAME = ReadRequest.class.getName();

private final JRaftServer server;

public NacosReadRequestProcessor(JRaftServer server, Serializer serializer) {
super(serializer);
this.server = server;
}

@Override
public void handleRequest(RpcContext rpcCtx, ReadRequest request) {

handleRequest(server, request.getGroup(), rpcCtx, request);
}

@Override
public String interest() {
return null;
return INTEREST_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.nacos.consistency.Serializer;
import com.alibaba.nacos.consistency.entity.WriteRequest;
import com.alibaba.nacos.core.distributed.raft.JRaftServer;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;

Expand All @@ -28,17 +29,22 @@
*/
public class NacosWriteRequestProcessor extends AbstractProcessor implements RpcProcessor<WriteRequest> {

public NacosWriteRequestProcessor(Serializer serializer) {
private static final String INTEREST_NAME = WriteRequest.class.getName();

private final JRaftServer server;

public NacosWriteRequestProcessor(JRaftServer server, Serializer serializer) {
super(serializer);
this.server = server;
}

@Override
public void handleRequest(RpcContext rpcCtx, WriteRequest request) {

handleRequest(server, request.getGroup(), rpcCtx, request);
}

@Override
public String interest() {
return null;
return INTEREST_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,14 @@ public static RpcServer initRpcServer(JRaftServer server, PeerId peerId) {
RaftRpcServerFactory.addRaftRequestProcessors(rpcServer, RaftExecutor.getRaftCoreExecutor(),
RaftExecutor.getRaftCliServiceExecutor());

// Deprecated
rpcServer.registerProcessor(new NacosLogProcessor(server, SerializeFactory.getDefault()));
// Deprecated
rpcServer.registerProcessor(new NacosGetRequestProcessor(server, SerializeFactory.getDefault()));

rpcServer.registerProcessor(new NacosWriteRequestProcessor(SerializeFactory.getDefault()));
rpcServer.registerProcessor(new NacosReadRequestProcessor(SerializeFactory.getDefault()));

rpcServer.registerProcessor(new NacosWriteRequestProcessor(server, SerializeFactory.getDefault()));
rpcServer.registerProcessor(new NacosReadRequestProcessor(server, SerializeFactory.getDefault()));

return rpcServer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public PersistentServiceProcessor(ProtocolManager protocolManager, ClusterVersio
@Override
public void afterConstruct() {
super.afterConstruct();
this.protocol.addLogProcessors(Collections.singletonList(this));
this.protocol.addRequestProcessors(Collections.singletonList(this));
this.protocol.protocolMetaData()
.subscribe(Constants.NAMING_PERSISTENT_SERVICE_GROUP, MetadataKey.LEADER_META_DATA,
(o, arg) -> hasLeader = StringUtils.isNotBlank(String.valueOf(arg)));
Expand Down Expand Up @@ -95,7 +95,7 @@ public void put(String key, Record value) throws NacosException {
final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req)))
.setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Write.desc).build();
try {
protocol.submit(request);
protocol.write(request);
} catch (Exception e) {
throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage());
}
Expand All @@ -108,7 +108,7 @@ public void remove(String key) throws NacosException {
final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req)))
.setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Delete.desc).build();
try {
protocol.submit(request);
protocol.write(request);
} catch (Exception e) {
throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage());
}
Expand Down

0 comments on commit 220d70a

Please sign in to comment.