This repository has been archived by the owner on Jul 21, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathrecommendations.go
118 lines (108 loc) · 2.51 KB
/
recommendations.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// Copyright (C) 2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package viewer
import (
"context"
"encoding/base64"
"time"
"github.com/ava-labs/indexvm/examples/nftdisco/thegraph"
"github.com/ava-labs/indexvm/examples/shared/consts"
"github.com/ava-labs/indexvm/examples/shared/gorse"
"github.com/ava-labs/indexvm/examples/shared/ipfs"
"github.com/ava-labs/indexvm/examples/shared/servicer"
"golang.org/x/sync/errgroup"
)
type rec struct {
reply *servicer.GetRecommendationReply
nft *thegraph.NFT
img []byte
}
func (v *Viewer) provideRecommendations(ctx context.Context) error {
g, gctx := errgroup.WithContext(ctx)
// Feed processors
w := make(chan *servicer.GetRecommendationReply)
g.Go(func() error {
// Start with serving unrated items
unrated, err := v.cli.GetUnrated()
if err != nil {
return err
}
for _, un := range unrated.Unrated {
select {
case w <- un:
case <-gctx.Done():
return nil
}
}
// Fetch new recommendations
for gctx.Err() == nil {
rec, err := v.cli.GetRecommendation(consts.NFTDataSchemaID)
if err != nil {
return err
}
v.seenL.Lock()
if _, ok := v.seenM[rec.ID]; ok {
v.seenL.Unlock()
time.Sleep(2 * time.Second)
continue
}
v.seenM[rec.ID] = struct{}{}
v.seenL.Unlock()
select {
case w <- rec:
case <-gctx.Done():
return nil
}
}
return nil
})
// Start processors
for i := 0; i < v.pending; i++ {
g.Go(func() error {
for {
select {
case <-gctx.Done():
return nil
case work := <-w:
// Decode raw data
parsedContent, err := base64.StdEncoding.DecodeString(work.Content)
if err != nil {
_ = v.submitRating(gctx, work, gorse.Junk)
continue
}
// Parse as NFT
nft, err := thegraph.Unpack(parsedContent)
if err != nil {
_ = v.submitRating(gctx, work, gorse.Junk)
continue
}
// Attempt to skip if filtering
act := v.processRec(nft)
if len(act) > 0 {
_ = v.submitRating(gctx, work, act)
continue
}
// Load image
tctx, cancel := context.WithTimeout(gctx, 1*time.Minute)
defer cancel()
content, err := ipfs.FetchContent(tctx, v.ipfsAPI, nft.Image)
if err != nil {
_ = v.submitRating(gctx, work, gorse.Inaccessible)
continue
}
// Send to CLI
select {
case v.recs <- &rec{
reply: work,
nft: nft,
img: content,
}:
case <-gctx.Done():
return nil
}
}
}
})
}
return g.Wait()
}