Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client Side Session Support #695

Merged
merged 22 commits into from
Jun 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,15 @@ import (
"go.uber.org/cadence/workflow"
)

// QueryTypeStackTrace is the build in query type for Client.QueryWorkflow() call. Use this query type to get the call
// stack of the workflow. The result will be a string encoded in the encoded.Value.
const QueryTypeStackTrace string = internal.QueryTypeStackTrace
const (
// QueryTypeStackTrace is the build in query type for Client.QueryWorkflow() call. Use this query type to get the call
// stack of the workflow. The result will be a string encoded in the encoded.Value.
QueryTypeStackTrace string = internal.QueryTypeStackTrace

// QueryTypeOpenSessions is the build in query type for Client.QueryWorkflow() call. Use this query type to get all open
// sessions in the workflow. The result will be a list of SessionInfo encoded in the encoded.Value.
QueryTypeOpenSessions string = internal.QueryTypeOpenSessions
)

type (
// Options are optional parameters for Client creation.
Expand Down
1 change: 1 addition & 0 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ type ServiceInvoker interface {
// Returns ActivityTaskCanceledError if activity is cancelled
Heartbeat(details []byte) error
Close(flushBufferedHeartbeat bool)
GetClient(domain string, options *ClientOptions) Client
}

// WithActivityTask adds activity specific information into context.
Expand Down
15 changes: 10 additions & 5 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,23 @@ import (
"fmt"
"time"

"go.uber.org/cadence/encoded"

"github.com/opentracing/opentracing-go"
"github.com/uber-go/tally"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
s "go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/encoded"
"go.uber.org/cadence/internal/common/metrics"
)

// QueryTypeStackTrace is the build in query type for Client.QueryWorkflow() call. Use this query type to get the call
// stack of the workflow. The result will be a string encoded in the EncodedValue.
const QueryTypeStackTrace string = "__stack_trace"
const (
// QueryTypeStackTrace is the build in query type for Client.QueryWorkflow() call. Use this query type to get the call
// stack of the workflow. The result will be a string encoded in the EncodedValue.
QueryTypeStackTrace string = "__stack_trace"

// QueryTypeOpenSessions is the build in query type for Client.QueryWorkflow() call. Use this query type to get all open
// sessions in the workflow. The result will be a list of SessionInfo encoded in the EncodedValue.
QueryTypeOpenSessions string = "__open_sessions"
)

type (
// Client is the client for starting and getting information about a workflow executions as well as
Expand Down
26 changes: 24 additions & 2 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type (
pendingLaTasks map[string]*localActivityTask
mutableSideEffect map[string][]byte
unstartedLaTasks map[string]struct{}
openSessions map[string]*SessionInfo

counterID int32 // To generate sequence IDs for activity/timer etc.
currentReplayTime time.Time // Indicates current replay time of the decision.
Expand Down Expand Up @@ -184,6 +185,7 @@ func newWorkflowExecutionEventHandler(
changeVersions: make(map[string]Version),
pendingLaTasks: make(map[string]*localActivityTask),
unstartedLaTasks: make(map[string]struct{}),
openSessions: make(map[string]*SessionInfo),
completeHandler: completeHandler,
enableLoggingInReplay: enableLoggingInReplay,
hostEnv: hostEnv,
Expand Down Expand Up @@ -631,6 +633,22 @@ func (wc *workflowEnvironmentImpl) recordMutableSideEffect(id string, data []byt
return newEncodedValue(data, wc.GetDataConverter())
}

func (wc *workflowEnvironmentImpl) AddSession(sessionInfo *SessionInfo) {
wc.openSessions[sessionInfo.SessionID] = sessionInfo
}

func (wc *workflowEnvironmentImpl) RemoveSession(sessionID string) {
delete(wc.openSessions, sessionID)
}

func (wc *workflowEnvironmentImpl) getOpenSessions() []*SessionInfo {
openSessions := make([]*SessionInfo, 0, len(wc.openSessions))
for _, info := range wc.openSessions {
openSessions = append(openSessions, info)
}
return openSessions
}

func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
event *m.HistoryEvent,
isReplay bool,
Expand Down Expand Up @@ -802,10 +820,14 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
}

func (weh *workflowExecutionEventHandlerImpl) ProcessQuery(queryType string, queryArgs []byte) ([]byte, error) {
if queryType == QueryTypeStackTrace {
switch queryType {
case QueryTypeStackTrace:
return weh.encodeArg(weh.StackTrace())
case QueryTypeOpenSessions:
return weh.encodeArg(weh.getOpenSessions())
default:
return weh.queryHandler(queryType, queryArgs)
}
return weh.queryHandler(queryType, queryArgs)
}

func (weh *workflowExecutionEventHandlerImpl) StackTrace() string {
Expand Down
4 changes: 4 additions & 0 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,10 @@ func (i *cadenceInvoker) Close(flushBufferedHeartbeat bool) {
}
}

func (i *cadenceInvoker) GetClient(domain string, options *ClientOptions) Client {
return NewClient(i.service, domain, options)
}

func newServiceInvoker(
taskToken []byte,
identity string,
Expand Down
123 changes: 117 additions & 6 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (

"github.com/apache/thrift/lib/go/thrift"
"github.com/opentracing/opentracing-go"
"github.com/pborman/uuid"
"github.com/uber-go/tally"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/.gen/go/shared"
Expand Down Expand Up @@ -70,6 +71,8 @@ const (

defaultPollerRate = 1000

defaultMaxConcurrentSessionExecutionSize = 1000 // Large concurrent session execution size (1k)

testTagsContextKey = "cadence-testTags"
)

Expand Down Expand Up @@ -101,6 +104,14 @@ type (
identity string
}

// sessionWorker wraps the code for hosting session creation, completion and
// activities within a session. The creationWorker polls from a global tasklist,
// while the activityWorker polls from a resource specific tasklist.
sessionWorker struct {
creationWorker Worker
activityWorker Worker
}

// Worker overrides.
workerOverrides struct {
workflowTaskHandler WorkflowTaskHandler
Expand Down Expand Up @@ -170,6 +181,9 @@ type (
// WorkerStopChannel is a read only channel listen on worker close. The worker will close the channel before exit.
WorkerStopChannel <-chan struct{}

// SessionResourceID is a unique identifier of the resource the session will consume
SessionResourceID string

ContextPropagators []ContextPropagator

Tracer opentracing.Tracer
Expand Down Expand Up @@ -290,6 +304,7 @@ func newWorkflowTaskWorkerInternal(
params.Logger,
params.MetricsScope,
nil,
nil,
)

// laTunnel is the glue that hookup 3 parts
Expand All @@ -316,6 +331,7 @@ func newWorkflowTaskWorkerInternal(
params.Logger,
params.MetricsScope,
nil,
nil,
)

// 3) the result pushed to laTunnel will be send as task to workflow worker to process.
Expand Down Expand Up @@ -359,23 +375,84 @@ func (ww *workflowWorker) Stop() {
ww.worker.Stop()
}

func newSessionWorker(service workflowserviceclient.Interface,
domain string,
params workerExecutionParameters,
overrides *workerOverrides,
env *hostEnvImpl,
maxConCurrentSessionExecutionSize int,
) Worker {
if params.Identity == "" {
params.Identity = getWorkerIdentity(params.TaskList)
}
// For now resourceID is hiden from user so we will always create a unique one for each worker.
if params.SessionResourceID == "" {
params.SessionResourceID = uuid.New()
}
sessionEnvironment := newSessionEnvironment(params.SessionResourceID, maxConCurrentSessionExecutionSize)

creationTasklist := getCreationTasklist(params.TaskList)
params.UserContext = context.WithValue(params.UserContext, sessionEnvironmentContextKey, sessionEnvironment)
params.TaskList = sessionEnvironment.GetResourceSpecificTasklist()
activityWorker := newActivityWorker(service, domain, params, overrides, env, nil)

params.ConcurrentPollRoutineSize = 1
params.TaskList = creationTasklist
creationWorker := newActivityWorker(service, domain, params, overrides, env, sessionEnvironment.GetTokenBucket())

return &sessionWorker{
creationWorker: creationWorker,
activityWorker: activityWorker,
}
}

func (sw *sessionWorker) Start() error {
err := sw.creationWorker.Start()
if err != nil {
return err
}

err = sw.activityWorker.Start()
if err != nil {
sw.creationWorker.Stop()
return err
}
return nil
}

func (sw *sessionWorker) Run() error {
err := sw.creationWorker.Start()
if err != nil {
return err
}
return sw.activityWorker.Run()
}

func (sw *sessionWorker) Stop() {
sw.creationWorker.Stop()
sw.activityWorker.Stop()
}

func newActivityWorker(
service workflowserviceclient.Interface,
domain string,
params workerExecutionParameters,
overrides *workerOverrides,
env *hostEnvImpl,
activityShutdownCh chan struct{},
sessionTokenBucket *sessionTokenBucket,
) Worker {
workerStopChannel := make(chan struct{}, 1)
params.WorkerStopChannel = getReadOnlyChannel(workerStopChannel)
ensureRequiredParams(&params)

// Get a activity task handler.
var taskHandler ActivityTaskHandler
if overrides != nil && overrides.activityTaskHandler != nil {
taskHandler = overrides.activityTaskHandler
} else {
taskHandler = newActivityTaskHandler(service, params, env)
}
return newActivityTaskWorker(taskHandler, service, domain, params, activityShutdownCh)
return newActivityTaskWorker(taskHandler, service, domain, params, workerStopChannel, sessionTokenBucket)
}

func newActivityTaskWorker(
Expand All @@ -384,6 +461,7 @@ func newActivityTaskWorker(
domain string,
workerParams workerExecutionParameters,
workerStopCh chan struct{},
sessionTokenBucket *sessionTokenBucket,
) (worker Worker) {
ensureRequiredParams(&workerParams)

Expand All @@ -408,6 +486,7 @@ func newActivityTaskWorker(
workerParams.Logger,
workerParams.MetricsScope,
workerStopCh,
sessionTokenBucket,
)

return &activityWorker{
Expand Down Expand Up @@ -899,6 +978,7 @@ func getDataConverterFromActivityCtx(ctx context.Context) encoded.DataConverter
type aggregatedWorker struct {
workflowWorker Worker
activityWorker Worker
sessionWorker Worker
logger *zap.Logger
hostEnv *hostEnvImpl
}
Expand Down Expand Up @@ -932,6 +1012,20 @@ func (aw *aggregatedWorker) Start() error {
return err
}
}

if !isInterfaceNil(aw.sessionWorker) {
if err := aw.sessionWorker.Start(); err != nil {
// stop workflow worker and activity worker.
if !isInterfaceNil(aw.workflowWorker) {
aw.workflowWorker.Stop()
}
if !isInterfaceNil(aw.activityWorker) {
aw.activityWorker.Stop()
}
return err
}
}

aw.logger.Info("Started Worker")
return nil
}
Expand Down Expand Up @@ -994,6 +1088,9 @@ func (aw *aggregatedWorker) Stop() {
if !isInterfaceNil(aw.activityWorker) {
aw.activityWorker.Stop()
}
if !isInterfaceNil(aw.sessionWorker) {
aw.sessionWorker.Stop()
}
aw.logger.Info("Stopped Worker")
}

Expand All @@ -1007,8 +1104,6 @@ func newAggregatedWorker(
options WorkerOptions,
) (worker Worker) {
wOptions := augmentWorkerOptions(options)
workerStopChannel := make(chan struct{}, 1)
readOnlyWorkerStopCh := getReadOnlyChannel(workerStopChannel)
ctx := wOptions.BackgroundActivityContext
if ctx == nil {
ctx = context.Background()
Expand Down Expand Up @@ -1036,7 +1131,6 @@ func newAggregatedWorker(
NonDeterministicWorkflowPolicy: wOptions.NonDeterministicWorkflowPolicy,
DataConverter: wOptions.DataConverter,
WorkerStopTimeout: wOptions.WorkerStopTimeout,
WorkerStopChannel: readOnlyWorkerStopCh,
ContextPropagators: wOptions.ContextPropagators,
Tracer: wOptions.Tracer,
}
Expand Down Expand Up @@ -1087,12 +1181,26 @@ func newAggregatedWorker(
workerParams,
nil,
hostEnv,
workerStopChannel,
nil,
)
}

var sessionWorker Worker
if wOptions.EnableSessionWorker {
sessionWorker = newSessionWorker(
service,
domain,
workerParams,
nil,
hostEnv,
wOptions.MaxConCurrentSessionExecutionSize,
)
}

return &aggregatedWorker{
workflowWorker: workflowWorker,
activityWorker: activityWorker,
sessionWorker: sessionWorker,
logger: logger,
hostEnv: hostEnv,
}
Expand Down Expand Up @@ -1278,6 +1386,9 @@ func augmentWorkerOptions(options WorkerOptions) WorkerOptions {
if options.DataConverter == nil {
options.DataConverter = getDefaultDataConverter()
}
if options.MaxConCurrentSessionExecutionSize == 0 {
options.MaxConCurrentSessionExecutionSize = defaultMaxConcurrentSessionExecutionSize
}

// if the user passes in a tracer then add a tracing context propagator
if options.Tracer != nil {
Expand Down
Loading