This repository has been archived by the owner on Mar 19, 2024. It is now read-only.
forked from optakt/flow-dps
-
Notifications
You must be signed in to change notification settings - Fork 4
/
invoker.go
191 lines (170 loc) · 5.12 KB
/
invoker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package invoker
import (
"context"
"fmt"
"github.com/dgraph-io/ristretto"
"github.com/rs/zerolog"
"github.com/onflow/cadence/runtime"
"github.com/onflow/flow-archive/models/archive"
"github.com/onflow/flow-archive/util"
"github.com/onflow/flow-go/engine/execution/computation"
"github.com/onflow/flow-go/engine/execution/computation/query"
"github.com/onflow/flow-go/fvm"
reusableRuntime "github.com/onflow/flow-go/fvm/runtime"
"github.com/onflow/flow-go/fvm/storage/derived"
"github.com/onflow/flow-go/fvm/storage/snapshot"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/metrics"
)
// Invoker retrieves account information from and executes Cadence scripts against
// the Flow virtual machine.
type Invoker struct {
index archive.Reader
queryExecutor *query.QueryExecutor
cache Cache
*Blocks
}
// New returns a new Invoker with the given configuration.
func New(
log zerolog.Logger,
index archive.Reader,
cfg Config,
) (*Invoker, error) {
// Initialize the Ristretto cache with the size limit. Ristretto recommends
// keeping ten times as many counters as items in the cache when full.
// Assuming an average item size of 1 kilobyte, this is what we get.
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: int64(cfg.CacheSize) / 1000 * 10,
MaxCost: int64(cfg.CacheSize),
BufferItems: 64,
})
if err != nil {
return nil, fmt.Errorf("could not initialize cache: %w", err)
}
blocks := NewBlocks(index)
// This is copied code from flow-go engine/execution/computation/manager.go
// Ideally the same code would be used, but that requires flow-go changes.
// TODO: reuse code from flow-go
var vm fvm.VM
if cfg.NewCustomVirtualMachine != nil {
vm = cfg.NewCustomVirtualMachine()
} else {
vm = fvm.NewVirtualMachine()
}
chainID := cfg.ChainID
fvmOptions := []fvm.Option{
fvm.WithReusableCadenceRuntimePool(
reusableRuntime.NewReusableCadenceRuntimePool(
computation.ReusableCadenceRuntimePoolSize,
runtime.Config{
TracingEnabled: cfg.CadenceTracing,
AccountLinkingEnabled: true,
// Attachments are enabled everywhere except for Mainnet
AttachmentsEnabled: chainID != flow.Mainnet,
// Capability Controllers are enabled everywhere except for Mainnet
CapabilityControllersEnabled: chainID != flow.Mainnet,
},
),
),
fvm.WithBlocks(blocks),
fvm.WithLogger(log),
fvm.WithChain(chainID.Chain()),
}
vmCtx := fvm.NewContext(fvmOptions...)
derivedChainData, err := derived.NewDerivedChainData(cfg.DerivedDataCacheSize)
if err != nil {
return nil, fmt.Errorf("cannot create derived data cache: %w", err)
}
queryExecutor := query.NewQueryExecutor(
cfg.QueryConfig,
log,
&metrics.NoopCollector{}, // TODO: add metrics
vm,
vmCtx,
derivedChainData,
)
return &Invoker{
Blocks: blocks,
index: index,
cache: cache,
queryExecutor: queryExecutor,
}, nil
}
// Account returns the account with the given address.
func (i *Invoker) Account(
ctx context.Context,
height uint64,
address flow.Address,
) (*flow.Account, error) {
err := util.ValidateHeightDataAvailable(i.index, height)
if err != nil {
return nil, err
}
// Look up the current block and commit for the block.
header, err := i.index.Header(height)
if err != nil {
return nil, fmt.Errorf("could not get header: %w", err)
}
return i.queryExecutor.GetAccount(
ctx,
address,
header,
i.storageSnapshot(height),
)
}
// Script executes the given Cadence script and returns its result.
func (i *Invoker) Script(
ctx context.Context,
height uint64,
script []byte,
args [][]byte,
) ([]byte, error) {
err := util.ValidateHeightDataAvailable(i.index, height)
if err != nil {
return nil, err
}
// Look up the current block and commit for the block.
header, err := i.index.Header(height)
if err != nil {
return nil, fmt.Errorf("could not get header: %w", err)
}
return i.queryExecutor.ExecuteScript(
ctx,
script,
args,
header,
i.storageSnapshot(height),
)
}
func (i *Invoker) storageSnapshot(height uint64) snapshot.StorageSnapshot {
// Initialize the storage snapshot. We use a shared cache between all
// heights here. It's a smart cache, which means that items that are
// accessed often are more likely to be kept, regardless of height. This
// allows us to put an upper bound on total cache size while using it for
// all heights.
return snapshot.NewReadFuncStorageSnapshot(
readRegister(i.index, i.cache, height))
}
func readRegister(
index archive.Reader,
cache Cache,
height uint64,
) func(flow.RegisterID) (flow.RegisterValue, error) {
return func(regID flow.RegisterID) (flow.RegisterValue, error) {
cacheKey := fmt.Sprintf("%d/%s", height, regID)
cacheValue, ok := cache.Get(cacheKey)
if ok {
return cacheValue.(flow.RegisterValue), nil
}
values, err := index.Values(height, flow.RegisterIDs{regID})
if err != nil {
return nil, fmt.Errorf("could not read register: %w", err)
}
if len(values) != 1 {
return nil, fmt.Errorf("wrong number of register values: %d", len(values))
}
value := values[0]
_ = cache.Set(cacheKey, value, int64(len(value)))
return value, nil
}
}