Skip to content

Commit

Permalink
Optimize for statistic data structures (alibaba#47)
Browse files Browse the repository at this point in the history
* Optimize for leap array

- Fix a bug: old position is not cleaned when inserting into a new (empty) position
- Reuse buckets for optimization
- The strategy is now changed: deprecated buckets will not be reset until newer time triggered. LeapArray is responsible for filtering the deprecated buckets (e.g. in `list` or `values`)
- Update test cases

Signed-off-by: Eric Zhao <sczyh16@gmail.com>
  • Loading branch information
sczyh30 authored Aug 8, 2018
1 parent b51c3ad commit a65d160
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
/**
* @param <T> type of data wrapper
* @author jialiang.linjl
* @author Eric Zhao
*/
public abstract class LeapArray<T> {

Expand All @@ -45,6 +46,12 @@ public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}

/**
* Get window at provided timestamp.
*
* @param time a valid timestamp
* @return the window at provided timestamp
*/
abstract public WindowWrap<T> currentWindow(long time);

public WindowWrap<T> getPreviousWindow(long time) {
Expand All @@ -53,8 +60,8 @@ public WindowWrap<T> getPreviousWindow(long time) {
time = time - windowLength;
WindowWrap<T> wrap = array.get(idx);

if (wrap == null) {
return wrap;
if (wrap == null || isWindowDeprecated(wrap)) {
return null;
}

if (wrap.windowStart() + windowLength < (time)) {
Expand All @@ -73,23 +80,27 @@ public T getWindowValue(long time) {
int idx = (int)(timeId % array.length());

WindowWrap<T> old = array.get(idx);
if (old == null) {
if (old == null || isWindowDeprecated(old)) {
return null;
}

return old.value();
}

public AtomicReferenceArray<WindowWrap<T>> array() {
AtomicReferenceArray<WindowWrap<T>> array() {
return array;
}

private boolean isWindowDeprecated(WindowWrap<T> windowWrap) {
return TimeUtil.currentTimeMillis() - windowWrap.windowStart() >= intervalInMs;
}

public List<WindowWrap<T>> list() {
ArrayList<WindowWrap<T>> result = new ArrayList<WindowWrap<T>>();

for (int i = 0; i < array.length(); i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null) {
if (windowWrap == null || isWindowDeprecated(windowWrap)) {
continue;
}
result.add(windowWrap);
Expand All @@ -103,7 +114,7 @@ public List<T> values() {

for (int i = 0; i < array.length(); i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null) {
if (windowWrap == null || isWindowDeprecated(windowWrap)) {
continue;
}
result.add(windowWrap.value());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,26 @@ public class Window {
private final LongAdder minRt = new LongAdder();

public Window() {
initMinRt();
}

private void initMinRt() {
minRt.add(4900);
}

/**
* Clean the adders and reset window to provided start time.
*
* @param startTime the start time of the window
* @return new clean window
*/
Window resetTo(long startTime) {
public Window reset() {
pass.reset();
block.reset();
exception.reset();
rt.reset();
success.reset();
minRt.reset();
initMinRt();
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,18 @@ public T value() {
public void setValue(T value) {
this.value = value;
}

public WindowWrap<T> resetTo(long startTime) {
this.windowStart = startTime;
return this;
}

@Override
public String toString() {
return "WindowWrap{" +
"windowLength=" + windowLength +
", windowStart=" + windowStart +
", value=" + value +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,26 @@
*/
public class WindowLeapArray extends LeapArray<Window> {

private final int timeLength;

public WindowLeapArray(int windowLengthInMs, int intervalInSec) {
super(windowLengthInMs, intervalInSec);
timeLength = intervalInSec * 1000;
}

private ReentrantLock addLock = new ReentrantLock();

/**
* Reset current window to provided start time and reset all counters.
*
* @param startTime the start time of the window
* @return new clean window wrap
*/
private WindowWrap<Window> resetWindowTo(WindowWrap<Window> w, long startTime) {
w.resetTo(startTime);
w.value().reset();
return w;
}

@Override
public WindowWrap<Window> currentWindow(long time) {

long timeId = time / windowLength;
// Calculate current index.
int idx = (int)(timeId % array.length());
Expand All @@ -62,29 +70,17 @@ public WindowWrap<Window> currentWindow(long time) {
} else if (time > old.windowStart()) {
if (addLock.tryLock()) {
try {
WindowWrap<Window> window = new WindowWrap<Window>(windowLength, time, new Window());
if (array.compareAndSet(idx, old, window)) {
for (int i = 0; i < array.length(); i++) {
WindowWrap<Window> tmp = array.get(i);
if (tmp == null) {
continue;
} else {
if (tmp.windowStart() < time - timeLength) {
array.set(i, null);
}
}
}
return window;
}
// if (old is deprecated) then [LOCK] resetTo currentTime.
return resetWindowTo(old, time);
} finally {
addLock.unlock();
}

} else {
Thread.yield();
}

} else if (time < old.windowStart()) {
// Cannot go through here.
return new WindowWrap<Window>(windowLength, time, new Window());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void testWindowAfterOneInterval() {
assertEquals(0L, currentWindow.block());
}

@Test
@Deprecated
public void testWindowDeprecatedRefresh() {
WindowLeapArray leapArray = new WindowLeapArray(windowLengthInMs, intervalInSec);
final int len = intervalInSec * 1000 / windowLengthInMs;
Expand Down Expand Up @@ -160,7 +160,34 @@ public void testGetPreviousWindow() {
}

@Test
public void testListWindows() {
public void testListWindowsResetOld() throws Exception {
final int windowLengthInMs = 100;
final int intervalInSec = 1;
final int intervalInMs = intervalInSec * 1000;

WindowLeapArray leapArray = new WindowLeapArray(windowLengthInMs, intervalInSec);
long time = TimeUtil.currentTimeMillis();

Set<WindowWrap<Window>> windowWraps = new HashSet<WindowWrap<Window>>();

windowWraps.add(leapArray.currentWindow(time));
windowWraps.add(leapArray.currentWindow(time + windowLengthInMs));

List<WindowWrap<Window>> list = leapArray.list();
for (WindowWrap<Window> wrap : list) {
assertTrue(windowWraps.contains(wrap));
}

Thread.sleep(windowLengthInMs + intervalInMs);

// This will replace the deprecated bucket, so all deprecated buckets will be reset.
leapArray.currentWindow(time + windowLengthInMs + intervalInMs).value().addPass();

assertEquals(1, leapArray.list().size());
}

@Test
public void testListWindowsNewBucket() throws Exception {
final int windowLengthInMs = 100;
final int intervalInSec = 1;

Expand All @@ -172,12 +199,16 @@ public void testListWindows() {
windowWraps.add(leapArray.currentWindow(time));
windowWraps.add(leapArray.currentWindow(time + windowLengthInMs));

Thread.sleep(intervalInSec * 1000 + windowLengthInMs * 3);

List<WindowWrap<Window>> list = leapArray.list();
for (WindowWrap<Window> wrap : list) {
assertTrue(windowWraps.contains(wrap));
}

leapArray.currentWindow(time + windowLengthInMs * 20 + intervalInSec * 1000).value().addPass();
// This won't hit deprecated bucket, so no deprecated buckets will be reset.
// But deprecated buckets can be filtered when collecting list.
leapArray.currentWindow(TimeUtil.currentTimeMillis()).value().addPass();

assertEquals(1, leapArray.list().size());
}
Expand Down

0 comments on commit a65d160

Please sign in to comment.