Skip to content

Commit 1b3e39f

Browse files
author
Andrei Ionescu
committed
Add unit test for Avro with nested partitions
1 parent e43ceae commit 1b3e39f

File tree

1 file changed

+143
-0
lines changed

1 file changed

+143
-0
lines changed
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.spark.source;
21+
22+
import com.google.common.collect.ImmutableMap;
23+
import com.google.common.collect.Lists;
24+
import java.io.File;
25+
import java.io.IOException;
26+
import java.sql.Timestamp;
27+
import java.time.Instant;
28+
import java.util.List;
29+
import org.apache.hadoop.conf.Configuration;
30+
import org.apache.iceberg.PartitionSpec;
31+
import org.apache.iceberg.Schema;
32+
import org.apache.iceberg.Table;
33+
import org.apache.iceberg.hadoop.HadoopTables;
34+
import org.apache.iceberg.spark.SparkSchemaUtil;
35+
import org.apache.iceberg.types.Types;
36+
import org.apache.spark.sql.Dataset;
37+
import org.apache.spark.sql.Encoders;
38+
import org.apache.spark.sql.Row;
39+
import org.apache.spark.sql.SparkSession;
40+
import org.junit.AfterClass;
41+
import org.junit.Assert;
42+
import org.junit.BeforeClass;
43+
import org.junit.Rule;
44+
import org.junit.Test;
45+
import org.junit.rules.TemporaryFolder;
46+
47+
import static org.apache.iceberg.types.Types.NestedField.optional;
48+
49+
public class TestAvroWrite {
50+
51+
@Rule
52+
public TemporaryFolder temp = new TemporaryFolder();
53+
54+
private static SparkSession spark = null;
55+
56+
@BeforeClass
57+
public static void startSpark() {
58+
TestAvroWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
59+
}
60+
61+
@AfterClass
62+
public static void stopSpark() {
63+
SparkSession currentSpark = TestAvroWrite.spark;
64+
TestAvroWrite.spark = null;
65+
currentSpark.stop();
66+
}
67+
68+
@Test
69+
public void testNestedPartitioning() throws IOException {
70+
Schema nestedSchema = new Schema(
71+
optional(1, "id", Types.IntegerType.get()),
72+
optional(2, "data", Types.StringType.get()),
73+
optional(3, "nestedData", Types.StructType.of(
74+
optional(4, "id", Types.IntegerType.get()),
75+
optional(5, "moreData", Types.StringType.get()))),
76+
optional(6, "timestamp", Types.TimestampType.withZone())
77+
);
78+
79+
File parent = temp.newFolder("parquet");
80+
File location = new File(parent, "test/iceberg");
81+
82+
HadoopTables tables = new HadoopTables(new Configuration());
83+
PartitionSpec spec = PartitionSpec.builderFor(nestedSchema)
84+
.identity("id")
85+
.day("timestamp")
86+
.identity("nestedData.moreData")
87+
.build();
88+
Table table = tables.create(nestedSchema, spec, ImmutableMap.of("write.format.default", "avro"),
89+
location.toString());
90+
91+
List<String> jsons = Lists.newArrayList(
92+
"{ \"id\": 1, \"data\": \"a\", \"nestedData\": { \"id\": 100, \"moreData\": \"p1\"}, " +
93+
"\"timestamp\": \"2017-12-01T10:12:55.034Z\" }",
94+
"{ \"id\": 2, \"data\": \"b\", \"nestedData\": { \"id\": 200, \"moreData\": \"p1\"}, " +
95+
"\"timestamp\": \"2017-12-02T10:12:55.034Z\" }",
96+
"{ \"id\": 3, \"data\": \"c\", \"nestedData\": { \"id\": 300, \"moreData\": \"p2\"}, " +
97+
"\"timestamp\": \"2017-12-03T10:12:55.034Z\" }",
98+
"{ \"id\": 4, \"data\": \"d\", \"nestedData\": { \"id\": 400, \"moreData\": \"p2\"}, " +
99+
"\"timestamp\": \"2017-12-04T10:12:55.034Z\" }"
100+
);
101+
Dataset<Row> df = spark.read().schema(SparkSchemaUtil.convert(nestedSchema))
102+
.json(spark.createDataset(jsons, Encoders.STRING()));
103+
104+
// TODO: incoming columns must be ordered according to the table's schema
105+
df.select("id", "data", "nestedData", "timestamp").write()
106+
.format("iceberg")
107+
.mode("append")
108+
.save(location.toString());
109+
110+
table.refresh();
111+
112+
Dataset<Row> result = spark.read()
113+
.format("iceberg")
114+
.load(location.toString());
115+
116+
List<Row> actual = result.orderBy("id").collectAsList();
117+
Assert.assertEquals("Number of rows should match", jsons.size(), actual.size());
118+
Assert.assertEquals("Row 1 col 1 is 1", 1, actual.get(0).getInt(0));
119+
Assert.assertEquals("Row 1 col 2 is a", "a", actual.get(0).getString(1));
120+
Assert.assertEquals("Row 1 col 3,1 is 100", 100, actual.get(0).getStruct(2).getInt(0));
121+
Assert.assertEquals("Row 1 col 3,2 is p1", "p1", actual.get(0).getStruct(2).getString(1));
122+
Assert.assertEquals("Row 1 col 4 is 2017-12-01T10:12:55.034+00:00",
123+
0, actual.get(0).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-01T10:12:55.034Z"))));
124+
Assert.assertEquals("Row 2 col 1 is 2", 2, actual.get(1).getInt(0));
125+
Assert.assertEquals("Row 2 col 2 is b", "b", actual.get(1).getString(1));
126+
Assert.assertEquals("Row 2 col 3,1 is 200", 200, actual.get(1).getStruct(2).getInt(0));
127+
Assert.assertEquals("Row 2 col 3,2 is p1", "p1", actual.get(1).getStruct(2).getString(1));
128+
Assert.assertEquals("Row 2 col 4 is 2017-12-02 12:12:55.034",
129+
0, actual.get(1).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-02T10:12:55.034Z"))));
130+
Assert.assertEquals("Row 3 col 1 is 3", 3, actual.get(2).getInt(0));
131+
Assert.assertEquals("Row 3 col 2 is c", "c", actual.get(2).getString(1));
132+
Assert.assertEquals("Row 3 col 3,1 is 300", 300, actual.get(2).getStruct(2).getInt(0));
133+
Assert.assertEquals("Row 3 col 3,2 is p2", "p2", actual.get(2).getStruct(2).getString(1));
134+
Assert.assertEquals("Row 3 col 4 is 2017-12-03 12:12:55.034",
135+
0, actual.get(2).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-03T10:12:55.034Z"))));
136+
Assert.assertEquals("Row 4 col 1 is 4", 4, actual.get(3).getInt(0));
137+
Assert.assertEquals("Row 4 col 2 is d", "d", actual.get(3).getString(1));
138+
Assert.assertEquals("Row 4 col 3,1 is 400", 400, actual.get(3).getStruct(2).getInt(0));
139+
Assert.assertEquals("Row 4 col 3,2 is p2", "p2", actual.get(3).getStruct(2).getString(1));
140+
Assert.assertEquals("Row 4 col 4 is 2017-12-04 12:12:55.034",
141+
0, actual.get(3).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-04T10:12:55.034Z"))));
142+
}
143+
}

0 commit comments

Comments
 (0)