Skip to content

Commit ede56ef

Browse files
lukecwikdavorbonaci
authored andcommitted
Add increment support with positive infinity
Add support for compare from offset and common prefix length to comparator. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=113125854
1 parent 186a7ff commit ede56ef

File tree

2 files changed

+144
-11
lines changed

2 files changed

+144
-11
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/RandomAccessData.java

Lines changed: 94 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
package com.google.cloud.dataflow.sdk.util;
1717

1818
import static com.google.common.base.Preconditions.checkArgument;
19+
import static com.google.common.base.Preconditions.checkNotNull;
1920

2021
import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
2122
import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder;
2223
import com.google.cloud.dataflow.sdk.coders.Coder;
2324
import com.google.cloud.dataflow.sdk.coders.CoderException;
2425
import com.google.common.base.MoreObjects;
2526
import com.google.common.io.ByteStreams;
27+
import com.google.common.primitives.UnsignedBytes;
2628

2729
import com.fasterxml.jackson.annotation.JsonCreator;
2830

@@ -50,7 +52,9 @@ public class RandomAccessData {
5052
/**
5153
* A {@link Coder} which encodes the valid parts of this stream.
5254
* This follows the same encoding scheme as {@link ByteArrayCoder}.
53-
* This coder is deterministic and the consistent with equals.
55+
* This coder is deterministic and consistent with equals.
56+
*
57+
* This coder does not support encoding positive infinity.
5458
*/
5559
public static class RandomAccessDataCoder extends AtomicCoder<RandomAccessData> {
5660
private static final RandomAccessDataCoder INSTANCE = new RandomAccessDataCoder();
@@ -63,6 +67,9 @@ public static RandomAccessDataCoder of() {
6367
@Override
6468
public void encode(RandomAccessData value, OutputStream outStream, Coder.Context context)
6569
throws CoderException, IOException {
70+
if (value == POSITIVE_INFINITY) {
71+
throw new CoderException("Positive infinity can not be encoded.");
72+
}
6673
if (!context.isWholeStream) {
6774
VarInt.encode(value.size, outStream);
6875
}
@@ -107,18 +114,45 @@ protected long getEncodedElementByteSize(RandomAccessData value, Coder.Context c
107114
}
108115
}
109116

117+
public static final UnsignedLexicographicalComparator UNSIGNED_LEXICOGRAPHICAL_COMPARATOR =
118+
new UnsignedLexicographicalComparator();
119+
110120
/**
111121
* A {@link Comparator} that compares two byte arrays lexicographically. It compares
112122
* values as a list of unsigned bytes. The first pair of values that follow any common prefix,
113123
* or when one array is a prefix of the other, treats the shorter array as the lesser.
114-
* For example, [] < [0x01] < [0x01, 0x7F] < [0x01, 0x80] < [0x02].
124+
* For example, [] < [0x01] < [0x01, 0x7F] < [0x01, 0x80] < [0x02] < POSITIVE INFINITY.
125+
*
126+
* <p>Note that a token type of positive infinity is supported and is greater than
127+
* all other {@link RandomAccessData}.
115128
*/
116-
public static final Comparator<RandomAccessData> UNSIGNED_LEXICOGRAPHICAL_COMPARATOR =
117-
new Comparator<RandomAccessData>() {
129+
public static final class UnsignedLexicographicalComparator
130+
implements Comparator<RandomAccessData> {
131+
// Do not instantiate
132+
private UnsignedLexicographicalComparator() {
133+
}
134+
118135
@Override
119136
public int compare(RandomAccessData o1, RandomAccessData o2) {
137+
return compare(o1, o2, 0 /* start from the beginning */);
138+
}
139+
140+
/**
141+
* Compare the two sets of bytes starting at the given offset.
142+
*/
143+
public int compare(RandomAccessData o1, RandomAccessData o2, int startOffset) {
144+
if (o1 == o2) {
145+
return 0;
146+
}
147+
if (o1 == POSITIVE_INFINITY) {
148+
return 1;
149+
}
150+
if (o2 == POSITIVE_INFINITY) {
151+
return -1;
152+
}
153+
120154
int minBytesLen = Math.min(o1.size, o2.size);
121-
for (int i = 0; i < minBytesLen; i++) {
155+
for (int i = startOffset; i < minBytesLen; i++) {
122156
// unsigned comparison
123157
int b1 = o1.buffer[i] & 0xFF;
124158
int b2 = o2.buffer[i] & 0xFF;
@@ -132,7 +166,45 @@ public int compare(RandomAccessData o1, RandomAccessData o2) {
132166
// If both lengths are equal, then both streams are equal.
133167
return o1.size - o2.size;
134168
}
135-
};
169+
170+
/**
171+
* Compute the length of the common prefix of the two provided sets of bytes.
172+
*/
173+
public int commonPrefixLength(RandomAccessData o1, RandomAccessData o2) {
174+
int minBytesLen = Math.min(o1.size, o2.size);
175+
for (int i = 0; i < minBytesLen; i++) {
176+
// unsigned comparison
177+
int b1 = o1.buffer[i] & 0xFF;
178+
int b2 = o2.buffer[i] & 0xFF;
179+
if (b1 != b2) {
180+
return i;
181+
}
182+
}
183+
return minBytesLen;
184+
}
185+
}
186+
187+
/** A token type representing positive infinity. */
188+
static final RandomAccessData POSITIVE_INFINITY = new RandomAccessData(0);
189+
190+
/**
191+
* Returns a RandomAccessData that is the smallest value of same length which
192+
* is strictly greater than this. Note that if this is empty or is all 0xFF then
193+
* a token value of positive infinity is returned.
194+
*
195+
* The {@link UnsignedLexicographicalComparator} supports comparing {@link RandomAccessData}
196+
* with support for positive infinitiy.
197+
*/
198+
public RandomAccessData increment() throws IOException {
199+
RandomAccessData copy = copy();
200+
for (int i = copy.size - 1; i >= 0; --i) {
201+
if (copy.buffer[i] != UnsignedBytes.MAX_VALUE) {
202+
copy.buffer[i] = UnsignedBytes.checkedCast(UnsignedBytes.toInt(copy.buffer[i]) + 1);
203+
return copy;
204+
}
205+
}
206+
return POSITIVE_INFINITY;
207+
}
136208

137209
private static final int DEFAULT_INITIAL_BUFFER_SIZE = 128;
138210

@@ -141,10 +213,17 @@ public RandomAccessData() {
141213
this(DEFAULT_INITIAL_BUFFER_SIZE);
142214
}
143215

216+
/** Constructs a RandomAccessData with the initial buffer. */
217+
public RandomAccessData(byte[] initialBuffer) {
218+
checkNotNull(initialBuffer);
219+
this.buffer = initialBuffer;
220+
this.size = initialBuffer.length;
221+
}
222+
144223
/** Constructs a RandomAccessData with the given buffer size. */
145224
public RandomAccessData(int initialBufferSize) {
146225
checkArgument(initialBufferSize >= 0, "Expected initial buffer size to be greater than zero.");
147-
buffer = new byte[initialBufferSize];
226+
this.buffer = new byte[initialBufferSize];
148227
}
149228

150229
private byte[] buffer;
@@ -220,6 +299,13 @@ public void readFrom(InputStream inStream, int offset, int length) throws IOExce
220299
size = offset + length;
221300
}
222301

302+
/** Returns a copy of this RandomAccessData. */
303+
public RandomAccessData copy() throws IOException {
304+
RandomAccessData copy = new RandomAccessData(size);
305+
writeTo(copy.asOutputStream(), 0, size);
306+
return copy;
307+
}
308+
223309
@Override
224310
public boolean equals(Object other) {
225311
if (other == this) {
@@ -244,7 +330,7 @@ public int hashCode() {
244330
@Override
245331
public String toString() {
246332
return MoreObjects.toStringHelper(this)
247-
.add("buffer", buffer)
333+
.add("buffer", Arrays.copyOf(buffer, size))
248334
.add("size", size)
249335
.toString();
250336
}

sdk/src/test/java/com/google/cloud/dataflow/sdk/util/RandomAccessDataTest.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,18 @@
1818
import static org.junit.Assert.assertArrayEquals;
1919
import static org.junit.Assert.assertEquals;
2020
import static org.junit.Assert.assertNotEquals;
21+
import static org.junit.Assert.assertSame;
2122
import static org.junit.Assert.assertTrue;
2223

2324
import com.google.cloud.dataflow.sdk.coders.Coder.Context;
25+
import com.google.cloud.dataflow.sdk.coders.CoderException;
2426
import com.google.cloud.dataflow.sdk.testing.CoderProperties;
2527
import com.google.cloud.dataflow.sdk.util.RandomAccessData.RandomAccessDataCoder;
28+
import com.google.common.primitives.UnsignedBytes;
2629

30+
import org.junit.Rule;
2731
import org.junit.Test;
32+
import org.junit.rules.ExpectedException;
2833
import org.junit.runner.RunWith;
2934
import org.junit.runners.JUnit4;
3035

@@ -40,6 +45,9 @@
4045
public class RandomAccessDataTest {
4146
private static final byte[] TEST_DATA_A = new byte[]{ 0x01, 0x02, 0x03 };
4247
private static final byte[] TEST_DATA_B = new byte[]{ 0x06, 0x05, 0x04, 0x03 };
48+
private static final byte[] TEST_DATA_C = new byte[]{ 0x06, 0x05, 0x03, 0x03 };
49+
50+
@Rule public ExpectedException expectedException = ExpectedException.none();
4351

4452
@Test
4553
public void testCoder() throws Exception {
@@ -59,15 +67,41 @@ public void testCoder() throws Exception {
5967
assertEquals(3, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA, Context.OUTER));
6068
}
6169

70+
@Test
71+
public void testCoderWithPositiveInfinityIsError() throws Exception {
72+
expectedException.expect(CoderException.class);
73+
expectedException.expectMessage("Positive infinity can not be encoded");
74+
RandomAccessDataCoder.of().encode(
75+
RandomAccessData.POSITIVE_INFINITY, new ByteArrayOutputStream(), Context.OUTER);
76+
}
77+
6278
@Test
6379
public void testLexicographicalComparator() throws Exception {
6480
RandomAccessData streamA = new RandomAccessData();
6581
streamA.asOutputStream().write(TEST_DATA_A);
6682
RandomAccessData streamB = new RandomAccessData();
6783
streamB.asOutputStream().write(TEST_DATA_B);
68-
assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(streamA, streamB) < 0);
69-
assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(streamB, streamA) > 0);
70-
assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(streamB, streamB) == 0);
84+
RandomAccessData streamC = new RandomAccessData();
85+
streamC.asOutputStream().write(TEST_DATA_C);
86+
assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
87+
streamA, streamB) < 0);
88+
assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
89+
streamB, streamA) > 0);
90+
assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
91+
streamB, streamB) == 0);
92+
// Check common prefix length.
93+
assertEquals(2, RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.commonPrefixLength(
94+
streamB, streamC));
95+
// Check that we honor the start offset.
96+
assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
97+
streamB, streamC, 3) == 0);
98+
// Test positive infinity comparisons.
99+
assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
100+
streamA, RandomAccessData.POSITIVE_INFINITY) < 0);
101+
assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
102+
RandomAccessData.POSITIVE_INFINITY, RandomAccessData.POSITIVE_INFINITY) == 0);
103+
assertTrue(RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
104+
RandomAccessData.POSITIVE_INFINITY, streamA) > 0);
71105
}
72106

73107
@Test
@@ -154,5 +188,18 @@ public void testThatRandomAccessDataGrowsWhenReading() throws Exception {
154188
Arrays.copyOf(stream.array(), TEST_DATA_A.length));
155189
}
156190

191+
@Test
192+
public void testIncrement() throws Exception {
193+
assertEquals(new RandomAccessData(new byte[]{ 0x00, 0x01 }),
194+
new RandomAccessData(new byte[]{ 0x00, 0x00 }).increment());
195+
assertEquals(new RandomAccessData(new byte[]{ 0x01, UnsignedBytes.MAX_VALUE }),
196+
new RandomAccessData(new byte[]{ 0x00, UnsignedBytes.MAX_VALUE }).increment());
197+
198+
// Test for positive infinity
199+
assertSame(RandomAccessData.POSITIVE_INFINITY, new RandomAccessData(new byte[0]).increment());
200+
assertSame(RandomAccessData.POSITIVE_INFINITY,
201+
new RandomAccessData(new byte[]{ UnsignedBytes.MAX_VALUE }).increment());
202+
assertSame(RandomAccessData.POSITIVE_INFINITY, RandomAccessData.POSITIVE_INFINITY.increment());
203+
}
157204
}
158205

0 commit comments

Comments
 (0)