Skip to content

Commit

Permalink
Create naive isolation group matching loadbalancer
Browse files Browse the repository at this point in the history
This is an intentionally naive implementation of a load balancer that assigns isolation groups to specific partitions and routes pollers/tasks to those partitions based on their isolation group. Any time there are multiple options one is picked randomly.

The goal of this implementation is to benchmark how significantly isolation-group-based routing improves task latencies and isolation group containment for non-extremely-skewed scenarios. It additionally provides a baseline to see how much a more sophisticated solution (storing the isolation group assignment with the partitions and dynamically rebalancing them) might improve these metrics.

This isn't intended to be used in production as-is.
  • Loading branch information
natemort committed Dec 18, 2024
1 parent f4e219a commit 666723d
Show file tree
Hide file tree
Showing 16 changed files with 839 additions and 297 deletions.
5 changes: 5 additions & 0 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type (
metricsClient metrics.Client
dynConfig *dynamicconfig.Collection
numberOfHistoryShards int
allIsolationGroups func() []string
logger log.Logger
}
)
Expand All @@ -83,6 +84,7 @@ func NewRPCClientFactory(
metricsClient metrics.Client,
dc *dynamicconfig.Collection,
numberOfHistoryShards int,
allIsolationGroups func() []string,
logger log.Logger,
) Factory {
return &rpcClientFactory{
Expand All @@ -91,6 +93,7 @@ func NewRPCClientFactory(
metricsClient: metricsClient,
dynConfig: dc,
numberOfHistoryShards: numberOfHistoryShards,
allIsolationGroups: allIsolationGroups,
logger: logger,
}
}
Expand Down Expand Up @@ -155,10 +158,12 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
defaultLoadBalancer := matching.NewLoadBalancer(partitionConfigProvider)
roundRobinLoadBalancer := matching.NewRoundRobinLoadBalancer(partitionConfigProvider)
weightedLoadBalancer := matching.NewWeightedLoadBalancer(roundRobinLoadBalancer, partitionConfigProvider, cf.logger)
igLoadBalancer := matching.NewIsolationLoadBalancer(weightedLoadBalancer, partitionConfigProvider, cf.allIsolationGroups)
loadBalancers := map[string]matching.LoadBalancer{
"random": defaultLoadBalancer,
"round-robin": roundRobinLoadBalancer,
"weighted": weightedLoadBalancer,
"isolation": igLoadBalancer,
}
client := matching.NewClient(
rawClient,
Expand Down
31 changes: 10 additions & 21 deletions client/matching/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,8 @@ func (c *clientImpl) AddActivityTask(
opts ...yarpc.CallOption,
) (*types.AddActivityTaskResponse, error) {
partition := c.loadBalancer.PickWritePartition(
request.GetDomainUUID(),
*request.GetTaskList(),
persistence.TaskListTypeActivity,
request.GetForwardedFrom(),
request,
)
originalTaskListName := request.TaskList.GetName()
request.TaskList.Name = partition
Expand Down Expand Up @@ -91,10 +89,8 @@ func (c *clientImpl) AddDecisionTask(
opts ...yarpc.CallOption,
) (*types.AddDecisionTaskResponse, error) {
partition := c.loadBalancer.PickWritePartition(
request.GetDomainUUID(),
*request.GetTaskList(),
persistence.TaskListTypeDecision,
request.GetForwardedFrom(),
request,
)
originalTaskListName := request.TaskList.GetName()
request.TaskList.Name = partition
Expand Down Expand Up @@ -122,10 +118,9 @@ func (c *clientImpl) PollForActivityTask(
opts ...yarpc.CallOption,
) (*types.MatchingPollForActivityTaskResponse, error) {
partition := c.loadBalancer.PickReadPartition(
request.GetDomainUUID(),
*request.PollRequest.GetTaskList(),
persistence.TaskListTypeActivity,
request.GetForwardedFrom(),
request,
request.GetIsolationGroup(),
)
originalTaskListName := request.PollRequest.GetTaskList().GetName()
request.PollRequest.TaskList.Name = partition
Expand All @@ -145,10 +140,8 @@ func (c *clientImpl) PollForActivityTask(
resp.PartitionConfig,
)
c.loadBalancer.UpdateWeight(
request.GetDomainUUID(),
*request.PollRequest.GetTaskList(),
persistence.TaskListTypeActivity,
request.GetForwardedFrom(),
request,
partition,
resp.LoadBalancerHints,
)
Expand All @@ -163,10 +156,9 @@ func (c *clientImpl) PollForDecisionTask(
opts ...yarpc.CallOption,
) (*types.MatchingPollForDecisionTaskResponse, error) {
partition := c.loadBalancer.PickReadPartition(
request.GetDomainUUID(),
*request.PollRequest.GetTaskList(),
persistence.TaskListTypeDecision,
request.GetForwardedFrom(),
request,
request.GetIsolationGroup(),
)
originalTaskListName := request.PollRequest.GetTaskList().GetName()
request.PollRequest.TaskList.Name = partition
Expand All @@ -186,10 +178,8 @@ func (c *clientImpl) PollForDecisionTask(
resp.PartitionConfig,
)
c.loadBalancer.UpdateWeight(
request.GetDomainUUID(),
*request.PollRequest.GetTaskList(),
persistence.TaskListTypeDecision,
request.GetForwardedFrom(),
request,
partition,
resp.LoadBalancerHints,
)
Expand All @@ -204,10 +194,9 @@ func (c *clientImpl) QueryWorkflow(
opts ...yarpc.CallOption,
) (*types.QueryWorkflowResponse, error) {
partition := c.loadBalancer.PickReadPartition(
request.GetDomainUUID(),
*request.GetTaskList(),
persistence.TaskListTypeDecision,
request.GetForwardedFrom(),
request,
"",
)
request.TaskList.Name = partition
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
Expand Down
34 changes: 17 additions & 17 deletions client/matching/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestClient_withResponse(t *testing.T) {
return c.AddActivityTask(context.Background(), testAddActivityTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
balancer.EXPECT().PickWritePartition(persistence.TaskListTypeActivity, testAddActivityTaskRequest()).Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().AddActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.AddActivityTaskResponse{}, nil)
mp.EXPECT().UpdatePartitionConfig(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, nil)
Expand All @@ -173,7 +173,7 @@ func TestClient_withResponse(t *testing.T) {
return c.AddActivityTask(context.Background(), testAddActivityTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
balancer.EXPECT().PickWritePartition(persistence.TaskListTypeActivity, testAddActivityTaskRequest()).Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", assert.AnError)
},
wantError: true,
Expand All @@ -184,7 +184,7 @@ func TestClient_withResponse(t *testing.T) {
return c.AddActivityTask(context.Background(), testAddActivityTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
balancer.EXPECT().PickWritePartition(persistence.TaskListTypeActivity, testAddActivityTaskRequest()).Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().AddActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, assert.AnError)
},
Expand All @@ -196,7 +196,7 @@ func TestClient_withResponse(t *testing.T) {
return c.AddDecisionTask(context.Background(), testAddDecisionTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
balancer.EXPECT().PickWritePartition(persistence.TaskListTypeDecision, testAddDecisionTaskRequest()).Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().AddDecisionTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.AddDecisionTaskResponse{}, nil)
mp.EXPECT().UpdatePartitionConfig(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, nil)
Expand All @@ -209,7 +209,7 @@ func TestClient_withResponse(t *testing.T) {
return c.AddDecisionTask(context.Background(), testAddDecisionTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
balancer.EXPECT().PickWritePartition(persistence.TaskListTypeDecision, testAddDecisionTaskRequest()).Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", assert.AnError)
},
wantError: true,
Expand All @@ -220,7 +220,7 @@ func TestClient_withResponse(t *testing.T) {
return c.AddDecisionTask(context.Background(), testAddDecisionTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
balancer.EXPECT().PickWritePartition(persistence.TaskListTypeDecision, testAddDecisionTaskRequest()).Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().AddDecisionTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, assert.AnError)
},
Expand All @@ -232,11 +232,11 @@ func TestClient_withResponse(t *testing.T) {
return c.PollForActivityTask(context.Background(), testMatchingPollForActivityTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
balancer.EXPECT().PickReadPartition(persistence.TaskListTypeActivity, testMatchingPollForActivityTaskRequest(), "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.MatchingPollForActivityTaskResponse{}, nil)
mp.EXPECT().UpdatePartitionConfig(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, nil)
balancer.EXPECT().UpdateWeight(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "", _testPartition, nil)
balancer.EXPECT().UpdateWeight(persistence.TaskListTypeActivity, testMatchingPollForActivityTaskRequest(), _testPartition, nil)
},
want: &types.MatchingPollForActivityTaskResponse{},
},
Expand All @@ -246,7 +246,7 @@ func TestClient_withResponse(t *testing.T) {
return c.PollForActivityTask(context.Background(), testMatchingPollForActivityTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
balancer.EXPECT().PickReadPartition(persistence.TaskListTypeActivity, testMatchingPollForActivityTaskRequest(), "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", assert.AnError)
},
want: nil,
Expand All @@ -258,7 +258,7 @@ func TestClient_withResponse(t *testing.T) {
return c.PollForActivityTask(context.Background(), testMatchingPollForActivityTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
balancer.EXPECT().PickReadPartition(persistence.TaskListTypeActivity, testMatchingPollForActivityTaskRequest(), "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, assert.AnError)
},
Expand All @@ -271,11 +271,11 @@ func TestClient_withResponse(t *testing.T) {
return c.PollForDecisionTask(context.Background(), testMatchingPollForDecisionTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
balancer.EXPECT().PickReadPartition(persistence.TaskListTypeDecision, testMatchingPollForDecisionTaskRequest(), "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.MatchingPollForDecisionTaskResponse{}, nil)
mp.EXPECT().UpdatePartitionConfig(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, nil)
balancer.EXPECT().UpdateWeight(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "", _testPartition, nil)
balancer.EXPECT().UpdateWeight(persistence.TaskListTypeDecision, testMatchingPollForDecisionTaskRequest(), _testPartition, nil)
},
want: &types.MatchingPollForDecisionTaskResponse{},
},
Expand All @@ -285,7 +285,7 @@ func TestClient_withResponse(t *testing.T) {
return c.PollForDecisionTask(context.Background(), testMatchingPollForDecisionTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
balancer.EXPECT().PickReadPartition(persistence.TaskListTypeDecision, testMatchingPollForDecisionTaskRequest(), "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", assert.AnError)
},
want: nil,
Expand All @@ -297,7 +297,7 @@ func TestClient_withResponse(t *testing.T) {
return c.PollForDecisionTask(context.Background(), testMatchingPollForDecisionTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
balancer.EXPECT().PickReadPartition(persistence.TaskListTypeDecision, testMatchingPollForDecisionTaskRequest(), "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, assert.AnError)
},
Expand All @@ -310,7 +310,7 @@ func TestClient_withResponse(t *testing.T) {
return c.QueryWorkflow(context.Background(), testMatchingQueryWorkflowRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
balancer.EXPECT().PickReadPartition(persistence.TaskListTypeDecision, testMatchingQueryWorkflowRequest(), "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.QueryWorkflowResponse{}, nil)
},
Expand All @@ -322,7 +322,7 @@ func TestClient_withResponse(t *testing.T) {
return c.QueryWorkflow(context.Background(), testMatchingQueryWorkflowRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
balancer.EXPECT().PickReadPartition(persistence.TaskListTypeDecision, testMatchingQueryWorkflowRequest(), "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", assert.AnError)
},
want: nil,
Expand All @@ -334,7 +334,7 @@ func TestClient_withResponse(t *testing.T) {
return c.QueryWorkflow(context.Background(), testMatchingQueryWorkflowRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
balancer.EXPECT().PickReadPartition(persistence.TaskListTypeDecision, testMatchingQueryWorkflowRequest(), "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, assert.AnError)
},
Expand Down
Loading

0 comments on commit 666723d

Please sign in to comment.