Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Support spec ID and partition metadata columns #2984

Merged

Conversation

aokolnychyi
Copy link
Contributor

This WIP PR adds support for _spec and _partition metadata columns so that we can project them for writing deltas.

FILE_PATH.name(), FILE_PATH,
ROW_POSITION.name(), ROW_POSITION,
IS_DELETED.name(), IS_DELETED);

private static final Set<Integer> META_IDS = META_COLUMNS.values().stream().map(NestedField::fieldId)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to change this as the partition type is not static and is handled specially.

public static NestedField get(String name) {
return META_COLUMNS.get(name);
public static NestedField metadataColumn(Table table, String name) {
if (name.equals(PARTITION_COLUMN_NAME)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept the logic case sensitive as before but we maybe should reconsider it at some point.

// add _partition
if (partitionType.fields().isEmpty()) {
// use null as some query engines may not be able to handle empty structs
idToConstant.put(MetadataColumns.PARTITION_COLUMN_ID, null);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using null for unpartitioned tables.

StructType structType = (StructType) type;

if (structType.fields().isEmpty()) {
return new GenericInternalRow();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't hit this clause with _partition as the passed value will be null.

@@ -149,7 +149,7 @@ private Schema schemaWithMetadataColumns() {
// metadata columns
List<Types.NestedField> fields = metaColumns.stream()
.distinct()
.map(MetadataColumns::get)
.map(name -> MetadataColumns.metadataColumn(table, name))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to pass a Table object to figure our the partition type.

import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

public class SparkTestTable extends SparkTable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is purely a temp solution for testing purposes until we compile against 3.2.

@@ -31,7 +30,14 @@

@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
TestTables.TestTable table = TestTables.load(Spark3Util.identifierToTableIdentifier(ident).toString());
return new SparkTable(table, false);
String[] parts = ident.name().split("\\$", 2);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably need to add a comment here. This is also for testing purposes until we consume Spark 3.2.

@Test
public void testSpecAndPartitionMetadataColumns() {
// TODO: support metadata structs in vectorized ORC reads
Assume.assumeFalse(fileFormat == FileFormat.ORC && vectorized);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ORC vectorized path does not support constant structs.

@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Aug 16, 2021

}

public static boolean isMetadataColumn(String name) {
return META_COLUMNS.containsKey(name);
return name.equals(PARTITION_COLUMN_NAME) || META_COLUMNS.containsKey(name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am starting to wonder if it is better to just have a dummy column type mapping for _partition in META_COLUMNS and only separate the logic when necessary, which can avoid changes like this in quite a few places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did that first but META_COLUMNS is an immutable map that does not allow null keys. I am reluctant to switch to a mutable map so I added this condition here. I hope we will be able to use metadataColumn(table, name) in other places so this workaround will be only part of MetadataColumns.

It looks ugly, though, I agree.

@aokolnychyi aokolnychyi force-pushed the spec-partition-metadata-cols-v2 branch from 71c3c64 to 480c1be Compare September 22, 2021 23:34
@github-actions github-actions bot added the API label Sep 22, 2021
@aokolnychyi aokolnychyi changed the title Core: Support spec and partition metadata columns Spark: Support spec ID and partition metadata columns Sep 22, 2021
@aokolnychyi
Copy link
Contributor Author

I've updated this PR to focus on Spark for now. Similar functionality can be added to other query engines later.

@aokolnychyi
Copy link
Contributor Author

I'd appreciate another review round.

@aokolnychyi aokolnychyi marked this pull request as ready for review September 23, 2021 01:13
@aokolnychyi
Copy link
Contributor Author

This is no longer WIP.

if (structPos != -1) {
return struct.get(structPos, javaClass);
} else {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this relies on a null for nestedProjections[pos], I think it should set nestedProjections[pos] to null in the constructor where positionMap[pos] is set to -1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is actually always null as long as the field was not found. I'll add an explicit call, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

public static final NestedField SPEC_ID = NestedField.required(
Integer.MAX_VALUE - 4, "_spec_id", Types.IntegerType.get(), "Spec ID to which a row belongs to");
// the partition column type is not static and depends on all specs in the table
public static final int PARTITION_COLUMN_ID = Integer.MAX_VALUE - 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that new reserved columns need to be added to the spec. We should also make sure the spec notes the ranges that are reserved. I'm not sure if we did that or just added specific IDs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do that in a follow-up as we are missing _deleted too.

// add _partition
if (partitionType != null && partitionType.fields().size() > 0) {
StructLike coercedPartition = coercePartition(partitionType, spec, partitionData);
idToConstant.put(MetadataColumns.PARTITION_COLUMN_ID, convertConstant.apply(partitionType, coercedPartition));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to call convertConstant on each partition value. The assumption in all of the implementations is that it is called on primitive values, not on structs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I see what's happening. This PR updates the conversion for Spark and because most callers only use constantsMap(task, func) there's no way to get a partition passed through. I'm a little uneasy about this, but since we need to convert to a specific class for the reader I don't see a good way around it.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Sep 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is also related to unknown transforms.

protected Map<Integer, ?> constantsMap(FileScanTask task, Schema readSchema) {
if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
StructType partitionType = Partitioning.partitionType(table);
return PartitionUtil.constantsMap(task, partitionType, BaseDataReader::convertConstant);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer just to call the method rather than trying to optimize by not adding the partition entry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer calling this all the time too. However, this would mean we can no longer query tables with unknown transforms. We need to know all transforms to build a common partition type. That's why I don't set the partition column if it is not requested.

Any thoughts on this, @rdblue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we leave unknown transforms out of the partition type instead? We can just ignore them if they're unknown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably consider persisting the partition type in the metadata instead. It might be confusing to silently ignore a partition column.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be fine for unknown partitions right now. It would unblock this without much risk.

if (parts.length == 2) {
TestTables.TestTable table = TestTables.load(parts[0]);
String[] metadataColumns = parts[1].split(",");
return new SparkTestTable(table, metadataColumns, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a different way to pass metadata columns in? Is there a better way to test it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to simply overload newScanBuilder in SparkTestTable but it did not work. We can get rid of this once we migrate to Spark 3.2 so I don't worry too much.

row(3, row(null, 2))
);
assertEquals("Rows must match", expected,
sql("SELECT _spec_id, _partition FROM `%s$_spec_id,_partition` ORDER BY _spec_id", TABLE_NAME));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably have a TODO comment to fix it when we can rely on Spark 3.2.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Sep 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added TODOs before all tests and in SparkTestTable.

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noted a few minor things, but overall this is good. Please fix some of those things and then feel free to merge.

@aokolnychyi aokolnychyi merged commit 7cf96f0 into apache:master Sep 27, 2021
@aokolnychyi
Copy link
Contributor Author

Thanks for reviewing, @rdblue @jackye1995 @RussellSpitzer @kbendick @stevenzwu!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants