Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add persistent cache task for scheduler #3545

Merged
merged 2 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions internal/dflog/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ func WithTask(taskID, url string) *SugaredLoggerOnWith {
}
}

func WithPersistentCacheTask(taskID string) *SugaredLoggerOnWith {
return &SugaredLoggerOnWith{
withArgs: []any{"taskID", taskID},
}
}

func WithHost(hostID, hostname, ip string) *SugaredLoggerOnWith {
return &SugaredLoggerOnWith{
withArgs: []any{"hostID", hostID, "hostname", hostname, "ip", ip},
Expand Down
41 changes: 41 additions & 0 deletions pkg/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ const (
// SchedulersNamespace prefix of schedulers namespace cache key.
SchedulersNamespace = "schedulers"

// SchedulerClustersNamespace prefix of scheduler clusters namespace cache key.
SchedulerClustersNamespace = "scheduler-clusters"

// TasksNamespace prefix of tasks namespace cache key.
PersistentCacheTasksNamespace = "persistent-cache-tasks"

// PersistentCachePeersNamespace prefix of persistent cache peers namespace cache key.
PersistentCachePeersNamespace = "persistent-cache-peers"

// PersistentCacheHostsNamespace prefix of persistent cache hosts namespace cache key.
PersistentCacheHostsNamespace = "persistent-cache-hosts"

// ApplicationsNamespace prefix of applications namespace cache key.
ApplicationsNamespace = "applications"

Expand Down Expand Up @@ -137,6 +149,35 @@ func MakeKeyInScheduler(namespace, id string) string {
return fmt.Sprintf("%s:%s", MakeNamespaceKeyInScheduler(namespace), id)
}

// MakeSchedulerClusterKeyInManager make scheduler cluster key in manager.
func MakePersistentCacheTaskKeyInScheduler(schedulerClusterID uint, taskID string) string {
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s", schedulerClusterID, PersistentCacheTasksNamespace, taskID))
}

// MakePersistentCacheTasksInScheduler make persistent cache tasks in scheduler.
func MakePersistentCacheTasksInScheduler(schedulerClusterID uint) string {
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s", schedulerClusterID, PersistentCacheTasksNamespace))
}

// MakePersistentCachePeerKeyInScheduler make persistent cache peer key in scheduler.
func MakePersistentCachePeerKeyInScheduler(schedulerClusterID uint, peerID string) string {
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s", schedulerClusterID, PersistentCachePeersNamespace, peerID))
}

func MakePersistentCachePeersInScheduler(schedulerClusterID uint) string {
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s", schedulerClusterID, PersistentCachePeersNamespace))
}

// MakePersistentCacheHostKeyInScheduler make persistent cache host key in scheduler.
func MakePersistentCacheHostKeyInScheduler(schedulerClusterID uint, hostID string) string {
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s", schedulerClusterID, PersistentCacheHostsNamespace, hostID))
}

// MakePersistentCacheHostsInScheduler make persistent cache hosts in scheduler.
func MakePersistentCacheHostsInScheduler(schedulerClusterID uint) string {
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s", schedulerClusterID, PersistentCacheHostsNamespace))
}

// MakeNetworkTopologyKeyInScheduler make network topology key in scheduler.
func MakeNetworkTopologyKeyInScheduler(srcHostID, destHostID string) string {
return MakeKeyInScheduler(NetworkTopologyNamespace, fmt.Sprintf("%s:%s", srcHostID, destHostID))
Expand Down
144 changes: 144 additions & 0 deletions scheduler/resource/persistentcache/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright 2024 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package persistentcache

import (
"context"
"time"

"github.com/looplab/fsm"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/digest"
)

const (
// Task has been created but did not start uploading.
TaskStatePending = "Pending"

// Task is uploading resources for p2p cluster.
TaskStateUploading = "Uploading"

// Task has been uploaded successfully.
TaskStateSucceeded = "Succeeded"

// Task has been uploaded failed.
TaskStateFailed = "Failed"
)

const (
// Task is uploading.
TaskEventUpload = "Upload"

// Task uploaded successfully.
TaskEventUploadSucceeded = "UploadSucceeded"

// Task uploaded failed.
TaskEventUploadFailed = "UploadFailed"
)

// Task contains content for persistent cache task.
type Task struct {
// ID is task id.
ID string

// Replica count of the persistent cache task. The persistent cache task will
// not be deleted when dfdamon runs garbage collection. It only be deleted
// when the task is deleted by the user.
PersistentReplicaCount uint64

// Replica count of the cache task. If cache task is not persistent,
// the persistent cache task will be deleted when dfdaemon runs garbage collection.
ReplicaCount uint64

// Digest of the persistent cache task content, for example md5:xxx or sha256:yyy.
Digest *digest.Digest

// Tag is used to distinguish different persistent cache tasks.
Tag string

// Application of persistent cache task.
Application string

// Persistet cache task piece length.
PieceLength int32

// ContentLength is persistent cache task total content length.
ContentLength int64

// TotalPieceCount is total piece count.
TotalPieceCount int32

// Persistent cache task state machine.
FSM *fsm.FSM

// TTL is persistent cache task time to live.
TTL time.Duration

// CreatedAt is persistent cache task create time.
CreatedAt time.Time

// UpdatedAt is persistent cache task update time.
UpdatedAt time.Time

// Persistent cache task log.
Log *logger.SugaredLoggerOnWith
}

// New persistent cache task instance.
func NewTask(id, tag, application, state string, persistentReplicaCount uint64, replicaCount uint64, pieceLength int32,
contentLength int64, totalPieceCount int32, digest *digest.Digest, ttl time.Duration, createdAt, updatedAt time.Time,
log *logger.SugaredLoggerOnWith) *Task {
t := &Task{
ID: id,
PersistentReplicaCount: persistentReplicaCount,
ReplicaCount: replicaCount,
Digest: digest,
Tag: tag,
Application: application,
ContentLength: contentLength,
TotalPieceCount: totalPieceCount,
TTL: time.Hour * 24,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
Log: logger.WithPersistentCacheTask(id),
}

// Initialize state machine.
t.FSM = fsm.NewFSM(
TaskStatePending,
fsm.Events{
{Name: TaskEventUpload, Src: []string{TaskStatePending, TaskStateFailed}, Dst: TaskStateUploading},
{Name: TaskEventUploadSucceeded, Src: []string{TaskStateUploading}, Dst: TaskStateSucceeded},
{Name: TaskEventUploadFailed, Src: []string{TaskStateUploading}, Dst: TaskStateFailed},
},
fsm.Callbacks{
TaskEventUpload: func(ctx context.Context, e *fsm.Event) {
t.Log.Infof("task state is %s", e.FSM.Current())
},
TaskEventUploadSucceeded: func(ctx context.Context, e *fsm.Event) {
t.Log.Infof("task state is %s", e.FSM.Current())
},
TaskEventUploadFailed: func(ctx context.Context, e *fsm.Event) {
t.Log.Infof("task state is %s", e.FSM.Current())
},
},
)
t.FSM.SetState(state)

return t
}
Loading
Loading