Skip to content

Commit fdaafd0

Browse files
author
THAVEAU Alexis
committed
feat(rate-limiter): cleanup
1 parent 2ba6ce1 commit fdaafd0

File tree

13 files changed

+376
-186
lines changed

13 files changed

+376
-186
lines changed

lua/dshm.lua

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ local function escape(key)
2828
return table.concat(result, ":")
2929
end
3030

31-
function _M.new(self, opts)
31+
function _M.new(_, opts)
3232
local sock, err = tcp()
3333
if not sock then
3434
return nil, err
@@ -88,7 +88,8 @@ local function read_response_line(self, data)
8888
ngx.log(ngx.DEBUG, "PROTOCOL ERROR")
8989
return nil, _M.PROTOCOL_ERROR
9090
else
91-
local data, err = sock:receive(data)
91+
local err
92+
data, err = sock:receive(data)
9293
if not data then
9394
if err == "timeout" then
9495
sock:close()
@@ -97,7 +98,8 @@ local function read_response_line(self, data)
9798
end
9899
ngx.log(ngx.DEBUG, "RECEIVE : ", data)
99100
-- Discard trailing \r\n
100-
local trail, err = sock:receive()
101+
local trail
102+
trail, err = sock:receive()
101103
if not trail then
102104
if err == "timeout" then
103105
sock:close()
@@ -128,7 +130,7 @@ function _M.get(self, key)
128130
if resp == "LEN" then
129131
resp, data = read_response_line(self, data)
130132
if resp == "DATA" then
131-
local resp, _ = read_response_line(self)
133+
resp = read_response_line(self)
132134
if resp == "DONE" then
133135
return data, nil
134136
else
@@ -203,7 +205,7 @@ function _M.incr(self, key, value, init, init_ttl)
203205
if resp == "LEN" then
204206
resp, data = read_response_line(self, data)
205207
if resp == "DATA" then
206-
local resp, _ = read_response_line(self)
208+
resp = read_response_line(self)
207209
if resp == "DONE" then
208210
return data, nil
209211
else
@@ -247,7 +249,7 @@ function _M.set(self, key, value, exptime)
247249
if resp == "LEN" then
248250
resp, data = read_response_line(self, data)
249251
if resp == "DATA" then
250-
local resp, _ = read_response_line(self)
252+
resp = read_response_line(self)
251253
if resp == "DONE" then
252254
return data, nil
253255
else
@@ -297,6 +299,58 @@ function _M.touch(self, key, exptime)
297299

298300
end
299301

302+
---
303+
--- Allow to manage rate limit on sliding window. Rate limiter will try to 'consume' a token and return the remaining token available.
304+
--- If the quota is exceeded, this method will return nil, "rejected"
305+
--- Otherwhise return the remaining token available in the window
306+
---
307+
---@param self self the dshm instance
308+
---@param self string the key
309+
---@param capacity number the tokens capacity
310+
---@param duration number the sliding window in seconds
311+
---@return number the remaining tokens available or nil if quota is exceeded
312+
---@return string nil or error. Error code is rejected when quota is exceeded
313+
---
314+
function _M.rate_limiter(self, key, capacity, duration)
315+
316+
ngx.log(ngx.DEBUG, "rate_limiter : ", key, ", capacity : ", capacity, ", duration : ", duration)
317+
318+
local sock = self.sock
319+
if not sock then
320+
return nil, "not initialized"
321+
end
322+
323+
local command = "rate_limiter " .. self.escape_key(key) .. " " .. capacity .. " " .. duration .. "\r\n"
324+
local bytes, err = sock:send(command)
325+
if not bytes then
326+
return nil, err
327+
end
328+
329+
local resp, data = read_response_line(self)
330+
if resp == "LEN" then
331+
resp, data = read_response_line(self, data)
332+
if resp == "DATA" then
333+
resp, _ = read_response_line(self)
334+
if resp == "DONE" then
335+
if data == "-1" then
336+
return nil, "rejected"
337+
else
338+
return data, nil
339+
end
340+
else
341+
return nil, _M.PROTOCOL_ERROR
342+
end
343+
else
344+
return nil, _M.PROTOCOL_ERROR
345+
end
346+
elseif resp == "ERROR" then
347+
return nil, data
348+
else
349+
return nil, _M.PROTOCOL_ERROR
350+
end
351+
352+
end
353+
300354
function _M.quit(self)
301355
local sock = self.sock
302356
if not sock then

src/main/java/io/github/grrolland/hcshm/ShmService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.github.grrolland.hcshm.processor.RateLimiterProcessor;
2424
import io.github.grrolland.hcshm.processor.TouchProcessor;
2525
import io.github.grrolland.hcshm.ratelimiter.ConsumptionProbe;
26-
import io.github.grrolland.hcshm.ratelimiter.RateLimiterValue;
26+
import io.github.grrolland.hcshm.ratelimiter.RateLimiterShmValue;
2727

2828
import java.time.Duration;
2929
import java.util.concurrent.TimeUnit;
@@ -158,11 +158,11 @@ public void flushall(String region) {
158158
* Consume a token
159159
* @param key the key
160160
* @param capacity the maximum capacity
161-
* @param duration the duration of a token
161+
* @param duration the duration of a token in seconds
162162
* @return the number of tokens remaining
163163
*/
164164
public String rateLimiter(String key, int capacity, long duration) {
165-
final IMap<String, RateLimiterValue> map = regionLocator.getMap(hazelcast, key);
165+
final IMap<String, RateLimiterShmValue> map = regionLocator.getMap(hazelcast, key);
166166
RateLimiterProcessor rateLimiterProcessor = new RateLimiterProcessor(capacity, Duration.ofMillis(duration));
167167
ConsumptionProbe consumptionProbe = (ConsumptionProbe) map.executeOnKey(key, rateLimiterProcessor);
168168
return String.valueOf(consumptionProbe.getRemainingTokens());

src/main/java/io/github/grrolland/hcshm/commands/RateLimiterCommand.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ public String execute(String[] commandTokens) {
4949
assertTokens(commandTokens, 4);
5050
String key = getKey(commandTokens[1]);
5151
int capacity = getIncrValue(commandTokens[2]);
52-
long expire = getExpire(commandTokens[3]);
53-
String value = getService().rateLimiter(key, capacity, expire);
52+
long duration = getExpire(commandTokens[3]);
53+
String value = getService().rateLimiter(key, capacity, duration);
5454
writeLen(response, value);
5555
writeValue(response, value);
5656
writeDone(response);

src/main/java/io/github/grrolland/hcshm/processor/RateLimiterProcessor.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import com.hazelcast.map.EntryProcessor;
44
import io.github.grrolland.hcshm.ratelimiter.ConsumptionProbe;
5-
import io.github.grrolland.hcshm.ratelimiter.RateLimiterValue;
5+
import io.github.grrolland.hcshm.ratelimiter.RateLimiterShmValue;
66

77
import java.io.Serializable;
88
import java.time.Duration;
@@ -12,7 +12,7 @@
1212
/**
1313
* Processor for the RATE_LIMITER command
1414
*/
15-
public class RateLimiterProcessor implements EntryProcessor<String, RateLimiterValue, Object>, Serializable {
15+
public class RateLimiterProcessor implements EntryProcessor<String, RateLimiterShmValue, Object>, Serializable {
1616
/**
1717
* Capacity
1818
*/
@@ -37,10 +37,12 @@ public RateLimiterProcessor(int capacity, Duration duration) {
3737
}
3838

3939
@Override
40-
public ConsumptionProbe process(final Map.Entry<String, RateLimiterValue> entry) {
41-
RateLimiterValue rateLimiterValue = Optional.ofNullable(getCurrentValue(entry)).orElseGet(this::create);
42-
final ConsumptionProbe consumptionProbe = rateLimiterValue.use();
43-
entry.setValue(rateLimiterValue);
40+
public ConsumptionProbe process(final Map.Entry<String, RateLimiterShmValue> entry) {
41+
RateLimiterShmValue rateLimiterShmValue = Optional.ofNullable(getCurrentValue(entry)).orElseGet(this::create);
42+
rateLimiterShmValue.setDuration(this.duration);
43+
rateLimiterShmValue.setCapacity(this.capacity);
44+
final ConsumptionProbe consumptionProbe = rateLimiterShmValue.take();
45+
entry.setValue(rateLimiterShmValue);
4446
return consumptionProbe;
4547
}
4648

@@ -53,16 +55,16 @@ public ConsumptionProbe process(final Map.Entry<String, RateLimiterValue> entry)
5355
* @throws BadRequestException
5456
* exception
5557
*/
56-
private RateLimiterValue getCurrentValue(final Map.Entry<String, RateLimiterValue> entry) throws BadRequestException {
58+
private RateLimiterShmValue getCurrentValue(final Map.Entry<String, RateLimiterShmValue> entry) throws BadRequestException {
5759
try {
5860
return entry.getValue();
5961
} catch (ClassCastException e) {
6062
throw new BadRequestException(e);
6163
}
6264
}
6365

64-
private RateLimiterValue create() {
65-
return new RateLimiterValue(this.capacity, this.duration);
66+
private RateLimiterShmValue create() {
67+
return new RateLimiterShmValue(this.capacity, this.duration);
6668
}
6769

6870
}
Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,18 @@
11
package io.github.grrolland.hcshm.ratelimiter;
22

33
/**
4-
* Describes token consumed, and tokens remaining
4+
* Describes token consumed, and number of tokens remaining
5+
* <p>
6+
* remainingTokens -1 means no tokens have been consumed
7+
* remainingTokens 0 means there is no more tokens to consume
58
*/
69
public class ConsumptionProbe {
7-
/**
8-
* Token has been consumed
9-
*/
10-
private final boolean consumed;
1110

1211
/**
1312
* Number of remaining tokens
1413
*/
1514
private final int remainingTokens;
1615

17-
/**
18-
* return the flag consumed
19-
*
20-
* @return true if token was consumed
21-
*/
22-
public boolean isConsumed() {
23-
return this.consumed;
24-
}
25-
2616
/**
2717
* Return the remaining records
2818
*
@@ -36,13 +26,10 @@ public int getRemainingTokens() {
3626
/**
3727
* Constructor
3828
*
39-
* @param consumed
40-
* true if token has been consumer
4129
* @param remainingTokens
4230
* the number of remaining token
4331
*/
44-
ConsumptionProbe(boolean consumed, int remainingTokens) {
45-
this.consumed = consumed;
32+
ConsumptionProbe(int remainingTokens) {
4633
this.remainingTokens = remainingTokens;
4734
}
4835
}

src/main/java/io/github/grrolland/hcshm/ratelimiter/RateLimiterValue.java renamed to src/main/java/io/github/grrolland/hcshm/ratelimiter/RateLimiterShmValue.java

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,30 +7,50 @@
77
import java.util.List;
88

99
/**
10-
* RateLimiterValue
10+
* RateLimiterShmValue store rate limiter data
1111
*/
12-
public class RateLimiterValue extends AbstractShmValue {
12+
public class RateLimiterShmValue extends AbstractShmValue {
1313

1414
/**
1515
* The current consumption records
1616
*/
17-
private final List<Record> records;
17+
private final List<Token> tokens;
1818
/**
1919
* Sliding Window duration
2020
*/
21-
private final Duration duration;
21+
private Duration duration;
2222
/**
2323
* The capacity
2424
*/
25-
private final int capacity;
25+
private int capacity;
26+
27+
/**
28+
* Set capacity
29+
*
30+
* @param pCapacity
31+
* capacity
32+
*/
33+
public void setCapacity(final int pCapacity) {
34+
this.capacity = pCapacity;
35+
}
36+
37+
/**
38+
* Set duration
39+
*
40+
* @param pDuration
41+
* duration
42+
*/
43+
public void setDuration(final Duration pDuration) {
44+
this.duration = pDuration;
45+
}
2646

2747
/**
2848
* Get the remaining records before capacity is exceeded
2949
*
3050
* @return the remaining records
3151
*/
3252
public int getRemaining() {
33-
return this.capacity - this.records.size();
53+
return Math.max(this.capacity - this.tokens.size(), 0);
3454
}
3555

3656
@Override
@@ -47,28 +67,28 @@ public String getValue() {
4767
* @param duration
4868
* the sliding window duration
4969
*/
50-
public RateLimiterValue(int capacity, Duration duration) {
51-
this.records = new ArrayList<>(capacity);
70+
public RateLimiterShmValue(int capacity, Duration duration) {
71+
this.tokens = new ArrayList<>(capacity);
5272
this.duration = duration;
5373
this.capacity = capacity;
5474
}
5575

5676
/**
57-
* Consume and return the ConsumptionProbe
77+
* Try to take a token and return the ConsumptionProbe
5878
*
59-
* @return ConsumptionProbe
79+
* @return the ConsumptionProbe
6080
*/
61-
public ConsumptionProbe use() {
81+
public ConsumptionProbe take() {
6282
// Clear expired tokens
6383
this.clearExpired();
6484

65-
boolean consumed = false;
6685
int remaining = -1;
67-
if (canConsume()) {
68-
consumed = records.add(new Record());
86+
// Try to consume
87+
if (this.canConsume()) {
88+
tokens.add(new Token());
6989
remaining = this.getRemaining();
7090
}
71-
return new ConsumptionProbe(consumed, remaining);
91+
return new ConsumptionProbe(remaining);
7292
}
7393

7494
/**
@@ -77,13 +97,14 @@ public ConsumptionProbe use() {
7797
* @return true if at least one token is available
7898
*/
7999
private boolean canConsume() {
80-
return this.records.size() < this.capacity;
100+
return this.tokens.size() < this.capacity;
81101
}
82102

83103
/**
84104
* Clear expired tokens
85105
*/
86106
private void clearExpired() {
87-
records.removeIf(pRecord -> pRecord.isExpired(this.duration));
107+
108+
tokens.removeIf(pToken -> pToken.isExpired(this.duration));
88109
}
89110
}

0 commit comments

Comments
 (0)