Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] add show processlist from xxx command to show process list of specify frondend #13326

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,43 @@ Lists the operations currently being performed by threads executing within the s
## Syntax

```SQL
SHOW [FULL] PROCESSLIST;
SHOW [FULL] [ALL] PROCESSLIST;
```
or

```SQL
SHOW [FULL] PROCESSLIST FROM ALL;
```

or

```SQL
SHOW [FULL] PROCESSLIST FROM 'fe_host';
```

## Parameter

| Parameter | Required | Description |
| --------- | -------- | ------------------------------------------------------------ |
| FULL | No | If you specify this parameter, all operations are listed. Otherwise, only the first 100 operations are listed. |
| Parameter | Required | Description |
|-----------| -------- |------------------------------------------------------------------------------------------------------------------------------------------------|
| FULL | No | If you specify this parameter, will output full SQL. Otherwise, only the first 100 chars of the sql are listed. |
| ALL | No | If you specify this parameter, the process of all frontend will be listed. Otherwise, only the process of the current frontend will be listed. |
| fe_host | No | If you specify this parameter, the process of specify frontend will be listed. |

## Return

| Return | Description |
| ------------------- | ------------------------------------------------------------ |
| Id | Connection ID. |
| User | The name of the user who runs the operation. |
| Host | The hostname of the client which runs the operation. |
| Db | The name of the database where the operation is executed. |
| Command | The type of the command. |
| ConnectionStartTime | Time when the connection starts. |
| Return | Description |
|---------------------|------------------------------------------------------------------------|
| FeHost | the hostname of frontend. |
| Id | Connection ID. |
| User | The name of the user who runs the operation. |
| ClientHost | The hostname of the client which runs the operation. |
| Db | The name of the database where the operation is executed. |
| Command | The type of the command. |
| ConnectionStartTime | Time when the connection starts. |
| Time | The time (in second) since the operation has entered the current state. |
| State | The state of the operation. |
| Info | The command that the operation is executing. |
| State | The state of the operation. |
| Info | The command that the operation is executing. |
| IsPending | Whether the operation is being queued. |

## Usage note

Expand All @@ -40,9 +55,33 @@ Example 1: lists the operations state via the user `root`.

```Plain
SHOW PROCESSLIST;
+------+------+---------------------+-------+---------+---------------------+------+-------+------------------+
| Id | User | Host | Db | Command | ConnectionStartTime | Time | State | Info |
+------+------+---------------------+-------+---------+---------------------+------+-------+------------------+
| 0 | root | x.x.x.x:xxxx | tpcds | Query | 2022-10-09 19:58:25 | 0 | OK | SHOW PROCESSLIST |
+------+------+---------------------+-------+---------+---------------------+------+-------+------------------+
+---------------+------+------+---------------------+------+---------+---------------------+------+-------+---------------------------+-----------+
| FeHost | Id | User | ClientHost | Db | Command | ConnectionStartTime | Time | State | Info | IsPending |
+---------------+------+------+---------------------+------+---------+---------------------+------+-------+---------------------------+-----------+
| x.x.x.1 | 0 | root | x.x.x.x:xxxx | ssb | Query | 2022-11-13 21:18:19 | 0 | OK | show processlist | false |
+---------------+------+------+---------------------+------+---------+---------------------+------+-------+---------------------------+-----------+
```

Example 2: lists the operations state of all frontend via the user `root`.

```Plain
SHOW PROCESSLIST FROM ALL;
+---------------+------+------+---------------------+------+---------+---------------------+------+-------+---------------------------+-----------+
| FeHost | Id | User | ClientHost | Db | Command | ConnectionStartTime | Time | State | Info | IsPending |
+---------------+------+------+---------------------+------+---------+---------------------+------+-------+---------------------------+-----------+
| x.x.x.1 | 0 | root | x.x.x.x:xxxx | ssb | Query | 2022-11-13 21:18:19 | 0 | OK | show processlist | false |
+---------------+------+------+---------------------+------+---------+---------------------+------+-------+---------------------------+-----------+
| x.x.x.2 | 0 | root | x.x.x.x:xxxx | ssb | Sleep | 2022-11-13 21:19:00 | 0 | OK | show processlist from all | false |
+---------------+------+------+---------------------+------+---------+---------------------+------+-------+---------------------------+-----------+
```

Example 3: lists the operations state of specify frontend via the user `root`.

```Plain
SHOW PROCESSLIST FROM 'x.x.x.1';
+---------------+------+------+---------------------+------+---------+---------------------+------+-------+---------------------------+-----------+
| FeHost | Id | User | ClientHost | Db | Command | ConnectionStartTime | Time | State | Info | IsPending |
+---------------+------+------+---------------------+------+---------+---------------------+------+-------+---------------------------+-----------+
| x.x.x.1 | 0 | root | x.x.x.x:xxxx | ssb | Query | 2022-11-13 21:18:19 | 0 | OK | show processlist | false |
+---------------+------+------+---------------------+------+---------+---------------------+------+-------+---------------------------+-----------+
```
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@
package com.starrocks.http.action;

import com.google.common.collect.Lists;
import com.starrocks.analysis.RedirectStatus;
import com.starrocks.http.ActionController;
import com.starrocks.http.BaseRequest;
import com.starrocks.http.BaseResponse;
import com.starrocks.http.IllegalArgException;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.FrontendOpExecutor;
import com.starrocks.qe.OriginStatement;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.service.ExecuteEnv;
import com.starrocks.system.Frontend;
import io.netty.handler.codec.http.HttpMethod;

import java.util.ArrayList;
Expand All @@ -38,20 +43,24 @@ public class SessionAction extends WebBaseAction {
private static final ArrayList<String> SESSION_TABLE_HEADER = Lists.newArrayList();

static {
SESSION_TABLE_HEADER.add("FeHost");
SESSION_TABLE_HEADER.add("Id");
SESSION_TABLE_HEADER.add("User");
SESSION_TABLE_HEADER.add("Host");
SESSION_TABLE_HEADER.add("Cluster");
SESSION_TABLE_HEADER.add("ClientHost");
SESSION_TABLE_HEADER.add("Db");
SESSION_TABLE_HEADER.add("Command");
SESSION_TABLE_HEADER.add("ConnectionStartTime");
SESSION_TABLE_HEADER.add("Time");
SESSION_TABLE_HEADER.add("State");
SESSION_TABLE_HEADER.add("Info");
SESSION_TABLE_HEADER.add("IsPending");
}

private boolean isShowAll;

public SessionAction(ActionController controller) {
super(controller);
isShowAll = false;
}

public static void registerAction(ActionController controller) throws IllegalArgException {
Expand All @@ -60,6 +69,7 @@ public static void registerAction(ActionController controller) throws IllegalArg

@Override
public void executeGet(BaseRequest request, BaseResponse response) {
isShowAll = Boolean.parseBoolean(request.getSingleParameter("all"));
getPageHeader(request, response.getContent());
appendSessionInfo(response.getContent());
getPageFooter(response.getContent());
Expand All @@ -69,12 +79,31 @@ public void executeGet(BaseRequest request, BaseResponse response) {
private void appendSessionInfo(StringBuilder buffer) {
buffer.append("<h2>Session Info</h2>");

List<ConnectContext.ThreadInfo> threadInfos = ExecuteEnv.getInstance().getScheduler().listConnection("root");
List<List<String>> rowSet = Lists.newArrayList();
List<ConnectContext.ThreadInfo> threadInfos = ExecuteEnv.getInstance().getScheduler().listConnection(
ConnectContext.get().getQualifiedUser());
long nowMs = System.currentTimeMillis();
for (ConnectContext.ThreadInfo info : threadInfos) {
rowSet.add(info.toRow(nowMs, false));
}
if (isShowAll) {
List<Frontend> frontends = GlobalStateMgr.getCurrentState().getFrontends(null);
for (Frontend frontend : frontends) {
if (frontend.getHost().equals(GlobalStateMgr.getCurrentState().getSelfNode().first)) {
continue;
}
String showProcStmt = "SHOW PROCESSLIST";
// ConnectContext build in RestBaseAction
ConnectContext context = ConnectContext.get();
FrontendOpExecutor frontendOpExecutor = new FrontendOpExecutor(frontend.getHost(), frontend.getRpcPort(),
new OriginStatement(showProcStmt, 0), context, RedirectStatus.FORWARD_NO_SYNC);
try {
frontendOpExecutor.execute();
rowSet.addAll(frontendOpExecutor.getProxyResultSet().getResultRows());
} catch (Exception ignored) {
}
}
}

buffer.append("<p>This page lists the session info, there are "
+ rowSet.size()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,7 @@ public boolean isRunning() {

public List<String> toRow(long nowMs, boolean full) {
List<String> row = Lists.newArrayList();
row.add(GlobalStateMgr.getCurrentState().getSelfNode().first);
row.add("" + connectionId);
row.add(ClusterNamespace.getNameFromFullName(qualifiedUser));
row.add(getMysqlChannel().getRemoteHostPortString());
Expand Down
191 changes: 191 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/FrontendOpExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// This file is made available under Elastic License 2.0.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package com.starrocks.qe;

import com.starrocks.analysis.RedirectStatus;
import com.starrocks.common.Config;
import com.starrocks.common.DdlException;
import com.starrocks.common.util.UUIDUtil;
import com.starrocks.mysql.MysqlChannel;
import com.starrocks.qe.QueryState.MysqlStateType;
import com.starrocks.rpc.FrontendServiceProxy;
import com.starrocks.sql.analyzer.AST2SQL;
import com.starrocks.sql.ast.SetStmt;
import com.starrocks.sql.ast.StatementBase;
import com.starrocks.system.SystemInfoService;
import com.starrocks.thrift.TMasterOpRequest;
import com.starrocks.thrift.TMasterOpResult;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.thrift.TQueryOptions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.nio.ByteBuffer;

public class FrontendOpExecutor {
private static final Logger LOG = LogManager.getLogger(FrontendOpExecutor.class);

private final OriginStatement originStmt;
private StatementBase parsedStmt;
private final ConnectContext ctx;
private TMasterOpResult result;

private int waitTimeoutMs;
// the total time of thrift connectTime add readTime and writeTime
private int thriftTimeoutMs;
// frontend host
private String feHost;
// frontend rpc port
private int feRpcPort;

public FrontendOpExecutor(String feHost, int feRpcPort, OriginStatement originStmt,
ConnectContext ctx, RedirectStatus status) {
this(feHost, feRpcPort, null, originStmt, ctx, status);
}

public FrontendOpExecutor(String feHost, int feRpcPort, StatementBase parsedStmt, OriginStatement originStmt,
ConnectContext ctx, RedirectStatus status) {
this.feHost = feHost;
this.feRpcPort = feRpcPort;
this.originStmt = originStmt;
this.ctx = ctx;
if (status.isNeedToWaitJournalSync()) {
this.waitTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000;
} else {
this.waitTimeoutMs = 0;
}
// set thriftTimeoutMs to query_timeout + thrift_rpc_timeout_ms
// so that we can return an execution timeout instead of a network timeout
this.thriftTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000 + Config.thrift_rpc_timeout_ms;
this.parsedStmt = parsedStmt;
}

public void execute() throws Exception {
forward();
LOG.info("forwarding to master get result max journal id: {}", result.maxJournalId);
ctx.getGlobalStateMgr().getJournalObservable().waitOn(result.maxJournalId, waitTimeoutMs);

if (result.state != null) {
MysqlStateType state = MysqlStateType.fromString(result.state);
if (state != null) {
ctx.getState().setStateType(state);
if (state == MysqlStateType.EOF || state == MysqlStateType.OK) {
afterForward();
}
}
}
}

private void afterForward() throws DdlException {
if (parsedStmt != null) {
if (parsedStmt instanceof SetStmt) {
SetExecutor executor = new SetExecutor(ctx, (SetStmt) parsedStmt);
try {
executor.setSessionVars();
} catch (DdlException e) {
LOG.warn("set session variables after forward failed", e);
// set remote result to null, so that mysql protocol will show the error message
result = null;
throw new DdlException("Global level variables are set successfully, " +
"but session level variables are set failed with error: " + e.getMessage() + ". " +
"Please check if the version of fe currently connected is the same as the version of master, " +
"or re-establish the connection and you will see the new variables");
}
}
}
}

// Send request to specified frontend
private void forward() throws Exception {
TNetworkAddress thriftAddress = new TNetworkAddress(feHost, feRpcPort);
TMasterOpRequest params = new TMasterOpRequest();
params.setCluster(SystemInfoService.DEFAULT_CLUSTER);
params.setSql(originStmt.originStmt);
params.setStmtIdx(originStmt.idx);
params.setUser(ctx.getQualifiedUser());
params.setDb(ctx.getDatabase());
params.setSqlMode(ctx.getSessionVariable().getSqlMode());
params.setUser_ip(ctx.getRemoteIP());
params.setTime_zone(ctx.getSessionVariable().getTimeZone());
params.setStmt_id(ctx.getStmtId());
params.setEnableStrictMode(ctx.getSessionVariable().getEnableInsertStrict());
params.setCurrent_user_ident(ctx.getCurrentUserIdentity().toThrift());
params.setIsLastStmt(ctx.getIsLastStmt());

TQueryOptions queryOptions = new TQueryOptions();
queryOptions.setMem_limit(ctx.getSessionVariable().getMaxExecMemByte());
queryOptions.setQuery_timeout(ctx.getSessionVariable().getQueryTimeoutS());
queryOptions.setLoad_mem_limit(ctx.getSessionVariable().getLoadMemLimit());
params.setQuery_options(queryOptions);

params.setQueryId(UUIDUtil.toTUniqueId(ctx.getQueryId()));
// forward all session variables
SetStmt setStmt = ctx.getModifiedSessionVariables();
if (setStmt != null) {
params.setModified_variables_sql(AST2SQL.toString(setStmt));
}
LOG.info("Forward statement {} to Leader {}", ctx.getStmtId(), thriftAddress);

result = FrontendServiceProxy.call(thriftAddress,
thriftTimeoutMs,
Config.thrift_rpc_retry_times,
client -> client.forward(params));
}

public ByteBuffer getOutputPacket() {
if (result == null) {
return null;
}
return result.packet;
}

public ShowResultSet getProxyResultSet() {
if (result == null) {
return null;
}
if (result.isSetResultSet()) {
return new ShowResultSet(result.resultSet);
} else {
return null;
}
}

/**
* if a query statement is forwarded to the leader, or if a show statement is automatically rewrote,
* the result of the query will be returned in thrift body and should write into mysql channel.
**/
public boolean sendResultToChannel(MysqlChannel channel) throws IOException {
if (!result.isSetChannelBufferList() || result.channelBufferList.isEmpty()) {
return false;
}
for (ByteBuffer byteBuffer : result.channelBufferList) {
channel.sendOnePacket(byteBuffer);
}
return true;
}

public void setResult(TMasterOpResult result) {
this.result = result;
}
}

Loading