Skip to content

Commit

Permalink
Shuffle responses for replication messages (uber#4652)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored Nov 23, 2021
1 parent 3b36196 commit ff71ae3
Showing 1 changed file with 19 additions and 6 deletions.
25 changes: 19 additions & 6 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package history

import (
"context"
"math/rand"
"sync"
"time"

Expand Down Expand Up @@ -805,7 +806,8 @@ func (c *clientImpl) GetReplicationMessages(

var wg sync.WaitGroup
wg.Add(len(requestsByPeer))
respChan := make(chan *getReplicationMessagesWithSize, len(requestsByPeer))
var responseMutex sync.Mutex
peerResponses := make([]*getReplicationMessagesWithSize, 0, len(requestsByPeer))
errChan := make(chan error, 1)

for peer, req := range requestsByPeer {
Expand All @@ -826,26 +828,37 @@ func (c *clientImpl) GetReplicationMessages(
}
return
}
respChan <- &getReplicationMessagesWithSize{
responseMutex.Lock()
peerResponses = append(peerResponses, &getReplicationMessagesWithSize{
response: resp,
size: responseInfo.Size,
}
})
responseMutex.Unlock()
}(ctx, peer, req)
}

wg.Wait()
close(respChan)
close(errChan)

if len(errChan) > 0 {
err := <-errChan
return nil, err
}

// Peers with largest responses can be slowest to return data.
// They end up in the end of array and have a possibility of not fitting in the response message.
// Skipped peers grow their responses even more and next they will be even slower and end up in the end again.
// This can lead to starving peers.
// Shuffle the slice of responses to prevent such scenario. All peer will have equal chance to be pick up first.
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := range peerResponses {
j := r.Intn(i + 1)
peerResponses[i], peerResponses[j] = peerResponses[j], peerResponses[i]
}

response := &types.GetReplicationMessagesResponse{MessagesByShard: make(map[int32]*types.ReplicationMessages)}
responseTotalSize := 0

for resp := range respChan {
for _, resp := range peerResponses {
// return partial response if the response size exceeded supported max size
responseTotalSize += resp.size
if responseTotalSize >= c.rpcMaxSizeInBytes() {
Expand Down

0 comments on commit ff71ae3

Please sign in to comment.