Skip to content

Core: Add primary key spec. #2010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 219 additions & 0 deletions api/src/main/java/org/apache/iceberg/PrimaryKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

/**
* A primary key that defines which columns will be unique in this table.
*/
public class PrimaryKey implements Serializable {

private static final PrimaryKey NON_PRIMARY_KEY = new PrimaryKey(null, 0, false, ImmutableList.of());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 is reserved for NON_PRIMARY_KEY, the newly created primary key's key id will start from 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent with PartitionSpec and SortOrder, better to use new PrimaryKey(new Schema(), ...)


private final Schema schema;
private final int keyId;
private final boolean enforceUniqueness;
private final Integer[] sourceIds;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw there'is customized Transform for each fields in PartitionSpec and SortOrder. The PartitionSpec needs it because of hidden partition feature, the SortOder need it because of NullOrder to determinate whether null should be sort first or last.

For primary key, we don't have null columns and I did not think of other reasons that we will need an extra Transform, so I just add the sourceId here.


private transient volatile List<Integer> sourceIdList;

private PrimaryKey(Schema schema, int keyId, boolean enforceUniqueness, List<Integer> sourceIds) {
this.schema = schema;
this.keyId = keyId;
this.enforceUniqueness = enforceUniqueness;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should add have enforceUniqueness in Iceberg API, because even if a user specifies primary key must be enforced, it is hard to enforce it at engine level. Primary key should always be enforced at "best effort". If Flink can enforce it through upsert, I feel it should be a flag in Flink to do so.

this.sourceIds = sourceIds.toArray(new Integer[0]);
}

/**
* Returns the {@link Schema} for this primary key.
*/
public Schema schema() {
return schema;
}

/**
* Returns this ID of this primary key.
*/
public int keyId() {
return keyId;
}

/**
* Returns true if the uniqueness should be guaranteed when writing iceberg table.
*/
public boolean enforceUniqueness() {
return enforceUniqueness;
}

/**
* Returns the list of source field ids for this primary key.
*/
public List<Integer> sourceIds() {
if (sourceIdList == null) {
synchronized (this) {
if (sourceIdList == null) {
this.sourceIdList = ImmutableList.copyOf(sourceIds);
}
}
}
return sourceIdList;
}

/**
* Returns true if the primary key has no column.
*/
public boolean isNonPrimaryKey() {
return sourceIds.length == 0;
}

/**
* Returns a dummy primary key that has no column.
*/
public static PrimaryKey nonPrimaryKey() {
return NON_PRIMARY_KEY;
}

/**
* Checks whether this primary key is equivalent to another primary key while ignoring the primary key id.
*
* @param other a different primary key.
* @return true if this key is equivalent to the given key.
*/
public boolean samePrimaryKey(PrimaryKey other) {
return Arrays.equals(sourceIds, other.sourceIds) && enforceUniqueness == other.enforceUniqueness;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
} else if (!(other instanceof PrimaryKey)) {
return false;
}

PrimaryKey that = (PrimaryKey) other;
if (this.keyId != that.keyId) {
return false;
}

if (this.enforceUniqueness != that.enforceUniqueness) {
return false;
}

return Arrays.equals(sourceIds, that.sourceIds);
}

@Override
public int hashCode() {
return Objects.hashCode(keyId, enforceUniqueness ? 1 : 0, Arrays.hashCode(sourceIds));
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("keyId", keyId)
.add("enforceUniqueness", enforceUniqueness)
.add("sourceIds", sourceIds())
.toString();
}

/**
* Creates a new {@link Builder primary key builder} for the given {@link Schema}.
*
* @param schema a schema
* @return a primary key builder for the given schema.
*/
public static Builder builderFor(Schema schema) {
return new Builder(schema);
}

/**
* A builder to create valid {@link PrimaryKey primary keys}. Call {@link #builderFor(Schema)} to create a new
* builder.
*/
public static class Builder {
private final Schema schema;
private final List<Integer> sourceIds = Lists.newArrayList();
// Default ID to 1 as 0 is reserved for non primary key.
private int keyId = 1;
private boolean enforceUniqueness = false;

private Builder(Schema schema) {
this.schema = schema;
}

public Builder withKeyId(int newKeyId) {
this.keyId = newKeyId;
return this;
}

public Builder withEnforceUniqueness(boolean enable) {
this.enforceUniqueness = enable;
return this;
}

public Builder addField(String name) {
Types.NestedField column = schema.findField(name);

Preconditions.checkNotNull(column, "Cannot find source column: %s", name);
Preconditions.checkArgument(column.isRequired(), "Cannot add optional source field to primary key: %s", name);

Type sourceType = column.type();
ValidationException.check(sourceType.isPrimitiveType(), "Cannot add non-primitive field: %s", sourceType);

sourceIds.add(column.fieldId());
return this;
}

public Builder addField(int sourceId) {
Types.NestedField column = schema.findField(sourceId);
Preconditions.checkNotNull(column, "Cannot find source column: %s", sourceId);
Preconditions.checkArgument(column.isRequired(), "Cannot add optional source field to primary key: %s", sourceId);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other databases (such as MySQL) also do not allow to add nullable columns in the primary keys. see https://dev.mysql.com/doc/refman/8.0/en/create-table.html:

A unique index where all key columns must be defined as NOT NULL. If they are not explicitly declared as NOT NULL, MySQL declares them so implicitly (and silently). A table can have only one PRIMARY KEY. The name of a PRIMARY KEY is always PRIMARY, which thus cannot be used as the name for any other kind of index.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, primary key should be non-null.
nit: duplicated logic with L183, can you refactor the checks?


Type sourceType = column.type();
ValidationException.check(sourceType.isPrimitiveType(), "Cannot add non-primitive field: %s", sourceType);

sourceIds.add(sourceId);
return this;
}

public PrimaryKey build() {
if (keyId == 0 && sourceIds.size() != 0) {
throw new IllegalArgumentException("Primary key ID 0 is reserved for non-primary key");
}
if (sourceIds.size() == 0 && keyId != 0) {
throw new IllegalArgumentException("Non-primary key ID must be 0");
}

return new PrimaryKey(schema, keyId, enforceUniqueness, sourceIds);
}
}
}
14 changes: 14 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@ default String name() {
*/
Map<Integer, SortOrder> sortOrders();

/**
* Return the {@link PrimaryKey primary key} for this table.
*
* @return this table's primary key.
*/
PrimaryKey primaryKey();

/**
* Return a map of primary key IDs to {@link PrimaryKey primary keys} for this table.
*
* @return this table's primary keys map.
*/
Map<Integer, PrimaryKey> primaryKeys();

/**
* Return a map of string properties for this table.
*
Expand Down
3 changes: 2 additions & 1 deletion api/src/main/java/org/apache/iceberg/Tables.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ default Table create(Schema schema, PartitionSpec spec, String tableIdentifier)
}

default Table create(Schema schema, PartitionSpec spec, Map<String, String> properties, String tableIdentifier) {
return create(schema, spec, SortOrder.unsorted(), properties, tableIdentifier);
return create(schema, spec, SortOrder.unsorted(), PrimaryKey.nonPrimaryKey(), properties, tableIdentifier);
}

default Table create(Schema schema,
PartitionSpec spec,
SortOrder order,
PrimaryKey primaryKey,
Map<String, String> properties,
String tableIdentifier) {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement create with a sort order");
Expand Down
9 changes: 9 additions & 0 deletions api/src/main/java/org/apache/iceberg/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PrimaryKey;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -360,6 +361,14 @@ interface TableBuilder {
*/
TableBuilder withSortOrder(SortOrder sortOrder);

/**
* Sets a primary key for this table.
*
* @param primaryKey a primary key
* @return this for method chaining
*/
TableBuilder withPrimaryKey(PrimaryKey primaryKey);

/**
* Sets a location for the table.
*
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
abstract class BaseMetadataTable implements Table {
private final PartitionSpec spec = PartitionSpec.unpartitioned();
private final SortOrder sortOrder = SortOrder.unsorted();
private final PrimaryKey primaryKey = PrimaryKey.nonPrimaryKey();

abstract Table table();

Expand Down Expand Up @@ -77,6 +78,16 @@ public Map<Integer, SortOrder> sortOrders() {
return ImmutableMap.of(sortOrder.orderId(), sortOrder);
}

@Override
public PrimaryKey primaryKey() {
return primaryKey;
}

@Override
public Map<Integer, PrimaryKey> primaryKeys() {
return ImmutableMap.of(primaryKey.keyId(), primaryKey);
}

@Override
public Map<String, String> properties() {
return ImmutableMap.of();
Expand Down
19 changes: 15 additions & 4 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ protected class BaseMetastoreCatalogTableBuilder implements TableBuilder {
private final ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builder();
private PartitionSpec spec = PartitionSpec.unpartitioned();
private SortOrder sortOrder = SortOrder.unsorted();
private PrimaryKey primaryKey = PrimaryKey.nonPrimaryKey();
private String location = null;

public BaseMetastoreCatalogTableBuilder(TableIdentifier identifier, Schema schema) {
Expand All @@ -180,6 +181,12 @@ public TableBuilder withSortOrder(SortOrder newSortOrder) {
return this;
}

@Override
public TableBuilder withPrimaryKey(PrimaryKey newPrimaryKey) {
this.primaryKey = newPrimaryKey != null ? newPrimaryKey : PrimaryKey.nonPrimaryKey();
return this;
}

@Override
public TableBuilder withLocation(String newLocation) {
this.location = newLocation;
Expand Down Expand Up @@ -209,7 +216,8 @@ public Table create() {

String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
Map<String, String> properties = propertiesBuilder.build();
TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, properties);
TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, primaryKey,
baseLocation, properties);

try {
ops.commit(null, metadata);
Expand All @@ -229,7 +237,8 @@ public Transaction createTransaction() {

String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
Map<String, String> properties = propertiesBuilder.build();
TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, properties);
TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, primaryKey,
baseLocation, properties);
return Transactions.createTableTransaction(identifier.toString(), ops, metadata);
}

Expand All @@ -252,10 +261,12 @@ private Transaction newReplaceTableTransaction(boolean orCreate) {
TableMetadata metadata;
if (ops.current() != null) {
String baseLocation = location != null ? location : ops.current().location();
metadata = ops.current().buildReplacement(schema, spec, sortOrder, baseLocation, propertiesBuilder.build());
metadata = ops.current().buildReplacement(schema, spec, sortOrder, primaryKey,
baseLocation, propertiesBuilder.build());
} else {
String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, propertiesBuilder.build());
metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, primaryKey,
baseLocation, propertiesBuilder.build());
}

if (orCreate) {
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ public Map<Integer, SortOrder> sortOrders() {
return ops.current().sortOrdersById();
}

@Override
public PrimaryKey primaryKey() {
return ops.current().primaryKey();
}

@Override
public Map<Integer, PrimaryKey> primaryKeys() {
return ops.current().primaryKeysById();
}

@Override
public Map<String, String> properties() {
return ops.current().properties();
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,16 @@ public Map<Integer, SortOrder> sortOrders() {
return current.sortOrdersById();
}

@Override
public PrimaryKey primaryKey() {
return current.primaryKey();
}

@Override
public Map<Integer, PrimaryKey> primaryKeys() {
return current.primaryKeysById();
}

@Override
public Map<String, String> properties() {
return current.properties();
Expand Down
Loading