diff --git a/cmd/server/cadence/server.go b/cmd/server/cadence/server.go index 907c69827ef..e14d3eeb559 100644 --- a/cmd/server/cadence/server.go +++ b/cmd/server/cadence/server.go @@ -255,6 +255,8 @@ func (s *server) startService() common.Daemon { log.Fatalf("error creating async queue provider: %v", err) } + params.KafkaConfig = s.cfg.Kafka + params.Logger.Info("Starting service " + s.name) var daemon common.Daemon diff --git a/common/resource/params.go b/common/resource/params.go index 9406fe65cb8..1fe6f3348b0 100644 --- a/common/resource/params.go +++ b/common/resource/params.go @@ -55,18 +55,19 @@ type ( ThrottledLogger log.Logger HostName string - MetricScope tally.Scope - MembershipResolver membership.Resolver - RPCFactory common.RPCFactory - PProfInitializer common.PProfInitializer - PersistenceConfig config.Persistence - ClusterMetadata cluster.Metadata - ReplicatorConfig config.Replicator - MetricsClient metrics.Client - MessagingClient messaging.Client - BlobstoreClient blobstore.Client - ESClient es.GenericClient - ESConfig *config.ElasticSearchConfig + MetricScope tally.Scope + MembershipResolver membership.Resolver + RPCFactory common.RPCFactory + PProfInitializer common.PProfInitializer + PersistenceConfig config.Persistence + ClusterMetadata cluster.Metadata + ReplicatorConfig config.Replicator + MetricsClient metrics.Client + MessagingClient messaging.Client + BlobstoreClient blobstore.Client + ESClient es.GenericClient + ESConfig *config.ElasticSearchConfig + DynamicConfig dynamicconfig.Client ClusterRedirectionPolicy *config.ClusterRedirectionPolicy PublicClient workflowserviceclient.Interface @@ -78,6 +79,7 @@ type ( IsolationGroupState isolationgroup.State // This can be nil, the default state store will be chosen if so Partitioner partition.Partitioner PinotConfig *config.PinotVisibilityConfig + KafkaConfig config.KafkaConfig PinotClient pinot.GenericClient OSClient es.GenericClient OSConfig *config.ElasticSearchConfig diff --git a/service/frontend/api/handler.go b/service/frontend/api/handler.go index 4dccace860a..1cc90004da5 100644 --- a/service/frontend/api/handler.go +++ b/service/frontend/api/handler.go @@ -225,6 +225,7 @@ func (wh *WorkflowHandler) DiagnoseWorkflowExecution(ctx context.Context, reques Domain: request.GetDomain(), WorkflowID: request.GetWorkflowExecution().GetWorkflowID(), RunID: request.GetWorkflowExecution().GetRunID(), + Identity: request.Identity, } inputInBytes, err := json.Marshal(diagnosticWorkflowInput) if err != nil { diff --git a/service/worker/diagnostics/activities.go b/service/worker/diagnostics/activities.go index e6b97382aa4..ef47a8abcce 100644 --- a/service/worker/diagnostics/activities.go +++ b/service/worker/diagnostics/activities.go @@ -25,11 +25,17 @@ package diagnostics import ( "context" + "github.com/uber/cadence/common/messaging" + "github.com/uber/cadence/common/messaging/kafka" "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/worker/diagnostics/analytics" "github.com/uber/cadence/service/worker/diagnostics/invariants" ) -const linkToTimeoutsRunbook = "https://cadenceworkflow.io/docs/workflow-troubleshooting/timeouts/" +const ( + linkToTimeoutsRunbook = "https://cadenceworkflow.io/docs/workflow-troubleshooting/timeouts/" + WfDiagnosticsAppName = "workflow-diagnostics" +) type retrieveExecutionHistoryInputParams struct { Domain string @@ -72,3 +78,23 @@ func (w *dw) rootCauseTimeouts(ctx context.Context, info rootCauseTimeoutsParams }) return timeoutInvariant.RootCause(ctx, info.Issues) } + +func (w *dw) emitUsageLogs(ctx context.Context, info analytics.WfDiagnosticsUsageData) error { + client := w.newMessagingClient() + return emit(ctx, info, client) +} + +func (w *dw) newMessagingClient() messaging.Client { + return kafka.NewKafkaClient(&w.kafkaCfg, w.metricsClient, w.logger, w.tallyScope, true) +} + +func emit(ctx context.Context, info analytics.WfDiagnosticsUsageData, client messaging.Client) error { + producer, err := client.NewProducer(WfDiagnosticsAppName) + if err != nil { + return err + } + emitter := analytics.NewEmitter(analytics.EmitterParams{ + Producer: producer, + }) + return emitter.EmitUsageData(ctx, info) +} diff --git a/service/worker/diagnostics/activities_test.go b/service/worker/diagnostics/activities_test.go index d8b96ad5bc1..abcefcf3197 100644 --- a/service/worker/diagnostics/activities_test.go +++ b/service/worker/diagnostics/activities_test.go @@ -34,7 +34,9 @@ import ( "github.com/uber/cadence/client" "github.com/uber/cadence/client/frontend" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/worker/diagnostics/analytics" "github.com/uber/cadence/service/worker/diagnostics/invariants" ) @@ -124,6 +126,16 @@ func Test__rootCauseTimeouts(t *testing.T) { require.Equal(t, expectedRootCause, result) } +func Test__emit(t *testing.T) { + ctrl := gomock.NewController(t) + mockClient := messaging.NewMockClient(ctrl) + mockProducer := messaging.NewMockProducer(ctrl) + mockProducer.EXPECT().Publish(gomock.Any(), gomock.Any()).Return(nil) + mockClient.EXPECT().NewProducer(WfDiagnosticsAppName).Return(mockProducer, nil) + err := emit(context.Background(), analytics.WfDiagnosticsUsageData{}, mockClient) + require.NoError(t, err) +} + func testDiagnosticWorkflow(t *testing.T) *dw { ctrl := gomock.NewController(t) mockClientBean := client.NewMockBean(ctrl) diff --git a/service/worker/diagnostics/module.go b/service/worker/diagnostics/module.go index 7e67c7ce5e0..4c9e7db56d4 100644 --- a/service/worker/diagnostics/module.go +++ b/service/worker/diagnostics/module.go @@ -34,6 +34,8 @@ import ( "github.com/uber/cadence/client" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/metrics" ) @@ -46,15 +48,19 @@ type dw struct { svcClient workflowserviceclient.Interface clientBean client.Bean metricsClient metrics.Client + logger log.Logger tallyScope tally.Scope worker worker.Worker + kafkaCfg config.KafkaConfig } type Params struct { ServiceClient workflowserviceclient.Interface ClientBean client.Bean MetricsClient metrics.Client + Logger log.Logger TallyScope tally.Scope + KafkaCfg config.KafkaConfig } // New creates a new diagnostics workflow. @@ -64,6 +70,8 @@ func New(params Params) DiagnosticsWorkflow { metricsClient: params.MetricsClient, tallyScope: params.TallyScope, clientBean: params.ClientBean, + logger: params.Logger, + kafkaCfg: params.KafkaCfg, } } @@ -82,6 +90,7 @@ func (w *dw) Start() error { newWorker.RegisterActivityWithOptions(w.retrieveExecutionHistory, activity.RegisterOptions{Name: retrieveWfExecutionHistoryActivity}) newWorker.RegisterActivityWithOptions(w.identifyTimeouts, activity.RegisterOptions{Name: identifyTimeoutsActivity}) newWorker.RegisterActivityWithOptions(w.rootCauseTimeouts, activity.RegisterOptions{Name: rootCauseTimeoutsActivity}) + newWorker.RegisterActivityWithOptions(w.emitUsageLogs, activity.RegisterOptions{Name: emitUsageLogsActivity}) w.worker = newWorker return newWorker.Start() } diff --git a/service/worker/diagnostics/parent_workflow.go b/service/worker/diagnostics/parent_workflow.go index 28296e28c89..b4a0321143a 100644 --- a/service/worker/diagnostics/parent_workflow.go +++ b/service/worker/diagnostics/parent_workflow.go @@ -24,17 +24,24 @@ package diagnostics import ( "fmt" + "time" "go.uber.org/cadence/workflow" + + "github.com/uber/cadence/service/worker/diagnostics/analytics" ) const ( diagnosticsStarterWorkflow = "diagnostics-starter-workflow" + emitUsageLogsActivity = "emitUsageLogs" queryDiagnosticsReport = "query-diagnostics-report" + + issueTypeTimeouts = "Timeout" ) type DiagnosticsStarterWorkflowInput struct { Domain string + Identity string WorkflowID string RunID string } @@ -43,7 +50,7 @@ type DiagnosticsStarterWorkflowResult struct { DiagnosticsResult *DiagnosticsWorkflowResult } -func (w *dw) DiagnosticsStarterWorkflow(ctx workflow.Context, params DiagnosticsWorkflowInput) (*DiagnosticsStarterWorkflowResult, error) { +func (w *dw) DiagnosticsStarterWorkflow(ctx workflow.Context, params DiagnosticsStarterWorkflowInput) (*DiagnosticsStarterWorkflowResult, error) { var result DiagnosticsWorkflowResult err := workflow.SetQueryHandler(ctx, queryDiagnosticsReport, func() (DiagnosticsStarterWorkflowResult, error) { return DiagnosticsStarterWorkflowResult{DiagnosticsResult: &result}, nil @@ -52,14 +59,53 @@ func (w *dw) DiagnosticsStarterWorkflow(ctx workflow.Context, params Diagnostics return nil, err } - err = workflow.ExecuteChildWorkflow(ctx, w.DiagnosticsWorkflow, DiagnosticsWorkflowInput{ + future := workflow.ExecuteChildWorkflow(ctx, w.DiagnosticsWorkflow, DiagnosticsWorkflowInput{ Domain: params.Domain, WorkflowID: params.WorkflowID, RunID: params.RunID, - }).Get(ctx, &result) + }) + + var childWfExec workflow.Execution + var childWfStart, childWfEnd time.Time + if err = future.GetChildWorkflowExecution().Get(ctx, &childWfExec); err != nil { + return nil, fmt.Errorf("Workflow Diagnostics start failed: %w", err) + } + childWfStart = workflow.Now(ctx) + + err = future.Get(ctx, &result) if err != nil { - return nil, fmt.Errorf("Workflow Diagnostics: %w", err) + return nil, fmt.Errorf("Workflow Diagnostics failed: %w", err) + } + childWfEnd = workflow.Now(ctx) + + activityOptions := workflow.ActivityOptions{ + ScheduleToCloseTimeout: time.Second * 10, + ScheduleToStartTimeout: time.Second * 5, + StartToCloseTimeout: time.Second * 5, + } + activityCtx := workflow.WithActivityOptions(ctx, activityOptions) + err = workflow.ExecuteActivity(activityCtx, w.emitUsageLogs, analytics.WfDiagnosticsUsageData{ + Domain: params.Domain, + WorkflowID: params.WorkflowID, + RunID: params.RunID, + Identity: params.Identity, + IssueType: getIssueType(result), + DiagnosticsWorkflowID: childWfExec.ID, + DiagnosticsRunID: childWfExec.RunID, + DiagnosticsStartTime: childWfStart, + DiagnosticsEndTime: childWfEnd, + }).Get(ctx, nil) + if err != nil { + return nil, fmt.Errorf("EmitUsageLogs: %w", err) } return &DiagnosticsStarterWorkflowResult{DiagnosticsResult: &result}, nil } + +func getIssueType(result DiagnosticsWorkflowResult) string { + var issueType string + if result.Timeouts != nil { + issueType = issueTypeTimeouts + } + return issueType +} diff --git a/service/worker/diagnostics/workflow_test.go b/service/worker/diagnostics/workflow_test.go index 87ef008d644..9530e0ae27a 100644 --- a/service/worker/diagnostics/workflow_test.go +++ b/service/worker/diagnostics/workflow_test.go @@ -74,6 +74,7 @@ func (s *diagnosticsWorkflowTestSuite) SetupTest() { s.workflowEnv.RegisterActivityWithOptions(s.dw.retrieveExecutionHistory, activity.RegisterOptions{Name: retrieveWfExecutionHistoryActivity}) s.workflowEnv.RegisterActivityWithOptions(s.dw.identifyTimeouts, activity.RegisterOptions{Name: identifyTimeoutsActivity}) s.workflowEnv.RegisterActivityWithOptions(s.dw.rootCauseTimeouts, activity.RegisterOptions{Name: rootCauseTimeoutsActivity}) + s.workflowEnv.RegisterActivityWithOptions(s.dw.emitUsageLogs, activity.RegisterOptions{Name: emitUsageLogsActivity}) } func (s *diagnosticsWorkflowTestSuite) TearDownTest() { @@ -131,6 +132,7 @@ func (s *diagnosticsWorkflowTestSuite) TestWorkflow() { s.workflowEnv.OnActivity(retrieveWfExecutionHistoryActivity, mock.Anything, mock.Anything).Return(nil, nil) s.workflowEnv.OnActivity(identifyTimeoutsActivity, mock.Anything, mock.Anything).Return(issues, nil) s.workflowEnv.OnActivity(rootCauseTimeoutsActivity, mock.Anything, mock.Anything).Return(rootCause, nil) + s.workflowEnv.OnActivity(emitUsageLogsActivity, mock.Anything, mock.Anything).Return(nil) s.workflowEnv.ExecuteWorkflow(diagnosticsStarterWorkflow, params) s.True(s.workflowEnv.IsWorkflowCompleted()) var result DiagnosticsStarterWorkflowResult diff --git a/service/worker/service.go b/service/worker/service.go index 24641382de3..ca665b160f7 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -29,6 +29,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/domain" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log/tag" @@ -68,6 +69,7 @@ type ( // Config contains all the service config for worker Config struct { + KafkaCfg config.KafkaConfig ArchiverConfig *archiver.Config IndexerCfg *indexer.Config ScannerCfg *scanner.Config @@ -344,6 +346,8 @@ func (s *Service) startDiagnostics() { MetricsClient: s.GetMetricsClient(), TallyScope: s.params.MetricScope, ClientBean: s.GetClientBean(), + Logger: s.GetLogger(), + KafkaCfg: s.params.KafkaConfig, } if err := diagnostics.New(params).Start(); err != nil { s.Stop()