Skip to content

Commit 7fb9ea5

Browse files
authored
Merge branch 'apache:master' into master
2 parents 53c5e83 + ef49a9e commit 7fb9ea5

File tree

17 files changed

+1027
-1525
lines changed

17 files changed

+1027
-1525
lines changed

core/src/main/java/org/apache/iceberg/TableProperties.java

+12-8
Original file line numberDiff line numberDiff line change
@@ -57,25 +57,25 @@ private TableProperties() {
5757
public static final int COMMIT_MIN_RETRY_WAIT_MS_DEFAULT = 100;
5858

5959
public static final String COMMIT_MAX_RETRY_WAIT_MS = "commit.retry.max-wait-ms";
60-
public static final int COMMIT_MAX_RETRY_WAIT_MS_DEFAULT = 60000; // 1 minute
60+
public static final int COMMIT_MAX_RETRY_WAIT_MS_DEFAULT = 60 * 1000; // 1 minute
6161

6262
public static final String COMMIT_TOTAL_RETRY_TIME_MS = "commit.retry.total-timeout-ms";
63-
public static final int COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT = 1800000; // 30 minutes
63+
public static final int COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT = 30 * 60 * 1000; // 30 minutes
6464

6565
public static final String COMMIT_NUM_STATUS_CHECKS = "commit.status-check.num-retries";
6666
public static final int COMMIT_NUM_STATUS_CHECKS_DEFAULT = 3;
6767

6868
public static final String COMMIT_STATUS_CHECKS_MIN_WAIT_MS = "commit.status-check.min-wait-ms";
69-
public static final long COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT = 1000L; // 1s
69+
public static final long COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT = 1000; // 1 second
7070

7171
public static final String COMMIT_STATUS_CHECKS_MAX_WAIT_MS = "commit.status-check.max-wait-ms";
72-
public static final long COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT = 60000L; // 1 minute
72+
public static final long COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT = 60 * 1000; // 1 minute
7373

7474
public static final String COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS = "commit.status-check.total-timeout-ms";
75-
public static final long COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT = 1800000; // 30 minutes
75+
public static final long COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT = 30 * 60 * 1000; // 30 minutes
7676

7777
public static final String MANIFEST_TARGET_SIZE_BYTES = "commit.manifest.target-size-bytes";
78-
public static final long MANIFEST_TARGET_SIZE_BYTES_DEFAULT = 8388608; // 8 MB
78+
public static final long MANIFEST_TARGET_SIZE_BYTES_DEFAULT = 8 * 1024 * 1024; // 8 MB
7979

8080
public static final String MANIFEST_MIN_MERGE_COUNT = "commit.manifest.min-count-to-merge";
8181
public static final int MANIFEST_MIN_MERGE_COUNT_DEFAULT = 100;
@@ -112,7 +112,7 @@ private TableProperties() {
112112
public static final String AVRO_COMPRESSION_DEFAULT = "gzip";
113113

114114
public static final String SPLIT_SIZE = "read.split.target-size";
115-
public static final long SPLIT_SIZE_DEFAULT = 134217728; // 128 MB
115+
public static final long SPLIT_SIZE_DEFAULT = 128 * 1024 * 1024; // 128 MB
116116

117117
public static final String METADATA_SPLIT_SIZE = "read.split.metadata-target-size";
118118
public static final long METADATA_SPLIT_SIZE_DEFAULT = 32 * 1024 * 1024; // 32 MB
@@ -195,7 +195,7 @@ private TableProperties() {
195195

196196
public static final String WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes";
197197
public static final String DELETE_TARGET_FILE_SIZE_BYTES = "write.delete.target-file-size-bytes";
198-
public static final long WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = 536870912; // 512 MB
198+
public static final long WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = 512 * 1024 * 1024; // 512 MB
199199

200200
public static final String SPARK_WRITE_PARTITIONED_FANOUT_ENABLED = "write.spark.fanout.enabled";
201201
public static final boolean SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT = false;
@@ -210,6 +210,10 @@ private TableProperties() {
210210
public static final String WRITE_DISTRIBUTION_MODE_NONE = "none";
211211
public static final String WRITE_DISTRIBUTION_MODE_HASH = "hash";
212212
public static final String WRITE_DISTRIBUTION_MODE_RANGE = "range";
213+
/**
214+
* @deprecated will be removed in 0.14.0, use specific modes instead
215+
*/
216+
@Deprecated
213217
public static final String WRITE_DISTRIBUTION_MODE_DEFAULT = WRITE_DISTRIBUTION_MODE_NONE;
214218

215219
public static final String GC_ENABLED = "gc.enabled";

flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
6363
import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
6464
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
65-
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT;
65+
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;
6666
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
6767
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
6868

@@ -403,7 +403,7 @@ private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
403403
// Fallback to use distribution mode parsed from table properties if don't specify in job level.
404404
String modeName = PropertyUtil.propertyAsString(properties,
405405
WRITE_DISTRIBUTION_MODE,
406-
WRITE_DISTRIBUTION_MODE_DEFAULT);
406+
WRITE_DISTRIBUTION_MODE_NONE);
407407

408408
writeMode = DistributionMode.fromName(modeName);
409409
} else {

flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
6363
import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
6464
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
65-
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT;
65+
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;
6666
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
6767
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
6868

@@ -403,7 +403,7 @@ private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
403403
// Fallback to use distribution mode parsed from table properties if don't specify in job level.
404404
String modeName = PropertyUtil.propertyAsString(properties,
405405
WRITE_DISTRIBUTION_MODE,
406-
WRITE_DISTRIBUTION_MODE_DEFAULT);
406+
WRITE_DISTRIBUTION_MODE_NONE);
407407

408408
writeMode = DistributionMode.fromName(modeName);
409409
} else {

flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
6363
import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
6464
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
65-
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT;
65+
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;
6666
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
6767
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
6868

@@ -403,7 +403,7 @@ private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
403403
// Fallback to use distribution mode parsed from table properties if don't specify in job level.
404404
String modeName = PropertyUtil.propertyAsString(properties,
405405
WRITE_DISTRIBUTION_MODE,
406-
WRITE_DISTRIBUTION_MODE_DEFAULT);
406+
WRITE_DISTRIBUTION_MODE_NONE);
407407

408408
writeMode = DistributionMode.fromName(modeName);
409409
} else {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.actions;
21+
22+
import org.apache.iceberg.Table;
23+
import org.apache.iceberg.common.DynConstructors;
24+
import org.apache.spark.sql.SparkSession;
25+
26+
/**
27+
* An API for interacting with actions in Spark.
28+
*
29+
* @deprecated since 0.12.0, used for supporting {@link RewriteDataFilesAction} in Spark 2.4 for backward compatibility.
30+
* This implementation is no longer maintained, the new implementation is available with Spark 3.x
31+
*/
32+
@Deprecated
33+
public class Actions {
34+
35+
// Load the actual implementation of Actions via reflection to allow for differences
36+
// between the major Spark APIs while still defining the API in this class.
37+
private static final String IMPL_NAME = "SparkActions";
38+
private static DynConstructors.Ctor<Actions> implConstructor;
39+
40+
private static String implClass() {
41+
return Actions.class.getPackage().getName() + "." + IMPL_NAME;
42+
}
43+
44+
private static DynConstructors.Ctor<Actions> actionConstructor() {
45+
if (implConstructor == null) {
46+
String className = implClass();
47+
try {
48+
implConstructor =
49+
DynConstructors.builder().hiddenImpl(className, SparkSession.class, Table.class).buildChecked();
50+
} catch (NoSuchMethodException e) {
51+
throw new IllegalArgumentException("Cannot find appropriate Actions implementation on the classpath.", e);
52+
}
53+
}
54+
return implConstructor;
55+
}
56+
57+
private SparkSession spark;
58+
private Table table;
59+
60+
protected Actions(SparkSession spark, Table table) {
61+
this.spark = spark;
62+
this.table = table;
63+
}
64+
65+
/**
66+
* @deprecated since 0.12.0, used for supporting {@link RewriteDataFilesAction}
67+
* in Spark 2.4 for backward compatibility.
68+
* This implementation is no longer maintained, the new implementation is available with Spark 3.x
69+
*/
70+
@Deprecated
71+
public static Actions forTable(SparkSession spark, Table table) {
72+
return actionConstructor().newInstance(spark, table);
73+
}
74+
75+
/**
76+
* @deprecated since 0.12.0, used for supporting {@link RewriteDataFilesAction}
77+
* in Spark 2.4 for backward compatibility.
78+
* This implementation is no longer maintained, the new implementation is available with Spark 3.x
79+
*/
80+
@Deprecated
81+
public static Actions forTable(Table table) {
82+
return forTable(SparkSession.active(), table);
83+
}
84+
85+
/**
86+
* @deprecated since 0.12.0, used for supporting {@link RewriteDataFilesAction}
87+
* in Spark 2.4 for backward compatibility.
88+
* This implementation is no longer maintained, the new implementation is available with Spark 3.x
89+
*/
90+
@Deprecated
91+
public RewriteDataFilesAction rewriteDataFiles() {
92+
return new RewriteDataFilesAction(spark, table);
93+
}
94+
95+
protected SparkSession spark() {
96+
return spark;
97+
}
98+
99+
protected Table table() {
100+
return table;
101+
}
102+
103+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.actions;
21+
22+
import java.util.List;
23+
import org.apache.iceberg.CombinedScanTask;
24+
import org.apache.iceberg.DataFile;
25+
import org.apache.iceberg.SerializableTable;
26+
import org.apache.iceberg.Table;
27+
import org.apache.iceberg.io.FileIO;
28+
import org.apache.iceberg.spark.SparkUtil;
29+
import org.apache.iceberg.spark.source.RowDataRewriter;
30+
import org.apache.spark.api.java.JavaRDD;
31+
import org.apache.spark.api.java.JavaSparkContext;
32+
import org.apache.spark.broadcast.Broadcast;
33+
import org.apache.spark.sql.SparkSession;
34+
35+
/**
36+
* @deprecated since 0.12.0, keeping this in Spark 2.4 for backward compatibility.
37+
* This implementation is no longer maintained, the new implementation is available with Spark 3.x
38+
*/
39+
@Deprecated
40+
public class RewriteDataFilesAction
41+
extends BaseRewriteDataFilesAction<RewriteDataFilesAction> {
42+
43+
private final JavaSparkContext sparkContext;
44+
private FileIO fileIO;
45+
46+
RewriteDataFilesAction(SparkSession spark, Table table) {
47+
super(table);
48+
this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
49+
}
50+
51+
@Override
52+
protected RewriteDataFilesAction self() {
53+
return this;
54+
}
55+
56+
@Override
57+
protected FileIO fileIO() {
58+
if (this.fileIO == null) {
59+
this.fileIO = SparkUtil.serializableFileIO(table());
60+
}
61+
return this.fileIO;
62+
}
63+
64+
@Override
65+
protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) {
66+
JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());
67+
Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table()));
68+
RowDataRewriter rowDataRewriter = new RowDataRewriter(tableBroadcast, spec(), caseSensitive());
69+
return rowDataRewriter.rewriteDataForTasks(taskRDD);
70+
}
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.actions;
21+
22+
import org.apache.iceberg.Table;
23+
import org.apache.spark.sql.SparkSession;
24+
25+
/**
26+
* @deprecated since 0.12.0, used for supporting {@link RewriteDataFilesAction} in Spark 2.4 for backward compatibility.
27+
* This implementation is no longer maintained, the new implementation is available with Spark 3.x
28+
*/
29+
@Deprecated
30+
class SparkActions extends Actions {
31+
protected SparkActions(SparkSession spark, Table table) {
32+
super(spark, table);
33+
}
34+
}

0 commit comments

Comments
 (0)