Skip to content

Commit

Permalink
Refactor LeapArray to reuse code for current bucket
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Zhao <sczyh16@gmail.com>
  • Loading branch information
sczyh30 committed Sep 12, 2018
1 parent ca2f4d9 commit 5490549
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.ReentrantLock;

import com.alibaba.csp.sentinel.util.TimeUtil;

/**
* Basic data structure for statistic metrics.
*
* @param <T> type of data wrapper
* @author jialiang.linjl
* @author Eric Zhao
Expand All @@ -32,27 +35,86 @@ public abstract class LeapArray<T> {
protected int sampleCount;
protected int intervalInMs;

protected AtomicReferenceArray<WindowWrap<T>> array;
protected final AtomicReferenceArray<WindowWrap<T>> array;

private final ReentrantLock updateLock = new ReentrantLock();

public LeapArray(int windowLength, int intervalInSec) {
this.windowLength = windowLength;
this.sampleCount = intervalInSec * 1000 / windowLength;
this.intervalInMs = intervalInSec * 1000;
this.sampleCount = intervalInMs / windowLength;

this.array = new AtomicReferenceArray<WindowWrap<T>>(sampleCount);
}

/**
* Get the window at current timestamp.
*
* @return the window at current timestamp
*/
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}

/**
* Create a new bucket.
*
* @return the new empty bucket
*/
public abstract T newEmptyBucket();

/**
* Reset current window to provided start time and reset all counters.
*
* @param startTime the start time of the window
* @param windowWrap current window
* @return new clean window wrap
*/
protected abstract WindowWrap<T> resetWindowTo(WindowWrap<T> windowWrap, long startTime);

/**
* Get window at provided timestamp.
*
* @param time a valid timestamp
* @return the window at provided timestamp
*/
abstract public WindowWrap<T> currentWindow(long time);
public WindowWrap<T> currentWindow(long time) {
long timeId = time / windowLength;
// Calculate current index.
int idx = (int)(timeId % array.length());

// Cut the time to current window start.
time = time - time % windowLength;

while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
WindowWrap<T> window = new WindowWrap<T>(windowLength, time, newEmptyBucket());
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
Thread.yield();
}
} else if (time == old.windowStart()) {
return old;
} else if (time > old.windowStart()) {
if (updateLock.tryLock()) {
try {
// if (old is deprecated) then [LOCK] resetTo currentTime.
return resetWindowTo(old, time);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}

} else if (time < old.windowStart()) {
// Cannot go through here.
return new WindowWrap<T>(windowLength, time, newEmptyBucket());
}
}
}

public WindowWrap<T> getPreviousWindow(long time) {
long timeId = (time - windowLength) / windowLength;
Expand Down Expand Up @@ -87,16 +149,12 @@ public T getWindowValue(long time) {
return old.value();
}

AtomicReferenceArray<WindowWrap<T>> array() {
return array;
}

private boolean isWindowDeprecated(WindowWrap<T> windowWrap) {
return TimeUtil.currentTimeMillis() - windowWrap.windowStart() >= intervalInMs;
}

public List<WindowWrap<T>> list() {
ArrayList<WindowWrap<T>> result = new ArrayList<WindowWrap<T>>();
List<WindowWrap<T>> result = new ArrayList<WindowWrap<T>>();

for (int i = 0; i < array.length(); i++) {
WindowWrap<T> windowWrap = array.get(i);
Expand All @@ -110,7 +168,7 @@ public List<WindowWrap<T>> list() {
}

public List<T> values() {
ArrayList<T> result = new ArrayList<T>();
List<T> result = new ArrayList<T>();

for (int i = 0; i < array.length(); i++) {
WindowWrap<T> windowWrap = array.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* @author jialiang.linjl
* @author Eric Zhao
*/
public class Window {
public class MetricBucket {

private final LongAdder pass = new LongAdder();
private final LongAdder block = new LongAdder();
Expand All @@ -33,7 +33,7 @@ public class Window {

private volatile long minRt;

public Window() {
public MetricBucket() {
initMinRt();
}

Expand All @@ -46,7 +46,7 @@ private void initMinRt() {
*
* @return new clean window
*/
public Window reset() {
public MetricBucket reset() {
pass.reset();
block.reset();
exception.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,27 @@

import com.alibaba.csp.sentinel.Constants;
import com.alibaba.csp.sentinel.node.metric.MetricNode;
import com.alibaba.csp.sentinel.slots.statistic.base.Window;
import com.alibaba.csp.sentinel.slots.statistic.base.MetricBucket;
import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;

/**
* The basic metric class in Sentinel using a {@link WindowLeapArray} internal.
* The basic metric class in Sentinel using a {@link MetricsLeapArray} internal.
*
* @author jialiang.linjl
* @author Eric Zhao
*/
public class ArrayMetric implements Metric {

private final WindowLeapArray data;
private final MetricsLeapArray data;

public ArrayMetric(int windowLength, int interval) {
this.data = new WindowLeapArray(windowLength, interval);
this.data = new MetricsLeapArray(windowLength, interval);
}

/**
* For unit test.
*/
public ArrayMetric(WindowLeapArray array) {
public ArrayMetric(MetricsLeapArray array) {
this.data = array;
}

Expand All @@ -49,8 +49,8 @@ public long success() {
data.currentWindow();
long success = 0;

List<Window> list = data.values();
for (Window window : list) {
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
success += window.success();
}
return success;
Expand All @@ -61,8 +61,8 @@ public long maxSuccess() {
data.currentWindow();
long success = 0;

List<Window> list = data.values();
for (Window window : list) {
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
if (window.success() > success) {
success = window.success();
}
Expand All @@ -74,8 +74,8 @@ public long maxSuccess() {
public long exception() {
data.currentWindow();
long exception = 0;
List<Window> list = data.values();
for (Window window : list) {
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
exception += window.exception();
}
return exception;
Expand All @@ -85,8 +85,8 @@ public long exception() {
public long block() {
data.currentWindow();
long block = 0;
List<Window> list = data.values();
for (Window window : list) {
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
block += window.block();
}
return block;
Expand All @@ -96,9 +96,9 @@ public long block() {
public long pass() {
data.currentWindow();
long pass = 0;
List<Window> list = data.values();
List<MetricBucket> list = data.values();

for (Window window : list) {
for (MetricBucket window : list) {
pass += window.pass();
}
return pass;
Expand All @@ -108,8 +108,8 @@ public long pass() {
public long rt() {
data.currentWindow();
long rt = 0;
List<Window> list = data.values();
for (Window window : list) {
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
rt += window.rt();
}
return rt;
Expand All @@ -119,8 +119,8 @@ public long rt() {
public long minRt() {
data.currentWindow();
long rt = Constants.TIME_DROP_VALVE;
List<Window> list = data.values();
for (Window window : list) {
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
if (window.minRt() < rt) {
rt = window.minRt();
}
Expand All @@ -133,7 +133,7 @@ public long minRt() {
public List<MetricNode> details() {
List<MetricNode> details = new ArrayList<MetricNode>();
data.currentWindow();
for (WindowWrap<Window> window : data.list()) {
for (WindowWrap<MetricBucket> window : data.list()) {
if (window == null) {
continue;
}
Expand All @@ -156,38 +156,38 @@ public List<MetricNode> details() {
}

@Override
public Window[] windows() {
public MetricBucket[] windows() {
data.currentWindow();
return data.values().toArray(new Window[data.values().size()]);
return data.values().toArray(new MetricBucket[data.values().size()]);
}

@Override
public void addException() {
WindowWrap<Window> wrap = data.currentWindow();
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addException();
}

@Override
public void addBlock() {
WindowWrap<Window> wrap = data.currentWindow();
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addBlock();
}

@Override
public void addSuccess() {
WindowWrap<Window> wrap = data.currentWindow();
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addSuccess();
}

@Override
public void addPass() {
WindowWrap<Window> wrap = data.currentWindow();
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass();
}

@Override
public void addRT(long rt) {
WindowWrap<Window> wrap = data.currentWindow();
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addRT(rt);
}

Expand All @@ -196,7 +196,7 @@ public void debugQps() {
data.currentWindow();
StringBuilder sb = new StringBuilder();
sb.append(Thread.currentThread().getId()).append("_");
for (WindowWrap<Window> windowWrap : data.list()) {
for (WindowWrap<MetricBucket> windowWrap : data.list()) {

sb.append(windowWrap.windowStart()).append(":").append(windowWrap.value().pass()).append(":")
.append(windowWrap.value().block());
Expand All @@ -208,7 +208,7 @@ public void debugQps() {

@Override
public long previousWindowBlock() {
WindowWrap<Window> wrap = data.currentWindow();
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap = data.getPreviousWindow();
if (wrap == null) {
return 0;
Expand All @@ -218,12 +218,11 @@ public long previousWindowBlock() {

@Override
public long previousWindowPass() {
WindowWrap<Window> wrap = data.currentWindow();
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap = data.getPreviousWindow();
if (wrap == null) {
return 0;
}
return wrap.value().pass();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.List;

import com.alibaba.csp.sentinel.node.metric.MetricNode;
import com.alibaba.csp.sentinel.slots.statistic.base.Window;
import com.alibaba.csp.sentinel.slots.statistic.base.MetricBucket;

/**
* Represents a basic structure recording invocation metrics of protected resources.
Expand Down Expand Up @@ -79,7 +79,7 @@ public interface Metric {
*
* @return window metric array
*/
Window[] windows();
MetricBucket[] windows();

/**
* Increment by one the current exception count.
Expand Down
Loading

0 comments on commit 5490549

Please sign in to comment.