diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 57faa5fcd9f5..854232a62d5b 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -36,7 +36,6 @@ import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Partitioning; -import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; @@ -52,6 +51,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.JobGroupInfo; import org.apache.iceberg.spark.SparkDataFile; +import org.apache.iceberg.spark.source.SerializableTableWithSize; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; @@ -220,7 +220,9 @@ private Dataset buildManifestEntryDF(List manifests) { private List writeManifestsForUnpartitionedTable( Dataset manifestEntryDF, int numManifests) { - Broadcast tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table)); + + Broadcast
tableBroadcast = + sparkContext().broadcast(SerializableTableWithSize.copyOf(table)); StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType(); Types.StructType combinedPartitionType = Partitioning.partitionType(table); @@ -246,7 +248,8 @@ private List writeManifestsForUnpartitionedTable( private List writeManifestsForPartitionedTable( Dataset manifestEntryDF, int numManifests, int targetNumManifestEntries) { - Broadcast
tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table)); + Broadcast
tableBroadcast = + sparkContext().broadcast(SerializableTableWithSize.copyOf(table)); StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType(); Types.StructType combinedPartitionType = Partitioning.partitionType(table); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 6ecdfac822f4..87fbe2de2fce 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -36,7 +36,6 @@ import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Partitioning; -import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; @@ -52,6 +51,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.JobGroupInfo; import org.apache.iceberg.spark.SparkDataFile; +import org.apache.iceberg.spark.source.SerializableTableWithSize; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; @@ -220,7 +220,9 @@ private Dataset buildManifestEntryDF(List manifests) { private List writeManifestsForUnpartitionedTable( Dataset manifestEntryDF, int numManifests) { - Broadcast
tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table)); + + Broadcast
tableBroadcast = + sparkContext().broadcast(SerializableTableWithSize.copyOf(table)); StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType(); Types.StructType combinedPartitionType = Partitioning.partitionType(table); @@ -246,7 +248,8 @@ private List writeManifestsForUnpartitionedTable( private List writeManifestsForPartitionedTable( Dataset manifestEntryDF, int numManifests, int targetNumManifestEntries) { - Broadcast
tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table)); + Broadcast
tableBroadcast = + sparkContext().broadcast(SerializableTableWithSize.copyOf(table)); StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType(); Types.StructType combinedPartitionType = Partitioning.partitionType(table); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 6ecdfac822f4..87fbe2de2fce 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -36,7 +36,6 @@ import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Partitioning; -import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; @@ -52,6 +51,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.JobGroupInfo; import org.apache.iceberg.spark.SparkDataFile; +import org.apache.iceberg.spark.source.SerializableTableWithSize; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; @@ -220,7 +220,9 @@ private Dataset buildManifestEntryDF(List manifests) { private List writeManifestsForUnpartitionedTable( Dataset manifestEntryDF, int numManifests) { - Broadcast
tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table)); + + Broadcast
tableBroadcast = + sparkContext().broadcast(SerializableTableWithSize.copyOf(table)); StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType(); Types.StructType combinedPartitionType = Partitioning.partitionType(table); @@ -246,7 +248,8 @@ private List writeManifestsForUnpartitionedTable( private List writeManifestsForPartitionedTable( Dataset manifestEntryDF, int numManifests, int targetNumManifestEntries) { - Broadcast
tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table)); + Broadcast
tableBroadcast = + sparkContext().broadcast(SerializableTableWithSize.copyOf(table)); StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType(); Types.StructType combinedPartitionType = Partitioning.partitionType(table); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 6ecdfac822f4..87fbe2de2fce 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -36,7 +36,6 @@ import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Partitioning; -import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; @@ -52,6 +51,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.JobGroupInfo; import org.apache.iceberg.spark.SparkDataFile; +import org.apache.iceberg.spark.source.SerializableTableWithSize; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; @@ -220,7 +220,9 @@ private Dataset buildManifestEntryDF(List manifests) { private List writeManifestsForUnpartitionedTable( Dataset manifestEntryDF, int numManifests) { - Broadcast
tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table)); + + Broadcast
tableBroadcast = + sparkContext().broadcast(SerializableTableWithSize.copyOf(table)); StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType(); Types.StructType combinedPartitionType = Partitioning.partitionType(table); @@ -246,7 +248,8 @@ private List writeManifestsForUnpartitionedTable( private List writeManifestsForPartitionedTable( Dataset manifestEntryDF, int numManifests, int targetNumManifestEntries) { - Broadcast
tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table)); + Broadcast
tableBroadcast = + sparkContext().broadcast(SerializableTableWithSize.copyOf(table)); StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType(); Types.StructType combinedPartitionType = Partitioning.partitionType(table);