From 8f3e6bd9b924c29c271561dfe7cdc69882204006 Mon Sep 17 00:00:00 2001 From: Jakob Haahr Taankvist Date: Tue, 20 Feb 2024 12:47:25 +0100 Subject: [PATCH] Refactor history packages (#5673) What changed? Moving structs in the service/history package to their own packages The handler is moved to it's own package with the interface (like in frontend) The handler wrappers (grpc, and thrift) are moved to their own packages under wrapped (like in frontend) The engine implementation is moved to it's own package as there was a circular dependency history (service) -> handler -> history (engine) and the engine implementation cannot be in the same package as the engine interface, as there is a dependency engineimpl -> shard -> engine (Interface) Moved all the error values in handler to a constants package as at least one of them (ErrRunIDNotValid) is referenced from two different packages (handler and engineimpl tests) Why? We should not have this much in the same package Also when service and handler are in the same package we cannot introduce wrappers around handler in any other package than history as that will create a circular dependency: history (service) -> wrapper -> history (handler) How did you test it? Ran unit tests Potential risks It just moves things around, so no real code changes Release notes Documentation Changes --- host/integration_test.go | 6 +- host/signal_workflow_test.go | 6 +- service/history/constants/constants.go | 38 ++++ .../{ => engine/engineimpl}/historyEngine.go | 2 +- .../engineimpl}/historyEngine2_test.go | 2 +- .../historyEngine3_eventsv2_test.go | 2 +- .../engineimpl}/historyEngine_test.go | 4 +- service/history/{ => handler}/handler.go | 191 +++++++++--------- service/history/{ => handler}/handler_test.go | 2 +- service/history/{ => handler}/interface.go | 8 +- .../history/{ => handler}/interface_mock.go | 4 +- service/history/service.go | 11 +- .../{ => wrappers/grpc}/grpc_handler.go | 2 +- .../grpc}/grpc_handler_generated.go | 9 +- .../{ => wrappers/thrift}/thrift_handler.go | 7 +- .../thrift}/thrift_handler_generated.go | 4 +- .../thrift}/thrift_handler_test.go | 5 +- 17 files changed, 168 insertions(+), 135 deletions(-) create mode 100644 service/history/constants/constants.go rename service/history/{ => engine/engineimpl}/historyEngine.go (99%) rename service/history/{ => engine/engineimpl}/historyEngine2_test.go (99%) rename service/history/{ => engine/engineimpl}/historyEngine3_eventsv2_test.go (99%) rename service/history/{ => engine/engineimpl}/historyEngine_test.go (99%) rename service/history/{ => handler}/handler.go (92%) rename service/history/{ => handler}/handler_test.go (99%) rename service/history/{ => handler}/interface.go (92%) rename service/history/{ => handler}/interface_mock.go (99%) rename service/history/{ => wrappers/grpc}/grpc_handler.go (99%) rename service/history/{ => wrappers/grpc}/grpc_handler_generated.go (98%) rename service/history/{ => wrappers/thrift}/thrift_handler.go (92%) rename service/history/{ => wrappers/thrift}/thrift_handler_generated.go (99%) rename service/history/{ => wrappers/thrift}/thrift_handler_test.go (99%) diff --git a/host/integration_test.go b/host/integration_test.go index 80c88ad1333..3f2c0e2fe7e 100644 --- a/host/integration_test.go +++ b/host/integration_test.go @@ -40,7 +40,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/types" - cadencehistory "github.com/uber/cadence/service/history" + "github.com/uber/cadence/service/history/engine/engineimpl" "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/matching" ) @@ -309,8 +309,8 @@ GetHistoryLoop: } terminateEventAttributes := lastEvent.WorkflowExecutionTerminatedEventAttributes - s.Equal(cadencehistory.TerminateIfRunningReason, terminateEventAttributes.GetReason()) - s.Equal(fmt.Sprintf(cadencehistory.TerminateIfRunningDetailsTemplate, we1.GetRunID()), string(terminateEventAttributes.Details)) + s.Equal(engineimpl.TerminateIfRunningReason, terminateEventAttributes.GetReason()) + s.Equal(fmt.Sprintf(engineimpl.TerminateIfRunningDetailsTemplate, we1.GetRunID()), string(terminateEventAttributes.Details)) s.Equal(execution.IdentityHistoryService, terminateEventAttributes.GetIdentity()) executionTerminated = true break GetHistoryLoop diff --git a/host/signal_workflow_test.go b/host/signal_workflow_test.go index d9850a8f39b..6cc938224d6 100644 --- a/host/signal_workflow_test.go +++ b/host/signal_workflow_test.go @@ -37,7 +37,7 @@ import ( "github.com/uber/cadence/common/client" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/types" - cadencehistory "github.com/uber/cadence/service/history" + "github.com/uber/cadence/service/history/engine/engineimpl" "github.com/uber/cadence/service/history/execution" ) @@ -1655,8 +1655,8 @@ GetHistoryLoop: } terminateEventAttributes := lastEvent.WorkflowExecutionTerminatedEventAttributes - s.Equal(cadencehistory.TerminateIfRunningReason, terminateEventAttributes.GetReason()) - s.Equal(fmt.Sprintf(cadencehistory.TerminateIfRunningDetailsTemplate, resp1.GetRunID()), string(terminateEventAttributes.Details)) + s.Equal(engineimpl.TerminateIfRunningReason, terminateEventAttributes.GetReason()) + s.Equal(fmt.Sprintf(engineimpl.TerminateIfRunningDetailsTemplate, resp1.GetRunID()), string(terminateEventAttributes.Details)) s.Equal(execution.IdentityHistoryService, terminateEventAttributes.GetIdentity()) executionTerminated = true break GetHistoryLoop diff --git a/service/history/constants/constants.go b/service/history/constants/constants.go new file mode 100644 index 00000000000..848321b2c4d --- /dev/null +++ b/service/history/constants/constants.go @@ -0,0 +1,38 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package constants + +import "github.com/uber/cadence/common/types" + +var ( + ErrDomainNotSet = &types.BadRequestError{Message: "Domain not set on request."} + ErrWorkflowExecutionNotSet = &types.BadRequestError{Message: "WorkflowExecution not set on request."} + ErrTaskListNotSet = &types.BadRequestError{Message: "Tasklist not set."} + ErrRunIDNotValid = &types.BadRequestError{Message: "RunID is not valid UUID."} + ErrWorkflowIDNotSet = &types.BadRequestError{Message: "WorkflowId is not set on request."} + ErrSourceClusterNotSet = &types.BadRequestError{Message: "Source Cluster not set on request."} + ErrTimestampNotSet = &types.BadRequestError{Message: "Timestamp not set on request."} + ErrInvalidTaskType = &types.BadRequestError{Message: "Invalid task type"} + ErrHistoryHostThrottle = &types.ServiceBusyError{Message: "History host rps exceeded"} + ErrShuttingDown = &types.InternalServiceError{Message: "Shutting down"} +) diff --git a/service/history/historyEngine.go b/service/history/engine/engineimpl/historyEngine.go similarity index 99% rename from service/history/historyEngine.go rename to service/history/engine/engineimpl/historyEngine.go index 8a0d9d8d51c..309dbcb6eb2 100644 --- a/service/history/historyEngine.go +++ b/service/history/engine/engineimpl/historyEngine.go @@ -19,7 +19,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package history +package engineimpl import ( "bytes" diff --git a/service/history/historyEngine2_test.go b/service/history/engine/engineimpl/historyEngine2_test.go similarity index 99% rename from service/history/historyEngine2_test.go rename to service/history/engine/engineimpl/historyEngine2_test.go index f9474f4f413..d279fba8857 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/engine/engineimpl/historyEngine2_test.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package history +package engineimpl import ( "context" diff --git a/service/history/historyEngine3_eventsv2_test.go b/service/history/engine/engineimpl/historyEngine3_eventsv2_test.go similarity index 99% rename from service/history/historyEngine3_eventsv2_test.go rename to service/history/engine/engineimpl/historyEngine3_eventsv2_test.go index 0b6a7ac753d..ebce68c5349 100644 --- a/service/history/historyEngine3_eventsv2_test.go +++ b/service/history/engine/engineimpl/historyEngine3_eventsv2_test.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package history +package engineimpl import ( "context" diff --git a/service/history/historyEngine_test.go b/service/history/engine/engineimpl/historyEngine_test.go similarity index 99% rename from service/history/historyEngine_test.go rename to service/history/engine/engineimpl/historyEngine_test.go index 5948f59019f..8270e4bf811 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/engine/engineimpl/historyEngine_test.go @@ -19,7 +19,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package history +package engineimpl import ( "context" @@ -228,7 +228,7 @@ func (s *engineSuite) TestGetMutableState_IntestRunID() { DomainUUID: constants.TestDomainID, Execution: &execution, }) - s.Equal(errRunIDNotValid, err) + s.Equal(constants.ErrRunIDNotValid, err) } func (s *engineSuite) TestGetMutableState_EmptyRunID() { diff --git a/service/history/handler.go b/service/history/handler/handler.go similarity index 92% rename from service/history/handler.go rename to service/history/handler/handler.go index e773dd2c03d..860fa4b490b 100644 --- a/service/history/handler.go +++ b/service/history/handler/handler.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package history +package handler import ( "context" @@ -46,7 +46,9 @@ import ( "github.com/uber/cadence/common/types" "github.com/uber/cadence/common/types/mapper/proto" "github.com/uber/cadence/service/history/config" + "github.com/uber/cadence/service/history/constants" "github.com/uber/cadence/service/history/engine" + "github.com/uber/cadence/service/history/engine/engineimpl" "github.com/uber/cadence/service/history/events" "github.com/uber/cadence/service/history/failover" "github.com/uber/cadence/service/history/replication" @@ -85,19 +87,6 @@ type ( var _ Handler = (*handlerImpl)(nil) var _ shard.EngineFactory = (*handlerImpl)(nil) -var ( - errDomainNotSet = &types.BadRequestError{Message: "Domain not set on request."} - errWorkflowExecutionNotSet = &types.BadRequestError{Message: "WorkflowExecution not set on request."} - errTaskListNotSet = &types.BadRequestError{Message: "Tasklist not set."} - errWorkflowIDNotSet = &types.BadRequestError{Message: "WorkflowId is not set on request."} - errRunIDNotValid = &types.BadRequestError{Message: "RunID is not valid UUID."} - errSourceClusterNotSet = &types.BadRequestError{Message: "Source Cluster not set on request."} - errTimestampNotSet = &types.BadRequestError{Message: "Timestamp not set on request."} - errInvalidTaskType = &types.BadRequestError{Message: "Invalid task type"} - errHistoryHostThrottle = &types.ServiceBusyError{Message: "History host rps exceeded"} - errShuttingDown = &types.InternalServiceError{Message: "Shutting down"} -) - // NewHandler creates a thrift handler for the history service func NewHandler( resource resource.Resource, @@ -233,7 +222,7 @@ func (h *handlerImpl) isShuttingDown() bool { func (h *handlerImpl) CreateEngine( shardContext shard.Context, ) engine.Engine { - return NewEngineWithShardContext( + return engineimpl.NewEngineWithShardContext( shardContext, h.GetVisibilityManager(), h.GetMatchingClient(), @@ -271,11 +260,11 @@ func (h *handlerImpl) RecordActivityTaskHeartbeat( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } heartbeatRequest := wrappedRequest.HeartbeatRequest @@ -330,11 +319,11 @@ func (h *handlerImpl) RecordActivityTaskStarted( ) if recordRequest.GetDomainUUID() == "" { - return nil, h.error(errDomainNotSet, scope, domainID, workflowID, "") + return nil, h.error(constants.ErrDomainNotSet, scope, domainID, workflowID, "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, workflowID, "") + return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, workflowID, "") } engine, err1 := h.controller.GetEngine(workflowID) @@ -377,15 +366,15 @@ func (h *handlerImpl) RecordDecisionTaskStarted( ) if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, workflowID, runID) + return nil, h.error(constants.ErrDomainNotSet, scope, domainID, workflowID, runID) } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, workflowID, runID) + return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, workflowID, runID) } if recordRequest.PollRequest == nil || recordRequest.PollRequest.TaskList.GetName() == "" { - return nil, h.error(errTaskListNotSet, scope, domainID, workflowID, runID) + return nil, h.error(constants.ErrTaskListNotSet, scope, domainID, workflowID, runID) } engine, err1 := h.controller.GetEngine(workflowID) @@ -422,11 +411,11 @@ func (h *handlerImpl) RespondActivityTaskCompleted( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } completeRequest := wrappedRequest.CompleteRequest @@ -470,11 +459,11 @@ func (h *handlerImpl) RespondActivityTaskFailed( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } failRequest := wrappedRequest.FailedRequest @@ -518,11 +507,11 @@ func (h *handlerImpl) RespondActivityTaskCanceled( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } cancelRequest := wrappedRequest.CancelRequest @@ -566,11 +555,11 @@ func (h *handlerImpl) RespondDecisionTaskCompleted( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } completeRequest := wrappedRequest.CompleteRequest @@ -623,11 +612,11 @@ func (h *handlerImpl) RespondDecisionTaskFailed( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } failedRequest := wrappedRequest.FailedRequest @@ -690,11 +679,11 @@ func (h *handlerImpl) StartWorkflowExecution( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } startRequest := wrappedRequest.StartRequest @@ -779,7 +768,7 @@ func (h *handlerImpl) RemoveTask( TaskID: request.GetTaskID(), }) default: - return errInvalidTaskType + return constants.ErrInvalidTaskType } } @@ -817,7 +806,7 @@ func (h *handlerImpl) ResetQueue( case common.TaskTypeCrossCluster: err = engine.ResetCrossClusterQueue(ctx, request.GetClusterName()) default: - err = errInvalidTaskType + err = constants.ErrInvalidTaskType } if err != nil { @@ -851,7 +840,7 @@ func (h *handlerImpl) DescribeQueue( case common.TaskTypeCrossCluster: resp, err = engine.DescribeCrossClusterQueue(ctx, request.GetClusterName()) default: - err = errInvalidTaskType + err = constants.ErrInvalidTaskType } if err != nil { @@ -874,7 +863,7 @@ func (h *handlerImpl) DescribeMutableState( domainID := request.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } workflowExecution := request.Execution @@ -906,11 +895,11 @@ func (h *handlerImpl) GetMutableState( domainID := getRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := getRequest.Execution @@ -942,11 +931,11 @@ func (h *handlerImpl) PollMutableState( domainID := getRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := getRequest.Execution @@ -978,11 +967,11 @@ func (h *handlerImpl) DescribeWorkflowExecution( domainID := request.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := request.Request.Execution @@ -1013,16 +1002,16 @@ func (h *handlerImpl) RequestCancelWorkflowExecution( defer sw.Stop() if h.isShuttingDown() { - return errShuttingDown + return constants.ErrShuttingDown } domainID := request.GetDomainUUID() if domainID == "" || request.CancelRequest.GetDomain() == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } cancelRequest := request.CancelRequest @@ -1061,16 +1050,16 @@ func (h *handlerImpl) SignalWorkflowExecution( defer sw.Stop() if h.isShuttingDown() { - return errShuttingDown + return constants.ErrShuttingDown } domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := wrappedRequest.SignalRequest.WorkflowExecution @@ -1106,16 +1095,16 @@ func (h *handlerImpl) SignalWithStartWorkflowExecution( defer sw.Stop() if h.isShuttingDown() { - return nil, errShuttingDown + return nil, constants.ErrShuttingDown } domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } signalWithStartRequest := wrappedRequest.SignalWithStartRequest @@ -1163,16 +1152,16 @@ func (h *handlerImpl) RemoveSignalMutableState( defer sw.Stop() if h.isShuttingDown() { - return errShuttingDown + return constants.ErrShuttingDown } domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := wrappedRequest.WorkflowExecution @@ -1205,16 +1194,16 @@ func (h *handlerImpl) TerminateWorkflowExecution( defer sw.Stop() if h.isShuttingDown() { - return errShuttingDown + return constants.ErrShuttingDown } domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := wrappedRequest.TerminateRequest.WorkflowExecution @@ -1247,16 +1236,16 @@ func (h *handlerImpl) ResetWorkflowExecution( defer sw.Stop() if h.isShuttingDown() { - return nil, errShuttingDown + return nil, constants.ErrShuttingDown } domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := wrappedRequest.ResetRequest.WorkflowExecution @@ -1287,16 +1276,16 @@ func (h *handlerImpl) QueryWorkflow( defer sw.Stop() if h.isShuttingDown() { - return nil, errShuttingDown + return nil, constants.ErrShuttingDown } domainID := request.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowID := request.GetRequest().GetExecution().GetWorkflowID() @@ -1330,20 +1319,20 @@ func (h *handlerImpl) ScheduleDecisionTask( defer sw.Stop() if h.isShuttingDown() { - return errShuttingDown + return constants.ErrShuttingDown } domainID := request.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } if request.WorkflowExecution == nil { - return h.error(errWorkflowExecutionNotSet, scope, domainID, "", "") + return h.error(constants.ErrWorkflowExecutionNotSet, scope, domainID, "", "") } workflowExecution := request.WorkflowExecution @@ -1376,20 +1365,20 @@ func (h *handlerImpl) RecordChildExecutionCompleted( defer sw.Stop() if h.isShuttingDown() { - return errShuttingDown + return constants.ErrShuttingDown } domainID := request.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } if request.WorkflowExecution == nil { - return h.error(errWorkflowExecutionNotSet, scope, domainID, "", "") + return h.error(constants.ErrWorkflowExecutionNotSet, scope, domainID, "", "") } workflowExecution := request.WorkflowExecution @@ -1427,16 +1416,16 @@ func (h *handlerImpl) ResetStickyTaskList( defer sw.Stop() if h.isShuttingDown() { - return nil, errShuttingDown + return nil, constants.ErrShuttingDown } domainID := resetRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowID := resetRequest.Execution.GetWorkflowID() @@ -1464,7 +1453,7 @@ func (h *handlerImpl) ReplicateEventsV2( h.startWG.Wait() if h.isShuttingDown() { - return errShuttingDown + return constants.ErrShuttingDown } scope, sw := h.startRequestProfile(ctx, metrics.HistoryReplicateEventsV2Scope) @@ -1472,11 +1461,11 @@ func (h *handlerImpl) ReplicateEventsV2( domainID := replicateRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := replicateRequest.WorkflowExecution @@ -1508,19 +1497,19 @@ func (h *handlerImpl) SyncShardStatus( defer sw.Stop() if h.isShuttingDown() { - return errShuttingDown + return constants.ErrShuttingDown } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, "", "", "") + return h.error(constants.ErrHistoryHostThrottle, scope, "", "", "") } if syncShardStatusRequest.SourceCluster == "" { - return h.error(errSourceClusterNotSet, scope, "", "", "") + return h.error(constants.ErrSourceClusterNotSet, scope, "", "", "") } if syncShardStatusRequest.Timestamp == nil { - return h.error(errTimestampNotSet, scope, "", "", "") + return h.error(constants.ErrTimestampNotSet, scope, "", "", "") } // shard ID is already provided in the request @@ -1550,24 +1539,24 @@ func (h *handlerImpl) SyncActivity( defer sw.Stop() if h.isShuttingDown() { - return errShuttingDown + return constants.ErrShuttingDown } domainID := syncActivityRequest.GetDomainID() if syncActivityRequest.DomainID == "" || uuid.Parse(syncActivityRequest.GetDomainID()) == nil { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } if syncActivityRequest.WorkflowID == "" { - return h.error(errWorkflowIDNotSet, scope, domainID, "", "") + return h.error(constants.ErrWorkflowIDNotSet, scope, domainID, "", "") } if syncActivityRequest.RunID == "" || uuid.Parse(syncActivityRequest.GetRunID()) == nil { - return h.error(errRunIDNotValid, scope, domainID, "", "") + return h.error(constants.ErrRunIDNotValid, scope, domainID, "", "") } workflowID := syncActivityRequest.GetWorkflowID() @@ -1599,7 +1588,7 @@ func (h *handlerImpl) GetReplicationMessages( defer sw.Stop() if h.isShuttingDown() { - return nil, errShuttingDown + return nil, constants.ErrShuttingDown } var wg sync.WaitGroup @@ -1673,7 +1662,7 @@ func (h *handlerImpl) GetDLQReplicationMessages( defer sw.Stop() if h.isShuttingDown() { - return nil, errShuttingDown + return nil, constants.ErrShuttingDown } taskInfoPerExecution := map[definition.WorkflowIdentifier][]*types.ReplicationTaskInfo{} @@ -1749,7 +1738,7 @@ func (h *handlerImpl) ReapplyEvents( defer sw.Stop() if h.isShuttingDown() { - return errShuttingDown + return constants.ErrShuttingDown } domainID := request.GetDomainUUID() @@ -1792,7 +1781,7 @@ func (h *handlerImpl) CountDLQMessages( defer sw.Stop() if h.isShuttingDown() { - return nil, errShuttingDown + return nil, constants.ErrShuttingDown } g := &errgroup.Group{} @@ -1840,7 +1829,7 @@ func (h *handlerImpl) ReadDLQMessages( defer sw.Stop() if h.isShuttingDown() { - return nil, errShuttingDown + return nil, constants.ErrShuttingDown } engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) @@ -1864,7 +1853,7 @@ func (h *handlerImpl) PurgeDLQMessages( defer sw.Stop() if h.isShuttingDown() { - return errShuttingDown + return constants.ErrShuttingDown } engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) @@ -1885,7 +1874,7 @@ func (h *handlerImpl) MergeDLQMessages( h.startWG.Wait() if h.isShuttingDown() { - return nil, errShuttingDown + return nil, constants.ErrShuttingDown } scope, sw := h.startRequestProfile(ctx, metrics.HistoryMergeDLQMessagesScope) @@ -1908,7 +1897,7 @@ func (h *handlerImpl) RefreshWorkflowTasks( defer sw.Stop() if h.isShuttingDown() { - return errShuttingDown + return constants.ErrShuttingDown } domainID := request.DomainUIID @@ -1965,7 +1954,7 @@ func (h *handlerImpl) GetCrossClusterTasks( defer sw.Stop() if h.isShuttingDown() { - return nil, errShuttingDown + return nil, constants.ErrShuttingDown } ctx, cancel := common.CreateChildContext(ctx, 0.05) @@ -2027,7 +2016,7 @@ func (h *handlerImpl) RespondCrossClusterTasksCompleted( defer sw.Stop() if h.isShuttingDown() { - return nil, errShuttingDown + return nil, constants.ErrShuttingDown } engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) @@ -2064,7 +2053,7 @@ func (h *handlerImpl) GetFailoverInfo( defer sw.Stop() if h.isShuttingDown() { - return nil, errShuttingDown + return nil, constants.ErrShuttingDown } resp, err := h.failoverCoordinator.GetFailoverInfo(request.GetDomainID()) @@ -2218,10 +2207,10 @@ func (h *handlerImpl) startRequestProfile(ctx context.Context, scope int) (metri func validateTaskToken(token *common.TaskToken) error { if token.WorkflowID == "" { - return errWorkflowIDNotSet + return constants.ErrWorkflowIDNotSet } if token.RunID != "" && uuid.Parse(token.RunID) == nil { - return errRunIDNotValid + return constants.ErrRunIDNotValid } return nil } diff --git a/service/history/handler_test.go b/service/history/handler/handler_test.go similarity index 99% rename from service/history/handler_test.go rename to service/history/handler/handler_test.go index 8b92f70c152..651af92e7cf 100644 --- a/service/history/handler_test.go +++ b/service/history/handler/handler_test.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package history +package handler import ( "context" diff --git a/service/history/interface.go b/service/history/handler/interface.go similarity index 92% rename from service/history/interface.go rename to service/history/handler/interface.go index a30d0cff304..37991f6fc09 100644 --- a/service/history/interface.go +++ b/service/history/handler/interface.go @@ -18,11 +18,11 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interface_mock.go -package history github.com/uber/cadence/service/history Handler -//go:generate gowrap gen -g -p . -i Handler -t ../templates/grpc.tmpl -o ./grpc_handler_generated.go -v handler=GRPC -v package=historyv1 -v path=github.com/uber/cadence/.gen/proto/history/v1 -v prefix=History -//go:generate gowrap gen -g -p ../../.gen/go/history/historyserviceserver -i Interface -t ../templates/thrift.tmpl -o ./thrift_handler_generated.go -v handler=Thrift -v prefix=History +//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interface_mock.go -package handler github.com/uber/cadence/service/history/handler Handler +//go:generate gowrap gen -g -p . -i Handler -t ../../templates/grpc.tmpl -o ../wrappers/grpc/grpc_handler_generated.go -v handler=GRPC -v package=historyv1 -v path=github.com/uber/cadence/.gen/proto/history/v1 -v prefix=History +//go:generate gowrap gen -g -p ../../../.gen/go/history/historyserviceserver -i Interface -t ../../templates/thrift.tmpl -o ../wrappers/thrift/thrift_handler_generated.go -v handler=Thrift -v prefix=History -package history +package handler import ( "context" diff --git a/service/history/interface_mock.go b/service/history/handler/interface_mock.go similarity index 99% rename from service/history/interface_mock.go rename to service/history/handler/interface_mock.go index b7a9b69a6d2..c162b36a83f 100644 --- a/service/history/interface_mock.go +++ b/service/history/handler/interface_mock.go @@ -23,8 +23,8 @@ // Code generated by MockGen. DO NOT EDIT. // Source: interface.go -// Package history is a generated GoMock package. -package history +// Package handler is a generated GoMock package. +package handler import ( context "context" diff --git a/service/history/service.go b/service/history/service.go index df47f987596..2fc0ffbfb05 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -30,7 +30,10 @@ import ( commonResource "github.com/uber/cadence/common/resource" "github.com/uber/cadence/common/service" "github.com/uber/cadence/service/history/config" + "github.com/uber/cadence/service/history/handler" "github.com/uber/cadence/service/history/resource" + "github.com/uber/cadence/service/history/wrappers/grpc" + "github.com/uber/cadence/service/history/wrappers/thrift" ) // Service represents the cadence-history service @@ -38,7 +41,7 @@ type Service struct { resource.Resource status int32 - handler Handler + handler handler.Handler stopC chan struct{} params *commonResource.Params config *config.Config @@ -90,12 +93,12 @@ func (s *Service) Start() { logger.Info("elastic search config", tag.ESConfig(s.params.ESConfig)) logger.Info("history starting") - s.handler = NewHandler(s.Resource, s.config) + s.handler = handler.NewHandler(s.Resource, s.config) - thriftHandler := NewThriftHandler(s.handler) + thriftHandler := thrift.NewThriftHandler(s.handler) thriftHandler.Register(s.GetDispatcher()) - grpcHandler := NewGRPCHandler(s.handler) + grpcHandler := grpc.NewGRPCHandler(s.handler) grpcHandler.Register(s.GetDispatcher()) // must start resource first diff --git a/service/history/grpc_handler.go b/service/history/wrappers/grpc/grpc_handler.go similarity index 99% rename from service/history/grpc_handler.go rename to service/history/wrappers/grpc/grpc_handler.go index dc5c666748e..6b0d13330b2 100644 --- a/service/history/grpc_handler.go +++ b/service/history/wrappers/grpc/grpc_handler.go @@ -20,7 +20,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. -package history +package grpc import ( "context" diff --git a/service/history/grpc_handler_generated.go b/service/history/wrappers/grpc/grpc_handler_generated.go similarity index 98% rename from service/history/grpc_handler_generated.go rename to service/history/wrappers/grpc/grpc_handler_generated.go index 9b1a6fd735e..d211349ea3b 100644 --- a/service/history/grpc_handler_generated.go +++ b/service/history/wrappers/grpc/grpc_handler_generated.go @@ -20,10 +20,10 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. -package history +package grpc // Code generated by gowrap. DO NOT EDIT. -// template: ../templates/grpc.tmpl +// template: ../../../templates/grpc.tmpl // gowrap: http://github.com/hexdigest/gowrap import ( @@ -31,13 +31,14 @@ import ( historyv1 "github.com/uber/cadence/.gen/proto/history/v1" "github.com/uber/cadence/common/types/mapper/proto" + "github.com/uber/cadence/service/history/handler" ) type GRPCHandler struct { - h Handler + h handler.Handler } -func NewGRPCHandler(h Handler) GRPCHandler { +func NewGRPCHandler(h handler.Handler) GRPCHandler { return GRPCHandler{h} } diff --git a/service/history/thrift_handler.go b/service/history/wrappers/thrift/thrift_handler.go similarity index 92% rename from service/history/thrift_handler.go rename to service/history/wrappers/thrift/thrift_handler.go index 964f5ed74ed..e2afa8b3669 100644 --- a/service/history/thrift_handler.go +++ b/service/history/wrappers/thrift/thrift_handler.go @@ -20,7 +20,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. -package history +package thrift import ( "context" @@ -31,13 +31,14 @@ import ( "github.com/uber/cadence/.gen/go/health/metaserver" "github.com/uber/cadence/.gen/go/history/historyserviceserver" "github.com/uber/cadence/common/types/mapper/thrift" + "github.com/uber/cadence/service/history/handler" ) type ThriftHandler struct { - h Handler + h handler.Handler } -func NewThriftHandler(h Handler) ThriftHandler { +func NewThriftHandler(h handler.Handler) ThriftHandler { return ThriftHandler{h} } diff --git a/service/history/thrift_handler_generated.go b/service/history/wrappers/thrift/thrift_handler_generated.go similarity index 99% rename from service/history/thrift_handler_generated.go rename to service/history/wrappers/thrift/thrift_handler_generated.go index 47de50737c1..3cfb774b576 100644 --- a/service/history/thrift_handler_generated.go +++ b/service/history/wrappers/thrift/thrift_handler_generated.go @@ -20,10 +20,10 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. -package history +package thrift // Code generated by gowrap. DO NOT EDIT. -// template: ../templates/thrift.tmpl +// template: ../../../templates/thrift.tmpl // gowrap: http://github.com/hexdigest/gowrap import ( diff --git a/service/history/thrift_handler_test.go b/service/history/wrappers/thrift/thrift_handler_test.go similarity index 99% rename from service/history/thrift_handler_test.go rename to service/history/wrappers/thrift/thrift_handler_test.go index ff61a1694db..9548e0a9f73 100644 --- a/service/history/thrift_handler_test.go +++ b/service/history/wrappers/thrift/thrift_handler_test.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package history +package thrift import ( "context" @@ -33,12 +33,13 @@ import ( "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/history/handler" ) func TestThriftHandler(t *testing.T) { ctrl := gomock.NewController(t) - h := NewMockHandler(ctrl) + h := handler.NewMockHandler(ctrl) th := NewThriftHandler(h) ctx := context.Background() internalErr := &types.InternalServiceError{Message: "test"}