Skip to content

Commit

Permalink
Merge pull request GoogleCloudPlatform#184 from mshields822/beam-162
Browse files Browse the repository at this point in the history
Port incubator-beam/pull/118
  • Loading branch information
davorbonaci committed Apr 7, 2016
2 parents bf6536a + 31b5052 commit 697c5fb
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;

import com.google.common.annotations.VisibleForTesting;
import java.util.Collection;
import java.util.Set;

Expand Down Expand Up @@ -92,9 +93,10 @@ void onMerge(Collection<W> toBeMerged, Collection<W> activeToBeMerged, W mergeRe
}

/**
* Remove EPHEMERAL windows since we only need to know about them while processing new elements.
* Remove EPHEMERAL windows and remaining NEW windows since we only need to know about them
* while processing new elements.
*/
void removeEphemeralWindows();
void cleanupTemporaryWindows();

/**
* Save any state changes needed.
Expand All @@ -107,7 +109,7 @@ void onMerge(Collection<W> toBeMerged, Collection<W> activeToBeMerged, W mergeRe
* yet been seen.
*/
@Nullable
W representative(W window);
W mergeResultWindow(W window);

/**
* Return (a view of) the set of currently ACTIVE windows.
Expand All @@ -120,16 +122,23 @@ void onMerge(Collection<W> toBeMerged, Collection<W> activeToBeMerged, W mergeRe
boolean isActive(W window);

/**
* If {@code window} is not already known to be ACTIVE, MERGED or EPHEMERAL then add it
* as NEW. All NEW windows will be accounted for as ACTIVE, MERGED or EPHEMERAL by a call
* to {@link #merge}.
* Called when an incoming element indicates it is a member of {@code window}, but before we
* have started processing that element. If {@code window} is not already known to be ACTIVE,
* MERGED or EPHEMERAL then add it as NEW.
*/
void ensureWindowExists(W window);

/**
* Called when a NEW or ACTIVE window is now known to be ACTIVE.
* Ensure that if it is NEW then it becomes ACTIVE (with itself as its only state address window).
*/
void addNew(W window);
void ensureWindowIsActive(W window);

/**
* If {@code window} is not already known to be ACTIVE, MERGED or EPHEMERAL then add it
* as ACTIVE.
*/
@VisibleForTesting
void addActive(W window);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -72,6 +73,9 @@
*/
public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> {
private final WindowFn<Object, W> windowFn;
/**
* Map ACTIVE and NEW windows to their state address windows. Persisted.
*/
private final Map<W, Set<W>> activeWindowToStateAddressWindows;

/**
Expand All @@ -80,10 +84,8 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
private final Map<W, Set<W>> activeWindowToEphemeralWindows;

/**
* A map from window to the ACTIVE window it has been merged into.
*
* <p>Does not need to be persisted.
*
* A map from window to the ACTIVE window it has been merged into. Does not need to be persisted.
* <p>
* <ul>
* <li>Key window may be ACTIVE, MERGED or EPHEMERAL.
* <li>ACTIVE windows map to themselves.
Expand All @@ -96,7 +98,7 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi

/**
* Deep clone of {@link #activeWindowToStateAddressWindows} as of last commit.
*
* <p>
* <p>Used to avoid writing to state if no changes have been made during the work unit.
*/
private final Map<W, Set<W>> originalActiveWindowToStateAddressWindows;
Expand All @@ -122,7 +124,19 @@ public MergingActiveWindowSet(WindowFn<Object, W> windowFn, StateInternals<?> st
}

@Override
public void removeEphemeralWindows() {
public void cleanupTemporaryWindows() {
// All NEW windows can be forgotten.
Iterator<Map.Entry<W, Set<W>>> iter =
activeWindowToStateAddressWindows.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<W, Set<W>> entry = iter.next();
if (entry.getValue().isEmpty()) {
windowToActiveWindow.remove(entry.getKey());
iter.remove();
}
}

// All EPHEMERAL windows can be forgotten.
for (Map.Entry<W, Set<W>> entry : activeWindowToEphemeralWindows.entrySet()) {
for (W ephemeral : entry.getValue()) {
windowToActiveWindow.remove(ephemeral);
Expand Down Expand Up @@ -158,7 +172,7 @@ public void persist() {

@Override
@Nullable
public W representative(W window) {
public W mergeResultWindow(W window) {
return windowToActiveWindow.get(window);
}

Expand All @@ -173,13 +187,30 @@ public boolean isActive(W window) {
}

@Override
public void addNew(W window) {
public void ensureWindowExists(W window) {
if (!windowToActiveWindow.containsKey(window)) {
Preconditions.checkState(!activeWindowToStateAddressWindows.containsKey(window));
activeWindowToStateAddressWindows.put(window, new LinkedHashSet<W>());
windowToActiveWindow.put(window, window);
}
}

@Override
public void ensureWindowIsActive(W window) {
Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
Preconditions.checkState(stateAddressWindows != null,
"Cannot ensure window %s is active since it is neither ACTIVE nor NEW",
window);
if (stateAddressWindows.isEmpty()) {
// Window was NEW, make it ACTIVE.
Preconditions.checkState(windowToActiveWindow.containsKey(window)
&& windowToActiveWindow.get(window).equals(window));
stateAddressWindows.add(window);
}
}

@Override
@VisibleForTesting
public void addActive(W window) {
if (!windowToActiveWindow.containsKey(window)) {
Set<W> stateAddressWindows = new LinkedHashSet<>();
Expand All @@ -191,21 +222,17 @@ public void addActive(W window) {

@Override
public void remove(W window) {
Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
if (stateAddressWindows == null) {
// Window is no longer active.
return;
}
for (W stateAddressWindow : stateAddressWindows) {
windowToActiveWindow.remove(stateAddressWindow);
Set<W> stateAddressWindows = activeWindowToStateAddressWindows.remove(window);
if (stateAddressWindows != null) {
for (W stateAddressWindow : stateAddressWindows) {
windowToActiveWindow.remove(stateAddressWindow);
}
}
activeWindowToStateAddressWindows.remove(window);
Set<W> ephemeralWindows = activeWindowToEphemeralWindows.get(window);
Set<W> ephemeralWindows = activeWindowToEphemeralWindows.remove(window);
if (ephemeralWindows != null) {
for (W ephemeralWindow : ephemeralWindows) {
windowToActiveWindow.remove(ephemeralWindow);
}
activeWindowToEphemeralWindows.remove(window);
}
windowToActiveWindow.remove(window);
}
Expand Down Expand Up @@ -291,15 +318,6 @@ public void merge(MergeCallback<W> mergeCallback) throws Exception {
// Actually do the merging and invoke the callbacks.
context.recordMerges();

// Any remaining NEW windows should become implicitly ACTIVE.
for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) {
if (entry.getValue().isEmpty()) {
// This window was NEW but since it survived merging must now become ACTIVE.
W window = entry.getKey();
entry.getValue().add(window);
windowToActiveWindow.put(window, window);
}
}
}

/**
Expand Down Expand Up @@ -433,12 +451,15 @@ public void checkInvariants() {
for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) {
W active = entry.getKey();
Preconditions.checkState(!entry.getValue().isEmpty(),
"Unexpected empty state address window set for ACTIVE window %s", active);
"Unexpected empty state address window set for ACTIVE window %s",
active);
for (W stateAddressWindow : entry.getValue()) {
Preconditions.checkState(knownStateAddressWindows.add(stateAddressWindow),
"%s is in more than one state address window set", stateAddressWindow);
"%s is in more than one state address window set",
stateAddressWindow);
Preconditions.checkState(active.equals(windowToActiveWindow.get(stateAddressWindow)),
"%s should have %s as its ACTIVE window", stateAddressWindow, active);
"%s should have %s as its ACTIVE window", stateAddressWindow,
active);
}
}
for (Map.Entry<W, Set<W>> entry : activeWindowToEphemeralWindows.entrySet()) {
Expand All @@ -449,14 +470,16 @@ public void checkInvariants() {
!entry.getValue().isEmpty(), "Unexpected empty EPHEMERAL set for %s", active);
for (W ephemeralWindow : entry.getValue()) {
Preconditions.checkState(knownStateAddressWindows.add(ephemeralWindow),
"%s is EPHEMERAL/state address of more than one ACTIVE window", ephemeralWindow);
"%s is EPHEMERAL/state address of more than one ACTIVE window",
ephemeralWindow);
Preconditions.checkState(active.equals(windowToActiveWindow.get(ephemeralWindow)),
"%s should have %s as its ACTIVE window", ephemeralWindow, active);
}
}
for (Map.Entry<W, W> entry : windowToActiveWindow.entrySet()) {
Preconditions.checkState(activeWindowToStateAddressWindows.containsKey(entry.getValue()),
"%s should be ACTIVE since representative for %s", entry.getValue(), entry.getKey());
"%s should be ACTIVE since mergeResultWindow for %s",
entry.getValue(), entry.getKey());
}
}

Expand Down Expand Up @@ -519,7 +542,9 @@ private static <W> Map<W, Set<W>> emptyIfNull(@Nullable Map<W, Set<W>> multimap)
}
}

/** Return a deep copy of {@code multimap}. */
/**
* Return a deep copy of {@code multimap}.
*/
private static <W> Map<W, Set<W>> deepCopy(Map<W, Set<W>> multimap) {
Map<W, Set<W>> newMultimap = new HashMap<>();
for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
Expand All @@ -528,15 +553,19 @@ private static <W> Map<W, Set<W>> deepCopy(Map<W, Set<W>> multimap) {
return newMultimap;
}

/** Return inversion of {@code multimap}, which must be invertible. */
/**
* Return inversion of {@code multimap}, which must be invertible.
*/
private static <W> Map<W, W> invert(Map<W, Set<W>> multimap) {
Map<W, W> result = new HashMap<>();
for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
W active = entry.getKey();
for (W target : entry.getValue()) {
W previous = result.put(target, active);
Preconditions.checkState(previous == null,
"Window %s has both %s and %s as representatives", target, previous, active);
Preconditions.checkState(
previous == null,
"Multimap is not invertible: Window %s has both %s and %s as representatives",
target, previous, active);
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;

import java.util.Collection;
Expand All @@ -30,13 +31,13 @@
*/
public class NonMergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> {
@Override
public void removeEphemeralWindows() {}
public void cleanupTemporaryWindows() {}

@Override
public void persist() {}

@Override
public W representative(W window) {
public W mergeResultWindow(W window) {
// Always represented by itself.
return window;
}
Expand All @@ -54,9 +55,13 @@ public boolean isActive(W window) {
}

@Override
public void addNew(W window) {}
public void ensureWindowExists(W window) {}

@Override
public void ensureWindowIsActive(W window) {}

@Override
@VisibleForTesting
public void addActive(W window) {}

@Override
Expand Down
Loading

0 comments on commit 697c5fb

Please sign in to comment.