Skip to content

Commit

Permalink
feat: add E2E tests for cases that peers going offline (#3524)
Browse files Browse the repository at this point in the history
Signed-off-by: BruceAko <chongzhi@hust.edu.cn>
  • Loading branch information
BruceAko authored Oct 21, 2024
1 parent 2793851 commit 071072f
Show file tree
Hide file tree
Showing 25 changed files with 510 additions and 56 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/e2e-v2-nydus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ jobs:
- name: Run E2E test
run: |
docker exec kind-control-plane curl -L -o nerdctl-${NERDCTL_VER}-linux-amd64.tar.gz https://github.com/containerd/nerdctl/releases/download/v${NERDCTL_VER}/nerdctl-${NERDCTL_VER}-linux-amd64.tar.gz
docker exec kind-control-plane tar xzf nerdctl-${NERDCTL_VER}-linux-amd64.tar.gz
docker exec kind-control-plane install -D -m 755 nerdctl /usr/local/bin/nerdctl
docker exec kind-worker curl -L -o nerdctl-${NERDCTL_VER}-linux-amd64.tar.gz https://github.com/containerd/nerdctl/releases/download/v${NERDCTL_VER}/nerdctl-${NERDCTL_VER}-linux-amd64.tar.gz
docker exec kind-worker tar xzf nerdctl-${NERDCTL_VER}-linux-amd64.tar.gz
docker exec kind-worker install -D -m 755 nerdctl /usr/local/bin/nerdctl
# this is only a simple test that run `date` in container
docker exec kind-control-plane /usr/local/bin/nerdctl run --snapshotter nydus --network=none ghcr.io/dragonflyoss/image-service/nginx:nydus-latest date
docker exec kind-worker /usr/local/bin/nerdctl run --snapshotter nydus --network=none ghcr.io/dragonflyoss/image-service/nginx:nydus-latest date
kubectl apply -f test/testdata/k8s/nydus.yaml
kubectl wait po nydus-pod --for=condition=ready --timeout=1m
Expand Down
2 changes: 1 addition & 1 deletion client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
}

host := &schedulerv1.PeerHost{
Id: idgen.HostIDV2(opt.Host.AdvertiseIP.String(), opt.Host.Hostname),
Id: idgen.HostIDV2(opt.Host.AdvertiseIP.String(), opt.Host.Hostname, opt.Scheduler.Manager.SeedPeer.Enable),
Ip: opt.Host.AdvertiseIP.String(),
RpcPort: int32(opt.Download.PeerGRPC.TCPListen.PortRange.Start),
DownPort: 0,
Expand Down
2 changes: 1 addition & 1 deletion deploy/docker-compose/template/scheduler.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ scheduler:
# then the task will also be reclaimed.
taskGCInterval: 30m
# hostGCInterval is the interval of host gc.
hostGCInterval: 6h
hostGCInterval: 5m
# hostTTL is time to live of host. If host announces message to scheduler,
# then HostTTl will be reset.
hostTTL: 1h
Expand Down
7 changes: 6 additions & 1 deletion manager/job/sync_peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"d7y.io/dragonfly/v2/manager/config"
"d7y.io/dragonfly/v2/manager/models"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/types"
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
)

Expand Down Expand Up @@ -196,7 +197,11 @@ func (s *syncPeers) mergePeers(ctx context.Context, scheduler models.Scheduler,

// If the peer exists in the sync peer results, update the peer data in the database with
// the sync peer results and delete the sync peer from the sync peers map.
id := idgen.HostIDV2(peer.IP, peer.Hostname)
isSeedPeer := false
if types.ParseHostType(peer.Type) != types.HostTypeNormal {
isSeedPeer = true
}
id := idgen.HostIDV2(peer.IP, peer.Hostname, isSeedPeer)
if syncPeer, ok := syncPeers[id]; ok {
if err := s.db.WithContext(ctx).First(&models.Peer{}, peer.ID).Updates(models.Peer{
Type: syncPeer.Type.Name(),
Expand Down
9 changes: 5 additions & 4 deletions pkg/idgen/host_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package idgen

import (
"fmt"

"d7y.io/dragonfly/v2/pkg/digest"
)

// HostIDV1 generates v1 version of host id.
Expand All @@ -28,6 +26,9 @@ func HostIDV1(hostname string, port int32) string {
}

// HostIDV2 generates v2 version of host id.
func HostIDV2(ip, hostname string) string {
return digest.SHA256FromStrings(ip, hostname)
func HostIDV2(ip, hostname string, isSeedPeer bool) string {
if isSeedPeer {
return fmt.Sprintf("%s-%s-seed", ip, hostname)
}
return fmt.Sprintf("%s-%s", ip, hostname)
}
48 changes: 31 additions & 17 deletions pkg/idgen/host_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,43 +67,57 @@ func TestHostIDV1(t *testing.T) {

func TestHostIDV2(t *testing.T) {
tests := []struct {
name string
ip string
hostname string
expect func(t *testing.T, d string)
name string
ip string
hostname string
isSeedPeer bool
expect func(t *testing.T, d string)
}{
{
name: "generate HostID",
ip: "127.0.0.1",
hostname: "foo",
name: "generate HostID for peer",
ip: "127.0.0.1",
hostname: "foo",
isSeedPeer: false,
expect: func(t *testing.T, d string) {
assert := assert.New(t)
assert.Equal(d, "52727e8408e0ee1f999086f241ec43d5b3dbda666f1a06ef1fcbe75b4e90fa17")
assert.Equal(d, "127.0.0.1-foo")
},
},
{
name: "generate HostID with empty ip",
ip: "",
hostname: "foo",
name: "generate HostID for seed peer",
ip: "127.0.0.1",
hostname: "foo",
isSeedPeer: true,
expect: func(t *testing.T, d string) {
assert := assert.New(t)
assert.Equal(d, "2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae")
assert.Equal(d, "127.0.0.1-foo-seed")
},
},
{
name: "generate HostID with empty host",
ip: "127.0.0.1",
hostname: "",
name: "generate HostID with empty ip for seed peer",
ip: "",
hostname: "foo",
isSeedPeer: true,
expect: func(t *testing.T, d string) {
assert := assert.New(t)
assert.Equal(d, "-foo-seed")
},
},
{
name: "generate HostID with empty host for seed peer",
ip: "127.0.0.1",
hostname: "",
isSeedPeer: true,
expect: func(t *testing.T, d string) {
assert := assert.New(t)
assert.Equal(d, "12ca17b49af2289436f303e0166030a21e525d266e209267433801a8fd4071a0")
assert.Equal(d, "127.0.0.1--seed")
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tc.expect(t, HostIDV2(tc.ip, tc.hostname))
tc.expect(t, HostIDV2(tc.ip, tc.hostname, tc.isSeedPeer))
})
}
}
2 changes: 1 addition & 1 deletion pkg/rpc/scheduler/client/client_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func GetV1(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt
}, nil
}

// GetV1ByAddr returns v2 version of the scheduler client by address.
// GetV1ByAddr returns v1 version of the scheduler client by address.
func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V1, error) {
conn, err := grpc.DialContext(
ctx,
Expand Down
16 changes: 16 additions & 0 deletions pkg/rpc/scheduler/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/protobuf/types/known/emptypb"

commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2"
Expand Down Expand Up @@ -145,6 +146,9 @@ type V2 interface {
// AnnounceHost announces host to scheduler.
AnnounceHost(context.Context, *schedulerv2.AnnounceHostRequest, ...grpc.CallOption) error

// ListHosts lists hosts in scheduler.
ListHosts(ctx context.Context, taskID string, opts ...grpc.CallOption) (*schedulerv2.ListHostsResponse, error)

// DeleteHost releases host in scheduler.
DeleteHost(context.Context, *schedulerv2.DeleteHostRequest, ...grpc.CallOption) error

Expand Down Expand Up @@ -250,6 +254,18 @@ func (v *v2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
return eg.Wait()
}

// ListHosts lists host in all schedulers.
func (v *v2) ListHosts(ctx context.Context, taskID string, opts ...grpc.CallOption) (*schedulerv2.ListHostsResponse, error) {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

return v.SchedulerClient.ListHosts(
context.WithValue(ctx, pkgbalancer.ContextKey, taskID),
new(emptypb.Empty),
opts...,
)
}

// DeleteHost releases host in all schedulers.
func (v *v2) DeleteHost(ctx context.Context, req *schedulerv2.DeleteHostRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
Expand Down
20 changes: 20 additions & 0 deletions pkg/rpc/scheduler/client/mocks/client_v2_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,20 @@ var (
}, []string{"os", "platform", "platform_family", "platform_version",
"kernel_version", "git_version", "git_commit", "go_version", "build_platform"})

ListHostsCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "list_hosts_total",
Help: "Counter of the number of the list hosts.",
})

ListHostsCountFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "list_hosts_failure_total",
Help: "Counter of the number of failed of the list hosts.",
})

LeaveHostCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Expand Down
4 changes: 4 additions & 0 deletions scheduler/resource/standard/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,7 @@ func (h *Host) LeavePeers() {
func (h *Host) FreeUploadCount() int32 {
return h.ConcurrentUploadLimit.Load() - h.ConcurrentUploadCount.Load()
}

func (h *Host) IsSeedPeer() bool {
return h.Type == types.HostTypeSuperSeed || h.Type == types.HostTypeStrongSeed || h.Type == types.HostTypeWeakSeed
}
4 changes: 2 additions & 2 deletions scheduler/resource/standard/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ var (

mockAnnounceInterval = 5 * time.Minute

mockHostID = idgen.HostIDV2("127.0.0.1", "foo")
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar")
mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false)
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true)
mockHostLocation = "baz"
mockHostIDC = "bas"
)
Expand Down
4 changes: 2 additions & 2 deletions scheduler/resource/standard/seed_peer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package standard
import (
"context"
"fmt"
reflect "reflect"
"reflect"

"github.com/hashicorp/go-multierror"
"google.golang.org/grpc"
Expand Down Expand Up @@ -156,7 +156,7 @@ func (sc *seedPeerClient) updateSeedPeersForHostManager(seedPeers []*managerv2.S
concurrentUploadLimit = int32(config.LoadLimit)
}

id := idgen.HostIDV2(seedPeer.Ip, seedPeer.Hostname)
id := idgen.HostIDV2(seedPeer.Ip, seedPeer.Hostname, true)
seedPeerHost, loaded := sc.hostManager.Load(id)
if !loaded {
options := []HostOption{WithNetwork(Network{
Expand Down
12 changes: 10 additions & 2 deletions scheduler/rpcserver/scheduler_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,18 @@ func (s *schedulerServerV2) AnnounceHost(ctx context.Context, req *schedulerv2.A
return new(emptypb.Empty), nil
}

// TODO Implement the following methods.
// ListHosts lists hosts in scheduler.
func (s *schedulerServerV2) ListHosts(ctx context.Context, _ *emptypb.Empty) (*schedulerv2.ListHostsResponse, error) {
return nil, nil
// Collect ListHostsCount metrics.
metrics.ListHostsCount.Inc()
resp, err := s.service.ListHosts(ctx)
if err != nil {
// Collect ListHostsFailureCount metrics.
metrics.ListHostsCountFailureCount.Inc()
return nil, err
}

return resp, nil
}

// DeleteHost releases host in scheduler.
Expand Down
4 changes: 2 additions & 2 deletions scheduler/scheduling/evaluator/evaluator_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ var (
mockTaskFilteredQueryParams = []string{"bar"}
mockTaskHeader = map[string]string{"content-length": "100"}
mockTaskPieceLength int32 = 2048
mockHostID = idgen.HostIDV2("127.0.0.1", "foo")
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar")
mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false)
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true)
mockHostLocation = "bas"
mockHostIDC = "baz"
mockPeerID = idgen.PeerIDV2()
Expand Down
10 changes: 5 additions & 5 deletions scheduler/scheduling/scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ var (
mockTaskFilteredQueryParams = []string{"bar"}
mockTaskHeader = map[string]string{"content-length": "100"}
mockTaskPieceLength int32 = 2048
mockHostID = idgen.HostIDV2("127.0.0.1", "foo")
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar")
mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false)
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true)
mockHostLocation = "baz"
mockHostIDC = "bas"
mockPeerID = idgen.PeerIDV2()
Expand Down Expand Up @@ -1040,7 +1040,7 @@ func TestScheduling_FindCandidateParents(t *testing.T) {
var mockPeers []*resource.Peer
for i := 0; i < 11; i++ {
mockHost := resource.NewHost(
idgen.HostIDV2("127.0.0.1", uuid.New().String()), mockRawHost.IP, mockRawHost.Hostname,
idgen.HostIDV2("127.0.0.1", uuid.New().String(), false), mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
peer := resource.NewPeer(idgen.PeerIDV1(fmt.Sprintf("127.0.0.%d", i)), mockTask, mockHost)
mockPeers = append(mockPeers, peer)
Expand Down Expand Up @@ -1357,7 +1357,7 @@ func TestScheduling_FindParentAndCandidateParents(t *testing.T) {
var mockPeers []*resource.Peer
for i := 0; i < 11; i++ {
mockHost := resource.NewHost(
idgen.HostIDV2("127.0.0.1", uuid.New().String()), mockRawHost.IP, mockRawHost.Hostname,
idgen.HostIDV2("127.0.0.1", uuid.New().String(), false), mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
peer := resource.NewPeer(idgen.PeerIDV1(fmt.Sprintf("127.0.0.%d", i)), mockTask, mockHost)
mockPeers = append(mockPeers, peer)
Expand Down Expand Up @@ -1618,7 +1618,7 @@ func TestScheduling_FindSuccessParent(t *testing.T) {
var mockPeers []*resource.Peer
for i := 0; i < 11; i++ {
mockHost := resource.NewHost(
idgen.HostIDV2("127.0.0.1", uuid.New().String()), mockRawHost.IP, mockRawHost.Hostname,
idgen.HostIDV2("127.0.0.1", uuid.New().String(), false), mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
peer := resource.NewPeer(idgen.PeerIDV1(fmt.Sprintf("127.0.0.%d", i)), mockTask, mockHost)
mockPeers = append(mockPeers, peer)
Expand Down
6 changes: 3 additions & 3 deletions scheduler/service/service_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ var (
mockTaskFilteredQueryParams = []string{"bar"}
mockTaskHeader = map[string]string{"Content-Length": "100", "Range": "bytes=0-99"}
mockTaskPieceLength int32 = 2048
mockHostID = idgen.HostIDV2("127.0.0.1", "foo")
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar")
mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false)
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true)
mockHostLocation = "bas"
mockHostIDC = "baz"
mockPeerID = idgen.PeerIDV2()
Expand Down Expand Up @@ -2559,7 +2559,7 @@ func TestServiceV1_LeaveHost(t *testing.T) {

tc.mock(host, mockPeer, hostManager, scheduling.EXPECT(), res.EXPECT(), hostManager.EXPECT())
tc.expect(t, mockPeer, svc.LeaveHost(context.Background(), &schedulerv1.LeaveHostRequest{
Id: idgen.HostIDV2(host.IP, host.Hostname),
Id: idgen.HostIDV2(host.IP, host.Hostname, true),
}))
})
}
Expand Down
Loading

0 comments on commit 071072f

Please sign in to comment.