Skip to content

Commit

Permalink
Log replication messages that did not fit (uber#4844)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored May 25, 2022
1 parent 3cfcaea commit 471e6d1
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 4 deletions.
24 changes: 20 additions & 4 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type (
getReplicationMessagesWithSize struct {
response *types.GetReplicationMessagesResponse
size int
peer string
}
)

Expand Down Expand Up @@ -831,6 +832,7 @@ func (c *clientImpl) GetReplicationMessages(
peerResponses = append(peerResponses, &getReplicationMessagesWithSize{
response: resp,
size: responseInfo.Size,
peer: peer,
})
responseMutex.Unlock()
return nil
Expand All @@ -854,13 +856,27 @@ func (c *clientImpl) GetReplicationMessages(

response := &types.GetReplicationMessagesResponse{MessagesByShard: make(map[int32]*types.ReplicationMessages)}
responseTotalSize := 0
rpcMaxResponseSize := c.rpcMaxSizeInBytes()
for _, resp := range peerResponses {
// return partial response if the response size exceeded supported max size
responseTotalSize += resp.size
if responseTotalSize >= c.rpcMaxSizeInBytes() {
return response, nil
if (responseTotalSize + resp.size) >= rpcMaxResponseSize {
// Log shards that did not fit for debugging purposes
for shardID := range resp.response.GetMessagesByShard() {
c.logger.Warn("Replication messages did not fit in the response",
tag.ShardID(int(shardID)),
tag.Address(resp.peer),
tag.ResponseSize(resp.size),
tag.ResponseTotalSize(responseTotalSize),
tag.ResponseMaxSize(rpcMaxResponseSize),
)
}

// return partial response if the response size exceeded supported max size
// but continue with next peer response, as it may still fit
continue
}

responseTotalSize += resp.size

for shardID, tasks := range resp.response.GetMessagesByShard() {
response.MessagesByShard[shardID] = tasks
}
Expand Down
15 changes: 15 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,21 @@ func TokenLastEventVersion(version int64) Tag {
return newInt64("xdc-token-last-event-version", version)
}

// ResponseSize returns tag for ResponseSize
func ResponseSize(size int) Tag {
return newInt("response-size", size)
}

// ResponseTotalSize returns tag for ResponseTotalSize
func ResponseTotalSize(size int) Tag {
return newInt("response-total-size", size)
}

// ResponseMaxSize returns tag for ResponseMaxSize
func ResponseMaxSize(size int) Tag {
return newInt("response-max-size", size)
}

/////////////////// Archival tags defined here: archival- ///////////////////
// archival request tags

Expand Down

0 comments on commit 471e6d1

Please sign in to comment.