Skip to content

mysql: stat activity #3142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
return
}

srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, nil, a.CatalogPool, info.config.SourceName)
srcConn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, nil, a.CatalogPool, info.config.SourceName)
if err != nil {
if !errors.Is(err, errors.ErrUnsupported) {
logger.Error("Failed to create connector to handle slot info", slog.Any("error", err))
Expand Down Expand Up @@ -1086,8 +1086,11 @@ func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *prot
additionalTableMappings []*protos.TableMapping,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, cfg.FlowJobName)
srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, cfg.Env, a.CatalogPool, cfg.SourceName)
srcConn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, cfg.Env, a.CatalogPool, cfg.SourceName)
if err != nil {
if errors.Is(err, errors.ErrUnsupported) {
return nil
}
return fmt.Errorf("failed to get source connector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)
Expand All @@ -1111,8 +1114,11 @@ func (a *FlowableActivity) RemoveTablesFromPublication(
removedTablesMapping []*protos.TableMapping,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, cfg.FlowJobName)
srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, cfg.Env, a.CatalogPool, cfg.SourceName)
srcConn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, cfg.Env, a.CatalogPool, cfg.SourceName)
if err != nil {
if errors.Is(err, errors.ErrUnsupported) {
return nil
}
return a.Alerter.LogFlowError(ctx, cfg.FlowJobName, fmt.Errorf("failed to get source connector: %w", err))
}
defer connectors.CloseConnector(ctx, srcConn)
Expand Down
73 changes: 2 additions & 71 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
Expand Down Expand Up @@ -253,81 +252,13 @@ func (h *FlowRequestHandler) GetStatInfo(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerStatResponse, error) {
peerConn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, nil, h.pool, req.PeerName)
peerConn, err := connectors.GetByNameAs[connectors.StatActivityConnector](ctx, nil, h.pool, req.PeerName)
if err != nil {
return nil, err
}
defer connectors.CloseConnector(ctx, peerConn)

peerUser := peerConn.Config.User

rows, err := peerConn.Conn().Query(ctx, "SELECT pid, wait_event, wait_event_type, query_start::text, query,"+
"EXTRACT(epoch FROM(now()-query_start)) AS dur, state"+
" FROM pg_stat_activity WHERE "+
"usename=$1 AND application_name LIKE 'peerdb%';", peerUser)
if err != nil {
slog.Error("Failed to get stat info", slog.Any("error", err))
return nil, err
}

statInfoRows, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.StatInfo, error) {
var pid int64
var waitEvent sql.NullString
var waitEventType sql.NullString
var queryStart sql.NullString
var query sql.NullString
var duration sql.NullFloat64
// shouldn't be null
var state string

err := rows.Scan(&pid, &waitEvent, &waitEventType, &queryStart, &query, &duration, &state)
if err != nil {
slog.Error("Failed to scan row", slog.Any("error", err))
return nil, err
}

we := waitEvent.String
if !waitEvent.Valid {
we = ""
}

wet := waitEventType.String
if !waitEventType.Valid {
wet = ""
}

q := query.String
if !query.Valid {
q = ""
}

qs := queryStart.String
if !queryStart.Valid {
qs = ""
}

d := duration.Float64
if !duration.Valid {
d = -1
}

return &protos.StatInfo{
Pid: pid,
WaitEvent: we,
WaitEventType: wet,
QueryStart: qs,
Query: q,
Duration: float32(d),
State: state,
}, nil
})
if err != nil {
return nil, err
}

return &protos.PeerStatResponse{
StatData: statInfoRows,
}, nil
return peerConn.StatActivity(ctx, req)
}

func (h *FlowRequestHandler) GetPublications(
Expand Down
28 changes: 9 additions & 19 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peerdb/flow/alerting"
connbigquery "github.com/PeerDB-io/peerdb/flow/connectors/bigquery"
connclickhouse "github.com/PeerDB-io/peerdb/flow/connectors/clickhouse"
connelasticsearch "github.com/PeerDB-io/peerdb/flow/connectors/elasticsearch"
Expand Down Expand Up @@ -53,6 +52,12 @@ type MirrorDestinationValidationConnector interface {
ValidateMirrorDestination(context.Context, *protos.FlowConnectionConfigs, map[string]*protos.TableSchema) error
}

type StatActivityConnector interface {
Connector

StatActivity(context.Context, *protos.PostgresPeerActivityInfoRequest) (*protos.PeerStatResponse, error)
}

type GetTableSchemaConnector interface {
Connector

Expand Down Expand Up @@ -103,24 +108,6 @@ type CDCPullConnectorCore interface {

// PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR
PullFlowCleanup(ctx context.Context, jobName string) error

// HandleSlotInfo update monitoring info on slot size etc
HandleSlotInfo(
ctx context.Context,
alerter *alerting.Alerter,
catalogPool shared.CatalogPool,
alertKeys *alerting.AlertKeys,
slotMetricGauges otel_metrics.SlotMetricGauges,
) error

// GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector.
GetSlotInfo(ctx context.Context, slotName string) ([]*protos.SlotInfo, error)

// AddTablesToPublication adds additional tables added to a mirror to the publication also
AddTablesToPublication(ctx context.Context, req *protos.AddTablesToPublicationInput) error

// RemoveTablesFromPublication removes tables from the publication
RemoveTablesFromPublication(ctx context.Context, req *protos.RemoveTablesFromPublicationInput) error
}

type CDCPullConnector interface {
Expand Down Expand Up @@ -522,6 +509,9 @@ var (
_ CDCNormalizeConnector = &connsnowflake.SnowflakeConnector{}
_ CDCNormalizeConnector = &connclickhouse.ClickHouseConnector{}

_ StatActivityConnector = &connpostgres.PostgresConnector{}
_ StatActivityConnector = &connmysql.MySqlConnector{}

_ GetTableSchemaConnector = &connpostgres.PostgresConnector{}
_ GetTableSchemaConnector = &connmysql.MySqlConnector{}
_ GetTableSchemaConnector = &connsnowflake.SnowflakeConnector{}
Expand Down
23 changes: 0 additions & 23 deletions flow/connectors/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"go.mongodb.org/mongo-driver/v2/mongo/readpref"
"go.temporal.io/sdk/log"

"github.com/PeerDB-io/peerdb/flow/alerting"
metadataStore "github.com/PeerDB-io/peerdb/flow/connectors/external_metadata"
"github.com/PeerDB-io/peerdb/flow/generated/protos"
"github.com/PeerDB-io/peerdb/flow/internal"
Expand Down Expand Up @@ -194,28 +193,6 @@ func (c *MongoConnector) PullFlowCleanup(ctx context.Context, jobName string) er
return nil
}

func (c *MongoConnector) HandleSlotInfo(
ctx context.Context,
alerter *alerting.Alerter,
catalogPool shared.CatalogPool,
alertKeys *alerting.AlertKeys,
slotMetricGauges otel_metrics.SlotMetricGauges,
) error {
return nil
}

func (c *MongoConnector) GetSlotInfo(ctx context.Context, slotName string) ([]*protos.SlotInfo, error) {
return nil, nil
}

func (c *MongoConnector) AddTablesToPublication(ctx context.Context, req *protos.AddTablesToPublicationInput) error {
return nil
}

func (c *MongoConnector) RemoveTablesFromPublication(ctx context.Context, req *protos.RemoveTablesFromPublicationInput) error {
return nil
}

// end stubs

func (c *MongoConnector) PullRecords(
Expand Down
23 changes: 0 additions & 23 deletions flow/connectors/mysql/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
_ "github.com/pingcap/tidb/pkg/types/parser_driver"
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peerdb/flow/alerting"
"github.com/PeerDB-io/peerdb/flow/connectors/utils"
"github.com/PeerDB-io/peerdb/flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peerdb/flow/generated/protos"
Expand Down Expand Up @@ -306,28 +305,6 @@ func (c *MySqlConnector) PullFlowCleanup(ctx context.Context, jobName string) er
return nil
}

func (c *MySqlConnector) HandleSlotInfo(
ctx context.Context,
alerter *alerting.Alerter,
catalogPool shared.CatalogPool,
alertKeys *alerting.AlertKeys,
slotMetricGauges otel_metrics.SlotMetricGauges,
) error {
return nil
}

func (c *MySqlConnector) GetSlotInfo(ctx context.Context, slotName string) ([]*protos.SlotInfo, error) {
return nil, nil
}

func (c *MySqlConnector) AddTablesToPublication(ctx context.Context, req *protos.AddTablesToPublicationInput) error {
return nil
}

func (c *MySqlConnector) RemoveTablesFromPublication(ctx context.Context, req *protos.RemoveTablesFromPublicationInput) error {
return nil
}

func (c *MySqlConnector) PullRecords(
ctx context.Context,
catalogPool shared.CatalogPool,
Expand Down
42 changes: 42 additions & 0 deletions flow/connectors/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,3 +392,45 @@ func (c *MySqlConnector) GetVersion(ctx context.Context) (string, error) {
}
return "", errors.New("failed to connect")
}

func (c *MySqlConnector) StatActivity(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerStatResponse, error) {
rs, err := c.Execute(ctx, "SELECT ID,COMMAND,STATE,TIME,INFO FROM performance_schema.processlist WHERE USER=?", c.config.User)
if err != nil {
// 42S02 is ER_NO_SUCH_TABLE
var myErr *mysql.MyError
if errors.As(err, &myErr) && myErr.Code == 1146 && myErr.State == "42S02" {
// mariadb
rs, err = c.Execute(ctx,
"SELECT PROCESSLIST_ID,PROCESSLIST_COMMAND,PROCESSLIST_STATE,PROCESSLIST_TIME,PROCESSLIST_INFO"+
" FROM performance_schema.threads WHERE USER=?", c.config.User)
if errors.As(err, &myErr) && myErr.Code == 1146 && myErr.State == "42S02" {
rs, err = c.Execute(ctx,
"SELECT ID,COMMAND,STATE,TIME,INFO FROM information_schema.processlist WHERE USER=?", c.config.User)
}
}

if err != nil {
return nil, err
}
}

statInfoRows := make([]*protos.StatInfo, len(rs.Values))
for idx, row := range rs.Values {
statInfoRows[idx] = &protos.StatInfo{
Pid: row[0].AsInt64(),
WaitEvent: string(row[1].AsString()),
WaitEventType: "",
QueryStart: "",
Query: string(row[4].AsString()),
Duration: float32(row[3].AsUint64()),
State: string(row[2].AsString()),
}
}

return &protos.PeerStatResponse{
StatData: statInfoRows,
}, nil
}
72 changes: 72 additions & 0 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,78 @@ func (c *PostgresConnector) CreateRawTable(ctx context.Context, req *protos.Crea
return nil, nil
}

func (c *PostgresConnector) StatActivity(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerStatResponse, error) {
rows, err := c.Conn().Query(ctx, "SELECT pid, wait_event, wait_event_type, query_start::text, query,"+
"CAST(EXTRACT(epoch FROM(now()-query_start)) AS float4) AS dur, state"+
" FROM pg_stat_activity WHERE "+
"usename=$1 AND application_name LIKE 'peerdb%';", c.Config.User)
if err != nil {
slog.Error("Failed to get stat info", slog.Any("error", err))
return nil, err
}

statInfoRows, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.StatInfo, error) {
var pid int64
var waitEvent pgtype.Text
var waitEventType pgtype.Text
var queryStart pgtype.Text
var query pgtype.Text
var duration pgtype.Float4
// shouldn't be null
var state string

if err := rows.Scan(&pid, &waitEvent, &waitEventType, &queryStart, &query, &duration, &state); err != nil {
slog.Error("Failed to scan row", slog.Any("error", err))
return nil, err
}

we := waitEvent.String
if !waitEvent.Valid {
we = ""
}

wet := waitEventType.String
if !waitEventType.Valid {
wet = ""
}

q := query.String
if !query.Valid {
q = ""
}

qs := queryStart.String
if !queryStart.Valid {
qs = ""
}

d := duration.Float32
if !duration.Valid {
d = -1
}

return &protos.StatInfo{
Pid: pid,
WaitEvent: we,
WaitEventType: wet,
QueryStart: qs,
Query: q,
Duration: d,
State: state,
}, nil
})
if err != nil {
return nil, err
}

return &protos.PeerStatResponse{
StatData: statInfoRows,
}, nil
}

func (c *PostgresConnector) GetTableSchema(
ctx context.Context,
env map[string]string,
Expand Down
4 changes: 3 additions & 1 deletion ui/app/peers/[peerName]/stattable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ export default function StatTable({ peerName }: { peerName: string }) {
</Label>
</TableCell>
<TableCell>
<TimeLabel timeVal={stat.queryStart} fontSize={14} />
{stat.queryStart && (
<TimeLabel timeVal={stat.queryStart} fontSize={14} />
)}
</TableCell>
<TableCell variant='extended'>
<div
Expand Down
Loading