-
Notifications
You must be signed in to change notification settings - Fork 3.4k
/
service.go
435 lines (379 loc) · 14 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
package batcher
import (
"context"
"errors"
"fmt"
"io"
"strings"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-batcher/rpc"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/rollup"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/httputil"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/oppprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
)
var ErrAlreadyStopped = errors.New("already stopped")
type BatcherConfig struct {
NetworkTimeout time.Duration
PollInterval time.Duration
MaxPendingTransactions uint64
// UseBlobs is true if the batcher should use blobs instead of calldata for posting blobs
UseBlobs bool
// UsePlasma is true if the rollup config has a DA challenge address so the batcher
// will post inputs to the Plasma DA server and post commitments to blobs or calldata.
UsePlasma bool
WaitNodeSync bool
CheckRecentTxsDepth int
}
// BatcherService represents a full batch-submitter instance and its resources,
// and conforms to the op-service CLI Lifecycle interface.
type BatcherService struct {
Log log.Logger
Metrics metrics.Metricer
L1Client *ethclient.Client
EndpointProvider dial.L2EndpointProvider
TxManager *txmgr.SimpleTxManager
PlasmaDA *plasma.DAClient
BatcherConfig
RollupConfig *rollup.Config
// Channel builder parameters
ChannelConfig ChannelConfig
driver *BatchSubmitter
Version string
pprofService *oppprof.Service
metricsSrv *httputil.HTTPServer
rpcServer *oprpc.Server
balanceMetricer io.Closer
stopped atomic.Bool
NotSubmittingOnStart bool
}
// BatcherServiceFromCLIConfig creates a new BatcherService from a CLIConfig.
// The service components are fully started, except for the driver,
// which will not be submitting batches (if it was configured to) until the Start part of the lifecycle.
func BatcherServiceFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger) (*BatcherService, error) {
var bs BatcherService
if err := bs.initFromCLIConfig(ctx, version, cfg, log); err != nil {
return nil, errors.Join(err, bs.Stop(ctx)) // try to clean up our failed initialization attempt
}
return &bs, nil
}
func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger) error {
bs.Version = version
bs.Log = log
bs.NotSubmittingOnStart = cfg.Stopped
bs.initMetrics(cfg)
bs.PollInterval = cfg.PollInterval
bs.MaxPendingTransactions = cfg.MaxPendingTransactions
bs.NetworkTimeout = cfg.TxMgrConfig.NetworkTimeout
bs.CheckRecentTxsDepth = cfg.CheckRecentTxsDepth
bs.WaitNodeSync = cfg.WaitNodeSync
if err := bs.initRPCClients(ctx, cfg); err != nil {
return err
}
if err := bs.initRollupConfig(ctx); err != nil {
return fmt.Errorf("failed to load rollup config: %w", err)
}
if err := bs.initChannelConfig(cfg); err != nil {
return fmt.Errorf("failed to init channel config: %w", err)
}
if err := bs.initTxManager(cfg); err != nil {
return fmt.Errorf("failed to init Tx manager: %w", err)
}
bs.initBalanceMonitor(cfg)
if err := bs.initMetricsServer(cfg); err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
if err := bs.initPProf(cfg); err != nil {
return fmt.Errorf("failed to init profiling: %w", err)
}
// init before driver
if err := bs.initPlasmaDA(cfg); err != nil {
return fmt.Errorf("failed to init plasma DA: %w", err)
}
bs.initDriver()
if err := bs.initRPCServer(cfg); err != nil {
return fmt.Errorf("failed to start RPC server: %w", err)
}
bs.Metrics.RecordInfo(bs.Version)
bs.Metrics.RecordUp()
return nil
}
func (bs *BatcherService) initRPCClients(ctx context.Context, cfg *CLIConfig) error {
l1Client, err := dial.DialEthClientWithTimeout(ctx, dial.DefaultDialTimeout, bs.Log, cfg.L1EthRpc)
if err != nil {
return fmt.Errorf("failed to dial L1 RPC: %w", err)
}
bs.L1Client = l1Client
var endpointProvider dial.L2EndpointProvider
if strings.Contains(cfg.RollupRpc, ",") && strings.Contains(cfg.L2EthRpc, ",") {
rollupUrls := strings.Split(cfg.RollupRpc, ",")
ethUrls := strings.Split(cfg.L2EthRpc, ",")
endpointProvider, err = dial.NewActiveL2EndpointProvider(ctx, ethUrls, rollupUrls, cfg.ActiveSequencerCheckDuration, dial.DefaultDialTimeout, bs.Log)
} else {
endpointProvider, err = dial.NewStaticL2EndpointProvider(ctx, bs.Log, cfg.L2EthRpc, cfg.RollupRpc)
}
if err != nil {
return fmt.Errorf("failed to build L2 endpoint provider: %w", err)
}
bs.EndpointProvider = endpointProvider
return nil
}
func (bs *BatcherService) initMetrics(cfg *CLIConfig) {
if cfg.MetricsConfig.Enabled {
procName := "default"
bs.Metrics = metrics.NewMetrics(procName)
} else {
bs.Metrics = metrics.NoopMetrics
}
}
// initBalanceMonitor depends on Metrics, L1Client and TxManager to start background-monitoring of the batcher balance.
func (bs *BatcherService) initBalanceMonitor(cfg *CLIConfig) {
if cfg.MetricsConfig.Enabled {
bs.balanceMetricer = bs.Metrics.StartBalanceMetrics(bs.Log, bs.L1Client, bs.TxManager.From())
}
}
func (bs *BatcherService) initRollupConfig(ctx context.Context) error {
rollupNode, err := bs.EndpointProvider.RollupClient(ctx)
if err != nil {
return fmt.Errorf("failed to retrieve rollup client: %w", err)
}
rollupConfig, err := rollupNode.RollupConfig(ctx)
if err != nil {
return fmt.Errorf("failed to retrieve rollup config: %w", err)
}
bs.RollupConfig = rollupConfig
if err := bs.RollupConfig.Check(); err != nil {
return fmt.Errorf("invalid rollup config: %w", err)
}
bs.RollupConfig.LogDescription(bs.Log, chaincfg.L2ChainIDToNetworkDisplayName)
return nil
}
func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
cc := ChannelConfig{
SeqWindowSize: bs.RollupConfig.SeqWindowSize,
ChannelTimeout: bs.RollupConfig.ChannelTimeout,
MaxChannelDuration: cfg.MaxChannelDuration,
MaxFrameSize: cfg.MaxL1TxSize - 1, // account for version byte prefix; reset for blobs
TargetNumFrames: cfg.TargetNumFrames,
SubSafetyMargin: cfg.SubSafetyMargin,
BatchType: cfg.BatchType,
}
switch cfg.DataAvailabilityType {
case flags.BlobsType:
if !cfg.TestUseMaxTxSizeForBlobs {
// account for version byte prefix
cc.MaxFrameSize = eth.MaxBlobDataSize - 1
}
cc.MultiFrameTxs = true
bs.UseBlobs = true
case flags.CalldataType:
bs.UseBlobs = false
default:
return fmt.Errorf("unknown data availability type: %v", cfg.DataAvailabilityType)
}
if bs.UsePlasma && cc.MaxFrameSize > plasma.MaxInputSize {
return fmt.Errorf("max frame size %d exceeds plasma max input size %d", cc.MaxFrameSize, plasma.MaxInputSize)
}
cc.InitCompressorConfig(cfg.ApproxComprRatio, cfg.Compressor, cfg.CompressionAlgo)
if bs.UseBlobs && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) {
bs.Log.Error("Cannot use Blob data before Ecotone!") // log only, the batcher may not be actively running.
}
if !bs.UseBlobs && bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) {
bs.Log.Warn("Ecotone upgrade is active, but batcher is not configured to use Blobs!")
}
// Checking for brotli compression only post Fjord
if bs.ChannelConfig.CompressorConfig.CompressionAlgo.IsBrotli() && !bs.RollupConfig.IsFjord(uint64(time.Now().Unix())) {
return fmt.Errorf("cannot use brotli compression before Fjord")
}
if err := cc.Check(); err != nil {
return fmt.Errorf("invalid channel configuration: %w", err)
}
bs.Log.Info("Initialized channel-config",
"use_blobs", bs.UseBlobs,
"use_plasma", bs.UsePlasma,
"max_frame_size", cc.MaxFrameSize,
"target_num_frames", cc.TargetNumFrames,
"compressor", cc.CompressorConfig.Kind,
"compression_algo", cc.CompressorConfig.CompressionAlgo,
"batch_type", cc.BatchType,
"max_channel_duration", cc.MaxChannelDuration,
"channel_timeout", cc.ChannelTimeout,
"sub_safety_margin", cc.SubSafetyMargin)
if bs.UsePlasma {
bs.Log.Warn("Alt-DA Mode is a Beta feature of the MIT licensed OP Stack. While it has received initial review from core contributors, it is still undergoing testing, and may have bugs or other issues.")
}
bs.ChannelConfig = cc
return nil
}
func (bs *BatcherService) initTxManager(cfg *CLIConfig) error {
txManager, err := txmgr.NewSimpleTxManager("batcher", bs.Log, bs.Metrics, cfg.TxMgrConfig)
if err != nil {
return err
}
bs.TxManager = txManager
return nil
}
func (bs *BatcherService) initPProf(cfg *CLIConfig) error {
bs.pprofService = oppprof.New(
cfg.PprofConfig.ListenEnabled,
cfg.PprofConfig.ListenAddr,
cfg.PprofConfig.ListenPort,
cfg.PprofConfig.ProfileType,
cfg.PprofConfig.ProfileDir,
cfg.PprofConfig.ProfileFilename,
)
if err := bs.pprofService.Start(); err != nil {
return fmt.Errorf("failed to start pprof service: %w", err)
}
return nil
}
func (bs *BatcherService) initMetricsServer(cfg *CLIConfig) error {
if !cfg.MetricsConfig.Enabled {
bs.Log.Info("Metrics disabled")
return nil
}
m, ok := bs.Metrics.(opmetrics.RegistryMetricer)
if !ok {
return fmt.Errorf("metrics were enabled, but metricer %T does not expose registry for metrics-server", bs.Metrics)
}
bs.Log.Debug("Starting metrics server", "addr", cfg.MetricsConfig.ListenAddr, "port", cfg.MetricsConfig.ListenPort)
metricsSrv, err := opmetrics.StartServer(m.Registry(), cfg.MetricsConfig.ListenAddr, cfg.MetricsConfig.ListenPort)
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
bs.Log.Info("Started metrics server", "addr", metricsSrv.Addr())
bs.metricsSrv = metricsSrv
return nil
}
func (bs *BatcherService) initDriver() {
bs.driver = NewBatchSubmitter(DriverSetup{
Log: bs.Log,
Metr: bs.Metrics,
RollupConfig: bs.RollupConfig,
Config: bs.BatcherConfig,
Txmgr: bs.TxManager,
L1Client: bs.L1Client,
EndpointProvider: bs.EndpointProvider,
ChannelConfig: bs.ChannelConfig,
PlasmaDA: bs.PlasmaDA,
})
}
func (bs *BatcherService) initRPCServer(cfg *CLIConfig) error {
server := oprpc.NewServer(
cfg.RPC.ListenAddr,
cfg.RPC.ListenPort,
bs.Version,
oprpc.WithLogger(bs.Log),
)
if cfg.RPC.EnableAdmin {
adminAPI := rpc.NewAdminAPI(bs.driver, bs.Metrics, bs.Log)
server.AddAPI(rpc.GetAdminAPI(adminAPI))
bs.Log.Info("Admin RPC enabled")
}
bs.Log.Info("Starting JSON-RPC server")
if err := server.Start(); err != nil {
return fmt.Errorf("unable to start RPC server: %w", err)
}
bs.rpcServer = server
return nil
}
func (bs *BatcherService) initPlasmaDA(cfg *CLIConfig) error {
config := cfg.PlasmaDA
if err := config.Check(); err != nil {
return err
}
bs.PlasmaDA = config.NewDAClient()
bs.UsePlasma = config.Enabled
return nil
}
// Start runs once upon start of the batcher lifecycle,
// and starts batch-submission work if the batcher is configured to start submit data on startup.
func (bs *BatcherService) Start(_ context.Context) error {
bs.driver.Log.Info("Starting batcher", "notSubmittingOnStart", bs.NotSubmittingOnStart)
if !bs.NotSubmittingOnStart {
return bs.driver.StartBatchSubmitting()
}
return nil
}
// Stopped returns if the service as a whole is stopped.
func (bs *BatcherService) Stopped() bool {
return bs.stopped.Load()
}
// Kill is a convenience method to forcefully, non-gracefully, stop the BatcherService.
func (bs *BatcherService) Kill() error {
ctx, cancel := context.WithCancel(context.Background())
cancel()
return bs.Stop(ctx)
}
// Stop fully stops the batch-submitter and all its resources gracefully. After stopping, it cannot be restarted.
// See driver.StopBatchSubmitting to temporarily stop the batch submitter.
// If the provided ctx is cancelled, the stopping is forced, i.e. the batching work is killed non-gracefully.
func (bs *BatcherService) Stop(ctx context.Context) error {
if bs.stopped.Load() {
return ErrAlreadyStopped
}
bs.Log.Info("Stopping batcher")
// close the TxManager first, so that new work is denied, in-flight work is cancelled as early as possible
// (transactions which are expected to be confirmed are still waited for)
if bs.TxManager != nil {
bs.TxManager.Close()
}
var result error
if bs.driver != nil {
if err := bs.driver.StopBatchSubmittingIfRunning(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop batch submitting: %w", err))
}
}
if bs.rpcServer != nil {
// TODO(7685): the op-service RPC server is not built on top of op-service httputil Server, and has poor shutdown
if err := bs.rpcServer.Stop(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop RPC server: %w", err))
}
}
if bs.pprofService != nil {
if err := bs.pprofService.Stop(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop PProf server: %w", err))
}
}
if bs.balanceMetricer != nil {
if err := bs.balanceMetricer.Close(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to close balance metricer: %w", err))
}
}
if bs.metricsSrv != nil {
if err := bs.metricsSrv.Stop(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop metrics server: %w", err))
}
}
if bs.L1Client != nil {
bs.L1Client.Close()
}
if bs.EndpointProvider != nil {
bs.EndpointProvider.Close()
}
if result == nil {
bs.stopped.Store(true)
bs.Log.Info("Batch Submitter stopped")
}
return result
}
var _ cliapp.Lifecycle = (*BatcherService)(nil)
// TestDriver returns a handler for the batch-submitter driver element, to start/stop/restart the
// batch-submission work, for use only in testing.
func (bs *BatcherService) TestDriver() *TestBatchSubmitter {
return &TestBatchSubmitter{
BatchSubmitter: bs.driver,
}
}