From 2eac36797044ce8c61eb54a91f053af9c485d22b Mon Sep 17 00:00:00 2001 From: liaochuntao Date: Sat, 5 Dec 2020 01:57:17 +0800 Subject: [PATCH 1/2] fix: fix Jraft WriteRequest type problem. --- .../embedded/DistributedDatabaseOperateImpl.java | 8 ++++---- .../nacos/consistency/ConsistencyProtocol.java | 8 ++++---- .../nacos/core/distributed/raft/JRaftProtocol.java | 8 ++++---- .../raft/processor/NacosGetRequestProcessor.java | 1 + .../raft/processor/NacosLogProcessor.java | 1 + .../raft/processor/NacosReadRequestProcessor.java | 13 ++++++++++--- .../raft/processor/NacosWriteRequestProcessor.java | 12 +++++++++--- .../core/distributed/raft/utils/JRaftUtils.java | 9 ++++++--- .../persistent/impl/PersistentServiceProcessor.java | 6 +++--- 9 files changed, 42 insertions(+), 24 deletions(-) diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/repository/embedded/DistributedDatabaseOperateImpl.java b/config/src/main/java/com/alibaba/nacos/config/server/service/repository/embedded/DistributedDatabaseOperateImpl.java index 7eca60e85f2..e31f1e400ab 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/repository/embedded/DistributedDatabaseOperateImpl.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/repository/embedded/DistributedDatabaseOperateImpl.java @@ -205,7 +205,7 @@ public Class subscribeType() { NotifyCenter.registerToPublisher(ConfigDumpEvent.class, NotifyCenter.ringBufferSize); NotifyCenter.registerSubscriber(new DumpConfigHandler()); - this.protocol.addLogProcessors(Collections.singletonList(this)); + this.protocol.addRequestProcessors(Collections.singletonList(this)); LogUtil.DEFAULT_LOG.info("use DistributedTransactionServicesImpl"); } @@ -390,7 +390,7 @@ public CompletableFuture> dataImport(File file) { if (submit) { List requests = batchUpdate.stream().map(ModifyRequest::new) .collect(Collectors.toList()); - CompletableFuture future = protocol.submitAsync(WriteRequest.newBuilder().setGroup(group()) + CompletableFuture future = protocol.writeAsync(WriteRequest.newBuilder().setGroup(group()) .setData(ByteString.copyFrom(serializer.serialize(requests))) .putExtendInfo(DATA_IMPORT_KEY, Boolean.TRUE.toString()).build()); futures.add(future); @@ -432,14 +432,14 @@ public Boolean update(List sqlContext, BiConsumer) (response, ex) -> { + this.protocol.writeAsync(request).whenComplete((BiConsumer) (response, ex) -> { String errMsg = Objects.isNull(ex) ? response.getErrMsg() : ExceptionUtil.getCause(ex).getMessage(); consumer.accept(response.getSuccess(), StringUtils.isBlank(errMsg) ? null : new NJdbcException(errMsg)); diff --git a/consistency/src/main/java/com/alibaba/nacos/consistency/ConsistencyProtocol.java b/consistency/src/main/java/com/alibaba/nacos/consistency/ConsistencyProtocol.java index 534423809fe..a5a98fcb1bb 100644 --- a/consistency/src/main/java/com/alibaba/nacos/consistency/ConsistencyProtocol.java +++ b/consistency/src/main/java/com/alibaba/nacos/consistency/ConsistencyProtocol.java @@ -48,11 +48,11 @@ public interface ConsistencyProtocol processors); + void addRequestProcessors(Collection

processors); /** * Copy of metadata information for this consensus protocol. @@ -87,7 +87,7 @@ public interface ConsistencyProtocol submitAsync(WriteRequest request); + CompletableFuture writeAsync(WriteRequest request); /** * New member list . diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftProtocol.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftProtocol.java index f53ad71d714..f6353852e23 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftProtocol.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/JRaftProtocol.java @@ -159,7 +159,7 @@ public Class subscribeType() { } @Override - public void addLogProcessors(Collection processors) { + public void addRequestProcessors(Collection processors) { raftServer.createMultiRaftGroup(processors); } @@ -175,14 +175,14 @@ public CompletableFuture aGetData(ReadRequest request) { } @Override - public Response submit(WriteRequest request) throws Exception { - CompletableFuture future = submitAsync(request); + public Response write(WriteRequest request) throws Exception { + CompletableFuture future = writeAsync(request); // Here you wait for 10 seconds, as long as possible, for the request to complete return future.get(10_000L, TimeUnit.MILLISECONDS); } @Override - public CompletableFuture submitAsync(WriteRequest request) { + public CompletableFuture writeAsync(WriteRequest request) { return raftServer.commit(request.getGroup(), request, new CompletableFuture<>()); } diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosGetRequestProcessor.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosGetRequestProcessor.java index f87578083bc..59d82682325 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosGetRequestProcessor.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosGetRequestProcessor.java @@ -28,6 +28,7 @@ * * @author liaochuntao */ +@Deprecated public class NacosGetRequestProcessor extends AbstractProcessor implements RpcProcessor { private static final String INTEREST_NAME = GetRequest.class.getName(); diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosLogProcessor.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosLogProcessor.java index 0511db1e5b4..a57c6ab0515 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosLogProcessor.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosLogProcessor.java @@ -28,6 +28,7 @@ * * @author liaochuntao */ +@Deprecated public class NacosLogProcessor extends AbstractProcessor implements RpcProcessor { private static final String INTEREST_NAME = Log.class.getName(); diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosReadRequestProcessor.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosReadRequestProcessor.java index 05f2d32d3c1..acaea5cd146 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosReadRequestProcessor.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosReadRequestProcessor.java @@ -16,8 +16,10 @@ package com.alibaba.nacos.core.distributed.raft.processor; +import com.alibaba.nacos.consistency.ProtoMessageUtil; import com.alibaba.nacos.consistency.Serializer; import com.alibaba.nacos.consistency.entity.ReadRequest; +import com.alibaba.nacos.core.distributed.raft.JRaftServer; import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; @@ -28,17 +30,22 @@ */ public class NacosReadRequestProcessor extends AbstractProcessor implements RpcProcessor { - public NacosReadRequestProcessor(Serializer serializer) { + private static final String INTEREST_NAME = ReadRequest.class.getName(); + + private final JRaftServer server; + + public NacosReadRequestProcessor(JRaftServer server, Serializer serializer) { super(serializer); + this.server = server; } @Override public void handleRequest(RpcContext rpcCtx, ReadRequest request) { - + handleRequest(server, request.getGroup(), rpcCtx, request); } @Override public String interest() { - return null; + return INTEREST_NAME; } } diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosWriteRequestProcessor.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosWriteRequestProcessor.java index e684ef0a694..6562db2d446 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosWriteRequestProcessor.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosWriteRequestProcessor.java @@ -18,6 +18,7 @@ import com.alibaba.nacos.consistency.Serializer; import com.alibaba.nacos.consistency.entity.WriteRequest; +import com.alibaba.nacos.core.distributed.raft.JRaftServer; import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; @@ -28,17 +29,22 @@ */ public class NacosWriteRequestProcessor extends AbstractProcessor implements RpcProcessor { - public NacosWriteRequestProcessor(Serializer serializer) { + private static final String INTEREST_NAME = WriteRequest.class.getName(); + + private final JRaftServer server; + + public NacosWriteRequestProcessor(JRaftServer server, Serializer serializer) { super(serializer); + this.server = server; } @Override public void handleRequest(RpcContext rpcCtx, WriteRequest request) { - + handleRequest(server, request.getGroup(), rpcCtx, request); } @Override public String interest() { - return null; + return INTEREST_NAME; } } diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/JRaftUtils.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/JRaftUtils.java index cdf949bf43d..b955c39d2a6 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/JRaftUtils.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/JRaftUtils.java @@ -80,11 +80,14 @@ public static RpcServer initRpcServer(JRaftServer server, PeerId peerId) { RaftRpcServerFactory.addRaftRequestProcessors(rpcServer, RaftExecutor.getRaftCoreExecutor(), RaftExecutor.getRaftCliServiceExecutor()); + // Deprecated rpcServer.registerProcessor(new NacosLogProcessor(server, SerializeFactory.getDefault())); + // Deprecated rpcServer.registerProcessor(new NacosGetRequestProcessor(server, SerializeFactory.getDefault())); - - rpcServer.registerProcessor(new NacosWriteRequestProcessor(SerializeFactory.getDefault())); - rpcServer.registerProcessor(new NacosReadRequestProcessor(SerializeFactory.getDefault())); + + rpcServer.registerProcessor(new NacosWriteRequestProcessor(server, SerializeFactory.getDefault())); + rpcServer.registerProcessor(new NacosReadRequestProcessor(server, SerializeFactory.getDefault())); + return rpcServer; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java index 4b2cc3fa6fa..46a90c6dca4 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/impl/PersistentServiceProcessor.java @@ -65,7 +65,7 @@ public PersistentServiceProcessor(ProtocolManager protocolManager, ClusterVersio @Override public void afterConstruct() { super.afterConstruct(); - this.protocol.addLogProcessors(Collections.singletonList(this)); + this.protocol.addRequestProcessors(Collections.singletonList(this)); this.protocol.protocolMetaData() .subscribe(Constants.NAMING_PERSISTENT_SERVICE_GROUP, MetadataKey.LEADER_META_DATA, (o, arg) -> hasLeader = StringUtils.isNotBlank(String.valueOf(arg))); @@ -95,7 +95,7 @@ public void put(String key, Record value) throws NacosException { final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req))) .setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Write.desc).build(); try { - protocol.submit(request); + protocol.write(request); } catch (Exception e) { throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage()); } @@ -108,7 +108,7 @@ public void remove(String key) throws NacosException { final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req))) .setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Delete.desc).build(); try { - protocol.submit(request); + protocol.write(request); } catch (Exception e) { throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage()); } From 591350e5f62d85d44d23e94c9f733615e4b6fbb5 Mon Sep 17 00:00:00 2001 From: KomachiSion <263976490@qq.com> Date: Wed, 13 Jan 2021 14:46:03 +0800 Subject: [PATCH 2/2] For checkstyle --- .../distributed/raft/processor/NacosReadRequestProcessor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosReadRequestProcessor.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosReadRequestProcessor.java index acaea5cd146..7dc81750a67 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosReadRequestProcessor.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/processor/NacosReadRequestProcessor.java @@ -16,7 +16,6 @@ package com.alibaba.nacos.core.distributed.raft.processor; -import com.alibaba.nacos.consistency.ProtoMessageUtil; import com.alibaba.nacos.consistency.Serializer; import com.alibaba.nacos.consistency.entity.ReadRequest; import com.alibaba.nacos.core.distributed.raft.JRaftServer;