Skip to content

Commit

Permalink
Spark: Use SerializableTableWithSize when optimizing metadata (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Oct 31, 2023
1 parent 50c5f26 commit da392f2
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
Expand All @@ -52,6 +51,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.SparkDataFile;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
Expand Down Expand Up @@ -220,7 +220,9 @@ private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) {

private List<ManifestFile> writeManifestsForUnpartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests) {
Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));

Broadcast<Table> tableBroadcast =
sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);

Expand All @@ -246,7 +248,8 @@ private List<ManifestFile> writeManifestsForUnpartitionedTable(
private List<ManifestFile> writeManifestsForPartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests, int targetNumManifestEntries) {

Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
Broadcast<Table> tableBroadcast =
sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
Expand All @@ -52,6 +51,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.SparkDataFile;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
Expand Down Expand Up @@ -220,7 +220,9 @@ private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) {

private List<ManifestFile> writeManifestsForUnpartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests) {
Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));

Broadcast<Table> tableBroadcast =
sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);

Expand All @@ -246,7 +248,8 @@ private List<ManifestFile> writeManifestsForUnpartitionedTable(
private List<ManifestFile> writeManifestsForPartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests, int targetNumManifestEntries) {

Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
Broadcast<Table> tableBroadcast =
sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
Expand All @@ -52,6 +51,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.SparkDataFile;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
Expand Down Expand Up @@ -220,7 +220,9 @@ private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) {

private List<ManifestFile> writeManifestsForUnpartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests) {
Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));

Broadcast<Table> tableBroadcast =
sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);

Expand All @@ -246,7 +248,8 @@ private List<ManifestFile> writeManifestsForUnpartitionedTable(
private List<ManifestFile> writeManifestsForPartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests, int targetNumManifestEntries) {

Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
Broadcast<Table> tableBroadcast =
sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
Expand All @@ -52,6 +51,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.SparkDataFile;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
Expand Down Expand Up @@ -220,7 +220,9 @@ private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) {

private List<ManifestFile> writeManifestsForUnpartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests) {
Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));

Broadcast<Table> tableBroadcast =
sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);

Expand All @@ -246,7 +248,8 @@ private List<ManifestFile> writeManifestsForUnpartitionedTable(
private List<ManifestFile> writeManifestsForPartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests, int targetNumManifestEntries) {

Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
Broadcast<Table> tableBroadcast =
sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);

Expand Down

0 comments on commit da392f2

Please sign in to comment.