Skip to content

Commit 4604cce

Browse files
authored
optimistic wallet identity added as annotation to submitted bacalhau jobs (#601)
1 parent 0cc3ead commit 4604cce

File tree

3 files changed

+59
-8
lines changed

3 files changed

+59
-8
lines changed

cmd/create.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,18 @@ var createCmd = &cobra.Command{
2828
dry := true
2929
upgradePlexVersion(dry)
3030

31-
cid, err := CreateIO(toolPath, inputDir, layers)
31+
cid, userID, err := CreateIO(toolPath, inputDir, layers)
3232
if err != nil {
3333
fmt.Println("Error:", err)
3434
os.Exit(1)
3535
}
36+
37+
if autoRun && userID != "" {
38+
*annotationsForAutoRun = append(*annotationsForAutoRun, fmt.Sprintf("userId=%s", userID))
39+
40+
fmt.Printf("Annotations for PlexRun: %v\n", *annotationsForAutoRun)
41+
}
42+
3643
if autoRun {
3744
_, _, err := PlexRun(cid, outputDir, verbose, showAnimation, concurrency, *annotationsForAutoRun)
3845
if err != nil {
@@ -43,10 +50,10 @@ var createCmd = &cobra.Command{
4350
},
4451
}
4552

46-
func CreateIO(toolPath, inputDir string, layers int) (string, error) {
53+
func CreateIO(toolPath, inputDir string, layers int) (string, string, error) {
4754
tempDirPath, err := ioutil.TempDir("", uuid.New().String())
4855
if err != nil {
49-
return "", err
56+
return "", "", err
5057
}
5158

5259
fmt.Println("Temporary directory created:", tempDirPath)
@@ -57,30 +64,30 @@ func CreateIO(toolPath, inputDir string, layers int) (string, error) {
5764

5865
toolConfig, toolInfo, err := ipwl.ReadToolConfig(toolPath)
5966
if err != nil {
60-
return "", err
67+
return "", "", err
6168
}
6269

6370
fmt.Println("Creating IO entries from input directory: ", inputDir)
6471
ioEntries, err = ipwl.CreateIOJson(inputDir, toolConfig, toolInfo, layers)
6572
if err != nil {
66-
return "", err
73+
return "", "", err
6774
}
6875

6976
ioJsonPath := path.Join(tempDirPath, "io.json")
7077
err = ipwl.WriteIOList(ioJsonPath, ioEntries)
7178
if err != nil {
72-
return "", err
79+
return "", "", err
7380
}
7481
fmt.Println("Initialized IO file at: ", ioJsonPath)
7582

7683
cid, err := ipfs.PinFile(ioJsonPath)
7784
if err != nil {
78-
return "", nil
85+
return "", "", nil
7986
}
8087

8188
// The Python SDK string matches here so make sure to change in both places
8289
fmt.Println("Initial IO JSON file CID: ", cid)
83-
return cid, nil
90+
return cid, ioEntries[0].UserID, nil
8491
}
8592

8693
func init() {

cmd/run.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,15 @@ func PlexRun(ioJsonCid, outputDir string, verbose, showAnimation bool, concurren
6868
}
6969
fmt.Println("Initialized IO file at: ", ioJsonPath)
7070

71+
userID, err := ipwl.ExtractUserIDFromIOJson(ioJsonPath)
72+
if err != nil {
73+
return completedIoJsonCid, ioJsonPath, err
74+
}
75+
76+
if userID != "" && !ipwl.ContainsUserIdAnnotation(annotations) {
77+
annotations = append(annotations, fmt.Sprintf("userId=%s", userID))
78+
}
79+
7180
retry := false
7281
fmt.Println("Processing IO Entries")
7382
ipwl.ProcessIOList(workDirPath, ioJsonPath, retry, verbose, showAnimation, concurrency, annotations)

internal/ipwl/io.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"io/ioutil"
77
"os"
8+
"strings"
89
)
910

1011
type FileInput struct {
@@ -139,6 +140,40 @@ func WriteIOList(ioJsonPath string, ioList []IO) error {
139140
return nil
140141
}
141142

143+
func ContainsUserIdAnnotation(slice []string) bool {
144+
for _, a := range slice {
145+
if strings.HasPrefix(a, "userId=") {
146+
return true
147+
}
148+
}
149+
return false
150+
}
151+
152+
func ExtractUserIDFromIOJson(ioJsonPath string) (string, error) {
153+
file, err := os.Open(ioJsonPath)
154+
if err != nil {
155+
return "", fmt.Errorf("failed to open file: %w", err)
156+
}
157+
defer file.Close()
158+
159+
data, err := ioutil.ReadAll(file)
160+
if err != nil {
161+
return "", fmt.Errorf("failed to read file: %w", err)
162+
}
163+
164+
var ioEntries []IO
165+
err = json.Unmarshal(data, &ioEntries)
166+
if err != nil {
167+
return "", fmt.Errorf("failed to unmarshal JSON: %w", err)
168+
}
169+
170+
if len(ioEntries) == 0 {
171+
return "", fmt.Errorf("no IO entries found")
172+
}
173+
174+
return ioEntries[0].UserID, nil
175+
}
176+
142177
func PrintIOGraphStatus(ioList []IO) {
143178
stateCount := make(map[string]int)
144179

0 commit comments

Comments
 (0)