Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions query/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"time"

"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxql"
"github.com/stretchr/testify/require"
)

var errUnexpected = errors.New("unexpected error")
Expand Down Expand Up @@ -354,6 +356,126 @@ func TestQueryExecutor_ShowQueries(t *testing.T) {
}
}

// TestQueryExecutor_ShowQueries_NonAdminFiltering verifies that SHOW QUERIES
// filters results based on the requesting user's database-level read
// permissions. A non-admin user should only see queries running against
// databases they have read access to, while an admin should see all queries.
func TestQueryExecutor_ShowQueries_NonAdminFiltering(t *testing.T) {
const (
dbColumn = 2
allowedDB = "mydb"
forbiddenDB = "secretdb"
nonAdminUser = "bar"
adminUser = "alice"
)

e := NewQueryExecutor()

// blockCh keeps the "long-running" queries alive so they appear in SHOW QUERIES.
blockCh := make(chan struct{})
defer close(blockCh)

e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *query.ExecutionContext) error {
switch stmt.(type) {
case *influxql.ShowQueriesStatement:
return e.TaskManager.ExecuteStatement(ctx, stmt)
case *influxql.SelectStatement:
// Block until the test completes so this query stays visible.
<-blockCh
return nil
}
t.Errorf("unexpected statement: %s", stmt)
return errUnexpected
},
}

// Start a "long-running" query on the forbidden database (by the admin user).
q1, err := influxql.ParseQuery("SELECT * FROM cpu")
require.NoError(t, err)
go func() {
discardOutput(e.ExecuteQuery(q1, query.ExecutionOptions{
Database: forbiddenDB,
UserID: adminUser,
}, nil))
}()

// Start a "long-running" query on the allowed database (by the non-admin user).
q2, err := influxql.ParseQuery("SELECT * FROM mem")
require.NoError(t, err)
go func() {
discardOutput(e.ExecuteQuery(q2, query.ExecutionOptions{
Database: allowedDB,
UserID: nonAdminUser,
}, nil))
}()

// Give the background queries a moment to register with the TaskManager.
time.Sleep(50 * time.Millisecond)

// Verify both queries are tracked (sanity check).
allQueries := e.TaskManager.Queries()
require.GreaterOrEqual(t, len(allQueries), 2, "expected at least 2 running queries")

// Helper to collect database names from SHOW QUERIES result rows.
databases := func(rows [][]interface{}) []string {
var dbs []string
for _, row := range rows {
if db, ok := row[dbColumn].(string); ok && db != "" {
dbs = append(dbs, db)
}
}
return dbs
}

t.Run("non-admin user only sees allowed databases", func(t *testing.T) {
showQ, err := influxql.ParseQuery("SHOW QUERIES")
require.NoError(t, err)

nonAdminUserInfo := &meta.UserInfo{
Name: nonAdminUser,
Admin: false,
Privileges: map[string]influxql.Privilege{
allowedDB: influxql.AllPrivileges,
},
}

results := e.ExecuteQuery(showQ, query.ExecutionOptions{
UserID: nonAdminUser,
CoarseAuthorizer: nonAdminUserInfo,
}, nil)
result := <-results
require.NoError(t, result.Err)
require.Len(t, result.Series, 1)

dbs := databases(result.Series[0].Values)
require.Contains(t, dbs, allowedDB, "non-admin user should see queries on databases they have read access to")
require.NotContains(t, dbs, forbiddenDB, "non-admin user should not see queries on databases they lack read access to")
})

t.Run("admin user sees all databases", func(t *testing.T) {
showQ, err := influxql.ParseQuery("SHOW QUERIES")
require.NoError(t, err)

adminUserInfo := &meta.UserInfo{
Name: adminUser,
Admin: true,
}

results := e.ExecuteQuery(showQ, query.ExecutionOptions{
UserID: adminUser,
CoarseAuthorizer: adminUserInfo,
}, nil)
result := <-results
require.NoError(t, result.Err)
require.Len(t, result.Series, 1)

dbs := databases(result.Series[0].Values)
require.Contains(t, dbs, allowedDB, "admin user should see all queries")
require.Contains(t, dbs, forbiddenDB, "admin user should see all queries")
})
}

func TestQueryExecutor_Limit_Timeout(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions query/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func NewTaskManager() *TaskManager {
func (t *TaskManager) ExecuteStatement(ctx *ExecutionContext, stmt influxql.Statement) error {
switch stmt := stmt.(type) {
case *influxql.ShowQueriesStatement:
rows, err := t.executeShowQueriesStatement(stmt)
rows, err := t.executeShowQueriesStatement(stmt, ctx.CoarseAuthorizer)
if err != nil {
return err
}
Expand Down Expand Up @@ -133,14 +133,18 @@ func (t *TaskManager) executeKillQueryStatement(stmt *influxql.KillQueryStatemen
return t.KillQuery(stmt.QueryID)
}

func (t *TaskManager) executeShowQueriesStatement(q *influxql.ShowQueriesStatement) (models.Rows, error) {
func (t *TaskManager) executeShowQueriesStatement(q *influxql.ShowQueriesStatement, authorizer CoarseAuthorizer) (models.Rows, error) {
t.mu.RLock()
defer t.mu.RUnlock()

now := time.Now()

values := make([][]interface{}, 0, len(t.queries))
for id, qi := range t.queries {
if authorizer != nil && qi.database != "" && !authorizer.AuthorizeDatabase(influxql.ReadPrivilege, qi.database) {
continue
}

d := now.Sub(qi.startTime)

d = prettyTime(d)
Expand Down