Skip to content

Commit

Permalink
feat: update new task type(TaskType_STANDARD, TaskType_PERSIST, TaskT…
Browse files Browse the repository at this point in the history
…ype_PERSIST_CACHE)

Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Sep 26, 2024
1 parent 1afe79e commit 846fa3e
Show file tree
Hide file tree
Showing 18 changed files with 182 additions and 182 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21

require (
d7y.io/api/v2 v2.0.159
d7y.io/api/v2 v2.0.160
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
d7y.io/api/v2 v2.0.159 h1:xSLq0GjqV0F8TgfZ13EDJa+eqaWcqhrEepybAoT9OnI=
d7y.io/api/v2 v2.0.159/go.mod h1:VOnTWgLrGtivgyyofZCfiSDTAKDJ9ohVqM6l3S8EPCE=
d7y.io/api/v2 v2.0.160 h1:YMfvacpzBn24WuiymkORtBR6lfIvOPhXJRcmkPEOXco=
d7y.io/api/v2 v2.0.160/go.mod h1:VOnTWgLrGtivgyyofZCfiSDTAKDJ9ohVqM6l3S8EPCE=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
18 changes: 9 additions & 9 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,25 +140,25 @@ func ParseHostType(name string) HostType {
func TaskTypeV1ToV2(typ commonv1.TaskType) commonv2.TaskType {
switch typ {
case commonv1.TaskType_Normal:
return commonv2.TaskType_DFDAEMON
case commonv1.TaskType_DfCache:
return commonv2.TaskType_DFCACHE
return commonv2.TaskType_STANDARD
case commonv1.TaskType_DfStore:
return commonv2.TaskType_DFSTORE
return commonv2.TaskType_PERSIST
case commonv1.TaskType_DfCache:
return commonv2.TaskType_PERSIST_CACHE
}

return commonv2.TaskType_DFDAEMON
return commonv2.TaskType_STANDARD
}

// TaskTypeV2ToV1 converts task type from v2 to v1.
func TaskTypeV2ToV1(typ commonv2.TaskType) commonv1.TaskType {
switch typ {
case commonv2.TaskType_DFDAEMON:
case commonv2.TaskType_STANDARD:
return commonv1.TaskType_Normal
case commonv2.TaskType_DFCACHE:
return commonv1.TaskType_DfCache
case commonv2.TaskType_DFSTORE:
case commonv2.TaskType_PERSIST:
return commonv1.TaskType_DfStore
case commonv2.TaskType_PERSIST_CACHE:
return commonv1.TaskType_DfCache
}

return commonv1.TaskType_Normal
Expand Down
4 changes: 2 additions & 2 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj
&dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{
Url: req.URL,
Digest: &req.Digest,
Type: commonv2.TaskType_DFDAEMON,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Priority: commonv2.Priority(req.Priority),
Expand Down Expand Up @@ -424,7 +424,7 @@ func (j *job) preheatV2(ctx context.Context, taskID string, req *internaljob.Pre
Download: &commonv2.Download{
Url: req.URL,
Digest: &req.Digest,
Type: commonv2.TaskType_DFDAEMON,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Priority: commonv2.Priority(req.Priority),
Expand Down
2 changes: 1 addition & 1 deletion scheduler/resource/host_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func TestHostManager_RunGC(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit)
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
hostManager, err := newHostManager(mockHostGCConfig, gc)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions scheduler/resource/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ func TestHost_LoadPeer(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, host)

host.StorePeer(mockPeer)
Expand Down Expand Up @@ -661,7 +661,7 @@ func TestHost_StorePeer(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(tc.peerID, mockResourceConfig, mockTask, host)

host.StorePeer(mockPeer)
Expand Down Expand Up @@ -707,7 +707,7 @@ func TestHost_DeletePeer(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, host)

host.StorePeer(mockPeer)
Expand Down Expand Up @@ -759,7 +759,7 @@ func TestHost_LeavePeers(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, host)

tc.expect(t, host, mockPeer)
Expand Down Expand Up @@ -811,7 +811,7 @@ func TestHost_FreeUploadCount(t *testing.T) {
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, host)

tc.expect(t, host, mockTask, mockPeer)
Expand Down
10 changes: 5 additions & 5 deletions scheduler/resource/peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestPeerManager_Load(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
if err != nil {
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestPeerManager_Store(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
if err != nil {
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestPeerManager_LoadOrStore(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
if err != nil {
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestPeerManager_Delete(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
if err != nil {
Expand Down Expand Up @@ -578,7 +578,7 @@ func TestPeerManager_RunGC(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
peerManager, err := newPeerManager(tc.gcConfig, gc)
if err != nil {
Expand Down
Loading

0 comments on commit 846fa3e

Please sign in to comment.