Skip to content

Commit

Permalink
Update cluster related command APIs and some enhancement and fix for …
Browse files Browse the repository at this point in the history
…cluster module

Signed-off-by: Eric Zhao <sczyh16@gmail.com>
  • Loading branch information
sczyh30 committed Dec 14, 2018
1 parent a5819e0 commit 2735954
Show file tree
Hide file tree
Showing 33 changed files with 865 additions and 98 deletions.
3 changes: 2 additions & 1 deletion .codecov.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
ignore:
- "sentinel-demo/.*"
- "sentinel-dashboard/.*"
- "sentinel-benchmark/.*"
- "sentinel-benchmark/.*"
- "sentinel-transport/.*"
7 changes: 7 additions & 0 deletions sentinel-cluster/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Sentinel Cluster Flow Control

This is the default implementation of Sentinel cluster flow control.

- `sentinel-cluster-common-default`: common module for cluster transport and functions
- `sentinel-cluster-client-default`: default cluster client module using Netty as underlying transport library
- `sentinel-cluster-server-default`: default cluster server module
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public void onRemoteServerChange(ClusterClientConfig clusterClientConfig) {
changeServer(clusterClientConfig);
}
});
// TODO: check here, who should start the client?
initNewConnection();
}

Expand Down Expand Up @@ -90,7 +89,6 @@ private void changeServer(/*@Valid*/ ClusterClientConfig config) {
return;
}
try {
// TODO: what if the client is pending init?
if (transportClient != null) {
transportClient.stop();
}
Expand All @@ -108,12 +106,14 @@ private void startClientIfScheduled() throws Exception {
if (shouldStart.get()) {
if (transportClient != null) {
transportClient.start();
} else {
RecordLog.warn("[DefaultClusterTokenClient] Cannot start transport client: client not created");
}
}
}

private void stopClientIfStarted() throws Exception {
if (shouldStart.get()) {
if (shouldStart.compareAndSet(true, false)) {
if (transportClient != null) {
transportClient.stop();
}
Expand All @@ -129,9 +129,7 @@ public void start() throws Exception {

@Override
public void stop() throws Exception {
if (shouldStart.compareAndSet(true, false)) {
stopClientIfStarted();
}
stopClientIfStarted();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ private synchronized void applyConfig(ClusterClientConfig config) {
public static boolean isValidConfig(ClusterClientConfig config) {
return config != null && StringUtil.isNotBlank(config.getServerHost())
&& config.getServerPort() > 0
&& config.getServerPort() <= 65535
&& config.getRequestTimeout() > 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,13 @@ public TokenClientHandler(AtomicInteger currentState, Runnable disconnectCallbac

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
currentState.compareAndSet(ClientConstants.CLIENT_STATUS_PENDING, ClientConstants.CLIENT_STATUS_STARTED);
currentState.set(ClientConstants.CLIENT_STATUS_STARTED);
fireClientPing(ctx);
RecordLog.info("[TokenClientHandler] Client handler active, remote address: " + ctx.channel().remoteAddress());
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(String.format("[%s] Client message recv: %s", System.currentTimeMillis(), msg)); // TODO: remove here
if (msg instanceof ClusterResponse) {
ClusterResponse<?> response = (ClusterResponse) msg;

Expand Down Expand Up @@ -96,7 +95,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
RecordLog.info("[TokenClientHandler] Client channel unregistered, remote address: " + ctx.channel().remoteAddress());
currentState.compareAndSet(ClientConstants.CLIENT_STATUS_STARTED, ClientConstants.CLIENT_STATUS_OFF);
currentState.set(ClientConstants.CLIENT_STATUS_OFF);
disconnectCallback.run();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public CommandResponse<String> handle(CommandRequest request) {
ClusterClientConfig clusterClientConfig = JSON.parseObject(data, ClusterClientConfig.class);
ClusterClientConfigManager.applyNewConfig(clusterClientConfig);

return CommandResponse.ofSuccess("ok");
return CommandResponse.ofSuccess("success");
} catch (Exception e) {
RecordLog.warn("[ModifyClusterClientConfigHandler] Decode client cluster config error", e);
return CommandResponse.ofFailure(e, "decode client cluster config error");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.alibaba.csp.sentinel.cluster.flow.rule;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -163,6 +164,11 @@ private static void registerPropertyInternal(/*@NonNull*/ String namespace, /*@V
}
}

/**
* Remove cluster flow rule property for a specific namespace.
*
* @param namespace valid namespace
*/
public static void removeProperty(String namespace) {
AssertUtil.notEmpty(namespace, "namespace cannot be empty");
synchronized (UPDATE_LOCK) {
Expand All @@ -189,17 +195,70 @@ private static void restorePropertyListeners() {
}
}

/**
* Get flow rule by rule ID.
*
* @param id rule ID
* @return flow rule
*/
public static FlowRule getFlowRuleById(Long id) {
if (!ClusterRuleUtil.validId(id)) {
return null;
}
return FLOW_RULES.get(id);
}

public static List<FlowRule> getAllFlowRules() {
return new ArrayList<>(FLOW_RULES.values());
}

/**
* Get all cluster flow rules within a specific namespace.
*
* @param namespace valid namespace
* @return cluster flow rules within the provided namespace
*/
public static List<FlowRule> getFlowRules(String namespace) {
if (StringUtil.isEmpty(namespace)) {
return new ArrayList<>();
}
List<FlowRule> rules = new ArrayList<>();
Set<Long> flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace);
if (flowIdSet == null || flowIdSet.isEmpty()) {
return rules;
}
for (Long flowId : flowIdSet) {
FlowRule rule = FLOW_RULES.get(flowId);
if (rule != null) {
rules.add(rule);
}
}
return rules;
}

/**
* Load flow rules for a specific namespace. The former rules of the namespace will be replaced.
*
* @param namespace a valid namespace
* @param rules rule list
*/
public static void loadRules(String namespace, List<FlowRule> rules) {
AssertUtil.notEmpty(namespace, "namespace cannot be empty");
NamespaceFlowProperty<FlowRule> property = PROPERTY_MAP.get(namespace);
if (property != null) {
property.getProperty().updateValue(rules);
}
}

private static void resetNamespaceFlowIdMapFor(/*@Valid*/ String namespace) {
NAMESPACE_FLOW_ID_MAP.put(namespace, new HashSet<Long>());
}

/**
* Clear all rules of the provided namespace and reset map.
*
* @param namespace valid namespace
*/
private static void clearAndResetRulesFor(/*@Valid*/ String namespace) {
Set<Long> flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace);
if (flowIdSet != null && !flowIdSet.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.alibaba.csp.sentinel.cluster.flow.rule;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -217,6 +218,54 @@ public static ParamFlowRule getParamRuleById(Long id) {
return PARAM_RULES.get(id);
}

public static List<ParamFlowRule> getAllParamRules() {
return new ArrayList<>(PARAM_RULES.values());
}

/**
* Get all cluster parameter flow rules within a specific namespace.
*
* @param namespace a valid namespace
* @return cluster parameter flow rules within the provided namespace
*/
public static List<ParamFlowRule> getParamRules(String namespace) {
if (StringUtil.isEmpty(namespace)) {
return new ArrayList<>();
}
List<ParamFlowRule> rules = new ArrayList<>();
Set<Long> flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace);
if (flowIdSet == null || flowIdSet.isEmpty()) {
return rules;
}
for (Long flowId : flowIdSet) {
ParamFlowRule rule = PARAM_RULES.get(flowId);
if (rule != null) {
rules.add(rule);
}
}
return rules;
}

/**
* Load parameter flow rules for a specific namespace. The former rules of the namespace will be replaced.
*
* @param namespace a valid namespace
* @param rules rule list
*/
public static void loadRules(String namespace, List<ParamFlowRule> rules) {
AssertUtil.notEmpty(namespace, "namespace cannot be empty");
NamespaceFlowProperty<ParamFlowRule> property = PROPERTY_MAP.get(namespace);
if (property != null) {
property.getProperty().updateValue(rules);
}
}

/**
* Get connected count for associated namespace of given {@code flowId}.
*
* @param flowId existing rule ID
* @return connected count
*/
public static int getConnectedCount(long flowId) {
if (flowId <= 0) {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private synchronized void changeServerConfig(ServerTransportConfig config) {
}
try {
if (server != null) {
stopServerIfStarted();
stopServer();
}
this.server = new NettyTransportServer(newPort);
this.port = newPort;
Expand All @@ -101,13 +101,11 @@ private void startServerIfScheduled() throws Exception {
}
}

private void stopServerIfStarted() throws Exception {
if (shouldStart.get()) {
if (server != null) {
server.stop();
if (embedded) {
handleEmbeddedStop();
}
private void stopServer() throws Exception {
if (server != null) {
server.stop();
if (embedded) {
handleEmbeddedStop();
}
}
}
Expand Down Expand Up @@ -136,7 +134,7 @@ public void start() throws Exception {
@Override
public void stop() throws Exception {
if (shouldStart.compareAndSet(true, false)) {
stopServerIfStarted();
stopServer();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.command.handler;

import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager;
import com.alibaba.csp.sentinel.command.CommandHandler;
import com.alibaba.csp.sentinel.command.CommandRequest;
import com.alibaba.csp.sentinel.command.CommandResponse;
import com.alibaba.csp.sentinel.command.annotation.CommandMapping;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.fastjson.JSON;

/**
* @author Eric Zhao
* @since 1.4.0
*/
@CommandMapping(name = "cluster/server/flowRules")
public class FetchClusterFlowRulesCommandHandler implements CommandHandler<String> {

@Override
public CommandResponse<String> handle(CommandRequest request) {
String namespace = request.getParam("namespace");
if (StringUtil.isEmpty(namespace)) {
return CommandResponse.ofSuccess(JSON.toJSONString(ClusterFlowRuleManager.getAllFlowRules()));
} else {
return CommandResponse.ofSuccess(JSON.toJSONString(ClusterFlowRuleManager.getFlowRules(namespace)));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.command.handler;

import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager;
import com.alibaba.csp.sentinel.command.CommandHandler;
import com.alibaba.csp.sentinel.command.CommandRequest;
import com.alibaba.csp.sentinel.command.CommandResponse;
import com.alibaba.csp.sentinel.command.annotation.CommandMapping;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.fastjson.JSON;

/**
* @author Eric Zhao
* @since 1.4.0
*/
@CommandMapping(name = "cluster/server/paramRules")
public class FetchClusterParamFlowRulesCommandHandler implements CommandHandler<String> {

@Override
public CommandResponse<String> handle(CommandRequest request) {
String namespace = request.getParam("namespace");
if (StringUtil.isEmpty(namespace)) {
return CommandResponse.ofSuccess(JSON.toJSONString(ClusterParamFlowRuleManager.getAllParamRules()));
} else {
return CommandResponse.ofSuccess(JSON.toJSONString(ClusterParamFlowRuleManager.getParamRules(namespace)));
}
}
}
Loading

0 comments on commit 2735954

Please sign in to comment.