Skip to content

Commit dea2fd1

Browse files
authored
Core: Add Variant implementation to read serialized objects (#11415)
1 parent cdf748e commit dea2fd1

26 files changed

+3813
-1
lines changed

api/src/main/java/org/apache/iceberg/io/CloseableIterable.java

+17
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,23 @@
3232

3333
public interface CloseableIterable<T> extends Iterable<T>, Closeable {
3434

35+
/**
36+
* Adapts an Iterable to CloseableIterable using a no-op close if it is not Closeable.
37+
*
38+
* @param iterable an Iterable
39+
* @return a CloseableIterable that closes Iterable if it is Closeable
40+
*/
41+
static <E> CloseableIterable<E> of(Iterable<E> iterable) {
42+
if (iterable instanceof CloseableIterable) {
43+
return (CloseableIterable<E>) iterable;
44+
} else if (iterable instanceof Closeable) {
45+
Closeable asCloseable = (Closeable) iterable;
46+
return combine(iterable, asCloseable);
47+
} else {
48+
return withNoopClose(iterable);
49+
}
50+
}
51+
3552
/**
3653
* Returns a closeable iterator over elements of type {@code T}.
3754
*

api/src/test/java/org/apache/iceberg/util/RandomUtil.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,10 @@ public static Object generateDictionaryEncodablePrimitive(
200200
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-.!?";
201201

202202
private static String randomString(Random random) {
203-
int length = random.nextInt(50);
203+
return generateString(random.nextInt(50), random);
204+
}
205+
206+
public static String generateString(int length, Random random) {
204207
byte[] buffer = new byte[length];
205208

206209
for (int i = 0; i < length; i += 1) {

core/src/main/java/org/apache/iceberg/util/SortedMerge.java

+13
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.Closeable;
2222
import java.io.IOException;
2323
import java.io.UncheckedIOException;
24+
import java.util.Arrays;
2425
import java.util.Comparator;
2526
import java.util.Iterator;
2627
import java.util.List;
@@ -30,6 +31,7 @@
3031
import org.apache.iceberg.io.CloseableGroup;
3132
import org.apache.iceberg.io.CloseableIterable;
3233
import org.apache.iceberg.io.CloseableIterator;
34+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3335

3436
/**
3537
* An Iterable that merges the items from other Iterables in order.
@@ -39,6 +41,17 @@
3941
* @param <T> the type of objects produced by this Iterable
4042
*/
4143
public class SortedMerge<T> extends CloseableGroup implements CloseableIterable<T> {
44+
public static <C extends Comparable<C>> CloseableIterable<C> of(
45+
Iterable<C> left, Iterable<C> right) {
46+
return of(Arrays.asList(left, right));
47+
}
48+
49+
public static <C extends Comparable<C>> CloseableIterable<C> of(List<Iterable<C>> iterables) {
50+
List<CloseableIterable<C>> closeableIterables =
51+
Lists.transform(iterables, CloseableIterable::of);
52+
return new SortedMerge<>(Comparator.naturalOrder(), closeableIterables);
53+
}
54+
4255
private final Comparator<T> comparator;
4356
private final List<CloseableIterable<T>> iterables;
4457

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.variants;
20+
21+
import java.math.BigDecimal;
22+
import java.nio.ByteBuffer;
23+
import java.nio.ByteOrder;
24+
import java.nio.charset.StandardCharsets;
25+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
26+
import org.apache.iceberg.variants.Variants.Primitives;
27+
28+
class PrimitiveWrapper<T> implements VariantPrimitive<T> {
29+
private static final byte NULL_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_NULL);
30+
private static final byte TRUE_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_TRUE);
31+
private static final byte FALSE_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_FALSE);
32+
private static final byte INT8_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_INT8);
33+
private static final byte INT16_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_INT16);
34+
private static final byte INT32_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_INT32);
35+
private static final byte INT64_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_INT64);
36+
private static final byte FLOAT_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_FLOAT);
37+
private static final byte DOUBLE_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_DOUBLE);
38+
private static final byte DATE_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_DATE);
39+
private static final byte TIMESTAMPTZ_HEADER =
40+
VariantUtil.primitiveHeader(Primitives.TYPE_TIMESTAMPTZ);
41+
private static final byte TIMESTAMPNTZ_HEADER =
42+
VariantUtil.primitiveHeader(Primitives.TYPE_TIMESTAMPNTZ);
43+
private static final byte DECIMAL4_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_DECIMAL4);
44+
private static final byte DECIMAL8_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_DECIMAL8);
45+
private static final byte DECIMAL16_HEADER =
46+
VariantUtil.primitiveHeader(Primitives.TYPE_DECIMAL16);
47+
private static final byte BINARY_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_BINARY);
48+
private static final byte STRING_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_STRING);
49+
50+
private final Variants.PhysicalType type;
51+
private final T value;
52+
private ByteBuffer buffer = null;
53+
54+
PrimitiveWrapper(Variants.PhysicalType type, T value) {
55+
this.type = type;
56+
this.value = value;
57+
}
58+
59+
@Override
60+
public Variants.PhysicalType type() {
61+
return type;
62+
}
63+
64+
@Override
65+
public T get() {
66+
return value;
67+
}
68+
69+
@Override
70+
public int sizeInBytes() {
71+
switch (type()) {
72+
case NULL:
73+
case BOOLEAN_TRUE:
74+
case BOOLEAN_FALSE:
75+
return 1; // 1 header only
76+
case INT8:
77+
return 2; // 1 header + 1 value
78+
case INT16:
79+
return 3; // 1 header + 2 value
80+
case INT32:
81+
case DATE:
82+
case FLOAT:
83+
return 5; // 1 header + 4 value
84+
case INT64:
85+
case DOUBLE:
86+
case TIMESTAMPTZ:
87+
case TIMESTAMPNTZ:
88+
return 9; // 1 header + 8 value
89+
case DECIMAL4:
90+
return 6; // 1 header + 1 scale + 4 unscaled value
91+
case DECIMAL8:
92+
return 10; // 1 header + 1 scale + 8 unscaled value
93+
case DECIMAL16:
94+
return 18; // 1 header + 1 scale + 16 unscaled value
95+
case BINARY:
96+
return 5 + ((ByteBuffer) value).remaining(); // 1 header + 4 length + value length
97+
case STRING:
98+
if (null == buffer) {
99+
this.buffer = ByteBuffer.wrap(((String) value).getBytes(StandardCharsets.UTF_8));
100+
}
101+
102+
return 5 + buffer.remaining(); // 1 header + 4 length + value length
103+
}
104+
105+
throw new UnsupportedOperationException("Unsupported primitive type: " + type());
106+
}
107+
108+
@Override
109+
public int writeTo(ByteBuffer outBuffer, int offset) {
110+
Preconditions.checkArgument(
111+
outBuffer.order() == ByteOrder.LITTLE_ENDIAN, "Invalid byte order: big endian");
112+
switch (type()) {
113+
case NULL:
114+
outBuffer.put(offset, NULL_HEADER);
115+
return 1;
116+
case BOOLEAN_TRUE:
117+
outBuffer.put(offset, TRUE_HEADER);
118+
return 1;
119+
case BOOLEAN_FALSE:
120+
outBuffer.put(offset, FALSE_HEADER);
121+
return 1;
122+
case INT8:
123+
outBuffer.put(offset, INT8_HEADER);
124+
outBuffer.put(offset + 1, (Byte) value);
125+
return 2;
126+
case INT16:
127+
outBuffer.put(offset, INT16_HEADER);
128+
outBuffer.putShort(offset + 1, (Short) value);
129+
return 3;
130+
case INT32:
131+
outBuffer.put(offset, INT32_HEADER);
132+
outBuffer.putInt(offset + 1, (Integer) value);
133+
return 5;
134+
case INT64:
135+
outBuffer.put(offset, INT64_HEADER);
136+
outBuffer.putLong(offset + 1, (Long) value);
137+
return 9;
138+
case FLOAT:
139+
outBuffer.put(offset, FLOAT_HEADER);
140+
outBuffer.putFloat(offset + 1, (Float) value);
141+
return 5;
142+
case DOUBLE:
143+
outBuffer.put(offset, DOUBLE_HEADER);
144+
outBuffer.putDouble(offset + 1, (Double) value);
145+
return 9;
146+
case DATE:
147+
outBuffer.put(offset, DATE_HEADER);
148+
outBuffer.putInt(offset + 1, (Integer) value);
149+
return 5;
150+
case TIMESTAMPTZ:
151+
outBuffer.put(offset, TIMESTAMPTZ_HEADER);
152+
outBuffer.putLong(offset + 1, (Long) value);
153+
return 9;
154+
case TIMESTAMPNTZ:
155+
outBuffer.put(offset, TIMESTAMPNTZ_HEADER);
156+
outBuffer.putLong(offset + 1, (Long) value);
157+
return 9;
158+
case DECIMAL4:
159+
BigDecimal decimal4 = (BigDecimal) value;
160+
outBuffer.put(offset, DECIMAL4_HEADER);
161+
outBuffer.put(offset + 1, (byte) decimal4.scale());
162+
outBuffer.putInt(offset + 2, decimal4.unscaledValue().intValueExact());
163+
return 6;
164+
case DECIMAL8:
165+
BigDecimal decimal8 = (BigDecimal) value;
166+
outBuffer.put(offset, DECIMAL8_HEADER);
167+
outBuffer.put(offset + 1, (byte) decimal8.scale());
168+
outBuffer.putLong(offset + 2, decimal8.unscaledValue().longValueExact());
169+
return 10;
170+
case DECIMAL16:
171+
BigDecimal decimal16 = (BigDecimal) value;
172+
byte padding = (byte) (decimal16.signum() < 0 ? 0xFF : 0x00);
173+
byte[] bytes = decimal16.unscaledValue().toByteArray();
174+
outBuffer.put(offset, DECIMAL16_HEADER);
175+
outBuffer.put(offset + 1, (byte) decimal16.scale());
176+
for (int i = 0; i < 16; i += 1) {
177+
if (i < bytes.length) {
178+
// copy the big endian value and convert to little endian
179+
outBuffer.put(offset + 2 + i, bytes[bytes.length - i - 1]);
180+
} else {
181+
// pad with 0x00 or 0xFF depending on the sign
182+
outBuffer.put(offset + 2 + i, padding);
183+
}
184+
}
185+
return 18;
186+
case BINARY:
187+
ByteBuffer binary = (ByteBuffer) value;
188+
outBuffer.put(offset, BINARY_HEADER);
189+
outBuffer.putInt(offset + 1, binary.remaining());
190+
VariantUtil.writeBufferAbsolute(outBuffer, offset + 5, binary);
191+
return 5 + binary.remaining();
192+
case STRING:
193+
// TODO: use short string when possible
194+
if (null == buffer) {
195+
this.buffer = ByteBuffer.wrap(((String) value).getBytes(StandardCharsets.UTF_8));
196+
}
197+
198+
outBuffer.put(offset, STRING_HEADER);
199+
outBuffer.putInt(offset + 1, buffer.remaining());
200+
VariantUtil.writeBufferAbsolute(outBuffer, offset + 5, buffer);
201+
return 5 + buffer.remaining();
202+
}
203+
204+
throw new UnsupportedOperationException("Unsupported primitive type: " + type());
205+
}
206+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.variants;
20+
21+
import java.nio.ByteBuffer;
22+
import java.nio.ByteOrder;
23+
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
24+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
25+
26+
class SerializedArray extends Variants.SerializedValue implements VariantArray {
27+
private static final int OFFSET_SIZE_MASK = 0b1100;
28+
private static final int OFFSET_SIZE_SHIFT = 2;
29+
private static final int IS_LARGE = 0b10000;
30+
31+
@VisibleForTesting
32+
static SerializedArray from(SerializedMetadata metadata, byte[] bytes) {
33+
return from(metadata, ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]);
34+
}
35+
36+
static SerializedArray from(SerializedMetadata metadata, ByteBuffer value, int header) {
37+
Preconditions.checkArgument(
38+
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
39+
Variants.BasicType basicType = VariantUtil.basicType(header);
40+
Preconditions.checkArgument(
41+
basicType == Variants.BasicType.ARRAY, "Invalid array, basic type: " + basicType);
42+
return new SerializedArray(metadata, value, header);
43+
}
44+
45+
private final SerializedMetadata metadata;
46+
private final ByteBuffer value;
47+
private final int offsetSize;
48+
private final int offsetListOffset;
49+
private final int dataOffset;
50+
private final VariantValue[] array;
51+
52+
private SerializedArray(SerializedMetadata metadata, ByteBuffer value, int header) {
53+
this.metadata = metadata;
54+
this.value = value;
55+
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
56+
int numElementsSize = ((header & IS_LARGE) == IS_LARGE) ? 4 : 1;
57+
int numElements =
58+
VariantUtil.readLittleEndianUnsigned(value, Variants.HEADER_SIZE, numElementsSize);
59+
this.offsetListOffset = Variants.HEADER_SIZE + numElementsSize;
60+
this.dataOffset = offsetListOffset + ((1 + numElements) * offsetSize);
61+
this.array = new VariantValue[numElements];
62+
}
63+
64+
@VisibleForTesting
65+
int numElements() {
66+
return array.length;
67+
}
68+
69+
@Override
70+
public VariantValue get(int index) {
71+
if (null == array[index]) {
72+
int offset =
73+
VariantUtil.readLittleEndianUnsigned(
74+
value, offsetListOffset + (offsetSize * index), offsetSize);
75+
int next =
76+
VariantUtil.readLittleEndianUnsigned(
77+
value, offsetListOffset + (offsetSize * (1 + index)), offsetSize);
78+
array[index] =
79+
Variants.from(metadata, VariantUtil.slice(value, dataOffset + offset, next - offset));
80+
}
81+
return array[index];
82+
}
83+
84+
@Override
85+
public ByteBuffer buffer() {
86+
return value;
87+
}
88+
}

0 commit comments

Comments
 (0)