Skip to content

Commit

Permalink
Do not acquire all shard when doing start up. (uber#1047)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Aug 23, 2018
1 parent 87b12c2 commit 59e184f
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 53 deletions.
49 changes: 0 additions & 49 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ package history
import (
"time"

"github.com/uber-common/bark"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service"
Expand Down Expand Up @@ -231,10 +228,6 @@ func (s *Service) Start() {
shardMgr = persistence.NewShardPersistenceRateLimitedClient(shardMgr, persistenceRateLimiter, log)
shardMgr = persistence.NewShardPersistenceMetricsClient(shardMgr, base.GetMetricsClient(), log)

// Hack to create shards for bootstrap purposes
// TODO: properly pre-create all shards before deployment.
s.createAllShards(p.CassandraConfig.NumHistoryShards, shardMgr, log)

metadata, err := persistence.NewMetadataManagerProxy(p.CassandraConfig.Hosts,
p.CassandraConfig.Port,
p.CassandraConfig.User,
Expand Down Expand Up @@ -318,45 +311,3 @@ func (s *Service) Stop() {
}
s.params.Logger.Infof("%v stopped", common.HistoryServiceName)
}

func (s *Service) createAllShards(numShards int, shardMgr persistence.ShardManager, log bark.Logger) {
policy := backoff.NewExponentialRetryPolicy(50 * time.Millisecond)
policy.SetMaximumInterval(time.Second)
policy.SetExpirationInterval(5 * time.Second)

log.Infof("Starting check for shard creation of '%v' shards.", numShards)
for shardID := 0; shardID < numShards; shardID++ {
getShardOperation := func() error {
_, err := shardMgr.GetShard(&persistence.GetShardRequest{
ShardID: shardID,
})

return err
}

err := backoff.Retry(getShardOperation, policy, common.IsPersistenceTransientError)
if err != nil {
if _, ok := err.(*shared.EntityNotExistsError); !ok {
log.Fatalf("failed to get shard for ShardId: %v, with error: %v", shardID, err)
}

// Shard not found. Let's create shard for the very first time
createShardOperation := func() error {
return shardMgr.CreateShard(&persistence.CreateShardRequest{
ShardInfo: &persistence.ShardInfo{
ShardID: shardID,
RangeID: 0,
TransferAckLevel: 0,
}})
}

err := backoff.Retry(createShardOperation, policy, common.IsPersistenceTransientError)
if err != nil {
if _, ok := err.(*persistence.ShardAlreadyExistError); !ok {
log.Fatalf("failed to create shard for ShardId: %v, with error: %v", shardID, err)
}
}
}
}
log.Infof("All '%v' shards are created.", numShards)
}
49 changes: 45 additions & 4 deletions service/history/shardContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"github.com/uber-common/bark"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/service"
)

Expand Down Expand Up @@ -785,12 +786,52 @@ func acquireShard(shardID int, svc service.Service, shardManager persistence.Sha
historyMgr persistence.HistoryManager, executionMgr persistence.ExecutionManager, domainCache cache.DomainCache,
owner string, closeCh chan<- int, config *Config, logger bark.Logger, metricsClient metrics.Client) (ShardContext,
error) {
response, err0 := shardManager.GetShard(&persistence.GetShardRequest{ShardID: shardID})
if err0 != nil {
return nil, err0

var shardInfo *persistence.ShardInfo

retryPolicy := backoff.NewExponentialRetryPolicy(50 * time.Millisecond)
retryPolicy.SetMaximumInterval(time.Second)
retryPolicy.SetExpirationInterval(5 * time.Second)

retryPredicate := func(err error) bool {
if common.IsPersistenceTransientError(err) {
return true
}
_, ok := err.(*persistence.ShardAlreadyExistError)
return ok

}

getShard := func() error {
resp, err := shardManager.GetShard(&persistence.GetShardRequest{
ShardID: shardID,
})
if err == nil {
shardInfo = resp.ShardInfo
return nil
}
if _, ok := err.(*shared.EntityNotExistsError); !ok {
return err
}

// EntityNotExistsError error
shardInfo = &persistence.ShardInfo{
ShardID: shardID,
RangeID: 0,
TransferAckLevel: 0,
}
return shardManager.CreateShard(&persistence.CreateShardRequest{ShardInfo: shardInfo})
}

err := backoff.Retry(getShard, retryPolicy, retryPredicate)
if err != nil {
logger.WithFields(bark.Fields{
logging.TagHistoryShardID: shardID,
logging.TagErr: err,
}).Error("Fail to acquire shard.")
return nil, err
}

shardInfo := response.ShardInfo
updatedShardInfo := copyShardInfo(shardInfo)
updatedShardInfo.Owner = owner

Expand Down

0 comments on commit 59e184f

Please sign in to comment.