Skip to content

Commit e006292

Browse files
tgrohdavorbonaci
authored andcommitted
Add CounterSet#merge(CounterSet)
This method merges the contents of the two counter sets into the called CounterSet, using the underlying Counter#merge method. All counters that are present only in one of the CounterSets are added to the called set. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=113225321
1 parent ec6a695 commit e006292

File tree

2 files changed

+90
-0
lines changed

2 files changed

+90
-0
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/CounterSet.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.google.cloud.dataflow.sdk.util.common;
1818

19+
import static com.google.common.base.Preconditions.checkArgument;
20+
1921
import java.util.AbstractSet;
2022
import java.util.HashMap;
2123
import java.util.Iterator;
@@ -129,6 +131,29 @@ public synchronized boolean add(Counter<?> e) {
129131
return true;
130132
}
131133

134+
public synchronized void merge(CounterSet that) {
135+
for (Counter<?> theirCounter : that) {
136+
Counter<?> myCounter = counters.get(theirCounter.getName());
137+
if (myCounter != null) {
138+
mergeCounters(myCounter, theirCounter);
139+
} else {
140+
addCounter(theirCounter);
141+
}
142+
}
143+
}
144+
145+
private <T> void mergeCounters(Counter<T> mine, Counter<?> theirCounter) {
146+
checkArgument(
147+
mine.isCompatibleWith(theirCounter),
148+
"Can't merge CounterSets containing incompatible counters with the same name: "
149+
+ "%s (existing) and %s (merged)",
150+
mine,
151+
theirCounter);
152+
@SuppressWarnings("unchecked")
153+
Counter<T> theirs = (Counter<T>) theirCounter;
154+
mine.merge(theirs);
155+
}
156+
132157
/**
133158
* A nested class that supports adding additional counters into the
134159
* enclosing CounterSet. This is useful as a mutator, hiding other

sdk/src/test/java/com/google/cloud/dataflow/sdk/util/common/CounterSetTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.google.cloud.dataflow.sdk.util.common.Counter.AggregationKind.MAX;
2020
import static com.google.cloud.dataflow.sdk.util.common.Counter.AggregationKind.SUM;
2121
import static org.hamcrest.Matchers.containsInAnyOrder;
22+
import static org.hamcrest.Matchers.equalTo;
2223
import static org.junit.Assert.assertFalse;
2324
import static org.junit.Assert.assertSame;
2425
import static org.junit.Assert.assertThat;
@@ -109,6 +110,70 @@ public void testAddOrReuseWithIncompatibleTypesThrowsException() {
109110
set.addOrReuseCounter(c1Incompatible);
110111
}
111112

113+
@Test
114+
public void testMergeWithDifferentNamesAddsAll() {
115+
Counter<?> c1 = Counter.longs("c1", SUM);
116+
Counter<?> c2 = Counter.ints("c2", MAX);
117+
118+
set.add(c1);
119+
set.add(c2);
120+
121+
CounterSet newSet = new CounterSet();
122+
newSet.merge(set);
123+
124+
assertThat(newSet, containsInAnyOrder(c1, c2));
125+
}
126+
127+
@SuppressWarnings("unchecked")
128+
@Test
129+
public void testMergeWithSameNamesMerges() {
130+
Counter<Long> c1 = Counter.longs("c1", SUM);
131+
Counter<Integer> c2 = Counter.ints("c2", MAX);
132+
133+
set.add(c1);
134+
set.add(c2);
135+
136+
c1.addValue(3L);
137+
c2.addValue(22);
138+
139+
CounterSet newSet = new CounterSet();
140+
Counter<Long> c1Prime = Counter.longs("c1", SUM);
141+
Counter<Integer> c2Prime = Counter.ints("c2", MAX);
142+
143+
c1Prime.addValue(7L);
144+
c2Prime.addValue(14);
145+
146+
newSet.add(c1Prime);
147+
newSet.add(c2Prime);
148+
149+
newSet.merge(set);
150+
151+
assertThat((Counter<Long>) newSet.getExistingCounter("c1"), equalTo(c1Prime));
152+
assertThat((Long) newSet.getExistingCounter("c1").getAggregate(), equalTo(10L));
153+
154+
assertThat((Counter<Integer>) newSet.getExistingCounter("c2"), equalTo(c2Prime));
155+
assertThat((Integer) newSet.getExistingCounter("c2").getAggregate(), equalTo(22));
156+
}
157+
158+
@SuppressWarnings("unchecked")
159+
@Test
160+
public void testMergeWithIncompatibleTypesThrowsException() {
161+
Counter<Long> c1 = Counter.longs("c1", SUM);
162+
163+
set.add(c1);
164+
165+
CounterSet newSet = new CounterSet();
166+
Counter<Long> c1Prime = Counter.longs("c1", MAX);
167+
168+
newSet.add(c1Prime);
169+
170+
thrown.expect(IllegalArgumentException.class);
171+
thrown.expectMessage("c1");
172+
thrown.expectMessage("incompatible counters with the same name");
173+
174+
newSet.merge(set);
175+
}
176+
112177
@Test
113178
public void testAddCounterMutatorAddCounterAddsCounter() {
114179
Counter<?> c1 = Counter.longs("c1", SUM);

0 commit comments

Comments
 (0)