Skip to content

Commit

Permalink
fix premigrate bug
Browse files Browse the repository at this point in the history
  • Loading branch information
yangzhe1991 committed Jul 1, 2015
1 parent 67b6168 commit f162249
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 17 deletions.
4 changes: 2 additions & 2 deletions cmd/cconfig/migrate_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func (t *MigrateTask) migrateSingleSlot(slotId int, to int) error {
log.Error(err)
return err
}
if s.State.Status != models.SLOT_STATUS_ONLINE && s.State.Status != models.SLOT_STATUS_MIGRATE {
log.Warning("status is not online && migrate", s)
if s.State.Status == models.SLOT_STATUS_OFFLINE {
log.Warning("status is offline", s)
return nil
}

Expand Down
12 changes: 7 additions & 5 deletions pkg/models/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func GetMigratingSlots(conn zkhelper.Conn, productName string) ([]*Slot, error)
}

for _, slot := range slots {
if slot.State.Status == SLOT_STATUS_MIGRATE {
if slot.State.Status == SLOT_STATUS_MIGRATE || slot.State.Status == SLOT_STATUS_PRE_MIGRATE {
migrateSlots = append(migrateSlots, slot)
}
}
Expand Down Expand Up @@ -246,17 +246,19 @@ func (s *Slot) SetMigrateStatus(zkConn zkhelper.Conn, fromGroup, toGroup int) er
return errors.Errorf("invalid group id, from %d, to %d", fromGroup, toGroup)
}
// wait until all proxy confirmed
err := NewAction(zkConn, s.ProductName, ACTION_TYPE_SLOT_PREMIGRATE, s, "", true)
s.State.Status = SLOT_STATUS_PRE_MIGRATE
err := s.Update(zkConn)
if err != nil {
return errors.Trace(err)
}
err = NewAction(zkConn, s.ProductName, ACTION_TYPE_SLOT_PREMIGRATE, s, "", true)
if err != nil {
return errors.Trace(err)
}

s.State.Status = SLOT_STATUS_MIGRATE
s.State.MigrateStatus.From = fromGroup
s.State.MigrateStatus.To = toGroup

s.GroupId = toGroup

return s.Update(zkConn)
}

Expand Down
12 changes: 7 additions & 5 deletions pkg/proxy/router/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ import (
"strconv"
"strings"
"time"
"github.com/wandoulabs/codis/pkg/utils"

topo "github.com/ngaut/go-zookeeper/zk"
stats "github.com/ngaut/gostats"
log "github.com/ngaut/logging"

"github.com/juju/errors"
"github.com/wandoulabs/codis/pkg/models"
"github.com/wandoulabs/codis/pkg/proxy/group"
"github.com/wandoulabs/codis/pkg/proxy/parser"
"github.com/wandoulabs/codis/pkg/proxy/router/topology"
log "github.com/ngaut/logging"
"github.com/juju/errors"
topo "github.com/ngaut/go-zookeeper/zk"
stats "github.com/ngaut/gostats"
"github.com/wandoulabs/codis/pkg/utils"
"github.com/xiam/resp"
)

Expand Down
5 changes: 0 additions & 5 deletions pkg/proxy/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,6 @@ func (s *Server) dispatch(r *PipelineRequest) {
tr = s.pipeConns[s.slots[r.slotIdx].dst.Master()]
}
tr.in <- r

}

func (s *Server) handleTopoEvent() {
Expand All @@ -540,7 +539,6 @@ func (s *Server) handleTopoEvent() {
s.bufferedReq.PushBack(r)
continue
}

for e := s.bufferedReq.Front(); e != nil; {
next := e.Next()
blockedReq := e.Value.(*PipelineRequest)
Expand All @@ -550,7 +548,6 @@ func (s *Server) handleTopoEvent() {
}
e = next
}

s.dispatch(r)
case e := <-s.evtbus:
switch e.(type) {
Expand All @@ -570,9 +567,7 @@ func (s *Server) handleTopoEvent() {
continue
}
}

}

log.Infof("got event %s, %v, lastActionSeq %d", s.pi.Id, e, s.lastActionSeq)
s.processAction(e)
}
Expand Down

0 comments on commit f162249

Please sign in to comment.