diff --git a/.github/workflows/e2e-v2-nydus.yml b/.github/workflows/e2e-v2-nydus.yml index 3bef9bb804d..79c18df6c72 100644 --- a/.github/workflows/e2e-v2-nydus.yml +++ b/.github/workflows/e2e-v2-nydus.yml @@ -124,12 +124,12 @@ jobs: - name: Run E2E test run: | - docker exec kind-control-plane curl -L -o nerdctl-${NERDCTL_VER}-linux-amd64.tar.gz https://github.com/containerd/nerdctl/releases/download/v${NERDCTL_VER}/nerdctl-${NERDCTL_VER}-linux-amd64.tar.gz - docker exec kind-control-plane tar xzf nerdctl-${NERDCTL_VER}-linux-amd64.tar.gz - docker exec kind-control-plane install -D -m 755 nerdctl /usr/local/bin/nerdctl + docker exec kind-worker curl -L -o nerdctl-${NERDCTL_VER}-linux-amd64.tar.gz https://github.com/containerd/nerdctl/releases/download/v${NERDCTL_VER}/nerdctl-${NERDCTL_VER}-linux-amd64.tar.gz + docker exec kind-worker tar xzf nerdctl-${NERDCTL_VER}-linux-amd64.tar.gz + docker exec kind-worker install -D -m 755 nerdctl /usr/local/bin/nerdctl # this is only a simple test that run `date` in container - docker exec kind-control-plane /usr/local/bin/nerdctl run --snapshotter nydus --network=none ghcr.io/dragonflyoss/image-service/nginx:nydus-latest date + docker exec kind-worker /usr/local/bin/nerdctl run --snapshotter nydus --network=none ghcr.io/dragonflyoss/image-service/nginx:nydus-latest date kubectl apply -f test/testdata/k8s/nydus.yaml kubectl wait po nydus-pod --for=condition=ready --timeout=1m diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index 704652fe31a..2841c985953 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -128,7 +128,7 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) { } host := &schedulerv1.PeerHost{ - Id: idgen.HostIDV2(opt.Host.AdvertiseIP.String(), opt.Host.Hostname), + Id: idgen.HostIDV2(opt.Host.AdvertiseIP.String(), opt.Host.Hostname, opt.Scheduler.Manager.SeedPeer.Enable), Ip: opt.Host.AdvertiseIP.String(), RpcPort: int32(opt.Download.PeerGRPC.TCPListen.PortRange.Start), DownPort: 0, diff --git a/deploy/docker-compose/template/scheduler.template.yaml b/deploy/docker-compose/template/scheduler.template.yaml index 178dd1ff71f..625f734cba6 100644 --- a/deploy/docker-compose/template/scheduler.template.yaml +++ b/deploy/docker-compose/template/scheduler.template.yaml @@ -71,7 +71,7 @@ scheduler: # then the task will also be reclaimed. taskGCInterval: 30m # hostGCInterval is the interval of host gc. - hostGCInterval: 6h + hostGCInterval: 5m # hostTTL is time to live of host. If host announces message to scheduler, # then HostTTl will be reset. hostTTL: 1h diff --git a/manager/job/sync_peers.go b/manager/job/sync_peers.go index 5f4fb494a7e..e99f56404dc 100644 --- a/manager/job/sync_peers.go +++ b/manager/job/sync_peers.go @@ -33,6 +33,7 @@ import ( "d7y.io/dragonfly/v2/manager/config" "d7y.io/dragonfly/v2/manager/models" "d7y.io/dragonfly/v2/pkg/idgen" + "d7y.io/dragonfly/v2/pkg/types" resource "d7y.io/dragonfly/v2/scheduler/resource/standard" ) @@ -196,7 +197,11 @@ func (s *syncPeers) mergePeers(ctx context.Context, scheduler models.Scheduler, // If the peer exists in the sync peer results, update the peer data in the database with // the sync peer results and delete the sync peer from the sync peers map. - id := idgen.HostIDV2(peer.IP, peer.Hostname) + isSeedPeer := false + if types.ParseHostType(peer.Type) != types.HostTypeNormal { + isSeedPeer = true + } + id := idgen.HostIDV2(peer.IP, peer.Hostname, isSeedPeer) if syncPeer, ok := syncPeers[id]; ok { if err := s.db.WithContext(ctx).First(&models.Peer{}, peer.ID).Updates(models.Peer{ Type: syncPeer.Type.Name(), diff --git a/pkg/idgen/host_id.go b/pkg/idgen/host_id.go index 0c0861fe086..4a2eb389cc7 100644 --- a/pkg/idgen/host_id.go +++ b/pkg/idgen/host_id.go @@ -18,8 +18,6 @@ package idgen import ( "fmt" - - "d7y.io/dragonfly/v2/pkg/digest" ) // HostIDV1 generates v1 version of host id. @@ -28,6 +26,9 @@ func HostIDV1(hostname string, port int32) string { } // HostIDV2 generates v2 version of host id. -func HostIDV2(ip, hostname string) string { - return digest.SHA256FromStrings(ip, hostname) +func HostIDV2(ip, hostname string, isSeedPeer bool) string { + if isSeedPeer { + return fmt.Sprintf("%s-%s-seed", ip, hostname) + } + return fmt.Sprintf("%s-%s", ip, hostname) } diff --git a/pkg/idgen/host_id_test.go b/pkg/idgen/host_id_test.go index af500904923..2f7a6f10cbe 100644 --- a/pkg/idgen/host_id_test.go +++ b/pkg/idgen/host_id_test.go @@ -67,43 +67,57 @@ func TestHostIDV1(t *testing.T) { func TestHostIDV2(t *testing.T) { tests := []struct { - name string - ip string - hostname string - expect func(t *testing.T, d string) + name string + ip string + hostname string + isSeedPeer bool + expect func(t *testing.T, d string) }{ { - name: "generate HostID", - ip: "127.0.0.1", - hostname: "foo", + name: "generate HostID for peer", + ip: "127.0.0.1", + hostname: "foo", + isSeedPeer: false, expect: func(t *testing.T, d string) { assert := assert.New(t) - assert.Equal(d, "52727e8408e0ee1f999086f241ec43d5b3dbda666f1a06ef1fcbe75b4e90fa17") + assert.Equal(d, "127.0.0.1-foo") }, }, { - name: "generate HostID with empty ip", - ip: "", - hostname: "foo", + name: "generate HostID for seed peer", + ip: "127.0.0.1", + hostname: "foo", + isSeedPeer: true, expect: func(t *testing.T, d string) { assert := assert.New(t) - assert.Equal(d, "2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae") + assert.Equal(d, "127.0.0.1-foo-seed") }, }, { - name: "generate HostID with empty host", - ip: "127.0.0.1", - hostname: "", + name: "generate HostID with empty ip for seed peer", + ip: "", + hostname: "foo", + isSeedPeer: true, + expect: func(t *testing.T, d string) { + assert := assert.New(t) + assert.Equal(d, "-foo-seed") + }, + }, + { + name: "generate HostID with empty host for seed peer", + ip: "127.0.0.1", + hostname: "", + isSeedPeer: true, expect: func(t *testing.T, d string) { assert := assert.New(t) - assert.Equal(d, "12ca17b49af2289436f303e0166030a21e525d266e209267433801a8fd4071a0") + assert.Equal(d, "127.0.0.1--seed") }, }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - tc.expect(t, HostIDV2(tc.ip, tc.hostname)) + tc.expect(t, HostIDV2(tc.ip, tc.hostname, tc.isSeedPeer)) }) } } diff --git a/pkg/rpc/scheduler/client/client_v1.go b/pkg/rpc/scheduler/client/client_v1.go index 18a6fa12215..f288020efc5 100644 --- a/pkg/rpc/scheduler/client/client_v1.go +++ b/pkg/rpc/scheduler/client/client_v1.go @@ -90,7 +90,7 @@ func GetV1(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt }, nil } -// GetV1ByAddr returns v2 version of the scheduler client by address. +// GetV1ByAddr returns v1 version of the scheduler client by address. func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V1, error) { conn, err := grpc.DialContext( ctx, diff --git a/pkg/rpc/scheduler/client/client_v2.go b/pkg/rpc/scheduler/client/client_v2.go index 58b5d999433..1e3f9bf379c 100644 --- a/pkg/rpc/scheduler/client/client_v2.go +++ b/pkg/rpc/scheduler/client/client_v2.go @@ -30,6 +30,7 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/balancer" + "google.golang.org/protobuf/types/known/emptypb" commonv2 "d7y.io/api/v2/pkg/apis/common/v2" schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2" @@ -145,6 +146,9 @@ type V2 interface { // AnnounceHost announces host to scheduler. AnnounceHost(context.Context, *schedulerv2.AnnounceHostRequest, ...grpc.CallOption) error + // ListHosts lists hosts in scheduler. + ListHosts(ctx context.Context, taskID string, opts ...grpc.CallOption) (*schedulerv2.ListHostsResponse, error) + // DeleteHost releases host in scheduler. DeleteHost(context.Context, *schedulerv2.DeleteHostRequest, ...grpc.CallOption) error @@ -250,6 +254,18 @@ func (v *v2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ return eg.Wait() } +// ListHosts lists host in all schedulers. +func (v *v2) ListHosts(ctx context.Context, taskID string, opts ...grpc.CallOption) (*schedulerv2.ListHostsResponse, error) { + ctx, cancel := context.WithTimeout(ctx, contextTimeout) + defer cancel() + + return v.SchedulerClient.ListHosts( + context.WithValue(ctx, pkgbalancer.ContextKey, taskID), + new(emptypb.Empty), + opts..., + ) +} + // DeleteHost releases host in all schedulers. func (v *v2) DeleteHost(ctx context.Context, req *schedulerv2.DeleteHostRequest, opts ...grpc.CallOption) error { ctx, cancel := context.WithTimeout(ctx, contextTimeout) diff --git a/pkg/rpc/scheduler/client/mocks/client_v2_mock.go b/pkg/rpc/scheduler/client/mocks/client_v2_mock.go index dff5dcfbf55..afabcaabfd2 100644 --- a/pkg/rpc/scheduler/client/mocks/client_v2_mock.go +++ b/pkg/rpc/scheduler/client/mocks/client_v2_mock.go @@ -152,6 +152,26 @@ func (mr *MockV2MockRecorder) DeleteTask(arg0, arg1 any, arg2 ...any) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTask", reflect.TypeOf((*MockV2)(nil).DeleteTask), varargs...) } +// ListHosts mocks base method. +func (m *MockV2) ListHosts(ctx context.Context, taskID string, opts ...grpc.CallOption) (*scheduler.ListHostsResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, taskID} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ListHosts", varargs...) + ret0, _ := ret[0].(*scheduler.ListHostsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListHosts indicates an expected call of ListHosts. +func (mr *MockV2MockRecorder) ListHosts(ctx, taskID any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, taskID}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListHosts", reflect.TypeOf((*MockV2)(nil).ListHosts), varargs...) +} + // StatPeer mocks base method. func (m *MockV2) StatPeer(arg0 context.Context, arg1 *scheduler.StatPeerRequest, arg2 ...grpc.CallOption) (*common.Peer, error) { m.ctrl.T.Helper() diff --git a/scheduler/metrics/metrics.go b/scheduler/metrics/metrics.go index ee6708e32ab..00bf4bcdfd9 100644 --- a/scheduler/metrics/metrics.go +++ b/scheduler/metrics/metrics.go @@ -224,6 +224,20 @@ var ( }, []string{"os", "platform", "platform_family", "platform_version", "kernel_version", "git_version", "git_commit", "go_version", "build_platform"}) + ListHostsCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "list_hosts_total", + Help: "Counter of the number of the list hosts.", + }) + + ListHostsCountFailureCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "list_hosts_failure_total", + Help: "Counter of the number of failed of the list hosts.", + }) + LeaveHostCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, diff --git a/scheduler/resource/standard/host.go b/scheduler/resource/standard/host.go index a17746106a5..515329f50ac 100644 --- a/scheduler/resource/standard/host.go +++ b/scheduler/resource/standard/host.go @@ -451,3 +451,7 @@ func (h *Host) LeavePeers() { func (h *Host) FreeUploadCount() int32 { return h.ConcurrentUploadLimit.Load() - h.ConcurrentUploadCount.Load() } + +func (h *Host) IsSeedPeer() bool { + return h.Type == types.HostTypeSuperSeed || h.Type == types.HostTypeStrongSeed || h.Type == types.HostTypeWeakSeed +} diff --git a/scheduler/resource/standard/host_test.go b/scheduler/resource/standard/host_test.go index e9b5810ec1f..d2edc49c2c2 100644 --- a/scheduler/resource/standard/host_test.go +++ b/scheduler/resource/standard/host_test.go @@ -132,8 +132,8 @@ var ( mockAnnounceInterval = 5 * time.Minute - mockHostID = idgen.HostIDV2("127.0.0.1", "foo") - mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar") + mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false) + mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true) mockHostLocation = "baz" mockHostIDC = "bas" ) diff --git a/scheduler/resource/standard/seed_peer_client.go b/scheduler/resource/standard/seed_peer_client.go index 0d53f98fd14..ee5d662d234 100644 --- a/scheduler/resource/standard/seed_peer_client.go +++ b/scheduler/resource/standard/seed_peer_client.go @@ -21,7 +21,7 @@ package standard import ( "context" "fmt" - reflect "reflect" + "reflect" "github.com/hashicorp/go-multierror" "google.golang.org/grpc" @@ -156,7 +156,7 @@ func (sc *seedPeerClient) updateSeedPeersForHostManager(seedPeers []*managerv2.S concurrentUploadLimit = int32(config.LoadLimit) } - id := idgen.HostIDV2(seedPeer.Ip, seedPeer.Hostname) + id := idgen.HostIDV2(seedPeer.Ip, seedPeer.Hostname, true) seedPeerHost, loaded := sc.hostManager.Load(id) if !loaded { options := []HostOption{WithNetwork(Network{ diff --git a/scheduler/rpcserver/scheduler_server_v2.go b/scheduler/rpcserver/scheduler_server_v2.go index a36062b7f5f..41a24352589 100644 --- a/scheduler/rpcserver/scheduler_server_v2.go +++ b/scheduler/rpcserver/scheduler_server_v2.go @@ -136,10 +136,18 @@ func (s *schedulerServerV2) AnnounceHost(ctx context.Context, req *schedulerv2.A return new(emptypb.Empty), nil } -// TODO Implement the following methods. // ListHosts lists hosts in scheduler. func (s *schedulerServerV2) ListHosts(ctx context.Context, _ *emptypb.Empty) (*schedulerv2.ListHostsResponse, error) { - return nil, nil + // Collect ListHostsCount metrics. + metrics.ListHostsCount.Inc() + resp, err := s.service.ListHosts(ctx) + if err != nil { + // Collect ListHostsFailureCount metrics. + metrics.ListHostsCountFailureCount.Inc() + return nil, err + } + + return resp, nil } // DeleteHost releases host in scheduler. diff --git a/scheduler/scheduling/evaluator/evaluator_base_test.go b/scheduler/scheduling/evaluator/evaluator_base_test.go index 0fc7ab0a51f..7d7264d74b7 100644 --- a/scheduler/scheduling/evaluator/evaluator_base_test.go +++ b/scheduler/scheduling/evaluator/evaluator_base_test.go @@ -141,8 +141,8 @@ var ( mockTaskFilteredQueryParams = []string{"bar"} mockTaskHeader = map[string]string{"content-length": "100"} mockTaskPieceLength int32 = 2048 - mockHostID = idgen.HostIDV2("127.0.0.1", "foo") - mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar") + mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false) + mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true) mockHostLocation = "bas" mockHostIDC = "baz" mockPeerID = idgen.PeerIDV2() diff --git a/scheduler/scheduling/scheduling_test.go b/scheduler/scheduling/scheduling_test.go index 140add6f187..01668709bb0 100644 --- a/scheduler/scheduling/scheduling_test.go +++ b/scheduler/scheduling/scheduling_test.go @@ -170,8 +170,8 @@ var ( mockTaskFilteredQueryParams = []string{"bar"} mockTaskHeader = map[string]string{"content-length": "100"} mockTaskPieceLength int32 = 2048 - mockHostID = idgen.HostIDV2("127.0.0.1", "foo") - mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar") + mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false) + mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true) mockHostLocation = "baz" mockHostIDC = "bas" mockPeerID = idgen.PeerIDV2() @@ -1040,7 +1040,7 @@ func TestScheduling_FindCandidateParents(t *testing.T) { var mockPeers []*resource.Peer for i := 0; i < 11; i++ { mockHost := resource.NewHost( - idgen.HostIDV2("127.0.0.1", uuid.New().String()), mockRawHost.IP, mockRawHost.Hostname, + idgen.HostIDV2("127.0.0.1", uuid.New().String(), false), mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) peer := resource.NewPeer(idgen.PeerIDV1(fmt.Sprintf("127.0.0.%d", i)), mockTask, mockHost) mockPeers = append(mockPeers, peer) @@ -1357,7 +1357,7 @@ func TestScheduling_FindParentAndCandidateParents(t *testing.T) { var mockPeers []*resource.Peer for i := 0; i < 11; i++ { mockHost := resource.NewHost( - idgen.HostIDV2("127.0.0.1", uuid.New().String()), mockRawHost.IP, mockRawHost.Hostname, + idgen.HostIDV2("127.0.0.1", uuid.New().String(), false), mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) peer := resource.NewPeer(idgen.PeerIDV1(fmt.Sprintf("127.0.0.%d", i)), mockTask, mockHost) mockPeers = append(mockPeers, peer) @@ -1618,7 +1618,7 @@ func TestScheduling_FindSuccessParent(t *testing.T) { var mockPeers []*resource.Peer for i := 0; i < 11; i++ { mockHost := resource.NewHost( - idgen.HostIDV2("127.0.0.1", uuid.New().String()), mockRawHost.IP, mockRawHost.Hostname, + idgen.HostIDV2("127.0.0.1", uuid.New().String(), false), mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) peer := resource.NewPeer(idgen.PeerIDV1(fmt.Sprintf("127.0.0.%d", i)), mockTask, mockHost) mockPeers = append(mockPeers, peer) diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index 85a4651639a..3d3884aa63a 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -192,8 +192,8 @@ var ( mockTaskFilteredQueryParams = []string{"bar"} mockTaskHeader = map[string]string{"Content-Length": "100", "Range": "bytes=0-99"} mockTaskPieceLength int32 = 2048 - mockHostID = idgen.HostIDV2("127.0.0.1", "foo") - mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar") + mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false) + mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true) mockHostLocation = "bas" mockHostIDC = "baz" mockPeerID = idgen.PeerIDV2() @@ -2559,7 +2559,7 @@ func TestServiceV1_LeaveHost(t *testing.T) { tc.mock(host, mockPeer, hostManager, scheduling.EXPECT(), res.EXPECT(), hostManager.EXPECT()) tc.expect(t, mockPeer, svc.LeaveHost(context.Background(), &schedulerv1.LeaveHostRequest{ - Id: idgen.HostIDV2(host.IP, host.Hostname), + Id: idgen.HostIDV2(host.IP, host.Hostname, true), })) }) } diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 3f6442bc7a8..e96bdddcc02 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -699,6 +699,82 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ return nil } +// ListHosts lists hosts in scheduler. +func (v *V2) ListHosts(ctx context.Context) (*schedulerv2.ListHostsResponse, error) { + hosts := v.resource.HostManager().LoadAll() + + respHosts := make([]*commonv2.Host, len(hosts)) + for i, host := range hosts { + respHosts[i] = &commonv2.Host{ + Id: host.ID, + Type: uint32(host.Type), + Hostname: host.Hostname, + Ip: host.IP, + Port: host.Port, + DownloadPort: host.DownloadPort, + Os: host.OS, + Platform: host.Platform, + PlatformFamily: host.PlatformFamily, + PlatformVersion: host.PlatformVersion, + KernelVersion: host.KernelVersion, + Cpu: &commonv2.CPU{ + LogicalCount: host.CPU.LogicalCount, + PhysicalCount: host.CPU.PhysicalCount, + Percent: host.CPU.Percent, + ProcessPercent: host.CPU.ProcessPercent, + Times: &commonv2.CPUTimes{ + User: host.CPU.Times.User, + System: host.CPU.Times.System, + Idle: host.CPU.Times.Idle, + Nice: host.CPU.Times.Nice, + Iowait: host.CPU.Times.Iowait, + Irq: host.CPU.Times.Irq, + Softirq: host.CPU.Times.Softirq, + Steal: host.CPU.Times.Steal, + Guest: host.CPU.Times.Guest, + GuestNice: host.CPU.Times.GuestNice, + }, + }, + Memory: &commonv2.Memory{ + Total: host.Memory.Total, + Available: host.Memory.Available, + Used: host.Memory.Used, + UsedPercent: host.Memory.UsedPercent, + ProcessUsedPercent: host.Memory.ProcessUsedPercent, + Free: host.Memory.Free, + }, + Network: &commonv2.Network{ + TcpConnectionCount: host.Network.TCPConnectionCount, + UploadTcpConnectionCount: host.Network.UploadTCPConnectionCount, + Location: &host.Network.Location, + Idc: &host.Network.IDC, + }, + Disk: &commonv2.Disk{ + Total: host.Disk.Total, + Free: host.Disk.Free, + Used: host.Disk.Used, + UsedPercent: host.Disk.UsedPercent, + InodesTotal: host.Disk.InodesTotal, + InodesUsed: host.Disk.InodesUsed, + InodesFree: host.Disk.InodesFree, + InodesUsedPercent: host.Disk.InodesUsedPercent, + }, + Build: &commonv2.Build{ + GitVersion: host.Build.GitVersion, + GitCommit: &host.Build.GitCommit, + GoVersion: &host.Build.GoVersion, + Platform: &host.Build.Platform, + }, + SchedulerClusterId: host.SchedulerClusterID, + DisableShared: host.DisableShared, + } + } + + return &schedulerv2.ListHostsResponse{ + Hosts: respHosts, + }, nil +} + // DeleteHost releases host in scheduler. func (v *V2) DeleteHost(ctx context.Context, req *schedulerv2.DeleteHostRequest) error { log := logger.WithHostID(req.GetHostId()) @@ -713,6 +789,7 @@ func (v *V2) DeleteHost(ctx context.Context, req *schedulerv2.DeleteHostRequest) // Leave peers in host. host.LeavePeers() + v.resource.HostManager().Delete(req.GetHostId()) return nil } diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index 52baef10455..82d9235dc68 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -882,6 +882,110 @@ func TestServiceV2_AnnounceHost(t *testing.T) { } } +func TestServiceV2_ListHosts(t *testing.T) { + tests := []struct { + name string + mock func(host []*resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) + expect func(t *testing.T, host *resource.Host, resp []*commonv2.Host, err error) + }{ + { + name: "host loaded successfully", + mock: func(host []*resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.LoadAll().Return(host).Times(1), + ) + }, + expect: func(t *testing.T, host *resource.Host, resp []*commonv2.Host, err error) { + assert := assert.New(t) + assert.NoError(err) + assert.Equal(len(resp), 1) + assert.EqualValues(resp[0], &commonv2.Host{ + Id: mockHostID, + Type: uint32(pkgtypes.HostTypeNormal), + Hostname: "foo", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: mockRawHost.DownloadPort, + Cpu: &commonv2.CPU{ + LogicalCount: mockCPU.LogicalCount, + PhysicalCount: mockCPU.PhysicalCount, + Percent: mockCPU.Percent, + ProcessPercent: mockCPU.ProcessPercent, + Times: &commonv2.CPUTimes{ + User: mockCPU.Times.User, + System: mockCPU.Times.System, + Idle: mockCPU.Times.Idle, + Nice: mockCPU.Times.Nice, + Iowait: mockCPU.Times.Iowait, + Irq: mockCPU.Times.Irq, + Softirq: mockCPU.Times.Softirq, + Steal: mockCPU.Times.Steal, + Guest: mockCPU.Times.Guest, + GuestNice: mockCPU.Times.GuestNice, + }, + }, + Memory: &commonv2.Memory{ + Total: mockMemory.Total, + Available: mockMemory.Available, + Used: mockMemory.Used, + UsedPercent: mockMemory.UsedPercent, + ProcessUsedPercent: mockMemory.ProcessUsedPercent, + Free: mockMemory.Free, + }, + Network: &commonv2.Network{ + TcpConnectionCount: mockNetwork.TCPConnectionCount, + UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount, + Location: &mockNetwork.Location, + Idc: &mockNetwork.IDC, + DownloadRate: mockNetwork.DownloadRate, + DownloadRateLimit: mockNetwork.DownloadRateLimit, + UploadRate: mockNetwork.UploadRate, + UploadRateLimit: mockNetwork.UploadRateLimit, + }, + Disk: &commonv2.Disk{ + Total: mockDisk.Total, + Free: mockDisk.Free, + Used: mockDisk.Used, + UsedPercent: mockDisk.UsedPercent, + InodesTotal: mockDisk.InodesTotal, + InodesUsed: mockDisk.InodesUsed, + InodesFree: mockDisk.InodesFree, + InodesUsedPercent: mockDisk.InodesUsedPercent, + }, + Build: &commonv2.Build{ + GitVersion: mockBuild.GitVersion, + GitCommit: &mockBuild.GitCommit, + GoVersion: &mockBuild.GoVersion, + Platform: &mockBuild.Platform, + }, + }) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + scheduling := schedulingmocks.NewMockScheduling(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + storage := storagemocks.NewMockStorage(ctl) + hostManager := resource.NewMockHostManager(ctl) + host := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type, + resource.WithCPU(mockCPU), resource.WithMemory(mockMemory), resource.WithNetwork(mockNetwork), resource.WithDisk(mockDisk), resource.WithBuild(mockBuild)) + hosts := []*resource.Host{host} + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + + tc.mock(hosts, hostManager, res.EXPECT(), hostManager.EXPECT()) + resp, err := svc.ListHosts(context.Background()) + tc.expect(t, host, resp.Hosts, err) + }) + } +} + func TestServiceV2_DeleteHost(t *testing.T) { tests := []struct { name string @@ -907,6 +1011,8 @@ func TestServiceV2_DeleteHost(t *testing.T) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Delete(gomock.Any()).Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -922,6 +1028,8 @@ func TestServiceV2_DeleteHost(t *testing.T) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Delete(gomock.Any()).Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { diff --git a/test/e2e/v2/e2e_test.go b/test/e2e/v2/e2e_test.go index 23e08eb59cf..67f8b422846 100644 --- a/test/e2e/v2/e2e_test.go +++ b/test/e2e/v2/e2e_test.go @@ -21,6 +21,7 @@ import ( "strconv" "strings" "testing" + "time" . "github.com/onsi/ginkgo/v2" //nolint . "github.com/onsi/gomega" //nolint @@ -86,6 +87,8 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) gitCommit := strings.Fields(string(rawGitCommit))[0] fmt.Printf("git commit: %s\n", gitCommit) + // Wait for peers to start and announce. + time.Sleep(5 * time.Minute) }) // TestE2E is the root of e2e test function diff --git a/test/e2e/v2/leave_host_test.go b/test/e2e/v2/leave_host_test.go new file mode 100644 index 00000000000..4f7b3850761 --- /dev/null +++ b/test/e2e/v2/leave_host_test.go @@ -0,0 +1,125 @@ +/* + * Copyright 2024 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package e2e + +import ( + "context" + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" //nolint + . "github.com/onsi/gomega" //nolint + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" + "d7y.io/dragonfly/v2/pkg/types" + "d7y.io/dragonfly/v2/test/e2e/v2/util" +) + +var _ = Describe("Clients Leaving", func() { + Context("graceful exit", func() { + It("number of hosts should be ok", Label("host", "leave"), func() { + // Create scheduler GRPC client. + schedulerClient, err := schedulerclient.GetV2ByAddr(context.Background(), ":8002", grpc.WithTransportCredentials(insecure.NewCredentials())) + Expect(err).NotTo(HaveOccurred()) + + // Get host count. + hostCount := util.Servers[util.ClientServerName].Replicas + Expect(calculateHostCountFromScheduler(schedulerClient)).To(Equal(hostCount)) + + // Get client pod name in master node. + podName, err := util.GetClientPodNameInMaster() + Expect(err).NotTo(HaveOccurred()) + + // Taint master node. + out, err := util.KubeCtlCommand("-n", util.DragonflyNamespace, "taint", "nodes", "kind-control-plane", "master:NoSchedule").CombinedOutput() + fmt.Println(string(out)) + Expect(err).NotTo(HaveOccurred()) + + // Delete client pod in master, client will leave gracefully with cleanup. + out, err = util.KubeCtlCommand("-n", util.DragonflyNamespace, "delete", "pod", podName, "--grace-period=30").CombinedOutput() + fmt.Println(string(out)) + Expect(err).NotTo(HaveOccurred()) + + // Wait fot the client to leave gracefully. + time.Sleep(1 * time.Minute) + Expect(calculateHostCountFromScheduler(schedulerClient)).To(Equal(hostCount - 1)) + + // Remove taint in master node. + out, err = util.KubeCtlCommand("-n", util.DragonflyNamespace, "taint", "nodes", "kind-control-plane", "master:NoSchedule-").CombinedOutput() + fmt.Println(string(out)) + Expect(err).NotTo(HaveOccurred()) + + // Wait for the client to start again. + time.Sleep(1 * time.Minute) + }) + }) + + Context("force delete", func() { + It("number of hosts should be ok", Label("host", "leave"), func() { + // Create scheduler GRPC client. + schedulerClient, err := schedulerclient.GetV2ByAddr(context.Background(), ":8002", grpc.WithTransportCredentials(insecure.NewCredentials())) + Expect(err).NotTo(HaveOccurred()) + + // Get host count. + hostCount := util.Servers[util.ClientServerName].Replicas + Expect(calculateHostCountFromScheduler(schedulerClient)).To(Equal(hostCount)) + + // Get client pod name in master node. + podName, err := util.GetClientPodNameInMaster() + Expect(err).NotTo(HaveOccurred()) + + // Taint master node. + out, err := util.KubeCtlCommand("-n", util.DragonflyNamespace, "taint", "nodes", "kind-control-plane", "master:NoSchedule").CombinedOutput() + fmt.Println(string(out)) + Expect(err).NotTo(HaveOccurred()) + + // Force delete client pod in master, client will leave without cleanup. + out, err = util.KubeCtlCommand("-n", util.DragonflyNamespace, "delete", "pod", podName, "--force", "--grace-period=0").CombinedOutput() + fmt.Println(string(out)) + Expect(err).NotTo(HaveOccurred()) + + // Wait for host gc. + time.Sleep(2 * time.Minute) + Expect(calculateHostCountFromScheduler(schedulerClient)).To(Equal(hostCount - 1)) + + // Remove taint in master node. + out, err = util.KubeCtlCommand("-n", util.DragonflyNamespace, "taint", "nodes", "kind-control-plane", "master:NoSchedule-").CombinedOutput() + fmt.Println(string(out)) + Expect(err).NotTo(HaveOccurred()) + + // Wait for the client to start again. + time.Sleep(1 * time.Minute) + }) + }) +}) + +func calculateHostCountFromScheduler(schedulerClient schedulerclient.V2) (hostCount int) { + response, err := schedulerClient.ListHosts(context.Background(), "") + fmt.Println(response, err) + Expect(err).NotTo(HaveOccurred()) + + hosts := response.Hosts + for _, host := range hosts { + hostType := types.HostType(host.Type) + if hostType != types.HostTypeSuperSeed && hostType != types.HostTypeStrongSeed && hostType != types.HostTypeWeakSeed { + hostCount++ + } + } + return +} diff --git a/test/e2e/v2/util/constants.go b/test/e2e/v2/util/constants.go index 1e220300c36..30f6e6f2344 100644 --- a/test/e2e/v2/util/constants.go +++ b/test/e2e/v2/util/constants.go @@ -57,6 +57,6 @@ var Servers = map[string]server{ Name: ClientServerName, Namespace: DragonflyNamespace, LogDirName: "dfdaemon", - Replicas: 1, + Replicas: 2, }, } diff --git a/test/e2e/v2/util/exec.go b/test/e2e/v2/util/exec.go index 1d69789e163..37c67b3bbb0 100644 --- a/test/e2e/v2/util/exec.go +++ b/test/e2e/v2/util/exec.go @@ -24,7 +24,7 @@ import ( ) const ( - kindDockerContainer = "kind-control-plane" + kindDockerContainer = "kind-worker" ) func DockerCommand(arg ...string) *exec.Cmd { @@ -103,37 +103,73 @@ func KubeCtlCopyCommand(ns, pod, source, target string) *exec.Cmd { } func ClientExec() (*PodExec, error) { - out, err := KubeCtlCommand("-n", DragonflyNamespace, "get", "pod", "-l", "component=client", - "-o", fmt.Sprintf("jsonpath='{range .items[0]}{.metadata.name}{end}'")).CombinedOutput() + podName, err := GetClientPodNameInWorker() + if err != nil { + return nil, err + } + return NewPodExec(DragonflyNamespace, podName, "client"), nil +} + +func SeedClientExec(n int) (*PodExec, error) { + podName, err := GetSeedClientPodName(n) + if err != nil { + return nil, err + } + return NewPodExec(DragonflyNamespace, podName, "seed-client"), nil +} + +func ManagerExec(n int) (*PodExec, error) { + podName, err := GetManagerPodName(n) if err != nil { return nil, err } + return NewPodExec(DragonflyNamespace, podName, "manager"), nil +} + +func GetClientPodNameInMaster() (string, error) { + out, err := KubeCtlCommand("-n", DragonflyNamespace, "get", "pod", "-l", "component=client", + "-o", "jsonpath='{.items[?(@.spec.nodeName==\"kind-control-plane\")].metadata.name}'").CombinedOutput() + if err != nil { + return "", err + } podName := strings.Trim(string(out), "'") fmt.Println(podName) - return NewPodExec(DragonflyNamespace, podName, "client"), nil + return podName, nil } -func SeedClientExec(n int) (*PodExec, error) { +func GetClientPodNameInWorker() (string, error) { + out, err := KubeCtlCommand("-n", DragonflyNamespace, "get", "pod", "-l", "component=client", + "-o", "jsonpath='{.items[?(@.spec.nodeName==\"kind-worker\")].metadata.name}'").CombinedOutput() + if err != nil { + return "", err + } + + podName := strings.Trim(string(out), "'") + fmt.Println(podName) + return podName, nil +} + +func GetSeedClientPodName(n int) (string, error) { out, err := KubeCtlCommand("-n", DragonflyNamespace, "get", "pod", "-l", "component=seed-client", "-o", fmt.Sprintf("jsonpath='{range .items[%d]}{.metadata.name}{end}'", n)).CombinedOutput() if err != nil { - return nil, err + return "", err } podName := strings.Trim(string(out), "'") fmt.Println(podName) - return NewPodExec(DragonflyNamespace, podName, "seed-client"), nil + return podName, nil } -func ManagerExec(n int) (*PodExec, error) { +func GetManagerPodName(n int) (string, error) { out, err := KubeCtlCommand("-n", DragonflyNamespace, "get", "pod", "-l", "component=manager", "-o", fmt.Sprintf("jsonpath='{range .items[%d]}{.metadata.name}{end}'", n)).CombinedOutput() if err != nil { - return nil, err + return "", err } podName := strings.Trim(string(out), "'") fmt.Println(podName) - return NewPodExec(DragonflyNamespace, podName, "manager"), nil + return podName, nil } diff --git a/test/testdata/charts/config-v2.yaml b/test/testdata/charts/config-v2.yaml index 601785a94c8..4b54a62492e 100644 --- a/test/testdata/charts/config-v2.yaml +++ b/test/testdata/charts/config-v2.yaml @@ -43,6 +43,9 @@ scheduler: limits: cpu: "2" memory: "4Gi" + service: + type: NodePort + nodePort: 30802 extraVolumeMounts: - name: logs mountPath: "/var/log/" @@ -59,6 +62,9 @@ scheduler: enableHost: true config: verbose: true + scheduler: + gc: + hostGCInterval: 2m seedClient: enable: true @@ -111,6 +117,11 @@ client: limits: cpu: "2" memory: "4Gi" + # Allow client daemonSet to create a pod on master node for testing when the daemon goes offline. + tolerations: + - key: "node-role.kubernetes.io/master" + operator: "Exists" + effect: "NoSchedule" extraVolumeMounts: - name: logs mountPath: "/var/log/" diff --git a/test/testdata/kind/config-v2.yaml b/test/testdata/kind/config-v2.yaml index 644e910e230..3fce6f525eb 100644 --- a/test/testdata/kind/config-v2.yaml +++ b/test/testdata/kind/config-v2.yaml @@ -4,6 +4,15 @@ networking: ipFamily: dual nodes: - role: control-plane + image: kindest/node:v1.23.4 + extraMounts: + - hostPath: ./test/testdata/containerd/config-v2.toml + containerPath: /etc/containerd/config.toml + - hostPath: /tmp/artifact + containerPath: /tmp/artifact + - hostPath: /dev/fuse + containerPath: /dev/fuse + - role: worker image: kindest/node:v1.23.4 extraPortMappings: - containerPort: 4001 @@ -12,6 +21,9 @@ nodes: - containerPort: 4003 hostPort: 4003 protocol: TCP + - containerPort: 30802 + hostPort: 8002 + protocol: TCP extraMounts: - hostPath: ./test/testdata/containerd/config-v2.toml containerPath: /etc/containerd/config.toml