Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic hooks for testing #6938

Merged
merged 16 commits into from
Jan 18, 2025
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ VISIBILITY_DB ?= temporal_visibility
# Always use "protolegacy" tag to allow disabling utf-8 validation on proto messages
# during proto library transition.
ALL_BUILD_TAGS := protolegacy,$(BUILD_TAG)
ALL_TEST_TAGS := $(ALL_BUILD_TAGS),$(TEST_TAG)
ALL_TEST_TAGS := $(ALL_BUILD_TAGS),errorinjector,$(TEST_TAG)
BUILD_TAG_FLAG := -tags $(ALL_BUILD_TAGS)
TEST_TAG_FLAG := -tags $(ALL_TEST_TAGS)

Expand Down
7 changes: 6 additions & 1 deletion client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"go.temporal.io/server/client/matching"
"go.temporal.io/server/common"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/errorinjector"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/metrics"
Expand All @@ -65,6 +66,7 @@ type (
monitor membership.Monitor,
metricsHandler metrics.Handler,
dc *dynamicconfig.Collection,
errorInjector errorinjector.ErrorInjector,
numberOfHistoryShards int32,
logger log.Logger,
throttledLogger log.Logger,
Expand All @@ -79,6 +81,7 @@ type (
monitor membership.Monitor
metricsHandler metrics.Handler
dynConfig *dynamicconfig.Collection
errorInjector errorinjector.ErrorInjector
numberOfHistoryShards int32
logger log.Logger
throttledLogger log.Logger
Expand All @@ -103,6 +106,7 @@ func (p *factoryProviderImpl) NewFactory(
monitor membership.Monitor,
metricsHandler metrics.Handler,
dc *dynamicconfig.Collection,
errorInjector errorinjector.ErrorInjector,
numberOfHistoryShards int32,
logger log.Logger,
throttledLogger log.Logger,
Expand All @@ -112,6 +116,7 @@ func (p *factoryProviderImpl) NewFactory(
monitor: monitor,
metricsHandler: metricsHandler,
dynConfig: dc,
errorInjector: errorInjector,
numberOfHistoryShards: numberOfHistoryShards,
logger: logger,
throttledLogger: throttledLogger,
Expand Down Expand Up @@ -159,7 +164,7 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
common.NewClientCache(keyResolver, clientProvider),
cf.metricsHandler,
cf.logger,
matching.NewLoadBalancer(namespaceIDToName, cf.dynConfig),
matching.NewLoadBalancer(namespaceIDToName, cf.dynConfig, cf.errorInjector),
)

if cf.metricsHandler != nil {
Expand Down
41 changes: 20 additions & 21 deletions client/matching/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sync"

"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/errorinjector"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/tqid"
)
Expand Down Expand Up @@ -57,11 +58,10 @@ type (
}

defaultLoadBalancer struct {
namespaceIDToName func(id namespace.ID) (namespace.Name, error)
nReadPartitions dynamicconfig.IntPropertyFnWithTaskQueueFilter
nWritePartitions dynamicconfig.IntPropertyFnWithTaskQueueFilter
forceReadPartition dynamicconfig.IntPropertyFn
forceWritePartition dynamicconfig.IntPropertyFn
namespaceIDToName func(id namespace.ID) (namespace.Name, error)
nReadPartitions dynamicconfig.IntPropertyFnWithTaskQueueFilter
nWritePartitions dynamicconfig.IntPropertyFnWithTaskQueueFilter
errorInjector errorinjector.ErrorInjector

lock sync.RWMutex
taskQueueLBs map[tqid.TaskQueue]*tqLoadBalancer
Expand All @@ -85,23 +85,22 @@ type (
func NewLoadBalancer(
namespaceIDToName func(id namespace.ID) (namespace.Name, error),
dc *dynamicconfig.Collection,
errorInjector errorinjector.ErrorInjector,
) LoadBalancer {
lb := &defaultLoadBalancer{
namespaceIDToName: namespaceIDToName,
nReadPartitions: dynamicconfig.MatchingNumTaskqueueReadPartitions.Get(dc),
nWritePartitions: dynamicconfig.MatchingNumTaskqueueWritePartitions.Get(dc),
forceReadPartition: dynamicconfig.TestMatchingLBForceReadPartition.Get(dc),
forceWritePartition: dynamicconfig.TestMatchingLBForceWritePartition.Get(dc),
lock: sync.RWMutex{},
taskQueueLBs: make(map[tqid.TaskQueue]*tqLoadBalancer),
namespaceIDToName: namespaceIDToName,
nReadPartitions: dynamicconfig.MatchingNumTaskqueueReadPartitions.Get(dc),
nWritePartitions: dynamicconfig.MatchingNumTaskqueueWritePartitions.Get(dc),
errorInjector: errorInjector,
taskQueueLBs: make(map[tqid.TaskQueue]*tqLoadBalancer),
}
return lb
}

func (lb *defaultLoadBalancer) PickWritePartition(
taskQueue *tqid.TaskQueue,
) *tqid.NormalPartition {
if n := lb.forceWritePartition(); n >= 0 {
if n, ok := errorinjector.Get[int](lb.errorInjector, errorinjector.MatchingLBForceWritePartition); ok {
return taskQueue.NormalPartition(n)
}

Expand Down Expand Up @@ -130,7 +129,7 @@ func (lb *defaultLoadBalancer) PickReadPartition(
partitionCount = lb.nReadPartitions(string(namespaceName), taskQueue.Name(), taskQueue.TaskType())
}

return tqlb.pickReadPartition(partitionCount, lb.forceReadPartition())
return tqlb.pickReadPartition(partitionCount, lb.errorInjector)
}

func (lb *defaultLoadBalancer) getTaskQueueLoadBalancer(tq *tqid.TaskQueue) *tqLoadBalancer {
Expand All @@ -157,16 +156,16 @@ func newTaskQueueLoadBalancer(tq *tqid.TaskQueue) *tqLoadBalancer {
}
}

func (b *tqLoadBalancer) pickReadPartition(partitionCount int, forcedPartition int) *pollToken {
func (b *tqLoadBalancer) pickReadPartition(partitionCount int, ei errorinjector.ErrorInjector) *pollToken {
b.lock.Lock()
defer b.lock.Unlock()

// ensure we reflect dynamic config change if it ever happens
b.ensurePartitionCountLocked(max(partitionCount, forcedPartition+1))

partitionID := forcedPartition

if partitionID < 0 {
var partitionID int
if n, ok := errorinjector.Get[int](ei, errorinjector.MatchingLBForceWritePartition); ok {
b.ensurePartitionCountLocked(max(partitionCount, n+1)) // allow n to be >= partitionCount
partitionID = n
} else {
b.ensurePartitionCountLocked(partitionCount)
partitionID = b.pickReadPartitionWithFewestPolls(partitionCount)
}

Expand Down
17 changes: 0 additions & 17 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,23 +1234,6 @@ these log lines can be noisy, we want to be able to turn on and sample selective
1000,
`MatchingMaxTaskQueuesInDeployment represents the maximum number of task-queues that can be registed in a single deployment`,
)
// for matching testing only:

TestMatchingDisableSyncMatch = NewGlobalBoolSetting(
"test.matching.disableSyncMatch",
false,
`TestMatchingDisableSyncMatch forces tasks to go through the db once`,
)
TestMatchingLBForceReadPartition = NewGlobalIntSetting(
"test.matching.lbForceReadPartition",
-1,
`TestMatchingLBForceReadPartition forces polls to go to a specific partition`,
)
TestMatchingLBForceWritePartition = NewGlobalIntSetting(
"test.matching.lbForceWritePartition",
-1,
`TestMatchingLBForceWritePartition forces adds to go to a specific partition`,
)

// keys for history

Expand Down
7 changes: 7 additions & 0 deletions common/errorinjector/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package errorinjector

const (
MatchingDisableSyncMatch = "matching.disableSyncMatch"
MatchingLBForceReadPartition = "matching.lbForceReadPartition"
MatchingLBForceWritePartition = "matching.lbForceWritePartition"
)
18 changes: 18 additions & 0 deletions common/errorinjector/noop_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//go:build !errorinjector
Copy link
Contributor

@stephanos stephanos Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to consider flipping this, actually (ie the default being the real impl).

A while ago I experimented with build tags and assertions; and I feel confident that we can add a step for the actual binary build that verifies there's no trace of ErrorInjector to be found, as it was optimized away. That way we don't need to tell every single developer to run their tests with this flag (in their IDE etc.).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that, but I'm still a little concerned since we have so many different binary builds (docker images, goreleaser, internal ones), and then users that build their own binaries...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, docker-builds seems to already invoke the server Makefile.


package errorinjector

import "go.uber.org/fx"

var Module = fx.Options(
fx.Provide(func() (ei ErrorInjector) { return }),
)

type (
ErrorInjector struct{}
)

func Get[T any](ei ErrorInjector, key string) (T, bool) {
var zero T
return zero, false
}
57 changes: 57 additions & 0 deletions common/errorinjector/test_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//go:build errorinjector

package errorinjector

import (
"sync"

"go.uber.org/fx"
)

var Module = fx.Options(
fx.Provide(NewTestErrorInjector),
)

type (
ErrorInjector interface {
// private accessors; access must go through package-level Get/Set
get(string) (any, bool)
set(string, any)
del(string)
}

errorInjectorImpl struct {
m sync.Map
}
)

func Get[T any](ei ErrorInjector, key string) (T, bool) {
if val, ok := ei.get(key); ok {
// this is only used in test so we want to panic on type mismatch:
return val.(T), ok
}
var zero T
return zero, false
}

func Set[T any](ei ErrorInjector, key string, val T) func() {
ei.set(key, val)
return func() { ei.del(key) }
}

func NewTestErrorInjector() ErrorInjector {
return &errorInjectorImpl{}
}

func (ei *errorInjectorImpl) get(key string) (any, bool) {
val, ok := ei.m.Load(key)
return val, ok
}

func (ei *errorInjectorImpl) set(key string, val any) {
ei.m.Store(key, val)
}

func (ei *errorInjectorImpl) del(key string) {
ei.m.Delete(key)
}
4 changes: 4 additions & 0 deletions common/resource/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/deadlock"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/errorinjector"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/membership"
Expand Down Expand Up @@ -129,6 +130,7 @@ var Module = fx.Options(
deadlock.Module,
config.Module,
utf8validator.Module,
errorinjector.Module,
fx.Invoke(func(*utf8validator.Validator) {}), // force this to be constructed even if not referenced elsewhere
)

Expand Down Expand Up @@ -227,6 +229,7 @@ func ClientFactoryProvider(
membershipMonitor membership.Monitor,
metricsHandler metrics.Handler,
dynamicCollection *dynamicconfig.Collection,
errorInjector errorinjector.ErrorInjector,
persistenceConfig *config.Persistence,
logger log.SnTaggedLogger,
throttledLogger log.ThrottledLogger,
Expand All @@ -236,6 +239,7 @@ func ClientFactoryProvider(
membershipMonitor,
metricsHandler,
dynamicCollection,
errorInjector,
persistenceConfig.NumHistoryShards,
logger,
throttledLogger,
Expand Down
4 changes: 0 additions & 4 deletions service/matching/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type (
PersistenceDynamicRateLimitingParams dynamicconfig.TypedPropertyFn[dynamicconfig.DynamicRateLimitingParams]
PersistenceQPSBurstRatio dynamicconfig.FloatPropertyFn
SyncMatchWaitDuration dynamicconfig.DurationPropertyFnWithTaskQueueFilter
TestDisableSyncMatch dynamicconfig.BoolPropertyFn
RPS dynamicconfig.IntPropertyFn
OperatorRPSRatio dynamicconfig.FloatPropertyFn
AlignMembershipChange dynamicconfig.DurationPropertyFn
Expand Down Expand Up @@ -132,7 +131,6 @@ type (
BacklogNegligibleAge func() time.Duration
MaxWaitForPollerBeforeFwd func() time.Duration
QueryPollerUnavailableWindow func() time.Duration
TestDisableSyncMatch func() bool
// Time to hold a poll request before returning an empty response if there are no tasks
LongPollExpirationInterval func() time.Duration
RangeSize int64
Expand Down Expand Up @@ -211,7 +209,6 @@ func NewConfig(
PersistenceDynamicRateLimitingParams: dynamicconfig.MatchingPersistenceDynamicRateLimitingParams.Get(dc),
PersistenceQPSBurstRatio: dynamicconfig.PersistenceQPSBurstRatio.Get(dc),
SyncMatchWaitDuration: dynamicconfig.MatchingSyncMatchWaitDuration.Get(dc),
TestDisableSyncMatch: dynamicconfig.TestMatchingDisableSyncMatch.Get(dc),
LoadUserData: dynamicconfig.MatchingLoadUserData.Get(dc),
HistoryMaxPageSize: dynamicconfig.MatchingHistoryMaxPageSize.Get(dc),
EnableDeployments: dynamicconfig.EnableDeployments.Get(dc),
Expand Down Expand Up @@ -303,7 +300,6 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) *
return config.MaxWaitForPollerBeforeFwd(ns.String(), taskQueueName, taskType)
},
QueryPollerUnavailableWindow: config.QueryPollerUnavailableWindow,
TestDisableSyncMatch: config.TestDisableSyncMatch,
LongPollExpirationInterval: func() time.Duration {
return config.LongPollExpirationInterval(ns.String(), taskQueueName, taskType)
},
Expand Down
3 changes: 3 additions & 0 deletions service/matching/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/errorinjector"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/metrics"
Expand Down Expand Up @@ -88,6 +89,7 @@ func NewHandler(
namespaceReplicationQueue persistence.NamespaceReplicationQueue,
visibilityManager manager.VisibilityManager,
nexusEndpointManager persistence.NexusEndpointManager,
errorInjector errorinjector.ErrorInjector,
) *Handler {
handler := &Handler{
config: config,
Expand All @@ -110,6 +112,7 @@ func NewHandler(
namespaceReplicationQueue,
visibilityManager,
nexusEndpointManager,
errorInjector,
),
namespaceRegistry: namespaceRegistry,
}
Expand Down
4 changes: 4 additions & 0 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
hlc "go.temporal.io/server/common/clock/hybrid_logical_clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/errorinjector"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/membership"
Expand Down Expand Up @@ -143,6 +144,7 @@ type (
partitions map[tqid.PartitionKey]taskQueuePartitionManager
gaugeMetrics gaugeMetrics // per-namespace task queue counters
config *Config
errorInjector errorinjector.ErrorInjector
// queryResults maps query TaskID (which is a UUID generated in QueryWorkflow() call) to a channel
// that QueryWorkflow() will block on. The channel is unblocked either by worker sending response through
// RespondQueryTaskCompleted() or through an internal service error causing temporal to be unable to dispatch
Expand Down Expand Up @@ -203,6 +205,7 @@ func NewEngine(
namespaceReplicationQueue persistence.NamespaceReplicationQueue,
visibilityManager manager.VisibilityManager,
nexusEndpointManager persistence.NexusEndpointManager,
errorInjector errorinjector.ErrorInjector,
) Engine {
scopedMetricsHandler := metricsHandler.WithTags(metrics.OperationTag(metrics.MatchingEngineScope))
e := &matchingEngineImpl{
Expand Down Expand Up @@ -233,6 +236,7 @@ func NewEngine(
loadedPhysicalTaskQueueCount: make(map[taskQueueCounterKey]int),
},
config: config,
errorInjector: errorInjector,
queryResults: collection.NewSyncMap[string, chan *queryResult](),
nexusResults: collection.NewSyncMap[string, chan *nexusResult](),
outstandingPollers: collection.NewSyncMap[string, context.CancelFunc](),
Expand Down
3 changes: 2 additions & 1 deletion service/matching/physical_task_queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/debug"
"go.temporal.io/server/common/errorinjector"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
Expand Down Expand Up @@ -529,7 +530,7 @@ func (c *physicalTaskQueueManagerImpl) TrySyncMatch(ctx context.Context, task *i
// request sent by history service
c.liveness.markAlive()
c.tasksAddedInIntervals.incrementTaskCount()
if c.config.TestDisableSyncMatch() {
if disable, _ := errorinjector.Get[bool](c.partitionMgr.engine.errorInjector, errorinjector.MatchingDisableSyncMatch); disable {
return false, nil
}
}
Expand Down
Loading
Loading