Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark;

import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.ExceptionUtil;

/**
* utility class to accept thread local commit properties
*/
public class CommitMetadata {

private CommitMetadata() {

}

private static final ThreadLocal<Map<String, String>> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of);

/**
* running the code wrapped as a caller, and any snapshot committed within the callable object will be attached with
* the metadata defined in properties
* @param properties extra commit metadata to attach to the snapshot committed within callable
* @param callable the code to be executed
* @param exClass the expected type of exception which would be thrown from callable
*/
public static <R, E extends Exception> R withCommitProperties(
Map<String, String> properties, Callable<R> callable, Class<E> exClass) throws E {
COMMIT_PROPERTIES.set(properties);
try {
return callable.call();
} catch (Throwable e) {
ExceptionUtil.castAndThrow(e, exClass);
return null;
} finally {
COMMIT_PROPERTIES.set(ImmutableMap.of());
}
}

public static Map<String, String> commitProperties() {
return COMMIT_PROPERTIES.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.util.PropertyUtil;
Expand Down Expand Up @@ -174,6 +175,10 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
extraSnapshotMetadata.forEach(operation::set);
}

if (!CommitMetadata.commitProperties().isEmpty()) {
CommitMetadata.commitProperties().forEach(operation::set);
}

if (isWapTable() && wapId != null) {
// write-audit-publish is enabled for this table and job
// stage the changes without changing the current snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
Expand All @@ -31,13 +32,16 @@
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -404,4 +408,41 @@ public void testExtraSnapshotMetadata() throws IOException {
Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
}

@Test
public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();
HadoopTables tables = new HadoopTables(CONF);

Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);

List<SimpleRecord> expectedRecords = Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(2, "b")
);
Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
originalDf.select("id", "data").write()
.format("iceberg")
.mode("append")
.save(tableLocation);
spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target");
Thread writerThread = new Thread(() -> {
Map<String, String> properties = Maps.newHashMap();
properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
CommitMetadata.withCommitProperties(properties, () -> {
spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
return 0;
}, RuntimeException.class);
});
writerThread.setName("test-extra-commit-message-writer-thread");
writerThread.start();
writerThread.join();
Set<String> threadNames = Sets.newHashSet();
for (Snapshot snapshot : table.snapshots()) {
threadNames.add(snapshot.summary().get("writer-thread"));
}
Assert.assertEquals(2, threadNames.size());
Assert.assertTrue(threadNames.contains(null));
Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark;

import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.ExceptionUtil;

/**
* utility class to accept thread local commit properties
*/
public class CommitMetadata {

private CommitMetadata() {

}

private static final ThreadLocal<Map<String, String>> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of);

/**
* running the code wrapped as a caller, and any snapshot committed within the callable object will be attached with
* the metadata defined in properties
* @param properties extra commit metadata to attach to the snapshot committed within callable
* @param callable the code to be executed
* @param exClass the expected type of exception which would be thrown from callable
*/
public static <R, E extends Exception> R withCommitProperties(
Map<String, String> properties, Callable<R> callable, Class<E> exClass) throws E {
COMMIT_PROPERTIES.set(properties);
try {
return callable.call();
} catch (Throwable e) {
ExceptionUtil.castAndThrow(e, exClass);
return null;
} finally {
COMMIT_PROPERTIES.set(ImmutableMap.of());
}
}

public static Map<String, String> commitProperties() {
return COMMIT_PROPERTIES.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.util.PropertyUtil;
Expand Down Expand Up @@ -174,6 +175,10 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
extraSnapshotMetadata.forEach(operation::set);
}

if (!CommitMetadata.commitProperties().isEmpty()) {
CommitMetadata.commitProperties().forEach(operation::set);
}

if (isWapTable() && wapId != null) {
// write-audit-publish is enabled for this table and job
// stage the changes without changing the current snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
Expand All @@ -31,13 +32,16 @@
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -404,4 +408,41 @@ public void testExtraSnapshotMetadata() throws IOException {
Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue"));
Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue"));
}

@Test
public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();
HadoopTables tables = new HadoopTables(CONF);

Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);

List<SimpleRecord> expectedRecords = Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(2, "b")
);
Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
originalDf.select("id", "data").write()
.format("iceberg")
.mode("append")
.save(tableLocation);
spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target");
Thread writerThread = new Thread(() -> {
Map<String, String> properties = Maps.newHashMap();
properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
CommitMetadata.withCommitProperties(properties, () -> {
spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
return 0;
}, RuntimeException.class);
});
writerThread.setName("test-extra-commit-message-writer-thread");
writerThread.start();
writerThread.join();
Set<String> threadNames = Sets.newHashSet();
for (Snapshot snapshot : table.snapshots()) {
threadNames.add(snapshot.summary().get("writer-thread"));
}
Assert.assertEquals(2, threadNames.size());
Assert.assertTrue(threadNames.contains(null));
Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark;

import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.ExceptionUtil;

/**
* utility class to accept thread local commit properties
*/
public class CommitMetadata {
Copy link
Contributor

@kbendick kbendick Jun 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this class need to be public? Could it be made package-private?

I have concerns around the usage of ThreadLocal for things that most cases don't need to be thread local. I don't want to give users too much room to hurt themselves because they don't consider that CommitMetadata is only threadlocal and then their writes not working properly in the common case of writes without user-side multithreading (e.g. it gets set in one thread somewhere, but another thread is used for commit).

EDIT - Since this takes a Callable, it's less of a concern. I would still name it in a way that's a bit more reflective of the thread local nature (especially if we wanted a CommitMetadata class one day that doens't require a callable and is persistent). That and I always prefer things be package-private if possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, changed to CallerWithCommitMetadata....but...eh...not sure if it is better or worse....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this class need to be public? Could it be made package-private?

Yes, this does need to be public because it is a way for Iceberg users to pass metadata.


private CommitMetadata() {

}

private static final ThreadLocal<Map<String, String>> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of);

/**
* running the code wrapped as a caller, and any snapshot committed within the callable object will be attached with
* the metadata defined in properties
* @param properties extra commit metadata to attach to the snapshot committed within callable
* @param callable the code to be executed
* @param exClass the expected type of exception which would be thrown from callable
*/
public static <R, E extends Exception> R withCommitProperties(
Map<String, String> properties, Callable<R> callable, Class<E> exClass) throws E {
COMMIT_PROPERTIES.set(properties);
try {
return callable.call();
} catch (Throwable e) {
ExceptionUtil.castAndThrow(e, exClass);
return null;
} finally {
COMMIT_PROPERTIES.set(ImmutableMap.of());
}
}

public static Map<String, String> commitProperties() {
return COMMIT_PROPERTIES.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.iceberg.io.PositionDeltaWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -249,6 +250,10 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {

extraSnapshotMetadata.forEach(operation::set);

if (!CommitMetadata.commitProperties().isEmpty()) {
CommitMetadata.commitProperties().forEach(operation::set);
}

if (wapEnabled && wapId != null) {
// write-audit-publish is enabled for this table and job
// stage the changes without changing the current snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.util.PropertyUtil;
Expand Down Expand Up @@ -192,6 +193,10 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
extraSnapshotMetadata.forEach(operation::set);
}

if (!CommitMetadata.commitProperties().isEmpty()) {
CommitMetadata.commitProperties().forEach(operation::set);
}

if (wapEnabled && wapId != null) {
// write-audit-publish is enabled for this table and job
// stage the changes without changing the current snapshot
Expand Down
Loading