-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoperator.go
337 lines (309 loc) · 13.1 KB
/
operator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
package operator
import (
"context"
"fmt"
"math/big"
"os"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/zees-dev/blockless-avs/aggregator"
cstaskmanager "github.com/zees-dev/blockless-avs/contracts/bindings/IncredibleSquaringTaskManager"
"github.com/zees-dev/blockless-avs/core"
"github.com/zees-dev/blockless-avs/core/chainio"
"github.com/zees-dev/blockless-avs/metrics"
"github.com/zees-dev/blockless-avs/types"
"github.com/Layr-Labs/eigensdk-go/chainio/clients"
sdkelcontracts "github.com/Layr-Labs/eigensdk-go/chainio/clients/elcontracts"
"github.com/Layr-Labs/eigensdk-go/chainio/clients/eth"
"github.com/Layr-Labs/eigensdk-go/chainio/txmgr"
"github.com/Layr-Labs/eigensdk-go/crypto/bls"
sdkecdsa "github.com/Layr-Labs/eigensdk-go/crypto/ecdsa"
"github.com/Layr-Labs/eigensdk-go/logging"
sdklogging "github.com/Layr-Labs/eigensdk-go/logging"
sdkmetrics "github.com/Layr-Labs/eigensdk-go/metrics"
"github.com/Layr-Labs/eigensdk-go/metrics/collectors/economic"
rpccalls "github.com/Layr-Labs/eigensdk-go/metrics/collectors/rpc_calls"
"github.com/Layr-Labs/eigensdk-go/nodeapi"
"github.com/Layr-Labs/eigensdk-go/signerv2"
sdktypes "github.com/Layr-Labs/eigensdk-go/types"
)
const AVS_NAME = "incredible-squaring"
const SEM_VER = "0.0.1"
type Operator struct {
config types.NodeConfig
logger logging.Logger
ethClient eth.EthClient
// TODO(samlaf): remove both avsWriter and eigenlayerWrite from operator
// they are only used for registration, so we should make a special registration package
// this way, auditing this operator code makes it obvious that operators don't need to
// write to the chain during the course of their normal operations
// writing to the chain should be done via the cli only
metricsReg *prometheus.Registry
metrics metrics.Metrics
nodeApi *nodeapi.NodeApi
avsWriter *chainio.AvsWriter
avsReader chainio.AvsReaderer
avsSubscriber chainio.AvsSubscriberer
eigenlayerReader sdkelcontracts.ELReader
eigenlayerWriter sdkelcontracts.ELWriter
blsKeypair *bls.KeyPair
operatorId bls.OperatorId
operatorAddr common.Address
// receive new tasks in this chan (typically from listening to onchain event)
newTaskCreatedChan chan *cstaskmanager.ContractIncredibleSquaringTaskManagerNewTaskCreated
// ip address of aggregator
aggregatorServerIpPortAddr string
// rpc client to send signed task responses to aggregator
aggregatorRpcClient AggregatorRpcClienter
// needed when opting in to avs (allow this service manager contract to slash operator)
credibleSquaringServiceManagerAddr common.Address
}
// TODO(samlaf): config is a mess right now, since the chainio client constructors
//
// take the config in core (which is shared with aggregator and challenger)
func NewOperatorFromConfig(c types.NodeConfig) (*Operator, error) {
var logLevel logging.LogLevel
if c.Production {
logLevel = sdklogging.Production
} else {
logLevel = sdklogging.Development
}
logger, err := sdklogging.NewZapLogger(logLevel)
if err != nil {
return nil, err
}
reg := prometheus.NewRegistry()
eigenMetrics := sdkmetrics.NewEigenMetrics(AVS_NAME, c.EigenMetricsIpPortAddress, reg, logger)
avsAndEigenMetrics := metrics.NewAvsAndEigenMetrics(AVS_NAME, eigenMetrics, reg)
// Setup Node Api
nodeApi := nodeapi.NewNodeApi(AVS_NAME, SEM_VER, c.NodeApiIpPortAddress, logger)
var ethRpcClient, ethWsClient eth.EthClient
if c.EnableMetrics {
rpcCallsCollector := rpccalls.NewCollector(AVS_NAME, reg)
ethRpcClient, err = eth.NewInstrumentedClient(c.EthRpcUrl, rpcCallsCollector)
if err != nil {
logger.Errorf("Cannot create http ethclient", "err", err)
return nil, err
}
ethWsClient, err = eth.NewInstrumentedClient(c.EthWsUrl, rpcCallsCollector)
if err != nil {
logger.Errorf("Cannot create ws ethclient", "err", err)
return nil, err
}
} else {
ethRpcClient, err = eth.NewClient(c.EthRpcUrl)
if err != nil {
logger.Errorf("Cannot create http ethclient", "err", err)
return nil, err
}
ethWsClient, err = eth.NewClient(c.EthWsUrl)
if err != nil {
logger.Errorf("Cannot create ws ethclient", "err", err)
return nil, err
}
}
blsKeyPassword, ok := os.LookupEnv("OPERATOR_BLS_KEY_PASSWORD")
if !ok {
logger.Warnf("OPERATOR_BLS_KEY_PASSWORD env var not set. using empty string")
}
blsKeyPair, err := bls.ReadPrivateKeyFromFile(c.BlsPrivateKeyStorePath, blsKeyPassword)
if err != nil {
logger.Errorf("Cannot parse bls private key", "err", err)
return nil, err
}
// TODO(samlaf): should we add the chainId to the config instead?
// this way we can prevent creating a signer that signs on mainnet by mistake
// if the config says chainId=5, then we can only create a goerli signer
chainId, err := ethRpcClient.ChainID(context.Background())
if err != nil {
logger.Error("Cannot get chainId", "err", err)
return nil, err
}
ecdsaKeyPassword, ok := os.LookupEnv("OPERATOR_ECDSA_KEY_PASSWORD")
if !ok {
logger.Warnf("OPERATOR_ECDSA_KEY_PASSWORD env var not set. using empty string")
}
signerV2, _, err := signerv2.SignerFromConfig(signerv2.Config{
KeystorePath: c.EcdsaPrivateKeyStorePath,
Password: ecdsaKeyPassword,
}, chainId)
if err != nil {
panic(err)
}
chainioConfig := clients.BuildAllConfig{
EthHttpUrl: c.EthRpcUrl,
EthWsUrl: c.EthWsUrl,
RegistryCoordinatorAddr: c.AVSRegistryCoordinatorAddress,
OperatorStateRetrieverAddr: c.OperatorStateRetrieverAddress,
AvsName: AVS_NAME,
PromMetricsIpPortAddress: c.EigenMetricsIpPortAddress,
}
sdkClients, err := clients.BuildAll(chainioConfig, common.HexToAddress(c.OperatorAddress), signerV2, logger)
if err != nil {
panic(err)
}
txMgr := txmgr.NewSimpleTxManager(ethRpcClient, logger, signerV2, common.HexToAddress(c.OperatorAddress))
avsWriter, err := chainio.BuildAvsWriter(
txMgr, common.HexToAddress(c.AVSRegistryCoordinatorAddress),
common.HexToAddress(c.OperatorStateRetrieverAddress), ethRpcClient, logger,
)
if err != nil {
logger.Error("Cannot create AvsWriter", "err", err)
return nil, err
}
avsReader, err := chainio.BuildAvsReader(
common.HexToAddress(c.AVSRegistryCoordinatorAddress),
common.HexToAddress(c.OperatorStateRetrieverAddress),
ethRpcClient, logger)
if err != nil {
logger.Error("Cannot create AvsReader", "err", err)
return nil, err
}
avsSubscriber, err := chainio.BuildAvsSubscriber(common.HexToAddress(c.AVSRegistryCoordinatorAddress),
common.HexToAddress(c.OperatorStateRetrieverAddress), ethWsClient, logger,
)
if err != nil {
logger.Error("Cannot create AvsSubscriber", "err", err)
return nil, err
}
// We must register the economic metrics separately because they are exported metrics (from jsonrpc or subgraph calls)
// and not instrumented metrics: see https://prometheus.io/docs/instrumenting/writing_clientlibs/#overall-structure
quorumNames := map[sdktypes.QuorumNum]string{
0: "quorum0",
}
economicMetricsCollector := economic.NewCollector(
sdkClients.ElChainReader, sdkClients.AvsRegistryChainReader,
AVS_NAME, logger, common.HexToAddress(c.OperatorAddress), quorumNames)
reg.MustRegister(economicMetricsCollector)
aggregatorRpcClient, err := NewAggregatorRpcClient(c.AggregatorServerIpPortAddress, logger, avsAndEigenMetrics)
if err != nil {
logger.Error("Cannot create AggregatorRpcClient. Is aggregator running?", "err", err)
return nil, err
}
operator := &Operator{
config: c,
logger: logger,
metricsReg: reg,
metrics: avsAndEigenMetrics,
nodeApi: nodeApi,
ethClient: ethRpcClient,
avsWriter: avsWriter,
avsReader: avsReader,
avsSubscriber: avsSubscriber,
eigenlayerReader: sdkClients.ElChainReader,
eigenlayerWriter: sdkClients.ElChainWriter,
blsKeypair: blsKeyPair,
operatorAddr: common.HexToAddress(c.OperatorAddress),
aggregatorServerIpPortAddr: c.AggregatorServerIpPortAddress,
aggregatorRpcClient: aggregatorRpcClient,
newTaskCreatedChan: make(chan *cstaskmanager.ContractIncredibleSquaringTaskManagerNewTaskCreated),
credibleSquaringServiceManagerAddr: common.HexToAddress(c.AVSRegistryCoordinatorAddress),
operatorId: [32]byte{0}, // this is set below
}
if c.RegisterOperatorOnStartup {
operatorEcdsaPrivateKey, err := sdkecdsa.ReadKey(
c.EcdsaPrivateKeyStorePath,
ecdsaKeyPassword,
)
if err != nil {
return nil, err
}
operator.registerOperatorOnStartup(operatorEcdsaPrivateKey, common.HexToAddress(c.TokenStrategyAddr))
}
// OperatorId is set in contract during registration so we get it after registering operator.
operatorId, err := sdkClients.AvsRegistryChainReader.GetOperatorId(&bind.CallOpts{}, operator.operatorAddr)
if err != nil {
logger.Error("Cannot get operator id", "err", err)
return nil, err
}
operator.operatorId = operatorId
logger.Info("Operator info",
"operatorId", operatorId,
"operatorAddr", c.OperatorAddress,
"operatorG1Pubkey", operator.blsKeypair.GetPubKeyG1(),
"operatorG2Pubkey", operator.blsKeypair.GetPubKeyG2(),
)
return operator, nil
}
func (o *Operator) Start(ctx context.Context) error {
operatorIsRegistered, err := o.avsReader.IsOperatorRegistered(&bind.CallOpts{}, o.operatorAddr)
if err != nil {
o.logger.Error("Error checking if operator is registered", "err", err)
return err
}
if !operatorIsRegistered {
// We bubble the error all the way up instead of using logger.Fatal because logger.Fatal prints a huge stack trace
// that hides the actual error message. This error msg is more explicit and doesn't require showing a stack trace to the user.
return fmt.Errorf("operator is not registered. Registering operator using the operator-cli before starting operator")
}
o.logger.Infof("Starting operator.")
if o.config.EnableNodeApi {
o.nodeApi.Start()
}
var metricsErrChan <-chan error
if o.config.EnableMetrics {
metricsErrChan = o.metrics.Start(ctx, o.metricsReg)
} else {
metricsErrChan = make(chan error, 1)
}
// TODO(samlaf): wrap this call with increase in avs-node-spec metric
sub := o.avsSubscriber.SubscribeToNewTasks(o.newTaskCreatedChan)
for {
select {
case <-ctx.Done():
return nil
case err := <-metricsErrChan:
// TODO(samlaf); we should also register the service as unhealthy in the node api
// https://eigen.nethermind.io/docs/spec/api/
o.logger.Fatal("Error in metrics server", "err", err)
case err := <-sub.Err():
o.logger.Error("Error in websocket subscription", "err", err)
// TODO(samlaf): write unit tests to check if this fixed the issues we were seeing
sub.Unsubscribe()
// TODO(samlaf): wrap this call with increase in avs-node-spec metric
sub = o.avsSubscriber.SubscribeToNewTasks(o.newTaskCreatedChan)
case newTaskCreatedLog := <-o.newTaskCreatedChan:
o.metrics.IncNumTasksReceived()
taskResponse := o.ProcessNewTaskCreatedLog(newTaskCreatedLog)
signedTaskResponse, err := o.SignTaskResponse(taskResponse)
if err != nil {
continue
}
go o.aggregatorRpcClient.SendSignedTaskResponseToAggregator(signedTaskResponse)
}
}
}
// Takes a NewTaskCreatedLog struct as input and returns a TaskResponseHeader struct.
// The TaskResponseHeader struct is the struct that is signed and sent to the contract as a task response.
func (o *Operator) ProcessNewTaskCreatedLog(newTaskCreatedLog *cstaskmanager.ContractIncredibleSquaringTaskManagerNewTaskCreated) *cstaskmanager.IIncredibleSquaringTaskManagerTaskResponse {
o.logger.Debug("Received new task", "task", newTaskCreatedLog)
o.logger.Info("Received new task",
"numberToBeSquared", newTaskCreatedLog.Task.NumberToBeSquared,
"taskIndex", newTaskCreatedLog.TaskIndex,
"taskCreatedBlock", newTaskCreatedLog.Task.TaskCreatedBlock,
"quorumNumbers", newTaskCreatedLog.Task.QuorumNumbers,
"QuorumThresholdPercentage", newTaskCreatedLog.Task.QuorumThresholdPercentage,
)
numberSquared := big.NewInt(0).Exp(newTaskCreatedLog.Task.NumberToBeSquared, big.NewInt(2), nil)
taskResponse := &cstaskmanager.IIncredibleSquaringTaskManagerTaskResponse{
ReferenceTaskIndex: newTaskCreatedLog.TaskIndex,
NumberSquared: numberSquared,
}
return taskResponse
}
func (o *Operator) SignTaskResponse(taskResponse *cstaskmanager.IIncredibleSquaringTaskManagerTaskResponse) (*aggregator.SignedTaskResponse, error) {
taskResponseHash, err := core.GetTaskResponseDigest(taskResponse)
if err != nil {
o.logger.Error("Error getting task response header hash. skipping task (this is not expected and should be investigated)", "err", err)
return nil, err
}
blsSignature := o.blsKeypair.SignMessage(taskResponseHash)
signedTaskResponse := &aggregator.SignedTaskResponse{
TaskResponse: *taskResponse,
BlsSignature: *blsSignature,
OperatorId: o.operatorId,
}
o.logger.Debug("Signed task response", "signedTaskResponse", signedTaskResponse)
return signedTaskResponse, nil
}