Skip to content

Commit 0f0f747

Browse files
committed
Make logs subscription channel size configurable
1 parent 341bfea commit 0f0f747

19 files changed

+81
-71
lines changed

cmd/rpcdaemon/cli/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
145145
rootCmd.PersistentFlags().IntVar(&cfg.MaxGetProofRewindBlockCount, utils.RpcMaxGetProofRewindBlockCount.Name, utils.RpcMaxGetProofRewindBlockCount.Value, utils.RpcMaxGetProofRewindBlockCount.Usage)
146146
rootCmd.PersistentFlags().Uint64Var(&cfg.OtsMaxPageSize, utils.OtsSearchMaxCapFlag.Name, utils.OtsSearchMaxCapFlag.Value, utils.OtsSearchMaxCapFlag.Usage)
147147
rootCmd.PersistentFlags().DurationVar(&cfg.RPCSlowLogThreshold, utils.RPCSlowFlag.Name, utils.RPCSlowFlag.Value, utils.RPCSlowFlag.Usage)
148+
rootCmd.PersistentFlags().IntVar(&cfg.WebsocketSubscribeLogsChannelSize, utils.WSSubscribeLogsChannelSize.Name, utils.WSSubscribeLogsChannelSize.Value, utils.WSSubscribeLogsChannelSize.Usage)
148149

149150
if err := rootCmd.MarkPersistentFlagFilename("rpc.accessList", "json"); err != nil {
150151
panic(err)

cmd/rpcdaemon/cli/httpcfg/http_cfg.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,22 @@ type HttpCfg struct {
4040
AuthRpcPort int
4141
PrivateApiAddr string
4242

43-
API []string
44-
Gascap uint64
45-
MaxTraces uint64
46-
WebsocketPort int
47-
WebsocketEnabled bool
48-
WebsocketCompression bool
49-
RpcAllowListFilePath string
50-
RpcBatchConcurrency uint
51-
RpcStreamingDisable bool
52-
DBReadConcurrency int
53-
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
54-
TxPoolApiAddr string
55-
StateCache kvcache.CoherentConfig
56-
Snap ethconfig.BlocksFreezing
57-
Sync ethconfig.Sync
43+
API []string
44+
Gascap uint64
45+
MaxTraces uint64
46+
WebsocketPort int
47+
WebsocketEnabled bool
48+
WebsocketCompression bool
49+
WebsocketSubscribeLogsChannelSize int
50+
RpcAllowListFilePath string
51+
RpcBatchConcurrency uint
52+
RpcStreamingDisable bool
53+
DBReadConcurrency int
54+
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
55+
TxPoolApiAddr string
56+
StateCache kvcache.CoherentConfig
57+
Snap ethconfig.BlocksFreezing
58+
Sync ethconfig.Sync
5859

5960
// GRPC server
6061
GRPCServerEnabled bool

cmd/utils/flags.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,11 @@ var (
489489
Usage: "HTTP path prefix on which JSON-RPC is served. Use '/' to serve on all paths.",
490490
Value: "",
491491
}
492+
WSSubscribeLogsChannelSize = cli.IntFlag{
493+
Name: "ws.api.subscribelogs.channelsize",
494+
Usage: "Size of the channel used for websocket logs subscriptions",
495+
Value: 8192,
496+
}
492497
ExecFlag = cli.StringFlag{
493498
Name: "exec",
494499
Usage: "Execute JavaScript statement",

turbo/cli/flags.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -476,22 +476,23 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg
476476
WriteTimeout: ctx.Duration(AuthRpcWriteTimeoutFlag.Name),
477477
IdleTimeout: ctx.Duration(HTTPIdleTimeoutFlag.Name),
478478
},
479-
EvmCallTimeout: ctx.Duration(EvmCallTimeoutFlag.Name),
480-
OverlayGetLogsTimeout: ctx.Duration(OverlayGetLogsFlag.Name),
481-
OverlayReplayBlockTimeout: ctx.Duration(OverlayReplayBlockFlag.Name),
482-
WebsocketPort: ctx.Int(utils.WSPortFlag.Name),
483-
WebsocketEnabled: ctx.IsSet(utils.WSEnabledFlag.Name),
484-
RpcBatchConcurrency: ctx.Uint(utils.RpcBatchConcurrencyFlag.Name),
485-
RpcStreamingDisable: ctx.Bool(utils.RpcStreamingDisableFlag.Name),
486-
DBReadConcurrency: ctx.Int(utils.DBReadConcurrencyFlag.Name),
487-
RpcAllowListFilePath: ctx.String(utils.RpcAccessListFlag.Name),
488-
Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name),
489-
MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name),
490-
TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name),
491-
BatchLimit: ctx.Int(utils.RpcBatchLimit.Name),
492-
ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name),
493-
AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name),
494-
MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name),
479+
EvmCallTimeout: ctx.Duration(EvmCallTimeoutFlag.Name),
480+
OverlayGetLogsTimeout: ctx.Duration(OverlayGetLogsFlag.Name),
481+
OverlayReplayBlockTimeout: ctx.Duration(OverlayReplayBlockFlag.Name),
482+
WebsocketPort: ctx.Int(utils.WSPortFlag.Name),
483+
WebsocketEnabled: ctx.IsSet(utils.WSEnabledFlag.Name),
484+
WebsocketSubscribeLogsChannelSize: ctx.Int(utils.WSSubscribeLogsChannelSize.Name),
485+
RpcBatchConcurrency: ctx.Uint(utils.RpcBatchConcurrencyFlag.Name),
486+
RpcStreamingDisable: ctx.Bool(utils.RpcStreamingDisableFlag.Name),
487+
DBReadConcurrency: ctx.Int(utils.DBReadConcurrencyFlag.Name),
488+
RpcAllowListFilePath: ctx.String(utils.RpcAccessListFlag.Name),
489+
Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name),
490+
MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name),
491+
TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name),
492+
BatchLimit: ctx.Int(utils.RpcBatchLimit.Name),
493+
ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name),
494+
AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name),
495+
MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name),
495496

496497
OtsMaxPageSize: ctx.Uint64(utils.OtsSearchMaxCapFlag.Name),
497498

turbo/engineapi/engine_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (e *EngineServer) Start(
8989
) {
9090
base := jsonrpc.NewBaseApi(filters, stateCache, blockReader, agg, httpConfig.WithDatadir, httpConfig.EvmCallTimeout, engineReader, httpConfig.Dirs)
9191

92-
ethImpl := jsonrpc.NewEthAPI(base, db, eth, txPool, mining, httpConfig.Gascap, httpConfig.ReturnDataLimit, httpConfig.AllowUnprotectedTxs, httpConfig.MaxGetProofRewindBlockCount, e.logger)
92+
ethImpl := jsonrpc.NewEthAPI(base, db, eth, txPool, mining, httpConfig.Gascap, httpConfig.ReturnDataLimit, httpConfig.AllowUnprotectedTxs, httpConfig.MaxGetProofRewindBlockCount, httpConfig.WebsocketSubscribeLogsChannelSize, e.logger)
9393

9494
// engineImpl := NewEngineAPI(base, db, engineBackend)
9595
// e.startEngineMessageHandler()

turbo/jsonrpc/corner_cases_support_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func TestNotFoundMustReturnNil(t *testing.T) {
1818
require := require.New(t)
1919
m, _, _ := rpcdaemontest.CreateTestSentry(t)
2020
api := NewEthAPI(newBaseApiForTest(m),
21-
m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
21+
m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
2222
ctx := context.Background()
2323

2424
a, err := api.GetTransactionByBlockNumberAndIndex(ctx, 10_000, 1)

turbo/jsonrpc/daemon.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func APIList(db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, m
2222
logger log.Logger,
2323
) (list []rpc.API) {
2424
base := NewBaseApi(filters, stateCache, blockReader, agg, cfg.WithDatadir, cfg.EvmCallTimeout, engine, cfg.Dirs)
25-
ethImpl := NewEthAPI(base, db, eth, txPool, mining, cfg.Gascap, cfg.ReturnDataLimit, cfg.AllowUnprotectedTxs, cfg.MaxGetProofRewindBlockCount, logger)
25+
ethImpl := NewEthAPI(base, db, eth, txPool, mining, cfg.Gascap, cfg.ReturnDataLimit, cfg.AllowUnprotectedTxs, cfg.MaxGetProofRewindBlockCount, cfg.WebsocketSubscribeLogsChannelSize, logger)
2626
erigonImpl := NewErigonAPI(base, db, eth)
2727
txpoolImpl := NewTxPoolAPI(base, db, txPool)
2828
netImpl := NewNetAPIImpl(eth)

turbo/jsonrpc/debug_api_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func TestTraceBlockByNumber(t *testing.T) {
5252
agg := m.HistoryV3Components()
5353
stateCache := kvcache.New(kvcache.DefaultCoherentConfig)
5454
baseApi := NewBaseApi(nil, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs)
55-
ethApi := NewEthAPI(baseApi, m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
55+
ethApi := NewEthAPI(baseApi, m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
5656
api := NewPrivateDebugAPI(baseApi, m.DB, 0)
5757
for _, tt := range debugTraceTransactionTests {
5858
var buf bytes.Buffer
@@ -97,7 +97,7 @@ func TestTraceBlockByNumber(t *testing.T) {
9797

9898
func TestTraceBlockByHash(t *testing.T) {
9999
m, _, _ := rpcdaemontest.CreateTestSentry(t)
100-
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
100+
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
101101
api := NewPrivateDebugAPI(newBaseApiForTest(m), m.DB, 0)
102102
for _, tt := range debugTraceTransactionTests {
103103
var buf bytes.Buffer

turbo/jsonrpc/erigon_receipts_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestGetLogs(t *testing.T) {
2929
assert := assert.New(t)
3030
m, _, _ := rpcdaemontest.CreateTestSentry(t)
3131
{
32-
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
32+
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
3333

3434
logs, err := ethApi.GetLogs(context.Background(), filters.FilterCriteria{FromBlock: big.NewInt(0), ToBlock: big.NewInt(10)})
3535
assert.NoError(err)

turbo/jsonrpc/eth_api.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,11 +328,12 @@ type APIImpl struct {
328328
ReturnDataLimit int
329329
AllowUnprotectedTxs bool
330330
MaxGetProofRewindBlockCount int
331+
SubscribeLogsChannelSize int
331332
logger log.Logger
332333
}
333334

334335
// NewEthAPI returns APIImpl instance
335-
func NewEthAPI(base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, gascap uint64, returnDataLimit int, allowUnprotectedTxs bool, maxGetProofRewindBlockCount int, logger log.Logger) *APIImpl {
336+
func NewEthAPI(base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, gascap uint64, returnDataLimit int, allowUnprotectedTxs bool, maxGetProofRewindBlockCount int, subscribeLogsChannelSize int, logger log.Logger) *APIImpl {
336337
if gascap == 0 {
337338
gascap = uint64(math.MaxUint64 / 2)
338339
}
@@ -348,6 +349,7 @@ func NewEthAPI(base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpoo
348349
AllowUnprotectedTxs: allowUnprotectedTxs,
349350
ReturnDataLimit: returnDataLimit,
350351
MaxGetProofRewindBlockCount: maxGetProofRewindBlockCount,
352+
SubscribeLogsChannelSize: subscribeLogsChannelSize,
351353
logger: logger,
352354
}
353355
}

0 commit comments

Comments
 (0)