From b58f0c9fd7073ec4ba4626ed97443bcb764c9335 Mon Sep 17 00:00:00 2001 From: GuojunLi Date: Mon, 28 Aug 2023 15:06:43 +0800 Subject: [PATCH] [core] Support catalog options table (#1889) --- docs/content/how-to/system-tables.md | 16 ++ .../paimon/catalog/AbstractCatalog.java | 11 +- .../table/system/CatalogOptionsTable.java | 192 ++++++++++++++++++ .../table/system/SystemTableLoader.java | 8 +- .../table/system/CatalogOptionsTableTest.java | 83 ++++++++ .../paimon/flink/CatalogTableITCase.java | 11 +- 6 files changed, 317 insertions(+), 4 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/table/system/CatalogOptionsTableTest.java diff --git a/docs/content/how-to/system-tables.md b/docs/content/how-to/system-tables.md index 70a84e94b770..9ea6548a6bd5 100644 --- a/docs/content/how-to/system-tables.md +++ b/docs/content/how-to/system-tables.md @@ -280,3 +280,19 @@ SELECT * FROM sys.all_table_options; */ ``` +### Catalog Options Table +You can query the catalog's option information through catalog options table. The options not shown will be the default value. You can take reference to [Configuration]({{< ref "maintenance/configurations#coreoptions" >}}). + +```sql +SELECT * FROM sys.catalog_options; + +/* ++-----------+---------------------------+ +| key | value | ++-----------+---------------------------+ +| warehouse | hdfs:///path/to/warehouse | ++-----------+---------------------------+ +1 rows in set +*/ +``` + diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 102ef5a5d970..6d212e908a81 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -32,6 +32,7 @@ import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.Table; import org.apache.paimon.table.system.AllTableOptionsTable; +import org.apache.paimon.table.system.CatalogOptionsTable; import org.apache.paimon.table.system.SystemTableLoader; import org.apache.paimon.utils.StringUtils; @@ -50,10 +51,12 @@ public abstract class AbstractCatalog implements Catalog { public static final String DB_SUFFIX = ".db"; protected static final String TABLE_DEFAULT_OPTION_PREFIX = "table-default."; protected static final List GLOBAL_TABLES = - Arrays.asList(AllTableOptionsTable.ALL_TABLE_OPTIONS); + Arrays.asList( + AllTableOptionsTable.ALL_TABLE_OPTIONS, CatalogOptionsTable.CATALOG_OPTIONS); protected final FileIO fileIO; protected final Map tableDefaultOptions; + protected final Map catalogOptions; @Nullable protected final LineageMeta lineageMeta; @@ -61,6 +64,7 @@ protected AbstractCatalog(FileIO fileIO) { this.fileIO = fileIO; this.lineageMeta = null; this.tableDefaultOptions = new HashMap<>(); + this.catalogOptions = new HashMap<>(); } protected AbstractCatalog(FileIO fileIO, Map options) { @@ -69,6 +73,7 @@ protected AbstractCatalog(FileIO fileIO, Map options) { findAndCreateLineageMeta( Options.fromMap(options), AbstractCatalog.class.getClassLoader()); this.tableDefaultOptions = new HashMap<>(); + this.catalogOptions = options; options.keySet().stream() .filter(key -> key.startsWith(TABLE_DEFAULT_OPTION_PREFIX)) @@ -94,7 +99,9 @@ private LineageMeta findAndCreateLineageMeta(Options options, ClassLoader classL public Table getTable(Identifier identifier) throws TableNotExistException { if (isSystemDatabase(identifier.getDatabaseName())) { String tableName = identifier.getObjectName(); - Table table = SystemTableLoader.loadGlobal(tableName, fileIO, allTablePaths()); + Table table = + SystemTableLoader.loadGlobal( + tableName, fileIO, allTablePaths(), catalogOptions); if (table == null) { throw new TableNotExistException(identifier); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java new file mode 100644 index 000000000000..19ce658746f8 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.system; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.ReadonlyTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.InnerTableScan; +import org.apache.paimon.table.source.ReadOnceTableScan; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.IteratorRecordReader; +import org.apache.paimon.utils.ProjectedRow; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.utils.SerializationUtils.newStringType; + +/** This is a system {@link Table} to display catalog options. */ +public class CatalogOptionsTable implements ReadonlyTable { + + public static final String CATALOG_OPTIONS = "catalog_options"; + + private final Map catalogOptions; + + public static final RowType TABLE_TYPE = + new RowType( + Arrays.asList( + new DataField(0, "key", newStringType(false)), + new DataField(1, "value", newStringType(false)))); + + public CatalogOptionsTable(Map catalogOptions) { + this.catalogOptions = catalogOptions; + } + + /** A name to identify this table. */ + @Override + public String name() { + return CATALOG_OPTIONS; + } + + /** Returns the row type of this table. */ + @Override + public RowType rowType() { + return TABLE_TYPE; + } + + @Override + public InnerTableScan newScan() { + return new CatalogOptionsScan(); + } + + @Override + public InnerTableRead newRead() { + return new CatalogOptionsRead(); + } + + /** Primary keys of this table. */ + @Override + public List primaryKeys() { + return Collections.singletonList("key"); + } + + @Override + public Table copy(Map dynamicOptions) { + return new CatalogOptionsTable(catalogOptions); + } + + private class CatalogOptionsScan extends ReadOnceTableScan { + + @Override + public InnerTableScan withFilter(Predicate predicate) { + return this; + } + + @Override + public Plan innerPlan() { + return () -> + Collections.singletonList( + new CatalogOptionsTable.CatalogOptionsSplit(catalogOptions)); + } + } + + private static class CatalogOptionsSplit implements Split { + + private static final long serialVersionUID = 1L; + + private final Map catalogOptions; + + private CatalogOptionsSplit(Map catalogOptions) { + this.catalogOptions = catalogOptions; + } + + @Override + public long rowCount() { + return catalogOptions.size(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CatalogOptionsTable.CatalogOptionsSplit that = + (CatalogOptionsTable.CatalogOptionsSplit) o; + return catalogOptions.equals(that.catalogOptions); + } + + @Override + public int hashCode() { + return catalogOptions.hashCode(); + } + } + + private static class CatalogOptionsRead implements InnerTableRead { + + private int[][] projection; + + @Override + public InnerTableRead withFilter(Predicate predicate) { + return this; + } + + @Override + public InnerTableRead withProjection(int[][] projection) { + this.projection = projection; + return this; + } + + @Override + public TableRead withIOManager(IOManager ioManager) { + return this; + } + + @Override + public RecordReader createReader(Split split) throws IOException { + if (!(split instanceof CatalogOptionsTable.CatalogOptionsSplit)) { + throw new IllegalArgumentException("Unsupported split: " + split.getClass()); + } + Iterator rows = + Iterators.transform( + ((CatalogOptionsSplit) split).catalogOptions.entrySet().iterator(), + this::toRow); + if (projection != null) { + rows = + Iterators.transform( + rows, row -> ProjectedRow.from(projection).replaceRow(row)); + } + return new IteratorRecordReader<>(rows); + } + + private InternalRow toRow(Map.Entry option) { + return GenericRow.of( + BinaryString.fromString(option.getKey()), + BinaryString.fromString(option.getValue())); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java index 3d7daea9c5d1..c31901af85a3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java @@ -29,6 +29,7 @@ import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS; import static org.apache.paimon.table.system.AuditLogTable.AUDIT_LOG; +import static org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS; import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS; import static org.apache.paimon.table.system.FilesTable.FILES; import static org.apache.paimon.table.system.ManifestsTable.MANIFESTS; @@ -70,10 +71,15 @@ public static Table load(String type, FileIO fileIO, FileStoreTable dataTable) { @Nullable public static Table loadGlobal( - String tableName, FileIO fileIO, Map> allTablePaths) { + String tableName, + FileIO fileIO, + Map> allTablePaths, + Map catalogOptions) { switch (tableName.toLowerCase()) { case ALL_TABLE_OPTIONS: return new AllTableOptionsTable(fileIO, allTablePaths); + case CATALOG_OPTIONS: + return new CatalogOptionsTable(catalogOptions); default: return null; } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/CatalogOptionsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/CatalogOptionsTableTest.java new file mode 100644 index 000000000000..9ee1a63cb5e3 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/CatalogOptionsTableTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.system; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.table.TableType; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; +import static org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link CatalogOptionsTable}. */ +public class CatalogOptionsTableTest extends TableTestBase { + + private Catalog catalog; + + private CatalogOptionsTable catalogOptionsTable; + private Options catalogOptions; + @TempDir java.nio.file.Path tempDir; + + @BeforeEach + public void before() throws Exception { + catalogOptions = new Options(); + catalogOptions.set(CatalogOptions.TABLE_TYPE, TableType.MANAGED); + catalogOptions.set("table-default.scan.infer-parallelism", "false"); + catalogOptions.set(CatalogOptions.WAREHOUSE, tempDir.toUri().toString()); + catalog = CatalogFactory.createCatalog(CatalogContext.create(catalogOptions)); + catalogOptionsTable = + (CatalogOptionsTable) + catalog.getTable(new Identifier(SYSTEM_DATABASE_NAME, CATALOG_OPTIONS)); + } + + @Test + public void testCatalogOptionsTable() throws Exception { + List expectRow = getExceptedResult(); + List result = read(catalogOptionsTable); + assertThat(result).containsExactlyElementsOf(expectRow); + } + + private List getExceptedResult() { + List expectedRow = new ArrayList<>(); + for (Map.Entry option : catalogOptions.toMap().entrySet()) { + expectedRow.add( + GenericRow.of( + BinaryString.fromString(option.getKey()), + BinaryString.fromString(option.getValue()))); + } + return expectedRow; + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index 092c6ddba946..bc9e773aa791 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -24,6 +24,7 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.system.AllTableOptionsTable; +import org.apache.paimon.table.system.CatalogOptionsTable; import org.apache.paimon.types.IntType; import org.apache.paimon.utils.BlockingIterator; @@ -102,6 +103,12 @@ public void testAllTableOptions() { Row.of("default", "T", "c.cc.ccc", "val3")); } + @Test + public void testCatalogOptionsTable() { + List result = sql("SELECT * FROM sys.catalog_options"); + assertThat(result).containsExactly(Row.of("warehouse", path)); + } + @Test public void testDropSystemDatabase() { assertThatCode(() -> sql("DROP DATABASE sys")) @@ -125,7 +132,9 @@ public void testChangeTableInSystemDatabase() { public void testSystemDatabase() { sql("USE " + Catalog.SYSTEM_DATABASE_NAME); assertThat(sql("SHOW TABLES")) - .containsExactly(Row.of(AllTableOptionsTable.ALL_TABLE_OPTIONS)); + .containsExactly( + Row.of(AllTableOptionsTable.ALL_TABLE_OPTIONS), + Row.of(CatalogOptionsTable.CATALOG_OPTIONS)); } @Test