-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Add primary key spec. #2010
Conversation
public Builder addField(int sourceId) { | ||
Types.NestedField column = schema.findField(sourceId); | ||
Preconditions.checkNotNull(column, "Cannot find source column: %s", sourceId); | ||
Preconditions.checkArgument(column.isRequired(), "Cannot add optional source field to primary key: %s", sourceId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other databases (such as MySQL) also do not allow to add nullable
columns in the primary keys. see https://dev.mysql.com/doc/refman/8.0/en/create-table.html:
A unique index where all key columns must be defined as NOT NULL. If they are not explicitly declared as NOT NULL, MySQL declares them so implicitly (and silently). A table can have only one PRIMARY KEY. The name of a PRIMARY KEY is always PRIMARY, which thus cannot be used as the name for any other kind of index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, primary key should be non-null.
nit: duplicated logic with L183, can you refactor the checks?
*/ | ||
public class PrimaryKey implements Serializable { | ||
|
||
private static final PrimaryKey NON_PRIMARY_KEY = new PrimaryKey(null, 0, false, ImmutableList.of()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0
is reserved for NON_PRIMARY_KEY
, the newly created primary key's key id will start from 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be consistent with PartitionSpec
and SortOrder
, better to use new PrimaryKey(new Schema(), ...)
private final Schema schema; | ||
private final int keyId; | ||
private final boolean enforceUniqueness; | ||
private final Integer[] sourceIds; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw there'is customized Transform
for each fields in PartitionSpec
and SortOrder
. The PartitionSpec
needs it because of hidden partition feature, the SortOder
need it because of NullOrder
to determinate whether null should be sort first or last.
For primary key, we don't have null columns and I did not think of other reasons that we will need an extra Transform
, so I just add the sourceId
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing in the discussion was whether we should call this primary key
or something like a default equality key
. Saying primary key would make people feel the uniqueness is enforced, but it actually cannot enforce it. I don't have a strong opinion on this, but I constantly hear customers using Redshift complaining about this. Also, once we introduce primary key, there will definitely be an ask to have a foreign key concept. Just something to think about if we decide to go with this approach.
*/ | ||
public class PrimaryKey implements Serializable { | ||
|
||
private static final PrimaryKey NON_PRIMARY_KEY = new PrimaryKey(null, 0, false, ImmutableList.of()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be consistent with PartitionSpec
and SortOrder
, better to use new PrimaryKey(new Schema(), ...)
private PrimaryKey(Schema schema, int keyId, boolean enforceUniqueness, List<Integer> sourceIds) { | ||
this.schema = schema; | ||
this.keyId = keyId; | ||
this.enforceUniqueness = enforceUniqueness; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should add have enforceUniqueness
in Iceberg API, because even if a user specifies primary key must be enforced, it is hard to enforce it at engine level. Primary key should always be enforced at "best effort". If Flink can enforce it through upsert, I feel it should be a flag in Flink to do so.
public Builder addField(int sourceId) { | ||
Types.NestedField column = schema.findField(sourceId); | ||
Preconditions.checkNotNull(column, "Cannot find source column: %s", sourceId); | ||
Preconditions.checkArgument(column.isRequired(), "Cannot add optional source field to primary key: %s", sourceId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, primary key should be non-null.
nit: duplicated logic with L183, can you refactor the checks?
@@ -242,6 +253,7 @@ public static TableMetadata read(FileIO io, InputFile file) { | |||
} | |||
} | |||
|
|||
@SuppressWarnings("checkstyle:CyclomaticComplexity") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should probably refactor the method into methods like parsePartitionSpecs
, parseSortOrders
and parsePrimaryKeys
, but it is not urgent, we can do it in another PR.
@@ -125,17 +126,21 @@ private Table loadMetadataTable(String location, String metadataTableName, Metad | |||
* Create a table using the FileSystem implementation resolve from | |||
* location. | |||
* | |||
* @param schema iceberg schema used to create the table | |||
* @param spec partitioning spec, if null the table will be unpartitioned | |||
* @param schema iceberg schema used to create the table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unnecessary change in spacing
+1 for having Vertica has good balance between both it enforces/checks constraints during the read,join and not checks it during insert. it could be good feature to have ANALYZE_CONSTRAINTS command with it user can validate constraint manually on a table. for-example after insert. in-case of enforcement it could be good idea to have flag on table level constraints ENABLED/DISABLED
|
Okay, after reconsidered the primary key uniqueness issues, it's hard to guarantee the uniqueness in an embedded table format lib, If both the spark job and flink streaming job are writing the same iceberg table I couldn't think of a good and efficient way to guarantee the uniqueness of primary key. If we have an online server in front of those data files, then it's will be easy to guarantee the uniqueness because all of the write requests will be send to the same online server and the server could decide how to reject those duplicated write request, while for an iceberg table format it's hard to synchronize between different computation job. So I'm fine to introduce the primary key without enforced uniqueness. @jackye1995 Did you start this work in your repo ? Should we update this PR based the above discussion ? ( Sorry about the delay). |
thanks for the reply! yup I have a version locally that I am working on, which includes changes similar to this PR without the uniqueness constraint. I will publish it tomorrow or the day after tomorrow. |
@jackye1995 Looks great ! Then I will close this PR if you publish it, will be glad to be a reviewer for your patch, thanks ! |
hi @jackye1995 What is the status of this primary key spec? |
Close this now because we've had a PR #2354 |
In this discussion, we are planing to introduce primary key for supporting equality deletes. We have introduced the equality field ids when developing the equality deletes feature in apache iceberg core. The equality field ids is actually the similar meaning to primary key, but we did not define it in table-level.
The primary key spec of this PR is resolving the problem, we will add the primary key spec in table-level, then :
equality field ids
(Just use the SQL'sprimary key
)when writing SQL to export cdc events into iceberg table.Currently, I've added the basic changes in this PR, the unit tests is still in-progress.