Skip to content

Commit c7323f9

Browse files
authored
Merge branch 'master' into tablemetadatabuilder
2 parents afa1cca + ec9a86b commit c7323f9

16 files changed

+708
-473
lines changed

.java-version

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
1.8

build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -557,8 +557,10 @@ project(':iceberg-nessie') {
557557
testImplementation "org.projectnessie:nessie-jaxrs-testextension"
558558
// Need to "pull in" el-api explicitly :(
559559
testImplementation "jakarta.el:jakarta.el-api"
560+
testImplementation 'org.assertj:assertj-core'
560561

561562
compileOnly "org.apache.hadoop:hadoop-common"
563+
testImplementation "org.apache.avro:avro"
562564

563565
testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
564566
}

core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Set;
2323
import java.util.UUID;
2424
import java.util.concurrent.atomic.AtomicReference;
25+
import java.util.function.Function;
2526
import java.util.function.Predicate;
2627
import org.apache.iceberg.encryption.EncryptionManager;
2728
import org.apache.iceberg.exceptions.CommitFailedException;
@@ -166,6 +167,12 @@ protected void refreshFromMetadataLocation(String newLocation, int numRetries) {
166167

167168
protected void refreshFromMetadataLocation(String newLocation, Predicate<Exception> shouldRetry,
168169
int numRetries) {
170+
refreshFromMetadataLocation(newLocation, shouldRetry, numRetries,
171+
metadataLocation -> TableMetadataParser.read(io(), metadataLocation));
172+
}
173+
174+
protected void refreshFromMetadataLocation(String newLocation, Predicate<Exception> shouldRetry,
175+
int numRetries, Function<String, TableMetadata> metadataLoader) {
169176
// use null-safe equality check because new tables have a null metadata location
170177
if (!Objects.equal(currentMetadataLocation, newLocation)) {
171178
LOG.info("Refreshing table metadata from new version: {}", newLocation);
@@ -175,8 +182,7 @@ protected void refreshFromMetadataLocation(String newLocation, Predicate<Excepti
175182
.retry(numRetries).exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */)
176183
.throwFailureWhenFinished()
177184
.shouldRetryTest(shouldRetry)
178-
.run(metadataLocation -> newMetadata.set(
179-
TableMetadataParser.read(io(), metadataLocation)));
185+
.run(metadataLocation -> newMetadata.set(metadataLoader.apply(metadataLocation)));
180186

181187
String newUUID = newMetadata.get().uuid();
182188
if (currentMetadata != null && currentMetadata.uuid() != null && newUUID != null) {

core/src/main/java/org/apache/iceberg/TableMetadata.java

+58
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,64 @@ public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {
677677
.build();
678678
}
679679

680+
/**
681+
* Returns an updated {@link TableMetadata} with the current-snapshot-ID set to the given
682+
* snapshot-ID and the snapshot-log reset to contain only the snapshot with the given snapshot-ID.
683+
*
684+
* @param snapshotId ID of a snapshot that must exist, or {@code -1L} to remove the current snapshot
685+
* and return an empty snapshot log.
686+
* @return {@link TableMetadata} with updated {@link #currentSnapshotId} and {@link #snapshotLog}
687+
*/
688+
public TableMetadata withCurrentSnapshotOnly(long snapshotId) {
689+
if ((currentSnapshotId == -1L && snapshotId == -1L && snapshots.isEmpty()) ||
690+
(currentSnapshotId == snapshotId && snapshots.size() == 1)) {
691+
return this;
692+
}
693+
List<HistoryEntry> newSnapshotLog = Lists.newArrayList();
694+
if (snapshotId != -1L) {
695+
Snapshot snapshot = snapshotsById.get(snapshotId);
696+
Preconditions.checkArgument(snapshot != null, "Non-existent snapshot");
697+
newSnapshotLog.add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshotId));
698+
}
699+
return new TableMetadata(null, formatVersion, uuid, location,
700+
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
701+
lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, snapshotId,
702+
snapshots, newSnapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
703+
}
704+
705+
public TableMetadata withCurrentSchema(int schemaId) {
706+
if (currentSchemaId == schemaId) {
707+
return this;
708+
}
709+
Preconditions.checkArgument(schemasById.containsKey(schemaId), "Non-existent schema");
710+
return new TableMetadata(null, formatVersion, uuid, location,
711+
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schemaId, schemas, defaultSpecId, specs,
712+
lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
713+
snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
714+
}
715+
716+
public TableMetadata withDefaultSortOrder(int sortOrderId) {
717+
if (defaultSortOrderId == sortOrderId) {
718+
return this;
719+
}
720+
Preconditions.checkArgument(sortOrdersById.containsKey(sortOrderId), "Non-existent sort-order");
721+
return new TableMetadata(null, formatVersion, uuid, location,
722+
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
723+
lastAssignedPartitionId, sortOrderId, sortOrders, properties, currentSnapshotId,
724+
snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
725+
}
726+
727+
public TableMetadata withDefaultSpec(int specId) {
728+
if (defaultSpecId == specId) {
729+
return this;
730+
}
731+
Preconditions.checkArgument(specsById.containsKey(specId), "Non-existent partition spec");
732+
return new TableMetadata(null, formatVersion, uuid, location,
733+
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, specId, specs,
734+
lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
735+
snapshots, snapshotLog, addPreviousFile(metadataFileLocation, lastUpdatedMillis));
736+
}
737+
680738
private PartitionSpec reassignPartitionIds(PartitionSpec partitionSpec, TypeUtil.NextID nextID) {
681739
PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(partitionSpec.schema())
682740
.withSpecId(partitionSpec.specId());

dev/.rat-excludes

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
version.txt
22
versions.lock
33
versions.props
4+
.java-version
45
books.json
56
new-books.json
67
build

nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java

+70-53
Original file line numberDiff line numberDiff line change
@@ -41,25 +41,27 @@
4141
import org.apache.iceberg.exceptions.NoSuchTableException;
4242
import org.apache.iceberg.hadoop.HadoopFileIO;
4343
import org.apache.iceberg.io.FileIO;
44+
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
4445
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
46+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
4547
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
4648
import org.apache.iceberg.util.Tasks;
47-
import org.projectnessie.api.TreeApi;
48-
import org.projectnessie.api.params.EntriesParams;
49-
import org.projectnessie.client.NessieClient;
5049
import org.projectnessie.client.NessieConfigConstants;
50+
import org.projectnessie.client.api.CommitMultipleOperationsBuilder;
51+
import org.projectnessie.client.api.NessieApiV1;
52+
import org.projectnessie.client.http.HttpClientBuilder;
5153
import org.projectnessie.client.http.HttpClientException;
5254
import org.projectnessie.error.BaseNessieClientServerException;
5355
import org.projectnessie.error.NessieConflictException;
5456
import org.projectnessie.error.NessieNotFoundException;
5557
import org.projectnessie.model.Branch;
56-
import org.projectnessie.model.Contents;
58+
import org.projectnessie.model.Content;
59+
import org.projectnessie.model.ContentKey;
5760
import org.projectnessie.model.IcebergTable;
58-
import org.projectnessie.model.ImmutableDelete;
59-
import org.projectnessie.model.ImmutableOperations;
60-
import org.projectnessie.model.ImmutablePut;
61-
import org.projectnessie.model.Operations;
61+
import org.projectnessie.model.Operation;
6262
import org.projectnessie.model.Reference;
63+
import org.projectnessie.model.TableReference;
64+
import org.projectnessie.model.Tag;
6365
import org.slf4j.Logger;
6466
import org.slf4j.LoggerFactory;
6567

@@ -75,7 +77,7 @@
7577
public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable, SupportsNamespaces, Configurable {
7678
private static final Logger logger = LoggerFactory.getLogger(NessieCatalog.class);
7779
private static final Joiner SLASH = Joiner.on("/");
78-
private NessieClient client;
80+
private NessieApiV1 api;
7981
private String warehouseLocation;
8082
private Configuration config;
8183
private UpdateableReference reference;
@@ -95,7 +97,8 @@ public void initialize(String inputName, Map<String, String> options) {
9597
// remove nessie prefix
9698
final Function<String, String> removePrefix = x -> x.replace("nessie.", "");
9799

98-
this.client = NessieClient.builder().fromConfig(x -> options.get(removePrefix.apply(x))).build();
100+
this.api = HttpClientBuilder.builder().fromConfig(x -> options.get(removePrefix.apply(x)))
101+
.build(NessieApiV1.class);
99102

100103
this.warehouseLocation = options.get(CatalogProperties.WAREHOUSE_LOCATION);
101104
if (warehouseLocation == null) {
@@ -120,12 +123,12 @@ public void initialize(String inputName, Map<String, String> options) {
120123
throw new IllegalStateException("Parameter 'warehouse' not set, Nessie can't store data.");
121124
}
122125
final String requestedRef = options.get(removePrefix.apply(NessieConfigConstants.CONF_NESSIE_REF));
123-
this.reference = loadReference(requestedRef);
126+
this.reference = loadReference(requestedRef, null);
124127
}
125128

126129
@Override
127130
public void close() {
128-
client.close();
131+
api.close();
129132
}
130133

131134
@Override
@@ -135,15 +138,17 @@ public String name() {
135138

136139
@Override
137140
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
138-
TableReference pti = TableReference.parse(tableIdentifier);
141+
TableReference tr = TableReference.parse(tableIdentifier.name());
142+
Preconditions.checkArgument(!tr.hasTimestamp(), "Invalid table name: # is only allowed for hashes (reference by " +
143+
"timestamp is not supported)");
139144
UpdateableReference newReference = this.reference;
140-
if (pti.reference() != null) {
141-
newReference = loadReference(pti.reference());
145+
if (tr.getReference() != null) {
146+
newReference = loadReference(tr.getReference(), tr.getHash());
142147
}
143148
return new NessieTableOperations(
144-
NessieUtil.toKey(pti.tableIdentifier()),
149+
ContentKey.of(org.projectnessie.model.Namespace.of(tableIdentifier.namespace().levels()), tr.getName()),
145150
newReference,
146-
client,
151+
api,
147152
fileIO,
148153
catalogOptions);
149154
}
@@ -170,23 +175,27 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {
170175
return false;
171176
}
172177

173-
Operations contents = ImmutableOperations.builder()
174-
.addOperations(ImmutableDelete.builder().key(NessieUtil.toKey(identifier)).build())
175-
.commitMeta(NessieUtil.buildCommitMetadata(String.format("iceberg delete table '%s'", identifier),
178+
if (purge) {
179+
logger.info("Purging data for table {} was set to true but is ignored", identifier.toString());
180+
}
181+
182+
CommitMultipleOperationsBuilder commitBuilderBase = api.commitMultipleOperations()
183+
.commitMeta(NessieUtil.buildCommitMetadata(String.format("Iceberg delete table %s", identifier),
176184
catalogOptions))
177-
.build();
185+
.operation(Operation.Delete.of(NessieUtil.toKey(identifier)));
178186

179187
// We try to drop the table. Simple retry after ref update.
180188
boolean threw = true;
181189
try {
182-
Tasks.foreach(contents)
190+
Tasks.foreach(commitBuilderBase)
183191
.retry(5)
184192
.stopRetryOn(NessieNotFoundException.class)
185193
.throwFailureWhenFinished()
186-
.onFailure((c, exception) -> refresh())
187-
.run(c -> {
188-
Branch branch = client.getTreeApi().commitMultipleOperations(reference.getAsBranch().getName(),
189-
reference.getHash(), c);
194+
.onFailure((o, exception) -> refresh())
195+
.run(commitBuilder -> {
196+
Branch branch = commitBuilder
197+
.branch(reference.getAsBranch())
198+
.commit();
190199
reference.updateReference(branch);
191200
}, BaseNessieClientServerException.class);
192201
threw = false;
@@ -215,24 +224,22 @@ public void renameTable(TableIdentifier from, TableIdentifier toOriginal) {
215224
throw new AlreadyExistsException("table %s already exists", to.name());
216225
}
217226

218-
Operations contents = ImmutableOperations.builder()
219-
.addOperations(
220-
ImmutablePut.builder().key(NessieUtil.toKey(to)).contents(existingFromTable).build(),
221-
ImmutableDelete.builder().key(NessieUtil.toKey(from)).build())
222-
.commitMeta(NessieUtil.buildCommitMetadata(String.format("iceberg rename table from '%s' to '%s'",
223-
from, to),
224-
catalogOptions))
225-
.build();
227+
CommitMultipleOperationsBuilder operations = api.commitMultipleOperations()
228+
.commitMeta(NessieUtil.buildCommitMetadata(String.format("Iceberg rename table from '%s' to '%s'",
229+
from, to), catalogOptions))
230+
.operation(Operation.Put.of(NessieUtil.toKey(to), existingFromTable, existingFromTable))
231+
.operation(Operation.Delete.of(NessieUtil.toKey(from)));
226232

227233
try {
228-
Tasks.foreach(contents)
234+
Tasks.foreach(operations)
229235
.retry(5)
230236
.stopRetryOn(NessieNotFoundException.class)
231237
.throwFailureWhenFinished()
232-
.onFailure((c, exception) -> refresh())
233-
.run(c -> {
234-
Branch branch = client.getTreeApi().commitMultipleOperations(reference.getAsBranch().getName(),
235-
reference.getHash(), c);
238+
.onFailure((o, exception) -> refresh())
239+
.run(ops -> {
240+
Branch branch = ops
241+
.branch(reference.getAsBranch())
242+
.commit();
236243
reference.updateReference(branch);
237244
}, BaseNessieClientServerException.class);
238245
} catch (NessieNotFoundException e) {
@@ -322,37 +329,46 @@ public Configuration getConf() {
322329
return config;
323330
}
324331

325-
TreeApi getTreeApi() {
326-
return client.getTreeApi();
327-
}
328-
329332
public void refresh() throws NessieNotFoundException {
330-
reference.refresh();
333+
reference.refresh(api);
331334
}
332335

333336
public String currentHash() {
334337
return reference.getHash();
335338
}
336339

340+
@VisibleForTesting
337341
String currentRefName() {
338342
return reference.getName();
339343
}
340344

345+
@VisibleForTesting
346+
FileIO fileIO() {
347+
return fileIO;
348+
}
349+
341350
private IcebergTable table(TableIdentifier tableIdentifier) {
342351
try {
343-
Contents table = client.getContentsApi()
344-
.getContents(NessieUtil.toKey(tableIdentifier), reference.getName(), reference.getHash());
345-
return table.unwrap(IcebergTable.class).orElse(null);
352+
ContentKey key = NessieUtil.toKey(tableIdentifier);
353+
Content table = api.getContent().key(key).reference(reference.getReference()).get().get(key);
354+
return table != null ? table.unwrap(IcebergTable.class).orElse(null) : null;
346355
} catch (NessieNotFoundException e) {
347356
return null;
348357
}
349358
}
350359

351-
private UpdateableReference loadReference(String requestedRef) {
360+
private UpdateableReference loadReference(String requestedRef, String hash) {
352361
try {
353-
Reference ref = requestedRef == null ? client.getTreeApi().getDefaultBranch()
354-
: client.getTreeApi().getReferenceByName(requestedRef);
355-
return new UpdateableReference(ref, client.getTreeApi());
362+
Reference ref = requestedRef == null ? api.getDefaultBranch()
363+
: api.getReference().refName(requestedRef).get();
364+
if (hash != null) {
365+
if (ref instanceof Branch) {
366+
ref = Branch.of(ref.getName(), hash);
367+
} else {
368+
ref = Tag.of(ref.getName(), hash);
369+
}
370+
}
371+
return new UpdateableReference(ref, hash != null);
356372
} catch (NessieNotFoundException ex) {
357373
if (requestedRef != null) {
358374
throw new IllegalArgumentException(String.format(
@@ -369,8 +385,9 @@ private UpdateableReference loadReference(String requestedRef) {
369385

370386
private Stream<TableIdentifier> tableStream(Namespace namespace) {
371387
try {
372-
return client.getTreeApi()
373-
.getEntries(reference.getName(), EntriesParams.builder().hashOnRef(reference.getHash()).build())
388+
return api.getEntries()
389+
.reference(reference.getReference())
390+
.get()
374391
.getEntries()
375392
.stream()
376393
.filter(NessieUtil.namespacePredicate(namespace))

0 commit comments

Comments
 (0)