Skip to content

Commit

Permalink
etcdserver: change "/downgrade/enabled" endpoint to serve linearized …
Browse files Browse the repository at this point in the history
…data
  • Loading branch information
YoyinZyc committed Nov 16, 2019
1 parent 92c7e20 commit 6ce8fa9
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 143 deletions.
48 changes: 6 additions & 42 deletions etcdserver/api/etcdhttp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,27 @@ const (

// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerHTTP) http.Handler {
return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s)
return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.DowngradeEnabledHandler())
}

func newPeerHandler(lg *zap.Logger, s etcdserver.Server, raftHandler http.Handler, leaseHandler http.Handler, ds etcdserver.ServerDowngradeHTTP) http.Handler {
func newPeerHandler(lg *zap.Logger, s etcdserver.Server, raftHandler http.Handler, leaseHandler http.Handler, downgradeEnabledHandler http.Handler) http.Handler {
peerMembersHandler := newPeerMembersHandler(lg, s.Cluster())
peerMemberPromoteHandler := newPeerMemberPromoteHandler(lg, s)
downgradeEnabledHandler := newDowngradeEnabledHandler(lg, s.Cluster(), ds)

mux := http.NewServeMux()
mux.HandleFunc("/", http.NotFound)
mux.Handle(rafthttp.RaftPrefix, raftHandler)
mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)
mux.Handle(peerMembersPath, peerMembersHandler)
mux.Handle(peerMemberPromotePrefix, peerMemberPromoteHandler)
mux.Handle(downgradeEnabledPath, downgradeEnabledHandler)
if leaseHandler != nil {
mux.Handle(leasehttp.LeasePrefix, leaseHandler)
mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
}

if downgradeEnabledHandler != nil {
mux.Handle(downgradeEnabledPath, downgradeEnabledHandler)
}
mux.HandleFunc(versionPath, versionHandler(s.Cluster(), serveVersion))
return mux
}
Expand Down Expand Up @@ -88,20 +90,6 @@ type peerMemberPromoteHandler struct {
server etcdserver.Server
}

func newDowngradeEnabledHandler(lg *zap.Logger, cluster api.Cluster, s etcdserver.ServerDowngradeHTTP) http.Handler {
return &downgradeEnabledHandler{
lg: lg,
cluster: cluster,
server: s,
}
}

type downgradeEnabledHandler struct {
lg *zap.Logger
cluster api.Cluster
server etcdserver.ServerDowngradeHTTP
}

func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, "GET") {
return
Expand Down Expand Up @@ -174,27 +162,3 @@ func (h *peerMemberPromoteHandler) ServeHTTP(w http.ResponseWriter, r *http.Requ
}
}
}

func (h *downgradeEnabledHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, "GET") {
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())

if r.URL.Path != downgradeEnabledPath {
http.Error(w, "bad path", http.StatusBadRequest)
return
}

enabled := h.server.DowngradeInfo().Enabled
w.Header().Set("Content-Type", "application/json")
b, err := json.Marshal(enabled)
if err != nil {
if h.lg != nil {
h.lg.Warn("failed to marshal downgrade.Enabled to json", zap.Error(err))
} else {
plog.Warningf("failed to marshal downgrade.Enabled to json (%v)", err)
}
}
w.Write(b)
}
56 changes: 0 additions & 56 deletions etcdserver/api/etcdhttp/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ func (s *fakeServer) ClusterVersion() *semver.Version { return nil }
func (s *fakeServer) Cluster() api.Cluster { return s.cluster }
func (s *fakeServer) Alarms() []*pb.AlarmMember { return nil }

func (s *fakeServer) DowngradeInfo() *membership.Downgrade { return s.cluster.downgrade }

var fakeRaftHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("test data"))
})
Expand Down Expand Up @@ -282,57 +280,3 @@ func TestNewPeerHandlerOnMembersPromotePrefix(t *testing.T) {
}
}
}

// TestServeDowngradeEnabledGet verifies the request to get local downgrade enabled status
func TestServeDowngradeEnabledGet(t *testing.T) {
d := &membership.Downgrade{Enabled: true}
cluster := &fakeCluster{
id: 1,
downgrade: d,
}
s := fakeServer{cluster}
h := newDowngradeEnabledHandler(nil, cluster, &s)
b, err := json.Marshal(d.Enabled)
if err != nil {
t.Fatal(err)
}
str := string(b)

tests := []struct {
name string
path string
wcode int
wct string
wbody string
}{
{"Succeeded", downgradeEnabledPath, http.StatusOK, "application/json", str},
{"Failed with bad path", path.Join(downgradeEnabledPath, "bad"), http.StatusBadRequest, "text/plain; charset=utf-8", "bad path\n"},
}

for i, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req, err := http.NewRequest("GET", testutil.MustNewURL(t, tt.path).String(), nil)
if err != nil {
t.Fatal(err)
}
rw := httptest.NewRecorder()
h.ServeHTTP(rw, req)

if rw.Code != tt.wcode {
t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode)
}
if gct := rw.Header().Get("Content-Type"); gct != tt.wct {
t.Errorf("#%d: content-type = %s, want %s", i, gct, tt.wct)
}
if rw.Body.String() != tt.wbody {
t.Errorf("#%d: body = %s, want %s", i, rw.Body.String(), tt.wbody)
}
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
wcid := cluster.ID().String()
if gcid != wcid {
t.Errorf("#%d: cid = %s, want %s", i, gcid, wcid)
}
})

}
}
6 changes: 3 additions & 3 deletions etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,8 +918,8 @@ func (c *RaftCluster) VotingMemberIDs() []types.ID {
return ids
}

// Downgrade returns the capability status of the cluster
func (c *RaftCluster) Downgrade() *Downgrade {
// DowngradeInfo returns the capability status of the cluster
func (c *RaftCluster) DowngradeInfo() *Downgrade {
c.Lock()
defer c.Unlock()
if c.downgradeInfo == nil {
Expand All @@ -929,7 +929,7 @@ func (c *RaftCluster) Downgrade() *Downgrade {
return d
}

func (c *RaftCluster) UpdateDowngrade(d *Downgrade) {
func (c *RaftCluster) SetDowngradeInfo(d *Downgrade) {
c.Lock()
defer c.Unlock()

Expand Down
8 changes: 3 additions & 5 deletions etcdserver/api/membership/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,13 +983,11 @@ func TestMustDetectDowngrade(t *testing.T) {

data, err := ioutil.ReadFile(logPath)
if err == nil {
t.Log(len(data))
if !bytes.Contains(data, []byte(tt.message)) {
t.Errorf("Expected to find %v in log", tt.message)
t.Log(string(data))
}
} else {
t.Log(err)
t.Fatal(err)
}

if !tt.success {
Expand All @@ -1000,7 +998,7 @@ func TestMustDetectDowngrade(t *testing.T) {
}

if tt.success && errCmd != nil {
t.Errorf("Expected not failure; Got %v", err)
t.Errorf("Expected not failure; Got %v", errCmd)
}
})
}
Expand Down Expand Up @@ -1094,7 +1092,7 @@ func TestGetDowngrade(t *testing.T) {
}
for i, tt := range tests {
t.Run(string(i), func(t *testing.T) {
d := tt.cluster.Downgrade()
d := tt.cluster.DowngradeInfo()
if d.Enabled != tt.expectedEnabled {
t.Errorf("Expected %v; Got %v", tt.expectedEnabled, d.Enabled)
}
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ func (a *applierV3backend) Downgrade(dr *pb.DowngradeRequest) (*pb.DowngradeResp
case pb.DowngradeRequest_CANCEL:
d = membership.Downgrade{Enabled: false}
}
a.s.cluster.UpdateDowngrade(&d)
a.s.cluster.SetDowngradeInfo(&d)
resp := &pb.DowngradeResponse{Version: a.s.ClusterVersion().String()}
return resp, nil
}
Expand Down
47 changes: 18 additions & 29 deletions etcdserver/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,60 +178,47 @@ func getVersions(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt
}

// decideAllowedVersionRange decides the available version range of the cluster that local server can join in;
// if the downgrade enable status of the cluster is not decided(some server allows downgrade some not),
// the version window is [localVersion, localVersion];
// if the downgrade enable status is true, the version window is [localVersion, oneMinorHigher]
// if the downgrade enabled status is true, the version window is [localVersion, oneMinorHigher]
// if the downgrade is not enabled, the version window is [MinClusterVersion, localVersion]
func decideAllowedVersionRange(enables []bool) (minV *semver.Version, maxV *semver.Version) {
func decideAllowedVersionRange(downgradeEnabled bool) (minV *semver.Version, maxV *semver.Version) {
minV = semver.Must(semver.NewVersion(version.MinClusterVersion))
maxV = semver.Must(semver.NewVersion(version.Version))
maxV = &semver.Version{Major: maxV.Major, Minor: maxV.Minor}

if len(enables) == 0 {
return minV, maxV
}
enable := enables[0]
for _, e := range enables {
// if the downgrade enable status of the cluster is not decided,
// the local server can only join into a cluster with exactly same version
if e != enable {
minV = &semver.Version{Major: maxV.Major, Minor: maxV.Minor}
return minV, maxV
}
}

if enable {
if downgradeEnabled {
minV = &semver.Version{Major: maxV.Major, Minor: maxV.Minor}
maxV.Minor = maxV.Minor + 1
}
return minV, maxV
}

func getDowngradableOfCluster(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) []bool {
// decideDowngradeEnabled will decide the downgrade enabled status of the cluster.
func decideDowngradeEnabled(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool {
members := cl.Members()
var enables []bool

for _, m := range members {
if m.ID == local {
continue
}
enable, err := getDowngradable(lg, m, rt)
enable, err := getDowngradeEnabled(lg, m, rt)
if err != nil {
if lg != nil {
lg.Warn("failed to get downgrade enabled status", zap.String("remote-member-id", m.ID.String()), zap.Error(err))
} else {
plog.Warningf("cannot get the downgrade enabled status of member %s (%v)", m.ID, err)
}
enables = append(enables, false)
} else {
enables = append(enables, enable)
// Since the "/downgrade/enabled" serves linearized data,
// this function can return once it gets a non-error response from the endpoint.
return enable
}
}
return enables
return false
}

// getDowngradeStatus returns the downgrade status of the given member
// getDowngradeEnabled returns the downgrade enabled status of the given member
// via its peerURLs. Returns the last error if it fails to get it.
func getDowngradable(lg *zap.Logger, m *membership.Member, rt http.RoundTripper) (bool, error) {
func getDowngradeEnabled(lg *zap.Logger, m *membership.Member, rt http.RoundTripper) (bool, error) {
cc := &http.Client{
Transport: rt,
}
Expand All @@ -241,7 +228,7 @@ func getDowngradable(lg *zap.Logger, m *membership.Member, rt http.RoundTripper)
)

for _, u := range m.PeerURLs {
addr := u + "/downgrade/enable"
addr := u + "/downgrade/enabled"
resp, err = cc.Get(addr)
if err != nil {
if lg != nil {
Expand Down Expand Up @@ -338,7 +325,9 @@ func decideClusterVersion(lg *zap.Logger, vers map[string]*version.Versions) *se
return cv
}

func decideDowngradeStatus(lg *zap.Logger, targetVersion *semver.Version, vers map[string]*version.Versions) bool {
// isDowngradeFinished decides the cluster downgrade status based on versions map.
// Return true if all servers are downgraded to target version, otherwise return false.
func isDowngradeFinished(lg *zap.Logger, targetVersion *semver.Version, vers map[string]*version.Versions) bool {
for mid, ver := range vers {
if ver == nil {
return false
Expand Down Expand Up @@ -372,7 +361,7 @@ func decideDowngradeStatus(lg *zap.Logger, targetVersion *semver.Version, vers m
// We set this rule since when the local member joins, another member might be offline.
func isCompatibleWithCluster(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool {
vers := getVersions(lg, cl, local, rt)
minV, maxV := decideAllowedVersionRange(getDowngradableOfCluster(lg, cl, local, rt))
minV, maxV := decideAllowedVersionRange(decideDowngradeEnabled(lg, cl, local, rt))
return isCompatibleWithVers(lg, vers, local, minV, maxV)
}

Expand Down
58 changes: 55 additions & 3 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ const (

// Todo: need to be decided
monitorDowngradeInterval = time.Second

downgradeHTTPTimeout = 5 * time.Second
)

var (
Expand Down Expand Up @@ -881,9 +883,59 @@ type ServerPeerHTTP interface {
type ServerDowngradeHTTP interface {
// DowngradeInfo is the downgrade information of the cluster
DowngradeInfo() *membership.Downgrade
DowngradeEnabledHandler() http.Handler
}

func (s *EtcdServer) DowngradeInfo() *membership.Downgrade { return s.cluster.DowngradeInfo() }

type downgradeEnabledHandler struct {
lg *zap.Logger
cluster api.Cluster
server *EtcdServer
}

func (s *EtcdServer) DowngradeEnabledHandler() http.Handler {
return &downgradeEnabledHandler{
lg: s.getLogger(),
cluster: s.cluster,
server: s,
}
}

func (s *EtcdServer) DowngradeInfo() *membership.Downgrade { return s.cluster.Downgrade() }
func (h *downgradeEnabledHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
w.Header().Set("Allow", r.Method)
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}

w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())

if r.URL.Path != "/downgrade/enabled" {
http.Error(w, "bad path", http.StatusBadRequest)
return
}

ctx, cancel := context.WithTimeout(context.Background(), downgradeHTTPTimeout)
defer cancel()

// serve with linearized downgrade info
if err := h.server.linearizableReadNotify(ctx); err != nil {
http.Error(w, "failed linearized read", http.StatusBadRequest)
return
}
enabled := h.server.DowngradeInfo().Enabled
w.Header().Set("Content-Type", "application/json")
b, err := json.Marshal(enabled)
if err != nil {
if h.lg != nil {
h.lg.Warn("failed to marshal downgrade.Enabled to json", zap.Error(err))
} else {
plog.Warningf("failed to marshal downgrade.Enabled to json (%v)", err)
}
}
w.Write(b)
}

// Process takes a raft message and applies it to the server's raft state
// machine, respecting any timeout of the given context.
Expand Down Expand Up @@ -2591,13 +2643,13 @@ func (s *EtcdServer) monitorDowngrade() {
continue
}

d := s.cluster.Downgrade()
d := s.cluster.DowngradeInfo()
if !d.Enabled {
continue
}

targetVersion := d.TargetVersion
if decideDowngradeStatus(s.getLogger(), targetVersion, getVersions(s.getLogger(), s.cluster, s.id, s.peerRt)) {
if isDowngradeFinished(s.getLogger(), targetVersion, getVersions(s.getLogger(), s.cluster, s.id, s.peerRt)) {
if lg != nil {
lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion.String()))
} else {
Expand Down
Loading

0 comments on commit 6ce8fa9

Please sign in to comment.