Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add single server rale limit #6756

Open
wants to merge 17 commits into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1053,4 +1053,29 @@ public interface ConfigurationKeys {
* The constant META_PREFIX
*/
String META_PREFIX = SEATA_FILE_ROOT_CONFIG + FILE_CONFIG_SPLIT_CHAR + FILE_ROOT_REGISTRY + FILE_CONFIG_SPLIT_CHAR + "metadata.";

/**
* The constant RATE_LIMIT_PREFIX.
*/
String RATE_LIMIT_PREFIX = SERVER_PREFIX + "ratelimit.";

/**
* The constant RATE_LIMIT_BUCKET_TOKEN_SECOND_NUM.
*/
String RATE_LIMIT_BUCKET_TOKEN_SECOND_NUM = RATE_LIMIT_PREFIX + "bucketTokenSecondNum";

/**
* The constant RATE_LIMIT_ENABLE.
*/
String RATE_LIMIT_ENABLE = RATE_LIMIT_PREFIX + "enable";

/**
* The constant RATE_LIMIT_BUCKET_TOKEN_MAX_NUM.
*/
String RATE_LIMIT_BUCKET_TOKEN_MAX_NUM = RATE_LIMIT_PREFIX + "bucketTokenMaxNum";

/**
* The constant RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM.
*/
String RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM = RATE_LIMIT_PREFIX + "bucketTokenInitialNum";
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package io.seata.tm.api;

import java.util.concurrent.TimeUnit;

import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
Expand All @@ -28,6 +26,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
* The type Default failure handler.
*/
Expand Down Expand Up @@ -77,6 +77,11 @@ public void onRollbacking(GlobalTransaction tx, Throwable originalException) {
SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
}

@Override
public void onBeginRateLimitedFailure(org.apache.seata.tm.api.GlobalTransaction globalTransaction, Throwable cause) {
LOGGER.warn("Failed to begin transaction due to RateLimit. ", cause);
}

protected class CheckTimerTask implements TimerTask {

private final GlobalTransaction tx;
Expand Down
104 changes: 104 additions & 0 deletions core/src/main/java/org/apache/seata/core/event/RateLimitEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.event;

public class RateLimitEvent implements Event {

/**
* The Trace id.
*/
private String traceId;

/**
* The Limit type (like GlobalBeginFailed).
*/
private String limitType;

/**
* The Application id.
*/
private String applicationId;

/**
* The Client id.
*/
private String clientId;

/**
* The Server ip address and port.
*/
private String serverIpAddressAndPort;

public String getTraceId() {
return traceId;
}

public void setTraceId(String traceId) {
this.traceId = traceId;
}

public String getLimitType() {
return limitType;
}

public void setLimitType(String limitType) {
this.limitType = limitType;
}

public String getApplicationId() {
return applicationId;
}

public void setApplicationId(String applicationId) {
this.applicationId = applicationId;
}

public String getClientId() {
return clientId;
}

public void setClientId(String clientId) {
this.clientId = clientId;
}

public String getServerIpAddressAndPort() {
return serverIpAddressAndPort;
}

public void setServerIpAddressAndPort(String serverIpAddressAndPort) {
this.serverIpAddressAndPort = serverIpAddressAndPort;
}

public RateLimitEvent(String traceId, String limitType, String applicationId, String clientId, String serverIpAddressAndPort) {
this.traceId = traceId;
this.limitType = limitType;
this.applicationId = applicationId;
this.clientId = clientId;
this.serverIpAddressAndPort = serverIpAddressAndPort;
}

@Override
public String toString() {
return "RateLimitEvent{" +
"traceId='" + traceId + '\'' +
", limitType='" + limitType + '\'' +
", applicationId='" + applicationId + '\'' +
", clientId='" + clientId + '\'' +
", serverIpAddressAndPort='" + serverIpAddressAndPort + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public enum TransactionExceptionCode {
* BeginFailed
*/
BeginFailed,

/**
* Lock key conflict transaction exception code.
*/
Expand Down Expand Up @@ -140,7 +139,12 @@ public enum TransactionExceptionCode {
/**
* Broken transaction exception code.
*/
Broken;
Broken,

/**
* BeginFailedRateLimited
*/
BeginFailedRateLimited;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ public enum ResultCode {
* Success result code.
*/
// Success
Success;
Success,

/**
* Rate limited result code.
*/
RateLimited;

/**
* Get result code.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,26 @@ class ResultCodeTest {
void getByte() {
Assertions.assertEquals(ResultCode.Failed, ResultCode.get((byte) 0));
Assertions.assertEquals(ResultCode.Success, ResultCode.get((byte) 1));
Assertions.assertEquals(ResultCode.RateLimited, ResultCode.get((byte) 2));
Assertions.assertThrows(IllegalArgumentException.class, () -> {
ResultCode.get((byte) 2);
ResultCode.get((byte) 3);
});
}

@Test
void getInt() {
Assertions.assertEquals(ResultCode.Failed, ResultCode.get(0));
Assertions.assertEquals(ResultCode.Success, ResultCode.get(1));
Assertions.assertEquals(ResultCode.RateLimited, ResultCode.get(2));
Assertions.assertThrows(IllegalArgumentException.class, () -> {
ResultCode.get(2);
ResultCode.get(3);
});
}

@Test
void values() {
Assertions.assertArrayEquals(new ResultCode[]{ResultCode.Failed, ResultCode.Success}, ResultCode.values());
Assertions.assertArrayEquals(new ResultCode[]{ResultCode.Failed, ResultCode.Success, ResultCode.RateLimited},
ResultCode.values());
}

@Test
Expand Down
6 changes: 6 additions & 0 deletions dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
<sofa.registry.version>6.3.0</sofa.registry.version>
<motan.version>1.0.0</motan.version>
<jcommander.version>1.82</jcommander.version>
<bucket4j.version>8.1.0</bucket4j.version>
<commons-compress.version>1.21</commons-compress.version>
<ant.version>1.10.12</ant.version>
<lz4.version>1.7.1</lz4.version>
Expand Down Expand Up @@ -611,6 +612,11 @@
<version>${jcommander.version}</version>
</dependency>

<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j_jdk8-core</artifactId>
<version>${bucket4j.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@
*/
package org.apache.seata.integration.tx.api.interceptor.handler;

import java.lang.reflect.Method;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.eventbus.Subscribe;
import org.apache.seata.common.exception.ShouldNeverHappenException;
import org.apache.seata.common.thread.NamedThreadFactory;
Expand Down Expand Up @@ -59,6 +52,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.seata.common.DefaultValues.DEFAULT_DISABLE_GLOBAL_TRANSACTION;
import static org.apache.seata.common.DefaultValues.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
import static org.apache.seata.common.DefaultValues.DEFAULT_TM_DEGRADE_CHECK;
Expand Down Expand Up @@ -248,6 +248,10 @@ public TransactionInfo getTransactionInfo() {
succeed = false;
failureHandler.onBeginFailure(globalTransaction, cause);
throw cause;
case BeginFailedRateLimited:
succeed = false;
failureHandler.onBeginRateLimitedFailure(globalTransaction, cause);
throw cause;
case CommitFailure:
succeed = false;
failureHandler.onCommitFailure(globalTransaction, cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public interface IdConstants {

String SEATA_EXCEPTION = "seata.exception";

String SEATA_RATE_LIMIT = "seata.rate.limit";

String APP_ID_KEY = "applicationId";

String GROUP_KEY = "group";
Expand Down Expand Up @@ -79,4 +81,9 @@ public interface IdConstants {

String STATUS_VALUE_AFTER_ROLLBACKED_KEY = "AfterRollbacked";

String LIMIT_TYPE_KEY = "limitType";

String CLIENT_ID_KEY = "clientId";

String SERVER_IP_ADDRESS_AND_PORT_KEY = "serverIpAddressAndPort";
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
*/
package org.apache.seata.saga.engine.tm;

import java.util.List;

import org.apache.seata.common.exception.FrameworkErrorCode;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.exception.TransactionException;
import org.apache.seata.core.exception.TransactionExceptionCode;
import org.apache.seata.core.model.BranchStatus;
import org.apache.seata.core.model.BranchType;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.core.rpc.netty.RmNettyRemotingClient;
import org.apache.seata.core.rpc.ShutdownHook;
import org.apache.seata.core.rpc.netty.RmNettyRemotingClient;
import org.apache.seata.core.rpc.netty.TmNettyRemotingClient;
import org.apache.seata.rm.DefaultResourceManager;
import org.apache.seata.rm.RMClient;
Expand All @@ -39,7 +39,6 @@
import org.apache.seata.tm.api.transaction.TransactionHook;
import org.apache.seata.tm.api.transaction.TransactionHookManager;
import org.apache.seata.tm.api.transaction.TransactionInfo;
import org.apache.seata.common.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
Expand All @@ -49,6 +48,8 @@
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;

import java.util.List;

/**
* Template of executing business logic with a global transaction for SAGA mode
*/
Expand Down Expand Up @@ -92,6 +93,10 @@ public GlobalTransaction beginTransaction(TransactionInfo txInfo) throws Transac
tx.begin(txInfo.getTimeOut(), txInfo.getName());
triggerAfterBegin(tx);
} catch (TransactionException txe) {
if (TransactionExceptionCode.BeginFailedRateLimited.equals(txe.getCode())) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailedRateLimited);
}
throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public interface StarterConstants {


String SERVER_PREFIX = SEATA_PREFIX + ".server";
String SERVER_RATELIMIT_PREFIX = SERVER_PREFIX + ".ratelimit";
String SERVER_UNDO_PREFIX = SERVER_PREFIX + ".undo";
String SERVER_RAFT_PREFIX = SERVER_PREFIX + ".raft";
String SERVER_RECOVERY_PREFIX = SERVER_PREFIX + ".recovery";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.seata.spring.boot.autoconfigure;

import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.seata.spring.boot.autoconfigure.properties.server.ServerRateLimitProperties;
import org.apache.seata.spring.boot.autoconfigure.properties.server.store.StoreProperties;
import org.apache.seata.spring.boot.autoconfigure.properties.server.MetricsProperties;
import org.apache.seata.spring.boot.autoconfigure.properties.server.ServerProperties;
Expand All @@ -38,6 +40,7 @@
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.PROPERTY_BEAN_MAP;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RAFT_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RATELIMIT_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RECOVERY_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_UNDO_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SESSION_PREFIX;
Expand Down Expand Up @@ -81,6 +84,7 @@ public static void init() {
PROPERTY_BEAN_MAP.put(SERVER_RAFT_PREFIX, ServerRaftProperties.class);
PROPERTY_BEAN_MAP.put(SESSION_PREFIX, SessionProperties.class);
PROPERTY_BEAN_MAP.put(STORE_PREFIX, StoreProperties.class);
PROPERTY_BEAN_MAP.put(SERVER_RATELIMIT_PREFIX, ServerRateLimitProperties.class);
}
}

Expand Down
Loading
Loading