Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support information schema for external catalog #27359

Merged
merged 26 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
poc iceberg x metabase
Signed-off-by: Letian Jiang <letian.jiang@outlook.com>
  • Loading branch information
letian-jiang committed Jul 14, 2023
commit 0939abdcf8ad15348961efb979b692911471050e
4 changes: 4 additions & 0 deletions be/src/exec/pipeline/scan/olap_schema_scan_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ Status OlapSchemaScanContext::prepare(RuntimeState* state) {

Status OlapSchemaScanContext::_prepare_params(RuntimeState* state) {
_param = std::make_shared<SchemaScannerParam>();
if (_tnode.schema_scan_node.__isset.catalog_name) {
_param->catalog = _obj_pool.add(new std::string(_tnode.schema_scan_node.catalog_name));
}

if (_tnode.schema_scan_node.__isset.db) {
_param->db = _obj_pool.add(new std::string(_tnode.schema_scan_node.db));
}
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/schema_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace starrocks {

// scanner parameter from frontend
struct SchemaScannerParam {
const std::string* catalog{nullptr};
const std::string* db{nullptr};
const std::string* table{nullptr};
const std::string* wild{nullptr};
Expand Down
9 changes: 9 additions & 0 deletions be/src/exec/schema_scanner/schema_columns_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ Status SchemaColumnsScanner::start(RuntimeState* state) {
}
// get all database
TGetDbsParams db_params;
if (nullptr != _param->catalog) {
db_params.__set_catalog_name(*(_param->catalog));
}
if (nullptr != _param->db) {
db_params.__set_pattern(*(_param->db));
}
Expand Down Expand Up @@ -488,6 +491,9 @@ Status SchemaColumnsScanner::fill_chunk(ChunkPtr* chunk) {

Status SchemaColumnsScanner::get_new_desc() {
TDescribeTableParams desc_params;
if (nullptr != _param->catalog) {
desc_params.__set_catalog_name(*(_param->catalog));
}
if (!_param->without_db_table) {
desc_params.__set_db(_db_result.dbs[_db_index - 1]);
desc_params.__set_table_name(_table_result.tables[_table_index++]);
Expand Down Expand Up @@ -523,6 +529,9 @@ Status SchemaColumnsScanner::get_new_table() {
}
TGetTablesParams table_params;
table_params.__set_db(_db_result.dbs[_db_index++]);
if (nullptr != _param->catalog) {
table_params.__set_catalog_name(*(_param->catalog));
}
if (nullptr != _param->table) {
table_params.__set_pattern(*(_param->table));
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/schema_scanner/schema_schemata_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ Status SchemaSchemataScanner::start(RuntimeState* state) {
return Status::InternalError("used before initial.");
}
TGetDbsParams db_params;
if (nullptr != _param->catalog) {
db_params.__set_catalog_name(*(_param->catalog));
}
if (nullptr != _param->wild) {
db_params.__set_pattern(*(_param->wild));
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/schema_scanner/schema_tables_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ SchemaTablesScanner::~SchemaTablesScanner() = default;
Status SchemaTablesScanner::start(RuntimeState* state) {
RETURN_IF_ERROR(SchemaScanner::start(state));
TAuthInfo auth_info;
if (nullptr != _param->catalog) {
auth_info.__set_catalog_name(*(_param->catalog));
}
if (nullptr != _param->db) {
auth_info.__set_pattern(*(_param->db));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class SystemTable extends Table {

private final TSchemaTableType schemaTableType;

private String catalogName;

public SystemTable(long id, String name, TableType type, List<Column> baseSchema, TSchemaTableType schemaTableType) {
super(id, name, type, baseSchema);
this.schemaTableType = schemaTableType;
Expand Down Expand Up @@ -75,6 +77,14 @@ public static Builder builder() {
return new Builder();
}

public String getCatalogName() {
return catalogName;
}

public void setCatalogName(String catalogName) {
this.catalogName = catalogName;
}

public static class Builder {
List<Column> columns;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,25 @@
import com.starrocks.catalog.Database;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.system.SystemId;
import com.starrocks.catalog.system.SystemTable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import static com.starrocks.catalog.InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME;

// Information schema used for MySQL compatible.
public class InfoSchemaDb extends Database {
public static final String DATABASE_NAME = "information_schema";

public InfoSchemaDb() {
super(SystemId.INFORMATION_SCHEMA_DB_ID, DATABASE_NAME);
this(DEFAULT_INTERNAL_CATALOG_NAME);
}

public InfoSchemaDb(String catalogName) {
super(SystemId.INFORMATION_SCHEMA_DB_ID, DATABASE_NAME); // do we need unique id for every catalog?
super.setCatalogName(catalogName);

super.registerTableUnlocked(TablesSystemTable.create());
super.registerTableUnlocked(PartitionsSystemTableSystemTable.create());
Expand Down Expand Up @@ -70,6 +78,10 @@ public InfoSchemaDb() {
super.registerTableUnlocked(BeBvarsSystemTable.create());
super.registerTableUnlocked(BeCloudNativeCompactionsSystemTable.create());
super.registerTableUnlocked(PipeFileSystemTable.create());

for (Table table : this.getTables()) {
((SystemTable) table).setCatalogName(catalogName);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.starrocks.catalog.IcebergTable;
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.system.information.InfoSchemaDb;
import com.starrocks.common.AlreadyExistsException;
import com.starrocks.common.DdlException;
import com.starrocks.common.MetaNotFoundException;
Expand Down Expand Up @@ -97,15 +98,24 @@ public class IcebergMetadata implements ConnectorMetadata {
private final Map<String, Database> databases = new ConcurrentHashMap<>();
private final Map<IcebergFilter, List<FileScanTask>> tasks = new ConcurrentHashMap<>();

private final InfoSchemaDb infoSchemaDb;

public IcebergMetadata(String catalogName, IcebergCatalog icebergCatalog) {
this.catalogName = catalogName;
this.icebergCatalog = icebergCatalog;
new IcebergMetricsReporter().setThreadLocalReporter();

// register information_schema database
this.infoSchemaDb = new InfoSchemaDb(catalogName);
this.databases.put(InfoSchemaDb.DATABASE_NAME, this.infoSchemaDb);
}

@Override
public List<String> listDbNames() {
return icebergCatalog.listAllDatabases();
List<String> dbs = icebergCatalog.listAllDatabases();
// can we move this to super class?
dbs.add(InfoSchemaDb.DATABASE_NAME);
return dbs;
}

@Override
Expand All @@ -129,6 +139,10 @@ public void dropDb(String dbName, boolean isForceDrop) throws MetaNotFoundExcept

@Override
public Database getDb(String dbName) {
if (InfoSchemaDb.isInfoSchemaDb(dbName)) {
return this.infoSchemaDb;
}

if (databases.containsKey(dbName)) {
return databases.get(dbName);
}
Expand All @@ -146,6 +160,10 @@ public Database getDb(String dbName) {

@Override
public List<String> listTableNames(String dbName) {
if (InfoSchemaDb.isInfoSchemaDb(dbName)) {
return infoSchemaDb.getTables().stream().map(Table::getName).collect(Collectors.toList());
}

return icebergCatalog.listTables(dbName);
}

Expand Down Expand Up @@ -174,6 +192,13 @@ public void dropTable(DropTableStmt stmt) {

@Override
public Table getTable(String dbName, String tblName) {
if (InfoSchemaDb.isInfoSchemaDb(dbName)) {
Table table = this.infoSchemaDb.getTable(tblName);
if (table != null) {
return table;
}
}

TableIdentifier identifier = TableIdentifier.of(dbName, tblName);
if (tables.containsKey(identifier)) {
return tables.get(identifier);
Expand Down
18 changes: 18 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/planner/SchemaScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.common.collect.Lists;
import com.starrocks.analysis.Analyzer;
import com.starrocks.analysis.TupleDescriptor;
import com.starrocks.catalog.InternalCatalog;
import com.starrocks.catalog.system.SystemTable;
import com.starrocks.common.UserException;
import com.starrocks.qe.ConnectContext;
Expand All @@ -54,10 +55,13 @@

import java.util.List;

import static com.starrocks.catalog.InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME;

/**
* Full scan of an SCHEMA table.
*/
public class SchemaScanNode extends ScanNode {
private String catalogName;
private final String tableName;
private String schemaDb;
private String schemaTable;
Expand Down Expand Up @@ -155,6 +159,13 @@ public void finalizeStats(Analyzer analyzer) throws UserException {
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.SCHEMA_SCAN_NODE;
msg.schema_scan_node = new TSchemaScanNode(desc.getId().asInt(), tableName);

if (catalogName != null) {
msg.schema_scan_node.setCatalog_name(catalogName);
} else {
msg.schema_scan_node.setCatalog_name(DEFAULT_INTERNAL_CATALOG_NAME);
}

if (schemaDb != null) {
msg.schema_scan_node.setDb(schemaDb);
} else {
Expand Down Expand Up @@ -328,4 +339,11 @@ public boolean canUseRuntimeAdaptiveDop() {
return true;
}

public String getCatalogName() {
return catalogName;
}

public void setCatalogName(String catalogName) {
this.catalogName = catalogName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import com.starrocks.authentication.AuthenticationMgr;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.InternalCatalog;
import com.starrocks.catalog.ListPartitionInfo;
import com.starrocks.catalog.LocalTablet;
import com.starrocks.catalog.MaterializedIndex;
Expand Down Expand Up @@ -124,6 +123,7 @@
import com.starrocks.scheduler.mv.MaterializedViewMgr;
import com.starrocks.scheduler.persist.TaskRunStatus;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.MetadataMgr;
import com.starrocks.sql.analyzer.Analyzer;
import com.starrocks.sql.analyzer.AnalyzerUtils;
import com.starrocks.sql.analyzer.SemanticException;
Expand Down Expand Up @@ -264,6 +264,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.starrocks.catalog.InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME;
import static com.starrocks.thrift.TStatusCode.NOT_IMPLEMENTED_ERROR;
import static com.starrocks.thrift.TStatusCode.OK;
import static com.starrocks.thrift.TStatusCode.RUNTIME_ERROR;
Expand All @@ -285,7 +286,6 @@ public TGetDbsResult getDbNames(TGetDbsParams params) throws TException {
LOG.debug("get db request: {}", params);
TGetDbsResult result = new TGetDbsResult();

List<String> dbs = Lists.newArrayList();
PatternMatcher matcher = null;
if (params.isSetPattern()) {
try {
Expand All @@ -296,8 +296,13 @@ public TGetDbsResult getDbNames(TGetDbsParams params) throws TException {
}
}

GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState();
List<String> dbNames = globalStateMgr.getDbNames();
String catalogName = DEFAULT_INTERNAL_CATALOG_NAME;
if (params.isSetCatalog_name()) {
catalogName = params.getCatalog_name();
}

MetadataMgr metadataMgr = GlobalStateMgr.getCurrentState().getMetadataMgr();
List<String> dbNames = metadataMgr.listDbNames(catalogName);
LOG.debug("get db names: {}", dbNames);

UserIdentity currentUser = null;
Expand All @@ -306,6 +311,8 @@ public TGetDbsResult getDbNames(TGetDbsParams params) throws TException {
} else {
currentUser = UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip);
}

List<String> dbs = new ArrayList<>();
for (String fullName : dbNames) {
if (!PrivilegeActions.checkAnyActionOnOrInDb(currentUser, null, fullName)) {
continue;
Expand Down Expand Up @@ -339,17 +346,35 @@ public TGetTablesResult getTableNames(TGetTablesParams params) throws TException
}

// database privs should be checked in analysis phase
Database db = GlobalStateMgr.getCurrentState().getDb(params.db);
String catalogName = DEFAULT_INTERNAL_CATALOG_NAME;
if (params.isSetCatalog_name()) {
catalogName = params.getCatalog_name();
}

MetadataMgr metadataMgr = GlobalStateMgr.getCurrentState().getMetadataMgr();
Database db = metadataMgr.getDb(catalogName, params.db);

UserIdentity currentUser = null;
if (params.isSetCurrent_user_ident()) {
currentUser = UserIdentity.fromThrift(params.current_user_ident);
} else {
currentUser = UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip);
}

if (db != null) {
for (String tableName : db.getTableNamesViewWithLock()) {
for (String tableName : metadataMgr.listTableNames(catalogName, params.db)) {
LOG.debug("get table: {}, wait to check", tableName);
Table tbl = db.getTable(tableName);
Table tbl = null;
try {
tbl = metadataMgr.getTable(catalogName, params.db, tableName);
} catch (Exception e) {
LOG.warn(e.getMessage());
}

if (tbl == null) {
continue;
}

if (tbl != null && !PrivilegeActions.checkAnyActionOnTableLikeObject(currentUser,
null, params.db, tbl)) {
continue;
Expand Down Expand Up @@ -840,18 +865,25 @@ public TDescribeTableResult describeTable(TDescribeTableParams params) throws TE
return result;
}

Database db = GlobalStateMgr.getCurrentState().getDb(params.db);
String catalogName = DEFAULT_INTERNAL_CATALOG_NAME;
if (params.isSetCatalog_name()) {
catalogName = params.getCatalog_name();
}

MetadataMgr metadataMgr = GlobalStateMgr.getCurrentState().getMetadataMgr();
Database db = metadataMgr.getDb(catalogName, params.db);

if (db != null) {
try {
db.readLock();
Table table = db.getTable(params.getTable_name());
Table table = metadataMgr.getTable(catalogName, params.db, params.table_name);
if (table == null) {
return result;
}
if (!PrivilegeActions.checkAnyActionOnTableLikeObject(currentUser, null, params.db, table)) {
return result;
}
setColumnDesc(columns, table, limit, false, params.db, params.getTable_name());
setColumnDesc(columns, table, limit, false, params.db, params.table_name);
} finally {
db.readUnlock();
}
Expand Down Expand Up @@ -1529,7 +1561,7 @@ public TRefreshTableResponse refreshTable(TRefreshTableRequest request) throws T
try {
// Adapt to the situation that the Fe node before upgrading sends a request to the Fe node after upgrading.
if (request.getCatalog_name() == null) {
request.setCatalog_name(InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME);
request.setCatalog_name(DEFAULT_INTERNAL_CATALOG_NAME);
}
GlobalStateMgr.getCurrentState().refreshExternalTable(new TableName(request.getCatalog_name(),
request.getDb_name(), request.getTable_name()), request.getPartitions());
Expand Down
Loading