Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: add row key to format v2 #2354

Closed
wants to merge 1 commit into from

Conversation

jackye1995
Copy link
Contributor

@jackye1995 jackye1995 commented Mar 21, 2021

This is the continuation for #2010 for adding a concept that describes how a row in a table should be uniquely identified. I have the implementation ready up to the Spark SQL extension to update the row key, and will separate them into multiple PRs for review. This PR should have the same amount of content as what openInx had in the old PR.

This PR adds RowKey to the Table and TableMetadata API, and writes the metadata information as something like:

  ...
  "default-row-key-id": 1,
  "row-keys": [
    {
      "key-id": 3,
      "fields": [
        {
          "source-id": 1
        },
        {
          "source-id": 3
        }
      ]
    }
  ],
  ...

I will add reasons behind the namings inline.

@openinx @rdblue @aokolnychyi

* Iceberg itself does not enforce row uniqueness based on this identifier.
* It is leveraged by operations such as streaming upsert.
*/
public class RowIdentifier implements Serializable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have considered some alternatives for the name, and discarded the following:

  1. primary key: used in db systems and implies enforcing uniqueness, which does not fit Iceberg use case
  2. upsert id: too specific for the upsert use case
  3. row id: conflicts with java.sql.RowId, and gives a feeling of referring to a specific row
  4. default row id: defaultXxx is used a lot in metadata, having a class with a default in prefix makes code in table metadata confusing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the Row identifier is the correct semantic to express. On minor thing for me is : the name seems a bit long as a part of table spec, how about using RowKey as the name ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RowKey sounds good to me, I will update based on that!

* 1. a required column in the table schema
* 2. a primitive type column
*/
public class RowIdentifyField implements Serializable {
Copy link
Contributor Author

@jackye1995 jackye1995 Mar 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the field only contains the source column id. Technically we can directly make fields() a Integer[], but because this will be a public API, it will be hard to change in case we want to amend it in the future, so I decide to still have this separated class for the individual field information. This also aligns better with the structure of partition spec and sort order.

In SortOrder, I see transform is also added as a part of the field, but it seems that only identity transform is permitted. It is the same situation here, so maybe we can also add transform here if anyone thinks there might be a use case of it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agreed that it's extensible to introduce a separate RowIdentifyField class here. For the transform function, currently I don't see there's any requirement that we will need it for this row identifier fields, but I'm OK to make it extensible for future usage.

@@ -152,6 +154,7 @@ public static String toJson(TableMetadata metadata) {
}
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do another PR to fix all the code complexity complaints in this class after this is merged.

@rdblue
Copy link
Contributor

rdblue commented Mar 22, 2021

Thanks for working on this, @jackye1995! I'll take a look.

* Iceberg itself does not enforce row uniqueness based on this identifier.
* It is leveraged by operations such as streaming upsert.
*/
public class RowIdentifier implements Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the Row identifier is the correct semantic to express. On minor thing for me is : the name seems a bit long as a part of table spec, how about using RowKey as the name ?


RowIdentifyField(Types.NestedField column) {
ValidationException.check(column.isRequired(),
"Cannot add column %s to row identifier because it is not a required column", column);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: In this RowIdentifyField constructor, we will throw the exception with message saying Cannot add column ... , that seems unreasonable because we are mixed the column adding and instance constructing together. How about moving those validation check out of this constructor ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, will update

* 1. a required column in the table schema
* 2. a primitive type column
*/
public class RowIdentifyField implements Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agreed that it's extensible to introduce a separate RowIdentifyField class here. For the transform function, currently I don't see there's any requirement that we will need it for this row identifier fields, but I'm OK to make it extensible for future usage.

@@ -111,6 +117,11 @@ static TableMetadata newTableMetadata(Schema schema,
int freshSortOrderId = sortOrder.isUnsorted() ? sortOrder.orderId() : INITIAL_SORT_ORDER_ID;
SortOrder freshSortOrder = freshSortOrder(freshSortOrderId, freshSchema, sortOrder);

// rebuild the row identifier using the new column ids
int freshRowIdentifierVersion = rowIdentifier.isNotIdentified() ?
rowIdentifier.rowIdVersion() : INITIAL_ROW_IDENTIFIER_VERSION;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the RowIdentifier is not identified, then the row-id-version should be 0 rather than 1 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because not identified is default that has ID 0. Otherwise the row key should have ID 1 in this method, and then get merged with current metadata in create/replace table transaction.

@jackye1995 jackye1995 force-pushed the row-id-api branch 2 times, most recently from 359bb24 to a79ccf5 Compare March 25, 2021 00:29
Comment on lines 999 to 1003
// reassign all row keys with fresh column IDs.
Types.NestedField column = schema.findField(columnName);
Preconditions.checkNotNull(column,
"Cannot find column in the fresh schema. name: %s, schema: %s", columnName, schema);
builder.addField(column.fieldId());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use builder.addField(columnName) here ? The builder#addField will validate the existence of the column inside, so we don't have to check it again here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, updated

* Notice that the order of each field matters.
* 2 keys with the same set of fields but different order are viewed as different.
* The fields of the key should ideally be ordered based on the importance of each field
* to be leveraged by features like secondary index.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment looks great, thanks for the doc.


public Builder addField(String name) {
Types.NestedField column = schema.findField(name);
ValidationException.check(column != null, "Cannot find column with name %s in schema", name);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about appending the schema string at the end of error message so that we could easily find out what's wrong when encountered the validation exception ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me

@openinx
Copy link
Member

openinx commented Mar 25, 2021

I like this PR , just left several comments. We may need to change this title to Core: Add RowKey Specification in format v2

@jackye1995 jackye1995 changed the title Core: add row identifier to format v2 Core: add row key to format v2 Mar 25, 2021
properties, currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}

public TableMetadata updateRowKey(RowKey newKey) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to introduce a public iceberg table API to update the RowKey specification, right ? Maybe we could file a separate issue to address this, we can publish the interface in that issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have a separate PR for that after this is out.

Comment on lines 998 to 1001
// reassign all row keys with fresh column IDs.
Types.NestedField column = schema.findField(columnName);
Preconditions.checkNotNull(column,
"Cannot find column in the fresh schema. name: %s, schema: %s", columnName, schema);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have to do the nullable check here, because the builder.addField(columnName) will do the check inside its implementation. (I'm sorry I did not describe this clearly in the last comment).

return writer.toString();

} catch (IOException e) {
throw new RuntimeIOException(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We iceberg prefer to use the UncheckedIOException rather than the deprecated RuntimeIOException, right ?

@openinx
Copy link
Member

openinx commented Mar 26, 2021

I checked this RowKey specification twice again, in my opinion I think it's good enough to get this merged, but since it's an basic specification for iceberg table format. I'd like to invite @rdblue and @aokolnychyi to do the double-check, will be so appreciate if you two get this check when you have time, thanks.

@rdblue
Copy link
Contributor

rdblue commented Mar 27, 2021

@jackye1995 and @openinx, I have a few questions about this before I'm comfortable merging it. Thanks for working on this so far!

Why do we need to track multiple versions of the row identifier like we do for schema, partition spec, and sort order? I think of this as the "fields that identify a row". Is it helpful to have more than one view of how rows are identified?

To answer that, we need to consider whether two versions are ever valid at the same time, and how row IDs are going to evolve over time:

  • Row identifier columns may be set, either to initialize or to fix a mistake (e.g., used account_id instead of profile_id)
  • Row identifier columns may be added, when a new identifying column is added to the schema (e.g., adding profile_id to a table previously identified by only account_id)

I think both of those operations only require setting the current way of identifying rows, not keeping track of the previous ways. I'm interested to hear what everyone thinks about that and whether there is agreement.

If I'm correct, then I would probably not keep track of multiple versions here. If I'm not, then I think we should ask whether the row ID columns should be tracked in the schema itself rather than separately versioned, since they will probably change at the same time the schema does -- when adding a new column that is now part of the identifier.

It would be great to hear from @aokolnychyi on this as well.

@openinx
Copy link
Member

openinx commented Mar 29, 2021

I think we should keep trace of multiple version in apache iceberg schema, let's discuss the case you described: adding profile_id to table previously identified by only account_id.

t1: User defines the account_id as the row identifier;
t2: Write few records into the table;
t3: Write few equality deletions (by account_id) into table;
t4: Adding profile_id to row identifier, now the identifier is account_id & profile_id;
t5: Write few equality deletions ( by account_id & profile_id) into table;

In my option, the iceberg table format's row identifier specification is introduced because we expect the standard SQL's PRIMARY KEY could be mapped to those row identifier columns automatically

(
if we don't have the row identifier spec then we don't know how to track those keys when create table like:

CREATE TABLE sample(id INT, data STRING,  PRIMARY KEY (id) NOT ENFORCED);

)

Back to the above case, at the timestamp t4 & t5, the table's row identifier is account_id & profile_id. If people want to read the snapshot at timestamp t3, then we should use the row identifier account_id. So if we don't track the multiple version of identifier, How could we read the row identifier from old snapshots ? If use the latest account_id & profile_id, that seems confuse people a lot because those rows are deleted only by field account_id actually.

@jackye1995
Copy link
Contributor Author

@openinx for your example, my understanding is that at t3, the equality delete file would have a row like account_id=xxx, and at t5, the new equality file would have a row like account_id=yyy, profile_id=zzz, basically the row key information at that point of time is already in the delete file written at that time. When time traveling to t4, the delete file written at that time can still work without the need to consult the latest row key.

@rdblue I actually mostly agree with what you mention, as I don't see why the example mentioned by openinx would not work, but maybe I missed something there. But I decided to go with the versioned approach because I think it can potentially be used to provide some uniqueness guarantee at read time in the future by merging rows, given the fact that now we basically have a primary key concept through RowKey and a sort key concept through SortOrder. And at that time, we will need this information to be present in the specific snapshot that we time travel to.

@rdblue
Copy link
Contributor

rdblue commented Mar 29, 2021

@openinx, I think that @jackye1995 is right about how the case you described would be encoded. The delete files themselves always encode what columns are used for the equality delete.

There is no requirement that a delete file's delete columns match the table's row identifier fields. That's one reason why we can encode deletes right now, before we've added the row identifier tracking. That also enables deleting rows by different fields than the row identifier fields, which is what makes the evolution case possible.

The row identifier fields are related to deletes only in that in situations where we don't have explicit delete columns in the operation, we can default the delete columns to the row identifier fields. That's to support the UPSERT case, where we define the identity fields in table metadata rather than in the sink configuration.

From @jackye1995's second comment, I think there is at least some agreement that the row identifier columns don't need to be tracked over time. That's because there is no way to go back to an older snapshot and then manipulate that data. Time travel is read-only and data manipulation is always applied to the current snapshot, so it is reasonable that there is only ever one version of the row identifier that matters: the one that is configured at the start of the operation.

Before moving ahead with this, I think we should simplify it and remove the versioning.

I'm also wondering about the field ordering mentioned in the code. Is that relevant? I think of the row identifier fields as unordered and simply used to produce a projection of the table schema that is a row identifier, in whatever field order the schema had. So I would model this as an unordered set of IDs rather than as an ordered collection.

@jackye1995
Copy link
Contributor Author

@rdblue for the ordering, I was thinking about potential features like clustering index, where the order of primary key matters for performance of the index. But I am okay to start simple by removing versioning and making the fields unordered for now.

@openinx are you okay with those changes?

@rdblue
Copy link
Contributor

rdblue commented Mar 29, 2021

@jackye1995, I don't think that the row identifier fields are strongly related to a clustering index. Records in an Iceberg table are physically sorted according to some order used at write time, and also distributed by a hash or ordered distribution. That logic is independent. And a future index type that has some order for its contents would probably be separately configured and would also not use the row identifier fields.

@jackye1995
Copy link
Contributor Author

@rdblue sounds good to me, I will make the update, thank you!

@openinx
Copy link
Member

openinx commented Mar 30, 2021

basically the row key information at that point of time is already in the delete file written at that time.

@jackye1995 @rdblue Yes, I can understand that we've maintained the equality field ids inside each equality delete files and iterating records from an old snapshot should be correct because we've considered applying process for different equality fields ids.

But in this comment, I'm not discussing the data correctness (the data correctness has no problem). I mean people should be able to read the old RowKey from an older snapshot. In this issue #1029, we are proposing to attach the schema in snapshot so that users could create a complete table based on an existing snapshot by using CTAS or RTAS, in this scenarios I think we also need to use the old PartitionSpec, SortOrder, RowKey in the newly created table. Then at the iceberg table level, people could see the consistent table metadata.

Another case is rollback to an older snapshot by replacing the latest snapshot, though we currently do not support replacing schema/partition-spec/sort-order with the old one, but in my mind I think we'd better to because providing an uniformed view of data files, schema and other table metadata at the old timestamp t3 matches the expectations from end users.

That's why I recommended to track the multiple versions of RowKey in iceberg table metadata.

@openinx
Copy link
Member

openinx commented Mar 31, 2021

@rdblue Yes, if we only consider the records time-travel among different snapshots, the old versions of partitionSpec, sortOrder, rowIdentifier are indeed useless. Maybe I'm considering a slight different case (Still use the same example):

t1: User defines the account_id as the row identifier;
t2: Write few records into the table;
t3: Write few equality deletions (by account_id) into table;
t4: Adding profile_id to row identifier, now the identifier is account_id & profile_id;
t5: Write few equality deletions ( by account_id & profile_id) into table;

People modified the RowIdentifier from account_id to (account_id, profile_id) at timestamp t4, after writing few records at timestamp t5. He find that the (account_id, profile_id) deletions will lead to business error, so he plan to revert this iceberg table to t3 and replaying all deletions whose identifier are (account_id).

My question is: after reverting the table to t3, should people still see the incorrect row identifier (account_id, profile_id) by default or people should see the correct row identifier (account_id) by default ?

Currently, we implementation is the first one, that means people will need to manually change the (account_id, profile_id) to (account_id) . If we are very clear that we will continue to use this behavior in the future, then we really do not need to maintain multiple versions of the row key. Otherwise, maintaining multiple versions is necessary. From my understanding, the second behavior is actually more user-friendly.

@rdblue
Copy link
Contributor

rdblue commented Mar 31, 2021

My question is: after reverting the table to t3, should people still see the incorrect row identifier (account_id, profile_id) by default or people should see the correct row identifier (account_id) by default ?

I think I see the miscommunication. I don't think there is a way to roll back to t3. There is a snapshot created at t2, t3, and t5. Those snapshots are accessible via time travel and rollback. The rest of the table metadata is independent so rolling back doesn't change it. To revert both the bad write and the configuration change, the user should roll back and then set the row identifier fields to just account_id.

Keeping table metadata and data separate (and only versioning data) is the right behavior, I think. Data is constantly evolving and we don't want to accidentally revert metadata changes -- like updating table properties -- when the data snapshot is rolled back.

Consider a slightly different scenario where the rollback to t3 was needed because the source was producing bad data. Why should the profile_id be removed from the row identifier in that case? If Iceberg did that implicitly, then after the corrected data is turned back on, Iceberg would start deleting rows incorrectly using the wrong key.

I think the right approach is to keep data a separate dimension. Since we want Iceberg to be a coordination layer between multiple services that don't know about one another, I think it would be bad for actions that fix data to also make possibly unknown changes to metadata.

@aokolnychyi
Copy link
Contributor

Let me catch up with the discussion today.

@aokolnychyi
Copy link
Contributor

I support the idea of a row identifier as long as Iceberg does not enforce it. I see its primary usage in UPSERT statements where we don't know the upsert columns unless they are provided in the command.

I also think it is important not to limit equality deletes to row identifier alone, which is currently handled by the spec as each delete file is associated with arbitrary column ids. We plan to leverage it in some MERGE INTO use cases, where the we can derive the delete column from the ON clause and merge columns can vary from operation to operation.

W.r.t. versioning, I'd go simple. I think the current rollback semantics applies only to snapshots. We don't revert table properties or sort order. I believe we should treat row identifiers in the same way.

That said, @openinx's use case is also valid. I have seen scenarios when users want to rollback the table state completely rather the current snapshot. I think that should be done by replacing the current pointer in the catalog to an old JSON file rather than by calling the table rollback API. Do we want to expose ways for rolling back table state to the users? I think that may be handy and should cover the use case that @openinx brought up.

*/
public class RowKey implements Serializable {

private static final RowKey NOT_IDENTIFIED = new RowKey(null, 0, ImmutableList.of());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guava collections are not Kryo friendly. We have to be careful.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great point, because we've suffered from the kryo serialization issues in here https://github.com/apache/iceberg/pull/2343/files. Maybe we could provide an unit test to cover the RowKey kryo serialization cases ( Similar to TestDataFileSerialization).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, will do that

* Iceberg itself does not enforce row uniqueness based on this key.
* It is leveraged by operations such as streaming upsert.
*/
public class RowKey implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion but I'd go for RowIdentifier or RowId. I think Key means uniqueness but I'll be fine this way too as long as we agree Iceberg does not ensure uniqueness.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key does not always means uniqueness in my mind. from the MySQL document, index could be created on top of key columns and the index could choose to be unique or non-unique.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have gone back and forth on this naming, and for now I would prefer the Key case because Id is heavily used in table metadata to mean concepts such as spec-id, schema-id, order-id, etc. which are the increasing ID of different specs. Using a different keyword Key would provide more clarity in the table metadata.

Copy link
Contributor

@rdblue rdblue Apr 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Jack's logic that "id" is typically used in Iceberg to refer to a numeric identifier. It would be odd to use RowId, especially given the overlap with the JDBC one. But, we have had a significant number of people that find "key" confusing when it is a non-unique "key".

What about shifting the focus from the "key" or "identifier" to the fields? We could use identifier-field-ids to hold the collection and add identifierFieldIds to APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed class RowKeyField to RowKeyIdentifierField, and fields to identifier-fields in metadata. Please let me know if that feels better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I don't have strong opinion about the RowKeyField or RowKeyIdentifierField. I'm okay if you think it's good for one of them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be more clear, I don't think we should ignore the near consensus from our sync discussion that "key" is misleading. I think we should instead call this class IdentityFields (or something similar) and store identifier-field-ids in table metadata.

@openinx
Copy link
Member

openinx commented Apr 1, 2021

Okay, I think everyone has reached a consensus on this issue Keeping table metadata and data separate (and only versioning data) is the right behavior. Then let's keep this consensus. @aokolnychyi 's suggestion about replacing the current pointer in the catalog to an old JSON file rather than by calling the table rollback API. looks good to me, I think this way we can also achieve the rollback of the table metadata (for now, this priority does not sound that high because people could change table state as they want by calling table API).

I support the idea of a row identifier as long as Iceberg does not enforce it

As a common iceberg table specification, the row identifier don't have to be enforced. (I've left a comment here).

We plan to leverage it in some MERGE INTO use cases, where the we can derive the delete column from the ON clause and merge columns can vary from operation to operation.

I don't know much about this point, I guess you may want to use row identifier to achieve some optimizations at the spark engine level. Can you provide more information?

@jackye1995 , I think we could update this PR now, thanks for the great work

@jackye1995
Copy link
Contributor Author

I have seen scenarios when users want to rollback the table state completely rather the current snapshot. I think that should be done by replacing the current pointer in the catalog to an old JSON file rather than by calling the table rollback API.

+1 on this

@jackye1995 , I think we could update this PR now, thanks for the great work

sounds good, will do it now.

@jackye1995 jackye1995 force-pushed the row-id-api branch 2 times, most recently from e88c211 to 8acb134 Compare April 1, 2021 07:40
@jackye1995
Copy link
Contributor Author

@openinx @rdblue should be ready for review

@@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowKey;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to introduce a createTable method that could accept RowKey in this interface ? I rise this question because when I support flink SQL primary key , I find it's necessary method. Of course, we could publish separate PR for this if possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opening a separate issue for this is good enough for me now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, I thought about that, and there were 2 reasons that let me decide to not add it:

  1. currently both Spark and Hive SQL specification does not allow a primary key clause. I would expect the user to run something like ALTER TABLE ... ADD ROW KEY (actual syntax TBD) to add this row key in Spark.

  2. it's now becoming hard to iterate through all the combinations of parameters in createTable, we do not have the methods with SortOrder in parameter yet, although it can be mapped to SORTED BY clause. I would prefer us to switch to use the table builder if possible instead of adding all those overloading methods.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I agree it's good to use TableBuilder to createTable in future usage, as we are introducing more and more arguments when creating table.

@aokolnychyi
Copy link
Contributor

@openinx, what I meant is that we can figure out the upsert columns from the ON condition of MERGE INTO.

MERGE INTO t USING s
ON t.id = s.id
....
MERGE INTO t USING s
ON t.id = s.id AND t.account_id = s.account_id
....

That's different from upsert use cases where we don't have the upsert columns in the command itself.

@rdblue
Copy link
Contributor

rdblue commented Apr 1, 2021

I think this is now unrelated to this PR, but since discussion is happening here I want to mention it:

I have seen scenarios when users want to rollback the table state completely rather the current snapshot. I think that should be done by replacing the current pointer in the catalog to an old JSON file rather than by calling the table rollback API.

I don't agree with the direction of adding an API to roll back the JSON file itself. That approach discards relevant history, like the fact that after t5, the table had a bad snapshot. That history is relevant and valuable. Here are a few examples:

  1. Users can see how long the bad snapshot was the current
  2. Users can see what the snapshot ID was and find out which jobs read the bad snapshot
  3. Users can see what other changes were rolled back at the same time (e.g. a compaction was also rolled back)

If we want to support this use case, then I think we need to make an API that will roll back a table to some point in time. That would roll back the snapshot (preserving the snapshot log) and revert metadata changes. We could do this by having a rollback API that actually uses a transaction to make multiple different changes. I've been thinking about updating the SnapshotManager to use a transaction as well, which would allow cherry-picking multiple commits as a single operation.

There's a big more discussion that should happen here, but I'm open to the transaction approach. I just don't think rolling back the metadata file instead of moving forward and keeping history is a good idea.

RowKey actualKey = identifiedByX.rowKey();
Assert.assertEquals("Row key must have 1 field", 1, actualKey.identifierFields().size());
Assert.assertEquals("Row key must have the expected field",
org.apache.commons.compress.utils.Sets.newHashSet(1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wrong import? Also may worth confirming "x" in schema is indeed assigned as id 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah, Intellij keeps giving me this, I fixed a few and missed this one, thanks

@@ -482,6 +506,7 @@ public TableMetadata updateSchema(Schema newSchema, int newLastColumnId) {
// rebuild all of the partition specs and sort orders for the new current schema
List<PartitionSpec> updatedSpecs = Lists.transform(specs, spec -> updateSpecSchema(newSchema, spec));
List<SortOrder> updatedSortOrders = Lists.transform(sortOrders, order -> updateSortOrderSchema(newSchema, order));
RowKey updatedRowKey = updateRowKeySchema(newSchema, rowKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we fail if schema update drops column defined in the row key? probably want to add test for that too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, I was thinking about having another PR for blocking column drop for column in row key after this one, do you think it's better to have it here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be a relatively small code change in this class and some testing which might be easy to include in this same PR, unless you are thinking about something different?

Copy link
Contributor Author

@jackye1995 jackye1995 Apr 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about the following: 1 PR for the update related changes including update row key API, implementation, and this fix in schema update; 1PR for work in Spark for SQL extension; 1PR in Flink for primary key clause (if no one did it yet)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackye1995 your plan looks great !

@jackye1995
Copy link
Contributor Author

@rdblue I think all other people are fine with the PR, please let me know if there are any other comments.

Copy link
Member

@openinx openinx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@openinx
Copy link
Member

openinx commented Apr 7, 2021

@rdblue , Pls take a final look at this specification if you have a chance. It will be great if we could reach consistence on this PR and merge this into master branch. (Some other PRs are blocked by this PR).

* 1. a required column in the table schema
* 2. a primitive type column
*/
public class RowKeyIdentifierField implements Serializable {
Copy link
Contributor

@rdblue rdblue Apr 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a class that wraps a single ID. Could we get rid of it? Instead, IdentifierFields could expose idSet or ids that returns Collection<Integer>.

@rdblue
Copy link
Contributor

rdblue commented Apr 9, 2021

@jackye1995, looking at this, I think that we can simplify it quite a bit since we are no longer tracking multiple versions of the identifier fields.

I started by looking at RowKeyIdentifierField, which is basically an Integer, then at RowKey, which is basically a Collection<Integer> or Set<Integer>. I think that we can remove those two classes fairly easily. Next, this requires quite a few API changes to expose the row key along side a schema, when I think most SQL tables consider those part of the same thing. If we also consider the identifier fields to be part of the schema, then we no longer need so many changes.

For example, we can reuse the UpdateSchema operation to set the schema's identifier fields, setIdentifierFields(String...). We can also expose Schema.identifierFieldIds() to access them. Not requiring a new operation on Table or modifying TableMetadata is a plus. But it also makes sense to do this: then we could have a single operation that adds profile_id and also adds it to the table's identifier fields at the same time.

Combining the identifier fields with schema also causes them to be versioned with schema -- we'd need to update Schema so that the identifier fields are part of equality -- but then we would be able to detect cases like @openinx raised. While we wouldn't roll back the schema/identifier update at the same time as the snapshot without a transaction, this would allow us to easily detect that the bad snapshot used the updated identifier, or detect which snapshots in a table used an updated identifier.

@jackye1995
Copy link
Contributor Author

Thanks for the comment Ryan, I agree that this can be simplified into a part of the schema itself, something like:

{
      "type": "struct",
      "schema-id": 0,
      "row-identifiers": [
        1
      ],
      "fields": [
        {
          "id": 1,
          "name": "x",
          "required": true,
          "type": "long"
        }
      ]
    }

Originally we were trying to add this as a component of table metadata because of (1) benefit of versioning, (2) possibility to extend each row-identifying field for more potential use cases. Since we all agree that (2) is not likely, and we can also get versioning by adding it into schema, I think this is a good approach to go.

I will update this PR tomorrow, @openinx meanwhile please let me know if you have any concern with the new approach.

@openinx
Copy link
Member

openinx commented Apr 12, 2021

Thanks @rdblue & @jackye1995's comment, I'm okay about the simplest way to implement the row identifier specification. Let's push this work forward. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants