Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

harmony storage #11647

Merged
merged 8 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
harmony storage
  • Loading branch information
snadrus committed Feb 22, 2024
commit 4ee99aebbe6edc1443e290d3b591cb8cbe7d36e3
10 changes: 10 additions & 0 deletions cmd/lotus-provider/deps/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/filecoin-project/lotus/journal/alerting"
"github.com/filecoin-project/lotus/journal/fsjournal"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/resources/storagemgr"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
Expand Down Expand Up @@ -102,6 +103,7 @@ type Deps struct {
Si *paths.DBIndex
LocalStore *paths.Local
ListenAddr string
StorageMgr *storagemgr.StorageMgr
}

const (
Expand Down Expand Up @@ -247,6 +249,14 @@ Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`,
}
}
}

if deps.StorageMgr == nil {
lo, err := deps.LocalStore.Local(ctx)
if err != nil {
return xerrors.Errorf("could not get local storage: %w", err)
}
deps.StorageMgr = storagemgr.New(lo)
}
fmt.Println("last line of populate")
return nil
}
Expand Down
10 changes: 10 additions & 0 deletions lib/harmony/harmonytask/harmonytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,13 @@ func (e *TaskEngine) ResourcesAvailable() resources.Resources {
}
return tmp
}

// This helps <task>.Do() determine where its storage allocation is.
func (e *TaskEngine) GetWorkingLocation(taskID TaskID) string {
for _, t := range e.handlers {
if v, ok := t.LocationMap.Load(taskID); ok {
return v.(string)
}
}
return ""
}
25 changes: 23 additions & 2 deletions lib/harmony/harmonytask/task_type_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"

Expand All @@ -19,8 +20,9 @@ var log = logging.Logger("harmonytask")
type taskTypeHandler struct {
TaskInterface
TaskTypeDetails
TaskEngine *TaskEngine
Count atomic.Int32
TaskEngine *TaskEngine
Count atomic.Int32
LocationMap sync.Map
}

func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error)) {
Expand Down Expand Up @@ -120,6 +122,15 @@ top:
}
}

var location string
if c := h.TaskTypeDetails.Cost.Storage.Claim; c != nil {
if location, err = c(int(*tID)); err != nil {
magik6k marked this conversation as resolved.
Show resolved Hide resolved
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "storage claim failed", "name", h.Name, "error", err)
return false
magik6k marked this conversation as resolved.
Show resolved Hide resolved
}
h.LocationMap.Store(*tID, location)
}

h.Count.Add(1)
go func() {
log.Infow("Beginning work on Task", "id", *tID, "from", from, "name", h.Name)
Expand All @@ -138,6 +149,12 @@ top:
}
h.Count.Add(-1)

if r := h.TaskTypeDetails.Cost.Storage.MarkComplete; r != nil {
h.LocationMap.Delete(*tID)
if err := r(location); err != nil {
log.Errorw("Could not release storage", "error", err)
}
}
h.recordCompletion(*tID, workStart, done, doErr)
if done {
for _, fs := range h.TaskEngine.follows[h.Name] { // Do we know of any follows for this task type?
Expand Down Expand Up @@ -250,5 +267,9 @@ func (h *taskTypeHandler) AssertMachineHasCapacity() error {
if r.Gpu-h.Cost.Gpu < 0 {
return errors.New("Did not accept " + h.Name + " task: out of available GPU")
}

if has := h.TaskTypeDetails.Cost.Storage.HasCapacity; has != nil && !has() {
return errors.New("Did not accept " + h.Name + " task: out of available Storage")
}
return nil
}
14 changes: 14 additions & 0 deletions lib/harmony/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ type Resources struct {
Gpu float64
Ram uint64
MachineID int
Storage
}

// Optional Storage management.
// See storagemgr/storagemgt.go for more details.
magik6k marked this conversation as resolved.
Show resolved Hide resolved
type Storage struct {
magik6k marked this conversation as resolved.
Show resolved Hide resolved
HasCapacity func() bool

// This allows some other system to claim space for this task.
Claim func(taskID int) (location string, err error)

// This allows some other system to consider the task done.
// It's up to the caller to remove the data, if that applies.
MarkComplete func(location string) error
}
type Reg struct {
Resources
Expand Down
246 changes: 246 additions & 0 deletions lib/harmony/resources/storagemgr/storagemgt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
// Storagemgr is a utility for harmonytask tasks
// to manage the available space.
// Ex:
//
// TaskTypeDefinition{
// Storage: storMgr.MakeFuncs(UsageTemporary, 1<<30, func(taskID int) string {
// return fmt.Sprintf("task-%d", taskID) // or use SQL to get the sector ID
// }),
// }
//
// Also useful: s.CombineFuncs(s1, s2, s3)
package storagemgr

import (
"bytes"
"encoding/json"
"os"
"path/filepath"
"strings"
"sync"

logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

var log = logging.Logger("harmonytask")

type path string

var claimsMx sync.Mutex
var claims = map[path][]consumption{}

var purposes = map[Usage][]path{}

type Usage int

const (
UsagePermanent = iota
UsageTemporary
UsageCache
UsageStaging
)

type StorageMgr struct { // Functions are attached here to force users to call New()
}

func New(paths []storiface.StoragePath) *StorageMgr {
// Note: this may be called multiple times per process.
// populate purposes (global)
for _, p := range paths {
// @reviewer help this logic make sense
var t Usage = UsagePermanent
if p.CanSeal {
t = UsageStaging
}
if strings.Contains(p.LocalPath, "cache") || strings.Contains(p.LocalPath, "tmp") {
t = UsageCache
}
purposes[t] = append(purposes[t], path(p.LocalPath))
}

return &StorageMgr{}
}

// For lotus-provider (but not other deps users).
// At start-up, remove (rm -rf) all the contents of all the claims.json files.
func (s *StorageMgr) Cleanup() {
for _, paths := range purposes {
for _, path := range paths {
for _, u := range getJSON(path) {
_ = os.RemoveAll(u.Location)
}
os.Remove(string(path) + "/claims.json")
}
}
}

func (s *StorageMgr) MakeFuncs(purpose Usage, need uint64, namer func(taskID int) string) resources.Storage {
return resources.Storage{
HasCapacity: func() bool { _, ok := s.hasCapacity(purpose, need); return ok },
Claim: func(id int) (string, error) {
name := namer(id)
return s.claim(purpose, need, name)
},
MarkComplete: s.markComplete,
}
}

type consumption struct {
Location string
Need uint64
}

func getJSON(path path) []consumption {
b, err := os.ReadFile(string(path) + "/claims.json")
magik6k marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Errorf("error reading claims: %s", err)
} else {
var consumages []consumption
err := json.NewDecoder(bytes.NewReader(b)).Decode(&consumages)
if err != nil {
log.Errorf("error decoding claims: %s", err)
} else {
return consumages
}
}
return nil
}

func (s *StorageMgr) hasCapacity(purpose Usage, need uint64) (path, bool) {
for _, path := range purposes[purpose] {
free, err := resources.DiskFree(string(path))
if err != nil {
log.Errorf("error checking free space: %s", err)
continue
}
if free < need {
continue
}

claimsMx.Lock()
defer claimsMx.Unlock()
consumages := getJSON(path)
for _, c := range consumages {
free -= c.Need // presume the whole need is used

filepath.Walk(c.Location, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
free += uint64(info.Size()) // give more space available if it's already used
return nil
})
}
if free > need {
return path, true
}
}
return "", false
}
func (s *StorageMgr) claim(purpose Usage, need uint64, name string) (location string, err error) {
path, ok := s.hasCapacity(purpose, need)
if !ok {
return "", xerrors.Errorf("no space for purpose %d", purpose)
}

consumption := consumption{Location: filepath.Join(string(path), name), Need: need}
claimsMx.Lock()
defer claimsMx.Unlock()

j := getJSON(path)
j = append(j, consumption)
b, err := json.Marshal(j)
if err != nil {
return "", xerrors.Errorf("error encoding claims: %w", err)
}
err = os.WriteFile(string(path)+"/claims.json", b, 0644)
magik6k marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return "", xerrors.Errorf("error writing claims: %w", err)
}

claims[path] = append(claims[path], consumption)
return location, nil
}

func (s *StorageMgr) markComplete(location string) error {
claimsMx.Lock()
defer claimsMx.Unlock()

foundPath := ""
// Clean up RAM
for path, consumages := range claims {
if strings.HasPrefix(location, string(path)) {
for i, c := range consumages {
if c.Location == location {
claims[path] = append(consumages[:i], consumages[i+1:]...)
foundPath = string(path)
break
}
}
}
}
// erase the claim on disk
if foundPath != "" {
j := getJSON(path(foundPath))
for i, c := range j {
if c.Location == location {
j = append(j[:i], j[i+1:]...)
break
}
}
b, err := json.Marshal(j)
if err != nil {
return xerrors.Errorf("error encoding claims: %w", err)
}
err = os.WriteFile(foundPath+"/claims.json", b, 0644)
if err != nil {
return xerrors.Errorf("error writing claims: %w", err)
}
}

return nil
}

func (s *StorageMgr) CombineFuncs(ss ...resources.Storage) resources.Storage {
return resources.Storage{
HasCapacity: func() bool {
for _, s := range ss {
if !s.HasCapacity() {
return false
}
}
return true
},
Claim: func(id int) (string, error) {
locations := make([]string, 0, len(ss))
for _, s := range ss {
location, err := s.Claim(id)
if err != nil {
for _, loc := range locations {
_ = s.MarkComplete(loc)
}
return "", err
}
locations = append(locations, location)
}

return strings.Join(locations, ","), xerrors.Errorf("no space")
},
MarkComplete: func(locations string) error {
var errKept error
for _, location := range strings.Split(locations, ",") {
if err := s.markComplete(location); err != nil {
errKept = err
}
}
return errKept
},
}
}