Skip to content

Commit 25b4e87

Browse files
mwegieldavorbonaci
authored andcommitted
Let shuffle reader and writer update counters.
To this end, provide a reference to CounterSet.AddCounterMutator to ApplianceShuffleWriter and ApplianceShuffleReader. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=112828809
1 parent 4fa7bd3 commit 25b4e87

File tree

10 files changed

+67
-19
lines changed

10 files changed

+67
-19
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/ApplianceShuffleReader.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.google.cloud.dataflow.sdk.runners.worker;
1818

19+
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
20+
1921
import java.io.IOException;
2022

2123
import javax.annotation.concurrent.ThreadSafe;
@@ -37,12 +39,21 @@ public final class ApplianceShuffleReader implements ShuffleReader {
3739
*/
3840
private long nativePointer;
3941

42+
/**
43+
* Mutator that can be used to update counters.
44+
*/
45+
private CounterSet.AddCounterMutator addCounterMutator;
46+
4047
/**
4148
* @param shuffleReaderConfig opaque configuration for creating a
4249
* shuffle reader
50+
* @param addCounterMutator mutator that can be used to update counters
4351
*/
44-
public ApplianceShuffleReader(byte[] shuffleReaderConfig) {
52+
public ApplianceShuffleReader(
53+
byte[] shuffleReaderConfig,
54+
CounterSet.AddCounterMutator addCounterMutator) {
4555
this.nativePointer = createFromConfig(shuffleReaderConfig);
56+
this.addCounterMutator = addCounterMutator;
4657
}
4758

4859
@Override

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/ApplianceShuffleWriter.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.google.cloud.dataflow.sdk.runners.worker;
1818

19+
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
20+
1921
import java.io.IOException;
2022
import javax.annotation.concurrent.ThreadSafe;
2123

@@ -35,14 +37,23 @@ public final class ApplianceShuffleWriter implements ShuffleWriter {
3537
*/
3638
private long nativePointer;
3739

40+
/**
41+
* Mutator that can be used to update counters.
42+
*/
43+
private final CounterSet.AddCounterMutator addCounterMutator;
44+
3845
/**
3946
* @param shuffleWriterConfig opaque configuration for creating a
4047
* shuffle writer
4148
* @param bufferSize the writer buffer size
49+
* @param addCounterMutator mutator that can be used to update counters
4250
*/
43-
public ApplianceShuffleWriter(byte[] shuffleWriterConfig,
44-
long bufferSize) {
51+
public ApplianceShuffleWriter(
52+
byte[] shuffleWriterConfig,
53+
long bufferSize,
54+
CounterSet.AddCounterMutator addCounterMutator) {
4555
this.nativePointer = createFromConfig(shuffleWriterConfig, bufferSize);
56+
this.addCounterMutator = addCounterMutator;
4657
}
4758

4859
@Override

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/GroupingShuffleReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ protected StateKind getStateSamplerStateKind() {
127127
@Override
128128
public GroupingShuffleReaderIterator<K, V> iterator() throws IOException {
129129
Preconditions.checkArgument(shuffleReaderConfig != null);
130-
ApplianceShuffleReader asr = new ApplianceShuffleReader(shuffleReaderConfig);
130+
ApplianceShuffleReader asr =
131+
new ApplianceShuffleReader(shuffleReaderConfig, addCounterMutator);
131132
String datasetId = asr.getDatasetId();
132133
initCounter(datasetId);
133134

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/PartitioningShuffleReader.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.cloud.dataflow.sdk.util.CoderUtils;
2323
import com.google.cloud.dataflow.sdk.util.WindowedValue;
2424
import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder;
25+
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
2526
import com.google.cloud.dataflow.sdk.util.common.worker.BatchingShuffleEntryReader;
2627
import com.google.cloud.dataflow.sdk.util.common.worker.NativeReader;
2728
import com.google.cloud.dataflow.sdk.util.common.worker.ShuffleEntry;
@@ -43,15 +44,18 @@ public class PartitioningShuffleReader<K, V> extends NativeReader<WindowedValue<
4344
final byte[] shuffleReaderConfig;
4445
final String startShufflePosition;
4546
final String stopShufflePosition;
47+
final CounterSet.AddCounterMutator addCounterMutator;
4648
Coder<K> keyCoder;
4749
WindowedValueCoder<V> windowedValueCoder;
4850

4951
public PartitioningShuffleReader(PipelineOptions options, byte[] shuffleReaderConfig,
50-
String startShufflePosition, String stopShufflePosition, Coder<WindowedValue<KV<K, V>>> coder)
52+
String startShufflePosition, String stopShufflePosition, Coder<WindowedValue<KV<K, V>>> coder,
53+
CounterSet.AddCounterMutator addCounterMutator)
5154
throws Exception {
5255
this.shuffleReaderConfig = shuffleReaderConfig;
5356
this.startShufflePosition = startShufflePosition;
5457
this.stopShufflePosition = stopShufflePosition;
58+
this.addCounterMutator = addCounterMutator;
5559
initCoder(coder);
5660
}
5761

@@ -79,7 +83,10 @@ private void initCoder(Coder<WindowedValue<KV<K, V>>> coder) throws Exception {
7983
public NativeReaderIterator<WindowedValue<KV<K, V>>> iterator() throws IOException {
8084
Preconditions.checkArgument(shuffleReaderConfig != null);
8185
return iterator(new BatchingShuffleEntryReader(
82-
new ChunkingShuffleBatchReader(new ApplianceShuffleReader(shuffleReaderConfig))));
86+
new ChunkingShuffleBatchReader(
87+
new ApplianceShuffleReader(
88+
shuffleReaderConfig,
89+
addCounterMutator))));
8390
}
8491

8592
PartitioningShuffleReaderIterator iterator(ShuffleEntryReader reader) {

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/PartitioningShuffleReaderFactory.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,19 +50,21 @@ public NativeReader<?> create(
5050
@SuppressWarnings({"unchecked", "rawtypes"})
5151
Coder<WindowedValue<KV<Object, Object>>> typedCoder =
5252
(Coder<WindowedValue<KV<Object, Object>>>) coder;
53-
return createTyped(spec, options, typedCoder);
53+
return createTyped(spec, options, typedCoder, addCounterMutator);
5454
}
5555

5656
public <K, V> PartitioningShuffleReader<K, V> createTyped(
5757
CloudObject spec,
5858
PipelineOptions options,
59-
Coder<WindowedValue<KV<K, V>>> coder)
59+
Coder<WindowedValue<KV<K, V>>> coder,
60+
CounterSet.AddCounterMutator addCounterMutator)
6061
throws Exception {
6162
return new PartitioningShuffleReader<K, V>(
6263
options,
6364
decodeBase64(getString(spec, PropertyNames.SHUFFLE_READER_CONFIG)),
6465
getString(spec, PropertyNames.START_SHUFFLE_POSITION, null),
6566
getString(spec, PropertyNames.END_SHUFFLE_POSITION, null),
66-
coder);
67+
coder,
68+
addCounterMutator);
6769
}
6870
}

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/ShuffleSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ public void close() throws IOException {
283283
public SinkWriter<WindowedValue<T>> writer() throws IOException {
284284
Preconditions.checkArgument(shuffleWriterConfig != null);
285285
ApplianceShuffleWriter applianceWriter = new ApplianceShuffleWriter(
286-
shuffleWriterConfig, SHUFFLE_WRITER_BUFFER_SIZE);
286+
shuffleWriterConfig, SHUFFLE_WRITER_BUFFER_SIZE, addCounterMutator);
287287
String datasetId = applianceWriter.getDatasetId();
288288
return writer(new ChunkingShuffleEntryWriter(applianceWriter), datasetId);
289289
}

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/UngroupedShuffleReader.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.cloud.dataflow.sdk.coders.Coder;
2020
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
2121
import com.google.cloud.dataflow.sdk.util.CoderUtils;
22+
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
2223
import com.google.cloud.dataflow.sdk.util.common.worker.BatchingShuffleEntryReader;
2324
import com.google.cloud.dataflow.sdk.util.common.worker.NativeReader;
2425
import com.google.cloud.dataflow.sdk.util.common.worker.ShuffleEntry;
@@ -41,21 +42,27 @@ public class UngroupedShuffleReader<T> extends NativeReader<T> {
4142
final String startShufflePosition;
4243
final String stopShufflePosition;
4344
final Coder<T> coder;
45+
final CounterSet.AddCounterMutator addCounterMutator;
4446

4547
public UngroupedShuffleReader(
4648
@SuppressWarnings("unused") PipelineOptions options, byte[] shuffleReaderConfig,
47-
@Nullable String startShufflePosition, @Nullable String stopShufflePosition, Coder<T> coder) {
49+
@Nullable String startShufflePosition, @Nullable String stopShufflePosition, Coder<T> coder,
50+
@Nullable CounterSet.AddCounterMutator addCounterMutator) {
4851
this.shuffleReaderConfig = shuffleReaderConfig;
4952
this.startShufflePosition = startShufflePosition;
5053
this.stopShufflePosition = stopShufflePosition;
5154
this.coder = coder;
55+
this.addCounterMutator = addCounterMutator;
5256
}
5357

5458
@Override
5559
public NativeReaderIterator<T> iterator() throws IOException {
5660
Preconditions.checkArgument(shuffleReaderConfig != null);
5761
return iterator(new BatchingShuffleEntryReader(
58-
new ChunkingShuffleBatchReader(new ApplianceShuffleReader(shuffleReaderConfig))));
62+
new ChunkingShuffleBatchReader(
63+
new ApplianceShuffleReader(
64+
shuffleReaderConfig,
65+
addCounterMutator))));
5966
}
6067

6168
UngroupedShuffleReaderIterator iterator(ShuffleEntryReader reader) {

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/UngroupedShuffleReaderFactory.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,27 @@ public NativeReader<?> create(
4343
@Nullable CounterSet.AddCounterMutator addCounterMutator,
4444
@Nullable String operationName)
4545
throws Exception {
46-
return create(spec, coder, options);
46+
return create(spec, coder, options, addCounterMutator);
4747
}
4848

4949
public <T> UngroupedShuffleReader<T> create(
50-
CloudObject spec, Coder<T> coder, PipelineOptions options) throws Exception {
51-
return create(options, spec, coder);
50+
CloudObject spec,
51+
Coder<T> coder,
52+
PipelineOptions options,
53+
CounterSet.AddCounterMutator addCounterMutator) throws Exception {
54+
return create(options, spec, coder, addCounterMutator);
5255
}
5356

5457
<T> UngroupedShuffleReader<T> create(
55-
PipelineOptions options, CloudObject spec, Coder<T> coder) throws Exception {
58+
PipelineOptions options,
59+
CloudObject spec,
60+
Coder<T> coder,
61+
CounterSet.AddCounterMutator addCounterMutator) throws Exception {
5662
return new UngroupedShuffleReader<>(options,
5763
decodeBase64(getString(spec, PropertyNames.SHUFFLE_READER_CONFIG)),
5864
getString(spec, PropertyNames.START_SHUFFLE_POSITION, null),
59-
getString(spec, PropertyNames.END_SHUFFLE_POSITION, null), coder);
65+
getString(spec, PropertyNames.END_SHUFFLE_POSITION, null),
66+
coder,
67+
addCounterMutator);
6068
}
6169
}

sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/worker/PartitioningShuffleReaderTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ private void runTestReadFromShuffle(List<WindowedValue<KV<Integer, String>>> exp
9494
// Read from shuffle with PartitioningShuffleReader.
9595
PartitioningShuffleReader<Integer, String> partitioningShuffleReader =
9696
new PartitioningShuffleReader<>(
97-
PipelineOptionsFactory.create(), null, null, null, elemCoder);
97+
PipelineOptionsFactory.create(), null, null, null, elemCoder,
98+
addCounterMutator);
9899
ExecutorTestUtils.TestReaderObserver observer =
99100
new ExecutorTestUtils.TestReaderObserver(partitioningShuffleReader);
100101

sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/worker/UngroupedShuffleReaderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ void runTestReadFromShuffle(List<Integer> expected) throws Exception {
7979
new UngroupedShuffleReader<>(
8080
PipelineOptionsFactory.create(),
8181
null, null, null,
82-
elemCoder);
82+
elemCoder, addCounterMutator);
8383
ExecutorTestUtils.TestReaderObserver observer =
8484
new ExecutorTestUtils.TestReaderObserver(ungroupedShuffleReader);
8585

0 commit comments

Comments
 (0)