-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Support spec ID and partition metadata columns #2984
Spark: Support spec ID and partition metadata columns #2984
Conversation
FILE_PATH.name(), FILE_PATH, | ||
ROW_POSITION.name(), ROW_POSITION, | ||
IS_DELETED.name(), IS_DELETED); | ||
|
||
private static final Set<Integer> META_IDS = META_COLUMNS.values().stream().map(NestedField::fieldId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to change this as the partition type is not static and is handled specially.
public static NestedField get(String name) { | ||
return META_COLUMNS.get(name); | ||
public static NestedField metadataColumn(Table table, String name) { | ||
if (name.equals(PARTITION_COLUMN_NAME)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept the logic case sensitive as before but we maybe should reconsider it at some point.
// add _partition | ||
if (partitionType.fields().isEmpty()) { | ||
// use null as some query engines may not be able to handle empty structs | ||
idToConstant.put(MetadataColumns.PARTITION_COLUMN_ID, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using null for unpartitioned tables.
flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
Outdated
Show resolved
Hide resolved
StructType structType = (StructType) type; | ||
|
||
if (structType.fields().isEmpty()) { | ||
return new GenericInternalRow(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We won't hit this clause with _partition
as the passed value will be null.
@@ -149,7 +149,7 @@ private Schema schemaWithMetadataColumns() { | |||
// metadata columns | |||
List<Types.NestedField> fields = metaColumns.stream() | |||
.distinct() | |||
.map(MetadataColumns::get) | |||
.map(name -> MetadataColumns.metadataColumn(table, name)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to pass a Table
object to figure our the partition type.
import org.apache.spark.sql.types.StructType; | ||
import org.apache.spark.sql.util.CaseInsensitiveStringMap; | ||
|
||
public class SparkTestTable extends SparkTable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is purely a temp solution for testing purposes until we compile against 3.2.
@@ -31,7 +30,14 @@ | |||
|
|||
@Override | |||
public Table loadTable(Identifier ident) throws NoSuchTableException { | |||
TestTables.TestTable table = TestTables.load(Spark3Util.identifierToTableIdentifier(ident).toString()); | |||
return new SparkTable(table, false); | |||
String[] parts = ident.name().split("\\$", 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I probably need to add a comment here. This is also for testing purposes until we consume Spark 3.2.
@Test | ||
public void testSpecAndPartitionMetadataColumns() { | ||
// TODO: support metadata structs in vectorized ORC reads | ||
Assume.assumeFalse(fileFormat == FileFormat.ORC && vectorized); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ORC vectorized path does not support constant structs.
flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
public static boolean isMetadataColumn(String name) { | ||
return META_COLUMNS.containsKey(name); | ||
return name.equals(PARTITION_COLUMN_NAME) || META_COLUMNS.containsKey(name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am starting to wonder if it is better to just have a dummy column type mapping for _partition
in META_COLUMNS
and only separate the logic when necessary, which can avoid changes like this in quite a few places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did that first but META_COLUMNS
is an immutable map that does not allow null keys. I am reluctant to switch to a mutable map so I added this condition here. I hope we will be able to use metadataColumn(table, name)
in other places so this workaround will be only part of MetadataColumns
.
It looks ugly, though, I agree.
71c3c64
to
480c1be
Compare
I've updated this PR to focus on Spark for now. Similar functionality can be added to other query engines later. |
I'd appreciate another review round. |
This is no longer WIP. |
api/src/main/java/org/apache/iceberg/util/StructProjection.java
Outdated
Show resolved
Hide resolved
if (structPos != -1) { | ||
return struct.get(structPos, javaClass); | ||
} else { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this relies on a null
for nestedProjections[pos]
, I think it should set nestedProjections[pos]
to null
in the constructor where positionMap[pos]
is set to -1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is actually always null as long as the field was not found. I'll add an explicit call, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
public static final NestedField SPEC_ID = NestedField.required( | ||
Integer.MAX_VALUE - 4, "_spec_id", Types.IntegerType.get(), "Spec ID to which a row belongs to"); | ||
// the partition column type is not static and depends on all specs in the table | ||
public static final int PARTITION_COLUMN_ID = Integer.MAX_VALUE - 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that new reserved columns need to be added to the spec. We should also make sure the spec notes the ranges that are reserved. I'm not sure if we did that or just added specific IDs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll do that in a follow-up as we are missing _deleted
too.
// add _partition | ||
if (partitionType != null && partitionType.fields().size() > 0) { | ||
StructLike coercedPartition = coercePartition(partitionType, spec, partitionData); | ||
idToConstant.put(MetadataColumns.PARTITION_COLUMN_ID, convertConstant.apply(partitionType, coercedPartition)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to call convertConstant
on each partition value. The assumption in all of the implementations is that it is called on primitive values, not on structs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I see what's happening. This PR updates the conversion for Spark and because most callers only use constantsMap(task, func)
there's no way to get a partition passed through. I'm a little uneasy about this, but since we need to convert to a specific class for the reader I don't see a good way around it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it is also related to unknown transforms.
protected Map<Integer, ?> constantsMap(FileScanTask task, Schema readSchema) { | ||
if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) { | ||
StructType partitionType = Partitioning.partitionType(table); | ||
return PartitionUtil.constantsMap(task, partitionType, BaseDataReader::convertConstant); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer just to call the method rather than trying to optimize by not adding the partition entry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer calling this all the time too. However, this would mean we can no longer query tables with unknown transforms. We need to know all transforms to build a common partition type. That's why I don't set the partition column if it is not requested.
Any thoughts on this, @rdblue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we leave unknown transforms out of the partition type instead? We can just ignore them if they're unknown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd probably consider persisting the partition type in the metadata instead. It might be confusing to silently ignore a partition column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be fine for unknown partitions right now. It would unblock this without much risk.
if (parts.length == 2) { | ||
TestTables.TestTable table = TestTables.load(parts[0]); | ||
String[] metadataColumns = parts[1].split(","); | ||
return new SparkTestTable(table, metadataColumns, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a different way to pass metadata columns in? Is there a better way to test it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to simply overload newScanBuilder
in SparkTestTable
but it did not work. We can get rid of this once we migrate to Spark 3.2 so I don't worry too much.
row(3, row(null, 2)) | ||
); | ||
assertEquals("Rows must match", expected, | ||
sql("SELECT _spec_id, _partition FROM `%s$_spec_id,_partition` ORDER BY _spec_id", TABLE_NAME)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably have a TODO
comment to fix it when we can rely on Spark 3.2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added TODOs before all tests and in SparkTestTable
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noted a few minor things, but overall this is good. Please fix some of those things and then feel free to merge.
Thanks for reviewing, @rdblue @jackye1995 @RussellSpitzer @kbendick @stevenzwu! |
This WIP PR adds support for
_spec
and_partition
metadata columns so that we can project them for writing deltas.