Skip to content

Commit 0bad670

Browse files
authored
Spark: Support namespaces in SparkCatalog (#1149)
1 parent a07c2b9 commit 0bad670

File tree

7 files changed

+561
-14
lines changed

7 files changed

+561
-14
lines changed

core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,13 +225,13 @@ public void createNamespace(Namespace namespace, Map<String, String> meta) {
225225
!namespace.isEmpty(),
226226
"Cannot create namespace with invalid name: %s", namespace);
227227
if (!meta.isEmpty()) {
228-
throw new UnsupportedOperationException("Cannot create namespace " + namespace + " : metadata is not supported");
228+
throw new UnsupportedOperationException("Cannot create namespace " + namespace + ": metadata is not supported");
229229
}
230230

231231
Path nsPath = new Path(warehouseLocation, SLASH.join(namespace.levels()));
232232

233233
if (isNamespace(nsPath)) {
234-
throw new AlreadyExistsException("Namespace '%s' already exists!", namespace);
234+
throw new AlreadyExistsException("Namespace already exists: %s", namespace);
235235
}
236236

237237
try {

core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public void testCreateNamespace() throws Exception {
177177

178178
AssertHelpers.assertThrows("Should fail to create when namespace already exist: " + tbl1.namespace(),
179179
org.apache.iceberg.exceptions.AlreadyExistsException.class,
180-
"Namespace '" + tbl1.namespace() + "' already exists!", () -> {
180+
"Namespace already exists: " + tbl1.namespace(), () -> {
181181
catalog.createNamespace(tbl1.namespace());
182182
});
183183
}

hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,7 @@ public List<Namespace> listNamespaces(Namespace namespace) {
227227
return ImmutableList.of();
228228
}
229229
try {
230-
return clients.run(
231-
HiveMetaStoreClient::getAllDatabases)
230+
return clients.run(HiveMetaStoreClient::getAllDatabases)
232231
.stream()
233232
.map(Namespace::of)
234233
.collect(Collectors.toList());
@@ -378,7 +377,9 @@ private Map<String, String> convertToMetadata(Database database) {
378377

379378
meta.putAll(database.getParameters());
380379
meta.put("location", database.getLocationUri());
381-
meta.put("comment", database.getDescription());
380+
if (database.getDescription() != null) {
381+
meta.put("comment", database.getDescription());
382+
}
382383

383384
return meta;
384385
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.spark;
21+
22+
import java.util.List;
23+
import java.util.stream.Collectors;
24+
import java.util.stream.IntStream;
25+
import org.apache.hadoop.hive.conf.HiveConf;
26+
import org.apache.iceberg.hive.HiveCatalog;
27+
import org.apache.iceberg.hive.TestHiveMetastore;
28+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
29+
import org.apache.spark.sql.Row;
30+
import org.apache.spark.sql.SparkSession;
31+
import org.junit.AfterClass;
32+
import org.junit.BeforeClass;
33+
34+
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
35+
36+
public class SparkTestBase {
37+
38+
private static TestHiveMetastore metastore = null;
39+
private static HiveConf hiveConf = null;
40+
protected static SparkSession spark = null;
41+
protected static HiveCatalog catalog = null;
42+
43+
@BeforeClass
44+
public static void startMetastoreAndSpark() {
45+
SparkTestBase.metastore = new TestHiveMetastore();
46+
metastore.start();
47+
SparkTestBase.hiveConf = metastore.hiveConf();
48+
49+
SparkTestBase.spark = SparkSession.builder()
50+
.master("local[2]")
51+
.config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
52+
.enableHiveSupport()
53+
.getOrCreate();
54+
55+
SparkTestBase.catalog = new HiveCatalog(spark.sessionState().newHadoopConf());
56+
}
57+
58+
@AfterClass
59+
public static void stopMetastoreAndSpark() {
60+
catalog.close();
61+
SparkTestBase.catalog = null;
62+
metastore.stop();
63+
SparkTestBase.metastore = null;
64+
spark.stop();
65+
SparkTestBase.spark = null;
66+
}
67+
68+
protected List<String[]> sql(String query, Object... args) {
69+
List<Row> rows = spark.sql(String.format(query, args)).collectAsList();
70+
if (rows.size() < 1) {
71+
return ImmutableList.of();
72+
}
73+
74+
return rows.stream()
75+
.map(row -> IntStream.range(0, row.size())
76+
.mapToObj(pos -> row.isNullAt(pos) ? null : row.get(pos).toString())
77+
.toArray(String[]::new)
78+
).collect(Collectors.toList());
79+
}
80+
81+
protected static String dbPath(String dbName) {
82+
return metastore.getDatabasePath(dbName);
83+
}
84+
}

spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java

Lines changed: 155 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,33 @@
2121

2222
import java.util.List;
2323
import java.util.Map;
24+
import java.util.Set;
2425
import org.apache.hadoop.conf.Configuration;
2526
import org.apache.iceberg.CachingCatalog;
2627
import org.apache.iceberg.Schema;
2728
import org.apache.iceberg.Table;
2829
import org.apache.iceberg.Transaction;
2930
import org.apache.iceberg.catalog.Catalog;
3031
import org.apache.iceberg.catalog.Namespace;
32+
import org.apache.iceberg.catalog.SupportsNamespaces;
3133
import org.apache.iceberg.catalog.TableIdentifier;
3234
import org.apache.iceberg.exceptions.AlreadyExistsException;
3335
import org.apache.iceberg.hadoop.HadoopCatalog;
3436
import org.apache.iceberg.hive.HiveCatalog;
3537
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
38+
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
39+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
40+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
3641
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3742
import org.apache.iceberg.spark.source.SparkTable;
3843
import org.apache.iceberg.spark.source.StagedSparkTable;
3944
import org.apache.spark.sql.SparkSession;
45+
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
46+
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
4047
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
4148
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
4249
import org.apache.spark.sql.connector.catalog.Identifier;
50+
import org.apache.spark.sql.connector.catalog.NamespaceChange;
4351
import org.apache.spark.sql.connector.catalog.StagedTable;
4452
import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
4553
import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -50,13 +58,30 @@
5058
import org.apache.spark.sql.connector.expressions.Transform;
5159
import org.apache.spark.sql.types.StructType;
5260
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
61+
import org.glassfish.jersey.internal.guava.Sets;
62+
import org.sparkproject.guava.collect.Maps;
5363

5464
/**
55-
* A Spark TableCatalog implementation that wraps Iceberg's {@link Catalog} interface.
65+
* A Spark TableCatalog implementation that wraps an Iceberg {@link Catalog}.
66+
* <p>
67+
* This supports the following catalog configuration options:
68+
* <ul>
69+
* <li><tt>type</tt> - catalog type, "hive" or "hadoop"</li>
70+
* <li><tt>uri</tt> - the Hive Metastore URI (Hive catalog only)</li>
71+
* <li><tt>warehouse</tt> - the warehouse path (Hadoop catalog only)</li>
72+
* <li><tt>default-namespace</tt> - a namespace to use as the default</li>
73+
* </ul>
74+
* <p>
75+
* To use a custom catalog that is not a Hive or Hadoop catalog, extend this class and override
76+
* {@link #buildIcebergCatalog(String, CaseInsensitiveStringMap)}.
5677
*/
57-
public class SparkCatalog implements StagingTableCatalog {
78+
public class SparkCatalog implements StagingTableCatalog, org.apache.spark.sql.connector.catalog.SupportsNamespaces {
79+
private static final Set<String> DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER);
80+
5881
private String catalogName = null;
5982
private Catalog icebergCatalog = null;
83+
private SupportsNamespaces asNamespaceCatalog = null;
84+
private String[] defaultNamespace = null;
6085

6186
/**
6287
* Build an Iceberg {@link Catalog} to be used by this Spark catalog adapter.
@@ -93,12 +118,6 @@ protected TableIdentifier buildIdentifier(Identifier identifier) {
93118
return TableIdentifier.of(Namespace.of(identifier.namespace()), identifier.name());
94119
}
95120

96-
@Override
97-
public Identifier[] listTables(String[] namespace) {
98-
// TODO: handle namespaces
99-
return new Identifier[0];
100-
}
101-
102121
@Override
103122
public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
104123
try {
@@ -227,13 +246,141 @@ public void invalidateTable(Identifier ident) {
227246
}
228247
}
229248

249+
@Override
250+
public Identifier[] listTables(String[] namespace) {
251+
return icebergCatalog.listTables(Namespace.of(namespace)).stream()
252+
.map(ident -> Identifier.of(ident.namespace().levels(), ident.name()))
253+
.toArray(Identifier[]::new);
254+
}
255+
256+
@Override
257+
public String[] defaultNamespace() {
258+
if (defaultNamespace != null) {
259+
return defaultNamespace;
260+
}
261+
262+
return new String[0];
263+
}
264+
265+
@Override
266+
public String[][] listNamespaces() {
267+
if (asNamespaceCatalog != null) {
268+
return asNamespaceCatalog.listNamespaces().stream()
269+
.map(Namespace::levels)
270+
.toArray(String[][]::new);
271+
}
272+
273+
return new String[0][];
274+
}
275+
276+
@Override
277+
public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException {
278+
if (asNamespaceCatalog != null) {
279+
try {
280+
return asNamespaceCatalog.listNamespaces(Namespace.of(namespace)).stream()
281+
.map(Namespace::levels)
282+
.toArray(String[][]::new);
283+
} catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) {
284+
throw new NoSuchNamespaceException(namespace);
285+
}
286+
}
287+
288+
throw new NoSuchNamespaceException(namespace);
289+
}
290+
291+
@Override
292+
public Map<String, String> loadNamespaceMetadata(String[] namespace) throws NoSuchNamespaceException {
293+
if (asNamespaceCatalog != null) {
294+
try {
295+
return asNamespaceCatalog.loadNamespaceMetadata(Namespace.of(namespace));
296+
} catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) {
297+
throw new NoSuchNamespaceException(namespace);
298+
}
299+
}
300+
301+
throw new NoSuchNamespaceException(namespace);
302+
}
303+
304+
@Override
305+
public void createNamespace(String[] namespace, Map<String, String> metadata) throws NamespaceAlreadyExistsException {
306+
if (asNamespaceCatalog != null) {
307+
try {
308+
if (asNamespaceCatalog instanceof HadoopCatalog && DEFAULT_NS_KEYS.equals(metadata.keySet())) {
309+
// Hadoop catalog will reject metadata properties, but Spark automatically adds "owner".
310+
// If only the automatic properties are present, replace metadata with an empty map.
311+
asNamespaceCatalog.createNamespace(Namespace.of(namespace), ImmutableMap.of());
312+
} else {
313+
asNamespaceCatalog.createNamespace(Namespace.of(namespace), metadata);
314+
}
315+
} catch (AlreadyExistsException e) {
316+
throw new NamespaceAlreadyExistsException(namespace);
317+
}
318+
} else {
319+
throw new UnsupportedOperationException("Namespaces are not supported by catalog: " + catalogName);
320+
}
321+
}
322+
323+
@Override
324+
public void alterNamespace(String[] namespace, NamespaceChange... changes) throws NoSuchNamespaceException {
325+
if (asNamespaceCatalog != null) {
326+
Map<String, String> updates = Maps.newHashMap();
327+
Set<String> removals = Sets.newHashSet();
328+
for (NamespaceChange change : changes) {
329+
if (change instanceof NamespaceChange.SetProperty) {
330+
NamespaceChange.SetProperty set = (NamespaceChange.SetProperty) change;
331+
updates.put(set.property(), set.value());
332+
} else if (change instanceof NamespaceChange.RemoveProperty) {
333+
removals.add(((NamespaceChange.RemoveProperty) change).property());
334+
} else {
335+
throw new UnsupportedOperationException("Cannot apply unknown namespace change: " + change);
336+
}
337+
}
338+
339+
try {
340+
if (!updates.isEmpty()) {
341+
asNamespaceCatalog.setProperties(Namespace.of(namespace), updates);
342+
}
343+
344+
if (!removals.isEmpty()) {
345+
asNamespaceCatalog.removeProperties(Namespace.of(namespace), removals);
346+
}
347+
348+
} catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) {
349+
throw new NoSuchNamespaceException(namespace);
350+
}
351+
} else {
352+
throw new NoSuchNamespaceException(namespace);
353+
}
354+
}
355+
356+
@Override
357+
public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException {
358+
if (asNamespaceCatalog != null) {
359+
try {
360+
return asNamespaceCatalog.dropNamespace(Namespace.of(namespace));
361+
} catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) {
362+
throw new NoSuchNamespaceException(namespace);
363+
}
364+
}
365+
366+
return false;
367+
}
368+
230369
@Override
231370
public final void initialize(String name, CaseInsensitiveStringMap options) {
232371
boolean cacheEnabled = Boolean.parseBoolean(options.getOrDefault("cache-enabled", "true"));
233372
Catalog catalog = buildIcebergCatalog(name, options);
234373

235374
this.catalogName = name;
236375
this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(catalog) : catalog;
376+
if (catalog instanceof SupportsNamespaces) {
377+
this.asNamespaceCatalog = (SupportsNamespaces) catalog;
378+
if (options.containsKey("default-namespace")) {
379+
this.defaultNamespace = Splitter.on('.')
380+
.splitToList(options.get("default-namespace"))
381+
.toArray(new String[0]);
382+
}
383+
}
237384
}
238385

239386
@Override

0 commit comments

Comments
 (0)