Skip to content

Commit

Permalink
Improved domain specific metrics in workflow handler (#2468)
Browse files Browse the repository at this point in the history
  • Loading branch information
shreyassrivatsan authored Aug 28, 2019
1 parent e0808e5 commit 356259c
Showing 1 changed file with 73 additions and 38 deletions.
111 changes: 73 additions & 38 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(
) (resp *gen.RecordActivityTaskHeartbeatResponse, retError error) {
defer log.CapturePanic(wh.GetLogger(), &retError)

scope, sw := wh.startRequestProfile(metrics.FrontendRecordActivityTaskHeartbeatScope)
defer sw.Stop()
scope := wh.getDefaultScope(metrics.FrontendRecordActivityTaskHeartbeatScope)

if err := wh.versionChecker.checkClientVersion(ctx); err != nil {
return nil, wh.error(err, scope)
Expand Down Expand Up @@ -599,8 +598,13 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(
return nil, wh.error(err, scope)
}

// add domain tag to scope, so further metrics will have the domain tag
scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name))
scope, sw := wh.startRequestProfileWithDomain(
metrics.FrontendRecordActivityTaskHeartbeatScope,
domainWrapper{
domain: domainEntry.GetInfo().Name,
},
)
defer sw.Stop()

sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name)
Expand Down Expand Up @@ -650,7 +654,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID(
) (resp *gen.RecordActivityTaskHeartbeatResponse, retError error) {
defer log.CapturePanic(wh.GetLogger(), &retError)

scope, sw := wh.startRequestProfile(metrics.FrontendRecordActivityTaskHeartbeatByIDScope)
scope, sw := wh.startRequestProfileWithDomain(metrics.FrontendRecordActivityTaskHeartbeatByIDScope, heartbeatRequest)
defer sw.Stop()

if err := wh.versionChecker.checkClientVersion(ctx); err != nil {
Expand Down Expand Up @@ -757,9 +761,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted(
) (retError error) {
defer log.CapturePanic(wh.GetLogger(), &retError)

scope, sw := wh.startRequestProfile(metrics.FrontendRespondActivityTaskCompletedScope)
defer sw.Stop()

scope := wh.getDefaultScope(metrics.FrontendRespondActivityTaskCompletedScope)
if err := wh.versionChecker.checkClientVersion(ctx); err != nil {
return wh.error(err, scope)
}
Expand Down Expand Up @@ -790,8 +792,13 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted(
return wh.error(errIdentityTooLong, scope)
}

// add domain tag to scope, so further metrics will have the domain tag
scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name))
scope, sw := wh.startRequestProfileWithDomain(
metrics.FrontendRespondActivityTaskCompletedScope,
domainWrapper{
domain: domainEntry.GetInfo().Name,
},
)
defer sw.Stop()

sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name)
Expand Down Expand Up @@ -840,7 +847,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedByID(
) (retError error) {
defer log.CapturePanic(wh.GetLogger(), &retError)

scope, sw := wh.startRequestProfile(metrics.FrontendRespondActivityTaskCompletedByIDScope)
scope, sw := wh.startRequestProfileWithDomain(metrics.FrontendRespondActivityTaskCompletedByIDScope, completeRequest)
defer sw.Stop()

if err := wh.versionChecker.checkClientVersion(ctx); err != nil {
Expand Down Expand Up @@ -949,9 +956,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed(
) (retError error) {
defer log.CapturePanic(wh.GetLogger(), &retError)

scope, sw := wh.startRequestProfile(metrics.FrontendRespondActivityTaskFailedScope)
defer sw.Stop()

scope := wh.getDefaultScope(metrics.FrontendRespondActivityTaskFailedScope)
if err := wh.versionChecker.checkClientVersion(ctx); err != nil {
return wh.error(err, scope)
}
Expand Down Expand Up @@ -979,8 +984,13 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed(
return wh.error(err, scope)
}

// add domain tag to scope, so further metrics will have the domain tag
scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name))
scope, sw := wh.startRequestProfileWithDomain(
metrics.FrontendRespondActivityTaskFailedScope,
domainWrapper{
domain: domainEntry.GetInfo().Name,
},
)
defer sw.Stop()

if len(failedRequest.GetIdentity()) > wh.config.MaxIDLengthLimit() {
return wh.error(errIdentityTooLong, scope)
Expand Down Expand Up @@ -1021,7 +1031,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedByID(
) (retError error) {
defer log.CapturePanic(wh.GetLogger(), &retError)

scope, sw := wh.startRequestProfile(metrics.FrontendRespondActivityTaskFailedByIDScope)
scope, sw := wh.startRequestProfileWithDomain(metrics.FrontendRespondActivityTaskFailedByIDScope, failedRequest)
defer sw.Stop()

if err := wh.versionChecker.checkClientVersion(ctx); err != nil {
Expand Down Expand Up @@ -1118,9 +1128,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled(
) (retError error) {
defer log.CapturePanic(wh.GetLogger(), &retError)

scope, sw := wh.startRequestProfile(metrics.FrontendRespondActivityTaskCanceledScope)
defer sw.Stop()

scope := wh.getDefaultScope(metrics.FrontendRespondActivityTaskCanceledScope)
if err := wh.versionChecker.checkClientVersion(ctx); err != nil {
return wh.error(err, scope)
}
Expand Down Expand Up @@ -1148,8 +1156,13 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled(
return wh.error(err, scope)
}

// add domain tag to scope, so further metrics will have the domain tag
scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name))
scope, sw := wh.startRequestProfileWithDomain(
metrics.FrontendRespondActivityTaskCanceledScope,
domainWrapper{
domain: domainEntry.GetInfo().Name,
},
)
defer sw.Stop()

if len(cancelRequest.GetIdentity()) > wh.config.MaxIDLengthLimit() {
return wh.error(errIdentityTooLong, scope)
Expand Down Expand Up @@ -1202,7 +1215,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledByID(
) (retError error) {
defer log.CapturePanic(wh.GetLogger(), &retError)

scope, sw := wh.startRequestProfile(metrics.FrontendRespondActivityTaskCanceledScope)
scope, sw := wh.startRequestProfileWithDomain(metrics.FrontendRespondActivityTaskCanceledScope, cancelRequest)
defer sw.Stop()

if err := wh.versionChecker.checkClientVersion(ctx); err != nil {
Expand Down Expand Up @@ -1310,9 +1323,7 @@ func (wh *WorkflowHandler) RespondDecisionTaskCompleted(
) (resp *gen.RespondDecisionTaskCompletedResponse, retError error) {
defer log.CapturePanic(wh.GetLogger(), &retError)

scope, sw := wh.startRequestProfile(metrics.FrontendRespondDecisionTaskCompletedScope)
defer sw.Stop()

scope := wh.getDefaultScope(metrics.FrontendRespondDecisionTaskCompletedScope)
if err := wh.versionChecker.checkClientVersion(ctx); err != nil {
return nil, wh.error(err, scope)
}
Expand Down Expand Up @@ -1340,8 +1351,13 @@ func (wh *WorkflowHandler) RespondDecisionTaskCompleted(
return nil, wh.error(err, scope)
}

// add domain tag to scope, so further metrics will have the domain tag
scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name))
scope, sw := wh.startRequestProfileWithDomain(
metrics.FrontendRespondDecisionTaskCompletedScope,
domainWrapper{
domain: domainEntry.GetInfo().Name,
},
)
defer sw.Stop()

histResp, err := wh.history.RespondDecisionTaskCompleted(ctx, &h.RespondDecisionTaskCompletedRequest{
DomainUUID: common.StringPtr(taskToken.DomainID),
Expand Down Expand Up @@ -1389,9 +1405,7 @@ func (wh *WorkflowHandler) RespondDecisionTaskFailed(
) (retError error) {
defer log.CapturePanic(wh.GetLogger(), &retError)

scope, sw := wh.startRequestProfile(metrics.FrontendRespondDecisionTaskFailedScope)
defer sw.Stop()

scope := wh.getDefaultScope(metrics.FrontendRespondDecisionTaskFailedScope)
if err := wh.versionChecker.checkClientVersion(ctx); err != nil {
return wh.error(err, scope)
}
Expand Down Expand Up @@ -1419,8 +1433,13 @@ func (wh *WorkflowHandler) RespondDecisionTaskFailed(
return wh.error(err, scope)
}

// add domain tag to scope, so further metrics will have the domain tag
scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name))
scope, sw := wh.startRequestProfileWithDomain(
metrics.FrontendRespondDecisionTaskFailedScope,
domainWrapper{
domain: domainEntry.GetInfo().Name,
},
)
defer sw.Stop()

if len(failedRequest.GetIdentity()) > wh.config.MaxIDLengthLimit() {
return wh.error(errIdentityTooLong, scope)
Expand Down Expand Up @@ -1460,9 +1479,7 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted(
) (retError error) {
defer log.CapturePanic(wh.GetLogger(), &retError)

scope, sw := wh.startRequestProfile(metrics.FrontendRespondQueryTaskCompletedScope)
defer sw.Stop()

scope := wh.getDefaultScope(metrics.FrontendRespondQueryTaskCompletedScope)
if err := wh.versionChecker.checkClientVersion(ctx); err != nil {
return wh.error(err, scope)
}
Expand Down Expand Up @@ -1490,8 +1507,13 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted(
return wh.error(err, scope)
}

// add domain tag to scope, so further metrics will have the domain tag
scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name))
scope, sw := wh.startRequestProfileWithDomain(
metrics.FrontendRespondQueryTaskCompletedScope,
domainWrapper{
domain: domainEntry.GetInfo().Name,
},
)
defer sw.Stop()

matchingRequest := &m.RespondQueryTaskCompletedRequest{
DomainUUID: common.StringPtr(queryTaskToken.DomainID),
Expand Down Expand Up @@ -2989,6 +3011,11 @@ func (wh *WorkflowHandler) startRequestProfileWithDomain(scope int, d domainGett
return metricsScope, sw
}

// getDefaultScope returns a default scope to use for request metrics
func (wh *WorkflowHandler) getDefaultScope(scope int) metrics.Scope {
return wh.metricsClient.Scope(scope).Tagged(metrics.DomainUnknownTag())
}

func (wh *WorkflowHandler) error(err error, scope metrics.Scope) error {
switch err := err.(type) {
case *gen.InternalServiceError:
Expand Down Expand Up @@ -3325,3 +3352,11 @@ func (wh *WorkflowHandler) GetReplicationMessages(
}
return resp, nil
}

type domainWrapper struct {
domain string
}

func (d domainWrapper) GetDomain() string {
return d.domain
}

0 comments on commit 356259c

Please sign in to comment.