Skip to content

Commit

Permalink
[refactor](dialect) make http sql converter plugin and audit loader a…
Browse files Browse the repository at this point in the history
…s builtin plugin (apache#29692)

Followup apache#28890

Make HttpSqlConverterPlugin and AuditLoader as Doris' builtin plugin.
To make it simple for user to support sql dialect and using audit loader.

HttpSqlConverterPlugin

By default, there is nothing changed.

There is a new global variable sql_converter_service, default is empty, if set, the HttpSqlConverterPlugin will be enabled

set global sql_converter_service = "http://127.0.0.1:5001/api/v1/convert"

AuditLoader

By default, there is nothing changed.

There is a new global variable enable_audit_plugin, default is false, if set to true, the audit loader plugin will be enable.

Doris will create audit_log in __internal_schema when startup

If enable_audit_plugin is true, the audit load will be inserted into audit_log table.

3 other global variables related to this plugin:

audit_plugin_max_batch_interval_sec: The max interval for audit loader to insert a batch of audit log.
audit_plugin_max_batch_bytes: The max batch size for audit loader to insert a batch of audit log.
audit_plugin_max_sql_length: The max length of statement in audit log
  • Loading branch information
morningman authored Jan 12, 2024
1 parent 1bd1530 commit 2e9f888
Show file tree
Hide file tree
Showing 34 changed files with 684 additions and 368 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2398,13 +2398,6 @@ public class Config extends ConfigBase {
"Whether to enable the function of getting log files through http interface"})
public static boolean enable_get_log_file_api = false;

// This config is deprecated and has not taken effect anymore,
// please use dialect plugin: fe_plugins/http-dialect-converter for instead
@Deprecated
@ConfField(description = {"用于SQL方言转换的服务地址。",
"The service address for SQL dialect conversion."})
public static String sql_convertor_service = "";

@ConfField(mutable = true)
public static boolean enable_profile_when_analyze = false;
@ConfField(mutable = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
if (view == null && !hasExplicitAlias()) {
String dialect = ConnectContext.get().getSessionVariable().getSqlDialect();
Dialect sqlDialect = Dialect.getByName(dialect);
if (Dialect.SPARK_SQL != sqlDialect) {
if (Dialect.SPARK != sqlDialect) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_DERIVED_MUST_HAVE_ALIAS);
}
hasExplicitAlias = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.HashDistributionDesc;
import org.apache.doris.analysis.KeysDesc;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.RangePartitionDesc;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TypeDef;
import org.apache.doris.common.Config;
Expand All @@ -33,6 +35,7 @@
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.plugin.audit.AuditLoaderPlugin;
import org.apache.doris.statistics.StatisticConstants;
import org.apache.doris.statistics.util.StatisticsUtil;

Expand All @@ -53,6 +56,33 @@ public class InternalSchemaInitializer extends Thread {

private static final Logger LOG = LogManager.getLogger(InternalSchemaInitializer.class);

public static final List<ColumnDef> AUDIT_TABLE_COLUMNS;

static {
AUDIT_TABLE_COLUMNS = new ArrayList<>();
AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_id", TypeDef.createVarchar(48), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("time", TypeDef.create(PrimitiveType.DATETIME), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("client_ip", TypeDef.createVarchar(128), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("user", TypeDef.createVarchar(128), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("catalog", TypeDef.createVarchar(128), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("db", TypeDef.createVarchar(128), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("state", TypeDef.createVarchar(128), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_code", TypeDef.create(PrimitiveType.INT), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_message", TypeDef.create(PrimitiveType.STRING), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_time", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_bytes", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_rows", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("return_rows", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt_id", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("is_query", TypeDef.create(PrimitiveType.TINYINT), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("frontend_ip", TypeDef.createVarchar(128), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("cpu_time_ms", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_hash", TypeDef.createVarchar(128), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_digest", TypeDef.createVarchar(128), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("peak_memory_bytes", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt", TypeDef.create(PrimitiveType.STRING), true));
}

public void run() {
if (!FeConstants.enableInternalSchemaDb) {
return;
Expand Down Expand Up @@ -83,6 +113,7 @@ public void run() {
Database database = op.get();
modifyTblReplicaCount(database, StatisticConstants.STATISTIC_TBL_NAME);
modifyTblReplicaCount(database, StatisticConstants.HISTOGRAM_TBL_NAME);
modifyTblReplicaCount(database, AuditLoaderPlugin.AUDIT_LOG_TABLE);
}

public void modifyTblReplicaCount(Database database, String tblName) {
Expand All @@ -103,8 +134,8 @@ public void modifyTblReplicaCount(Database database, String tblName) {
>= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
return;
}
colStatsTbl.writeLock();
try {
colStatsTbl.writeLock();
Env.getCurrentEnv().modifyTableReplicaAllocation(database, (OlapTable) colStatsTbl, props);
} finally {
colStatsTbl.writeUnlock();
Expand All @@ -123,8 +154,11 @@ public void modifyTblReplicaCount(Database database, String tblName) {
}

private void createTbl() throws UserException {
// statistics
Env.getCurrentEnv().getInternalCatalog().createTable(buildStatisticsTblStmt());
Env.getCurrentEnv().getInternalCatalog().createTable(buildHistogramTblStmt());
// audit table
Env.getCurrentEnv().getInternalCatalog().createTable(buildAuditTblStmt());
}

@VisibleForTesting
Expand Down Expand Up @@ -212,7 +246,40 @@ public CreateTableStmt buildHistogramTblStmt() throws UserException {
return createTableStmt;
}

private CreateTableStmt buildAuditTblStmt() throws UserException {
TableName tableName = new TableName("",
FeConstants.INTERNAL_DB_NAME, AuditLoaderPlugin.AUDIT_LOG_TABLE);

String engineName = "olap";
ArrayList<String> dupKeys = Lists.newArrayList("query_id", "time", "client_ip");
KeysDesc keysDesc = new KeysDesc(KeysType.DUP_KEYS, dupKeys);
// partition
PartitionDesc partitionDesc = new RangePartitionDesc(Lists.newArrayList("time"), Lists.newArrayList());
// distribution
int bucketNum = 2;
DistributionDesc distributionDesc = new HashDistributionDesc(bucketNum, Lists.newArrayList("query_id"));
Map<String, String> properties = new HashMap<String, String>() {
{
put("dynamic_partition.time_unit", "DAY");
put("dynamic_partition.start", "-30");
put("dynamic_partition.end", "3");
put("dynamic_partition.prefix", "p");
put("dynamic_partition.buckets", String.valueOf(bucketNum));
put("dynamic_partition.enable", "true");
put("replication_num", String.valueOf(Math.max(1,
Config.min_replication_num_per_tablet)));
}
};
CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
tableName, AUDIT_TABLE_COLUMNS, engineName, keysDesc, partitionDesc, distributionDesc,
properties, null, "Doris internal audit table, DO NOT MODIFY IT", null);
StatisticsUtil.analyze(createTableStmt);
return createTableStmt;
}


private boolean created() {
// 1. check database exist
Optional<Database> optionalDatabase =
Env.getCurrentEnv().getInternalCatalog()
.getDb(FeConstants.INTERNAL_DB_NAME);
Expand All @@ -225,6 +292,7 @@ private boolean created() {
return false;
}

// 2. check statistic tables
Table statsTbl = optionalStatsTbl.get();
Optional<Column> optionalColumn =
statsTbl.fullSchema.stream().filter(c -> c.getName().equals("count")).findFirst();
Expand All @@ -238,7 +306,17 @@ private boolean created() {
}
return false;
}
return db.getTable(StatisticConstants.HISTOGRAM_TBL_NAME).isPresent();
optionalStatsTbl = db.getTable(StatisticConstants.HISTOGRAM_TBL_NAME);
if (!optionalStatsTbl.isPresent()) {
return false;
}

// 3. check audit table
optionalStatsTbl = db.getTable(AuditLoaderPlugin.AUDIT_LOG_TABLE);
if (!optionalStatsTbl.isPresent()) {
return false;
}
return true;
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.httpv2.exception.UnauthorizedException;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
Expand Down Expand Up @@ -104,21 +105,21 @@ public Object streamLoad(HttpServletRequest request,
return redirectToHttps(request);
}

try {
executeCheckPassword(request, response);
} catch (UnauthorizedException unauthorizedException) {
if (LOG.isDebugEnabled()) {
LOG.debug("Check password failed, going to check auth token, request: {}", request.toString());
String authToken = request.getHeader("token");
// if auth token is not null, check it first
if (!Strings.isNullOrEmpty(authToken)) {
if (!checkClusterToken(authToken)) {
throw new UnauthorizedException("Invalid token: " + authToken);
}

if (!checkClusterToken(request)) {
throw unauthorizedException;
} else {
return executeWithClusterToken(request, db, table, true);
return executeWithClusterToken(request, db, table, true);
} else {
try {
executeCheckPassword(request, response);
return executeWithoutPassword(request, response, db, table, true, groupCommit);
} finally {
ConnectContext.remove();
}
}

return executeWithoutPassword(request, response, db, table, true, groupCommit);
}

@RequestMapping(path = "/api/_http_stream",
Expand Down Expand Up @@ -363,18 +364,8 @@ private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws LoadEx
// AuditlogPlugin should be re-disigned carefully, and blow method focuses on
// temporarily addressing the users' needs for audit logs.
// So this function is not widely tested under general scenario
private boolean checkClusterToken(HttpServletRequest request) {
if (LOG.isDebugEnabled()) {
LOG.debug("Checking cluser token, request {}", request.toString());
}

String authToken = request.getHeader("token");

if (Strings.isNullOrEmpty(authToken)) {
return false;
}

return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(authToken);
private boolean checkClusterToken(String token) {
return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token);
}

// NOTE: This function can only be used for AuditlogPlugin stream load for now.
Expand All @@ -388,6 +379,9 @@ private Object executeWithClusterToken(HttpServletRequest request, String db,
ctx.setEnv(Env.getCurrentEnv());
ctx.setThreadLocalInfo();
ctx.setRemoteIP(request.getRemoteAddr());
// set user to ADMIN_USER, so that we can get the proper resource tag
ctx.setQualifiedUser(Auth.ADMIN_USER);
ctx.setThreadLocalInfo();

String dbName = db;
String tableName = table;
Expand Down Expand Up @@ -444,8 +438,10 @@ private Object executeWithClusterToken(HttpServletRequest request, String db,

return redirectView;
} catch (Exception e) {
LOG.warn("Failed to execute stream load with cluster token, {}", e);
LOG.warn("Failed to execute stream load with cluster token, {}", e.getMessage(), e);
return new RestBaseResult(e.getMessage());
} finally {
ConnectContext.remove();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.plugin.AuditEvent.EventType;
import org.apache.doris.plugin.StreamLoadAuditEvent;
import org.apache.doris.plugin.audit.AuditEvent;
import org.apache.doris.plugin.audit.AuditEvent.EventType;
import org.apache.doris.plugin.audit.StreamLoadAuditEvent;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
import org.apache.doris.load.BrokerFileGroupAggInfo;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.plugin.LoadAuditEvent;
import org.apache.doris.plugin.audit.AuditEvent;
import org.apache.doris.plugin.audit.LoadAuditEvent;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,25 @@ public enum Dialect {
*/
PRESTO("presto"),
/**
* Spark sql parser dialect
* Spark3 sql parser dialect
*/
SPARK_SQL("spark_sql"),
SPARK("spark"),
/**
* Hive parser dialect
* Spark2 sql parser dialect
*/
HIVE("hive"),
SPARK2("spark2"),
/**
* Alibaba max compute parser dialect
* Flink sql parser dialect
*/
MAX_COMPUTE("max_compute"),
FLINK("flink"),
/**
* Mysql parser dialect
* Hive parser dialect
*/
MYSQL("mysql"),
HIVE("hive"),
/**
* Postgresql parser dialect
*/
POSTGRESQL("postgresql"),
POSTGRES("postgres"),
/**
* Sqlserver parser dialect
*/
Expand All @@ -64,13 +64,9 @@ public enum Dialect {
*/
CLICKHOUSE("clickhouse"),
/**
* Sap hana parser dialect
*/
SAP_HANA("sap_hana"),
/**
* OceanBase parser dialect
* oracle parser dialect
*/
OCEANBASE("oceanbase");
ORACLE("oracle");

public static final int MAX_DIALECT_SIZE = Dialect.values().length;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.plugin;

import org.apache.doris.plugin.audit.AuditEvent;

/**
* Audit plugin interface describe.
*/
Expand Down
26 changes: 23 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.apache.doris.nereids.parser.Dialect;
import org.apache.doris.plugin.PluginInfo.PluginType;
import org.apache.doris.plugin.PluginLoader.PluginStatus;
import org.apache.doris.qe.AuditLogBuilder;
import org.apache.doris.plugin.audit.AuditLoaderPlugin;
import org.apache.doris.plugin.audit.AuditLogBuilder;
import org.apache.doris.plugin.dialect.HttpDialectConverterPlugin;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -104,12 +106,24 @@ private boolean removeDynamicPluginName(String name) {
}

private void initBuiltinPlugins() {
// AuditLog
// AuditLog: log audit log to file
AuditLogBuilder auditLogBuilder = new AuditLogBuilder();
if (!registerBuiltinPlugin(auditLogBuilder.getPluginInfo(), auditLogBuilder)) {
LOG.warn("failed to register audit log builder");
}

// AuditLoader: log audit log to internal table
AuditLoaderPlugin auditLoaderPlugin = new AuditLoaderPlugin();
if (!registerBuiltinPlugin(auditLoaderPlugin.getPluginInfo(), auditLoaderPlugin)) {
LOG.warn("failed to register audit log builder");
}

// sql dialect converter
HttpDialectConverterPlugin httpDialectConverterPlugin = new HttpDialectConverterPlugin();
if (!registerBuiltinPlugin(httpDialectConverterPlugin.getPluginInfo(), httpDialectConverterPlugin)) {
LOG.warn("failed to register http dialect converter plugin");
}

// other builtin plugins
}

Expand Down Expand Up @@ -217,11 +231,17 @@ public boolean registerBuiltinPlugin(PluginInfo pluginInfo, Plugin plugin) {
}

PluginLoader loader = new BuiltinPluginLoader(Config.plugin_dir, pluginInfo, plugin);
PluginLoader checkLoader = plugins[pluginInfo.getTypeId()].putIfAbsent(pluginInfo.getName(), loader);
try {
loader.install();
} catch (Exception e) {
LOG.warn("failed to register builtin plugin {}", pluginInfo.getName(), e);
return false;
}
// add dialect plugin
if (plugin instanceof DialectConverterPlugin) {
addDialectPlugin((DialectConverterPlugin) plugin, pluginInfo);
}
PluginLoader checkLoader = plugins[pluginInfo.getTypeId()].putIfAbsent(pluginInfo.getName(), loader);
return checkLoader == null;
}

Expand Down
Loading

0 comments on commit 2e9f888

Please sign in to comment.