Skip to content

Commit f58751a

Browse files
committed
add limiter
1 parent 5f4682b commit f58751a

File tree

6 files changed

+211
-81
lines changed

6 files changed

+211
-81
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
package cn.delei.distributed.limiter;
22

3-
import cn.delei.util.PrintUtil;
4-
53
import java.util.concurrent.atomic.AtomicInteger;
64

75
/**
86
* 固定窗口限流简单实现
97
*
108
* @author deleiguo
119
*/
12-
public class FixedWindowsRateLimiter {
10+
public class FixedWindowsRateLimiter implements IRateLimiter {
1311
/**
1412
* 初始时间
1513
*/
@@ -40,7 +38,8 @@ public FixedWindowsRateLimiter(int windowSize, int threshold) {
4038
this.counter = ZERO;
4139
}
4240

43-
public boolean tryAcquire() {
41+
@Override
42+
public synchronized boolean tryAcquire() {
4443
long now = System.currentTimeMillis();
4544
int newCount = this.counter.addAndGet(1);
4645
// 判断是否在时间窗口内
@@ -57,34 +56,5 @@ public boolean tryAcquire() {
5756
return true;
5857
}
5958
}
60-
61-
public static void main(String[] args) throws Exception {
62-
int win = 3000;
63-
int threshold = 20;
64-
// 时间窗口为1000毫秒,阀值为20
65-
FixedWindowsRateLimiter counterLimiter = new FixedWindowsRateLimiter(win, threshold);
66-
67-
int count = 0;
68-
int size = 50;
69-
PrintUtil.printTitle("01-模拟" + size + "次请求");
70-
// 模拟请求,看多少能通过
71-
for (int i = 0; i < size; i++) {
72-
if (counterLimiter.tryAcquire()) {
73-
count++;
74-
}
75-
}
76-
System.out.printf("模拟 %s 次请求,通过 %s ,限流 %s \n", size, count, (size - count));
77-
78-
//模拟时间窗口
79-
Thread.sleep(win + 200);
80-
81-
PrintUtil.printTitle("02-时间窗口过后,模拟" + size + "次请求");
82-
count = 0;
83-
for (int i = 0; i < size; i++) {
84-
if (counterLimiter.tryAcquire()) {
85-
count++;
86-
}
87-
}
88-
System.out.printf("模拟 %s 次请求,通过 %s ,限流 %s \n", size, count, (size - count));
89-
}
59+
9060
}
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
package cn.delei.distributed.limiter;
22

3-
public abstract class CustomRateLimiter {
3+
/**
4+
* 限流接口
5+
*
6+
* @author deleiguo
7+
*/
8+
public interface IRateLimiter {
49
/**
510
* 尝试获取流量
611
*
712
* @return true 表示当前流量可以放行,否则表示拒绝
813
*/
9-
protected abstract boolean tryAcquire();
14+
public boolean tryAcquire();
1015
}
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,48 @@
1-
package cn.delei.distributed.limiter;public class LeakyBucketRateLimiter {
1+
package cn.delei.distributed.limiter;
2+
3+
/**
4+
* 漏桶限流
5+
*
6+
* @author deleiguo
7+
*/
8+
public class LeakyBucketRateLimiter implements IRateLimiter {
9+
/**
10+
* 漏桶的容量
11+
*/
12+
private final int capacity;
13+
/**
14+
* 漏出速率(每秒固定的通过数量)
15+
*/
16+
private final int rate;
17+
/**
18+
* 剩余水量
19+
*/
20+
private long leftWater;
21+
/**
22+
* 上次注入时间
23+
*/
24+
private long timeStamp = System.currentTimeMillis();
25+
26+
public LeakyBucketRateLimiter(int rate, int capacity) {
27+
this.capacity = capacity;
28+
this.rate = rate;
29+
}
30+
31+
@Override
32+
public synchronized boolean tryAcquire() {
33+
// 计算剩余水量
34+
long now = System.currentTimeMillis();
35+
// 计算时间差
36+
long timeGap = (now - timeStamp) / 1000;
37+
// 计算新的剩余水量
38+
leftWater = Math.max(0, leftWater - timeGap * rate);
39+
timeStamp = now;
40+
41+
// 如果未满,则放行;否则限流
42+
if (leftWater < capacity) {
43+
leftWater += 1;
44+
return true;
45+
}
46+
return false;
47+
}
248
}
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,136 @@
11
package cn.delei.distributed.limiter;
22

3-
public class Test {
4-
5-
public void fixedTest() {
3+
import cn.delei.util.PrintUtil;
64

5+
import java.util.LinkedList;
6+
import java.util.Queue;
7+
import java.util.concurrent.ExecutorService;
8+
import java.util.concurrent.Executors;
9+
import java.util.concurrent.ScheduledExecutorService;
10+
import java.util.concurrent.TimeUnit;
11+
12+
public class RateLimiteTest {
13+
14+
public static void main(String[] args) throws Exception {
15+
// fixedTest();
16+
// slideTest();
17+
leakyBucketTest();
718
}
19+
20+
/**
21+
* 固定窗口限流
22+
*/
23+
static void fixedTest() throws Exception {
24+
int win = 3000;
25+
int threshold = 20;
26+
// 时间窗口为1000毫秒,阀值为20
27+
FixedWindowsRateLimiter counterLimiter = new FixedWindowsRateLimiter(win, threshold);
28+
29+
// 统计通过数量
30+
int count = 0;
31+
32+
// 模拟请求
33+
int size = 50;
34+
PrintUtil.printTitle("01-模拟" + size + "次请求");
35+
for (int i = 0; i < size; i++) {
36+
if (counterLimiter.tryAcquire()) {
37+
count++;
38+
}
39+
}
40+
System.out.printf("模拟 %s 次请求,通过 %s ,限流 %s \n", size, count, (size - count));
41+
42+
//模拟时间窗口
43+
Thread.sleep(win + 200);
44+
45+
PrintUtil.printTitle("02-时间窗口过后,模拟" + size + "次请求");
46+
count = 0;
47+
for (int i = 0; i < size; i++) {
48+
if (counterLimiter.tryAcquire()) {
49+
count++;
50+
}
51+
}
52+
System.out.printf("模拟 %s 次请求,通过 %s ,限流 %s \n", size, count, (size - count));
53+
}
54+
55+
/**
56+
* 滑动窗口限流
57+
*/
58+
static void slideTest() throws Exception {
59+
int limit = 20;
60+
SildeWindowRateLimiter limiter = new SildeWindowRateLimiter(1000, limit, 10);
61+
62+
Thread.sleep(3000);
63+
64+
int count = 0;
65+
int failCount = 0;
66+
int size = 50;
67+
int group = 100;
68+
69+
PrintUtil.printTitle("模拟100组,每次间隔150ms,50次请求");
70+
for (int j = 0; j < group; j++) {
71+
count = 0;
72+
for (int i = 0; i < size; i++) {
73+
if (limiter.tryAcquire()) {
74+
count++;
75+
}
76+
}
77+
Thread.sleep(150);
78+
79+
// 模拟请求,看多少能通过
80+
for (int i = 0; i < size; i++) {
81+
if (limiter.tryAcquire()) {
82+
count++;
83+
}
84+
}
85+
if (count > limit) {
86+
System.out.println("时间窗口内放过的请求超过阈值,放过的请求数" + count + ",限流:" + limit);
87+
failCount++;
88+
}
89+
Thread.sleep((int) (Math.random() * 100));
90+
}
91+
System.out.printf("模拟 %s 次请求,限流失败组数 %s \n", size, failCount);
92+
}
93+
94+
95+
private static void leakyBucketTest() throws Exception {
96+
LeakyBucketRateLimiter rateLimiter = new LeakyBucketRateLimiter(20, 20);
97+
98+
ExecutorService singleThread = Executors.newSingleThreadExecutor();
99+
// 存储流量的队列
100+
// 当 tryAcquire 返回 true 时,将请求入队,然后再以固定频率从队列中取出请求进行处理
101+
Queue<Integer> queue = new LinkedList<>();
102+
// 模拟请求 不确定速率注水
103+
singleThread.execute(() -> {
104+
int count = 0;
105+
while (true) {
106+
count++;
107+
boolean flag = rateLimiter.tryAcquire();
108+
if (flag) {
109+
queue.offer(count);
110+
System.out.println(count + "--------流量被放行--------");
111+
} else {
112+
System.out.println(count + "流量被限制");
113+
}
114+
try {
115+
Thread.sleep((long) (Math.random() * 1000));
116+
} catch (InterruptedException e) {
117+
e.printStackTrace();
118+
}
119+
}
120+
});
121+
122+
// 模拟处理请求 固定速率漏水
123+
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
124+
scheduledExecutorService.scheduleAtFixedRate(() -> {
125+
if (!queue.isEmpty()) {
126+
System.out.println(queue.poll() + "被处理");
127+
}
128+
}, 0, 100, TimeUnit.MILLISECONDS);
129+
130+
// 保证主线程不会退出
131+
// while (true) {
132+
// Thread.sleep(10000);
133+
// }
134+
}
135+
8136
}
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package cn.delei.distributed.limiter;
22

3-
import cn.delei.util.PrintUtil;
4-
53
import java.util.concurrent.atomic.AtomicInteger;
64

75

@@ -10,7 +8,7 @@
108
*
119
* @author deleiguo
1210
*/
13-
public class SildeWindowRateLimiter {
11+
public class SildeWindowRateLimiter implements IRateLimiter {
1412
/**
1513
* 窗口大小,单位为毫秒
1614
*/
@@ -48,7 +46,8 @@ public SildeWindowRateLimiter(int windowSize, int limit, int splitNum) {
4846
startTime = System.currentTimeMillis();
4947
}
5048

51-
public boolean tryAcquire() {
49+
@Override
50+
public synchronized boolean tryAcquire() {
5251
long now = System.currentTimeMillis();
5352

5453
// 计算滑动小窗口的数量
@@ -57,9 +56,12 @@ public boolean tryAcquire() {
5756
slideWindow(windowsNum);
5857

5958
int count = 0;
59+
// 所有小窗口的计数总和
6060
for (int i = 0; i < splitNum; i++) {
6161
count += counters[i].get();
6262
}
63+
64+
// 如果计数总和超过阀值不可放行
6365
if (count >= limit) {
6466
return false;
6567
} else {
@@ -74,8 +76,9 @@ public boolean tryAcquire() {
7476
* @param windowsNum
7577
*/
7678
private void slideWindow(long windowsNum) {
77-
if (windowsNum == 0)
79+
if (windowsNum == 0) {
7880
return;
81+
}
7982
long slideNum = Math.min(windowsNum, splitNum);
8083
for (int i = 0; i < slideNum; i++) {
8184
index = (index + 1) % splitNum;
@@ -85,40 +88,4 @@ private void slideWindow(long windowsNum) {
8588
startTime = startTime + windowsNum * (windowSize / splitNum);
8689
}
8790

88-
public static void main(String[] args) throws Exception {
89-
int limit = 20;
90-
SildeWindowRateLimiter limiter = new SildeWindowRateLimiter(1000, limit, 10);
91-
92-
Thread.sleep(3000);
93-
94-
int count = 0;
95-
int failCount = 0;
96-
int size = 50;
97-
int group = 100;
98-
99-
PrintUtil.printTitle("模拟100组,每次间隔150ms,50次请求");
100-
for (int j = 0; j < group; j++) {
101-
count = 0;
102-
for (int i = 0; i < size; i++) {
103-
if (limiter.tryAcquire()) {
104-
count++;
105-
}
106-
}
107-
Thread.sleep(150);
108-
109-
// 模拟请求,看多少能通过
110-
for (int i = 0; i < size; i++) {
111-
if (limiter.tryAcquire()) {
112-
count++;
113-
}
114-
}
115-
if (count > limit) {
116-
System.out.println("时间窗口内放过的请求超过阈值,放过的请求数" + count + ",限流:" + limit);
117-
failCount++;
118-
}
119-
Thread.sleep((int) (Math.random() * 100));
120-
}
121-
System.out.printf("模拟 %s 次请求,限流失败组数 %s \n", size, failCount);
122-
}
123-
12491
}

source/java/interview/delei-interview-google-guava/src/main/java/cn/delei/guava/JoinerTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,19 @@ public void joinTest() {
1313
List<String> dataList = Arrays.asList("aa", "bb", "cc", null, "dd", "ee", null, "ff");
1414
System.out.println(Joiner.on("-").skipNulls().join(dataList));
1515
System.out.println(Joiner.on("-").useForNull("NVL").join(dataList));
16+
17+
System.out.println("if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
18+
"return nil;" +
19+
"end; " +
20+
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
21+
"if (counter > 0) then " +
22+
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
23+
"return 0; " +
24+
"else " +
25+
"redis.call('del', KEYS[1]); " +
26+
"redis.call('publish', KEYS[2], ARGV[1]); " +
27+
"return 1; " +
28+
"end; " +
29+
"return nil;");
1630
}
1731
}

0 commit comments

Comments
 (0)