Skip to content

Commit

Permalink
[Wf-Diagnostics] Emit usage logs after workflow diagnostics run (#6316)
Browse files Browse the repository at this point in the history
* [Wf-Diagnostics] Emit usage logs after workflow diagnostics run

* linting

* Update module.go

* update messaging client initialisation
  • Loading branch information
sankari165 authored Oct 3, 2024
1 parent 3cacfbb commit ea3a90e
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 17 deletions.
2 changes: 2 additions & 0 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ func (s *server) startService() common.Daemon {
log.Fatalf("error creating async queue provider: %v", err)
}

params.KafkaConfig = s.cfg.Kafka

params.Logger.Info("Starting service " + s.name)

var daemon common.Daemon
Expand Down
26 changes: 14 additions & 12 deletions common/resource/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,19 @@ type (
ThrottledLogger log.Logger
HostName string

MetricScope tally.Scope
MembershipResolver membership.Resolver
RPCFactory common.RPCFactory
PProfInitializer common.PProfInitializer
PersistenceConfig config.Persistence
ClusterMetadata cluster.Metadata
ReplicatorConfig config.Replicator
MetricsClient metrics.Client
MessagingClient messaging.Client
BlobstoreClient blobstore.Client
ESClient es.GenericClient
ESConfig *config.ElasticSearchConfig
MetricScope tally.Scope
MembershipResolver membership.Resolver
RPCFactory common.RPCFactory
PProfInitializer common.PProfInitializer
PersistenceConfig config.Persistence
ClusterMetadata cluster.Metadata
ReplicatorConfig config.Replicator
MetricsClient metrics.Client
MessagingClient messaging.Client
BlobstoreClient blobstore.Client
ESClient es.GenericClient
ESConfig *config.ElasticSearchConfig

DynamicConfig dynamicconfig.Client
ClusterRedirectionPolicy *config.ClusterRedirectionPolicy
PublicClient workflowserviceclient.Interface
Expand All @@ -78,6 +79,7 @@ type (
IsolationGroupState isolationgroup.State // This can be nil, the default state store will be chosen if so
Partitioner partition.Partitioner
PinotConfig *config.PinotVisibilityConfig
KafkaConfig config.KafkaConfig
PinotClient pinot.GenericClient
OSClient es.GenericClient
OSConfig *config.ElasticSearchConfig
Expand Down
1 change: 1 addition & 0 deletions service/frontend/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func (wh *WorkflowHandler) DiagnoseWorkflowExecution(ctx context.Context, reques
Domain: request.GetDomain(),
WorkflowID: request.GetWorkflowExecution().GetWorkflowID(),
RunID: request.GetWorkflowExecution().GetRunID(),
Identity: request.Identity,
}
inputInBytes, err := json.Marshal(diagnosticWorkflowInput)
if err != nil {
Expand Down
28 changes: 27 additions & 1 deletion service/worker/diagnostics/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,17 @@ package diagnostics
import (
"context"

"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/messaging/kafka"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/worker/diagnostics/analytics"
"github.com/uber/cadence/service/worker/diagnostics/invariants"
)

const linkToTimeoutsRunbook = "https://cadenceworkflow.io/docs/workflow-troubleshooting/timeouts/"
const (
linkToTimeoutsRunbook = "https://cadenceworkflow.io/docs/workflow-troubleshooting/timeouts/"
WfDiagnosticsAppName = "workflow-diagnostics"
)

type retrieveExecutionHistoryInputParams struct {
Domain string
Expand Down Expand Up @@ -72,3 +78,23 @@ func (w *dw) rootCauseTimeouts(ctx context.Context, info rootCauseTimeoutsParams
})
return timeoutInvariant.RootCause(ctx, info.Issues)
}

func (w *dw) emitUsageLogs(ctx context.Context, info analytics.WfDiagnosticsUsageData) error {
client := w.newMessagingClient()
return emit(ctx, info, client)
}

func (w *dw) newMessagingClient() messaging.Client {
return kafka.NewKafkaClient(&w.kafkaCfg, w.metricsClient, w.logger, w.tallyScope, true)
}

func emit(ctx context.Context, info analytics.WfDiagnosticsUsageData, client messaging.Client) error {
producer, err := client.NewProducer(WfDiagnosticsAppName)
if err != nil {
return err
}
emitter := analytics.NewEmitter(analytics.EmitterParams{
Producer: producer,
})
return emitter.EmitUsageData(ctx, info)
}
12 changes: 12 additions & 0 deletions service/worker/diagnostics/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ import (
"github.com/uber/cadence/client"
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/worker/diagnostics/analytics"
"github.com/uber/cadence/service/worker/diagnostics/invariants"
)

Expand Down Expand Up @@ -124,6 +126,16 @@ func Test__rootCauseTimeouts(t *testing.T) {
require.Equal(t, expectedRootCause, result)
}

func Test__emit(t *testing.T) {
ctrl := gomock.NewController(t)
mockClient := messaging.NewMockClient(ctrl)
mockProducer := messaging.NewMockProducer(ctrl)
mockProducer.EXPECT().Publish(gomock.Any(), gomock.Any()).Return(nil)
mockClient.EXPECT().NewProducer(WfDiagnosticsAppName).Return(mockProducer, nil)
err := emit(context.Background(), analytics.WfDiagnosticsUsageData{}, mockClient)
require.NoError(t, err)
}

func testDiagnosticWorkflow(t *testing.T) *dw {
ctrl := gomock.NewController(t)
mockClientBean := client.NewMockBean(ctrl)
Expand Down
9 changes: 9 additions & 0 deletions service/worker/diagnostics/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (

"github.com/uber/cadence/client"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
)

Expand All @@ -46,15 +48,19 @@ type dw struct {
svcClient workflowserviceclient.Interface
clientBean client.Bean
metricsClient metrics.Client
logger log.Logger
tallyScope tally.Scope
worker worker.Worker
kafkaCfg config.KafkaConfig
}

type Params struct {
ServiceClient workflowserviceclient.Interface
ClientBean client.Bean
MetricsClient metrics.Client
Logger log.Logger
TallyScope tally.Scope
KafkaCfg config.KafkaConfig
}

// New creates a new diagnostics workflow.
Expand All @@ -64,6 +70,8 @@ func New(params Params) DiagnosticsWorkflow {
metricsClient: params.MetricsClient,
tallyScope: params.TallyScope,
clientBean: params.ClientBean,
logger: params.Logger,
kafkaCfg: params.KafkaCfg,
}
}

Expand All @@ -82,6 +90,7 @@ func (w *dw) Start() error {
newWorker.RegisterActivityWithOptions(w.retrieveExecutionHistory, activity.RegisterOptions{Name: retrieveWfExecutionHistoryActivity})
newWorker.RegisterActivityWithOptions(w.identifyTimeouts, activity.RegisterOptions{Name: identifyTimeoutsActivity})
newWorker.RegisterActivityWithOptions(w.rootCauseTimeouts, activity.RegisterOptions{Name: rootCauseTimeoutsActivity})
newWorker.RegisterActivityWithOptions(w.emitUsageLogs, activity.RegisterOptions{Name: emitUsageLogsActivity})
w.worker = newWorker
return newWorker.Start()
}
Expand Down
54 changes: 50 additions & 4 deletions service/worker/diagnostics/parent_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,24 @@ package diagnostics

import (
"fmt"
"time"

"go.uber.org/cadence/workflow"

"github.com/uber/cadence/service/worker/diagnostics/analytics"
)

const (
diagnosticsStarterWorkflow = "diagnostics-starter-workflow"
emitUsageLogsActivity = "emitUsageLogs"
queryDiagnosticsReport = "query-diagnostics-report"

issueTypeTimeouts = "Timeout"
)

type DiagnosticsStarterWorkflowInput struct {
Domain string
Identity string
WorkflowID string
RunID string
}
Expand All @@ -43,7 +50,7 @@ type DiagnosticsStarterWorkflowResult struct {
DiagnosticsResult *DiagnosticsWorkflowResult
}

func (w *dw) DiagnosticsStarterWorkflow(ctx workflow.Context, params DiagnosticsWorkflowInput) (*DiagnosticsStarterWorkflowResult, error) {
func (w *dw) DiagnosticsStarterWorkflow(ctx workflow.Context, params DiagnosticsStarterWorkflowInput) (*DiagnosticsStarterWorkflowResult, error) {
var result DiagnosticsWorkflowResult
err := workflow.SetQueryHandler(ctx, queryDiagnosticsReport, func() (DiagnosticsStarterWorkflowResult, error) {
return DiagnosticsStarterWorkflowResult{DiagnosticsResult: &result}, nil
Expand All @@ -52,14 +59,53 @@ func (w *dw) DiagnosticsStarterWorkflow(ctx workflow.Context, params Diagnostics
return nil, err
}

err = workflow.ExecuteChildWorkflow(ctx, w.DiagnosticsWorkflow, DiagnosticsWorkflowInput{
future := workflow.ExecuteChildWorkflow(ctx, w.DiagnosticsWorkflow, DiagnosticsWorkflowInput{
Domain: params.Domain,
WorkflowID: params.WorkflowID,
RunID: params.RunID,
}).Get(ctx, &result)
})

var childWfExec workflow.Execution
var childWfStart, childWfEnd time.Time
if err = future.GetChildWorkflowExecution().Get(ctx, &childWfExec); err != nil {
return nil, fmt.Errorf("Workflow Diagnostics start failed: %w", err)
}
childWfStart = workflow.Now(ctx)

err = future.Get(ctx, &result)
if err != nil {
return nil, fmt.Errorf("Workflow Diagnostics: %w", err)
return nil, fmt.Errorf("Workflow Diagnostics failed: %w", err)
}
childWfEnd = workflow.Now(ctx)

activityOptions := workflow.ActivityOptions{
ScheduleToCloseTimeout: time.Second * 10,
ScheduleToStartTimeout: time.Second * 5,
StartToCloseTimeout: time.Second * 5,
}
activityCtx := workflow.WithActivityOptions(ctx, activityOptions)
err = workflow.ExecuteActivity(activityCtx, w.emitUsageLogs, analytics.WfDiagnosticsUsageData{
Domain: params.Domain,
WorkflowID: params.WorkflowID,
RunID: params.RunID,
Identity: params.Identity,
IssueType: getIssueType(result),
DiagnosticsWorkflowID: childWfExec.ID,
DiagnosticsRunID: childWfExec.RunID,
DiagnosticsStartTime: childWfStart,
DiagnosticsEndTime: childWfEnd,
}).Get(ctx, nil)
if err != nil {
return nil, fmt.Errorf("EmitUsageLogs: %w", err)
}

return &DiagnosticsStarterWorkflowResult{DiagnosticsResult: &result}, nil
}

func getIssueType(result DiagnosticsWorkflowResult) string {
var issueType string
if result.Timeouts != nil {
issueType = issueTypeTimeouts
}
return issueType
}
2 changes: 2 additions & 0 deletions service/worker/diagnostics/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (s *diagnosticsWorkflowTestSuite) SetupTest() {
s.workflowEnv.RegisterActivityWithOptions(s.dw.retrieveExecutionHistory, activity.RegisterOptions{Name: retrieveWfExecutionHistoryActivity})
s.workflowEnv.RegisterActivityWithOptions(s.dw.identifyTimeouts, activity.RegisterOptions{Name: identifyTimeoutsActivity})
s.workflowEnv.RegisterActivityWithOptions(s.dw.rootCauseTimeouts, activity.RegisterOptions{Name: rootCauseTimeoutsActivity})
s.workflowEnv.RegisterActivityWithOptions(s.dw.emitUsageLogs, activity.RegisterOptions{Name: emitUsageLogsActivity})
}

func (s *diagnosticsWorkflowTestSuite) TearDownTest() {
Expand Down Expand Up @@ -131,6 +132,7 @@ func (s *diagnosticsWorkflowTestSuite) TestWorkflow() {
s.workflowEnv.OnActivity(retrieveWfExecutionHistoryActivity, mock.Anything, mock.Anything).Return(nil, nil)
s.workflowEnv.OnActivity(identifyTimeoutsActivity, mock.Anything, mock.Anything).Return(issues, nil)
s.workflowEnv.OnActivity(rootCauseTimeoutsActivity, mock.Anything, mock.Anything).Return(rootCause, nil)
s.workflowEnv.OnActivity(emitUsageLogsActivity, mock.Anything, mock.Anything).Return(nil)
s.workflowEnv.ExecuteWorkflow(diagnosticsStarterWorkflow, params)
s.True(s.workflowEnv.IsWorkflowCompleted())
var result DiagnosticsStarterWorkflowResult
Expand Down
4 changes: 4 additions & 0 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/domain"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -68,6 +69,7 @@ type (

// Config contains all the service config for worker
Config struct {
KafkaCfg config.KafkaConfig
ArchiverConfig *archiver.Config
IndexerCfg *indexer.Config
ScannerCfg *scanner.Config
Expand Down Expand Up @@ -344,6 +346,8 @@ func (s *Service) startDiagnostics() {
MetricsClient: s.GetMetricsClient(),
TallyScope: s.params.MetricScope,
ClientBean: s.GetClientBean(),
Logger: s.GetLogger(),
KafkaCfg: s.params.KafkaConfig,
}
if err := diagnostics.New(params).Start(); err != nil {
s.Stop()
Expand Down

0 comments on commit ea3a90e

Please sign in to comment.