Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a configurable maximum batch size for RPC requests #2939

Merged
merged 16 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- `[rpc]` Add a configurable maximum batch size for RPC requests.
([\#2867](https://github.com/cometbft/cometbft/pull/2867)).
12 changes: 10 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,10 @@ type RPCConfig struct {
// See https://github.com/tendermint/tendermint/issues/3435
TimeoutBroadcastTxCommit time.Duration `mapstructure:"timeout_broadcast_tx_commit"`

// Maximum number of requests that can be sent in a batch
// https://www.jsonrpc.org/specification#batch
MaxRequestBatchSize int `mapstructure:"max_request_batch_size"`

// Maximum size of request body, in bytes
MaxBodyBytes int64 `mapstructure:"max_body_bytes"`

Expand Down Expand Up @@ -442,8 +446,9 @@ func DefaultRPCConfig() *RPCConfig {
TimeoutBroadcastTxCommit: 10 * time.Second,
WebSocketWriteBufferSize: defaultSubscriptionBufferSize,

MaxBodyBytes: int64(1000000), // 1MB
MaxHeaderBytes: 1 << 20, // same as the net/http default
MaxRequestBatchSize: 0, // no maximum requests in a batch request
andynog marked this conversation as resolved.
Show resolved Hide resolved
MaxBodyBytes: int64(1000000), // 1MB
MaxHeaderBytes: 1 << 20, // same as the net/http default

TLSCertFile: "",
TLSKeyFile: "",
Expand Down Expand Up @@ -482,6 +487,9 @@ func (cfg *RPCConfig) ValidateBasic() error {
if cfg.TimeoutBroadcastTxCommit < 0 {
return cmterrors.ErrNegativeField{Field: "timeout_broadcast_tx_commit"}
}
if cfg.MaxRequestBatchSize < 0 {
return cmterrors.ErrNegativeField{Field: "max_request_batch_size"}
}
if cfg.MaxBodyBytes < 0 {
return cmterrors.ErrNegativeField{Field: "max_body_bytes"}
}
Expand Down
1 change: 1 addition & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func TestRPCConfigValidateBasic(t *testing.T) {
"TimeoutBroadcastTxCommit",
"MaxBodyBytes",
"MaxHeaderBytes",
"MaxRequestBatchSize",
}

for _, fieldName := range fieldsToTest {
Expand Down
5 changes: 5 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ experimental_close_on_slow_client = {{ .RPC.CloseOnSlowClient }}
# See https://github.com/tendermint/tendermint/issues/3435
timeout_broadcast_tx_commit = "{{ .RPC.TimeoutBroadcastTxCommit }}"

# Maximum number of requests that can be sent in a batch
# If the value is set to '0' (zero-value), then no maximum batch size will be
# enforced for a JSON-RPC batch request.
max_request_batch_size = {{ .RPC.MaxRequestBatchSize }}

# Maximum size of request body, in bytes
max_body_bytes = {{ .RPC.MaxBodyBytes }}

Expand Down
19 changes: 19 additions & 0 deletions docs/references/config/config.toml.md
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,25 @@ and endpoints. There is an old developer discussion about this [here](https://gi

> Note: It is generally recommended *not* to use the `broadcast_tx_commit` method in production, and instead prefer `/broadcast_tx_sync`.

### rpc.max_request_batch_size
Maximum number of requests that can be sent in a JSON-RPC batch request.
```toml
max_request_batch_size = 0
```

| Value type | integer |
|:--------------------|:--------|
| **Possible values** | &gt;= 0 |

If the number of requests sent in a JSON-RPC batch exceed the maximum batch size configured, an error will be returned.

The default value is set to `0` (zero-value), which means no maximum batch size will be enforced for a JSON-RPC batch request.

If you want to enforce a maximum value for batch requests set this value to a number greater than `0` (e.g. `10` which will limit the number
of requests to 10 requests/batch)

Reference: https://www.jsonrpc.org/specification#batch

### rpc.max_body_bytes
Maximum size of request body, in bytes.
```toml
Expand Down
1 change: 1 addition & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {
}

config := rpcserver.DefaultConfig()
config.MaxRequestBatchSize = n.config.RPC.MaxRequestBatchSize
config.MaxBodyBytes = n.config.RPC.MaxBodyBytes
config.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes
config.MaxOpenConnections = n.config.RPC.MaxOpenConnections
Expand Down
48 changes: 48 additions & 0 deletions rpc/client/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/cometbft/cometbft/abci/example/kvstore"
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
"github.com/cometbft/cometbft/rpc/jsonrpc/types"
rpctest "github.com/cometbft/cometbft/rpc/test"
)

Expand Down Expand Up @@ -135,3 +136,50 @@ func ExampleHTTP_batching() {
// firstName = satoshi
// lastName = nakamoto
}

// Test the maximum batch request size middleware.
func ExampleHTTP_maxBatchSize() {
// Start a CometBFT node (and kvstore) in the background to test against
app := kvstore.NewInMemoryApplication()
node := rpctest.StartCometBFT(app, rpctest.RecreateConfig, rpctest.SuppressStdout, rpctest.MaxReqBatchSize)

// Change the max_request_batch_size
node.Config().RPC.MaxRequestBatchSize = 2

// Create our RPC client
rpcAddr := rpctest.GetConfig().RPC.ListenAddress
c, err := rpchttp.New(rpcAddr)
if err != nil {
log.Fatal(err)
}

defer rpctest.StopCometBFT(node)

// Create a new batch
batch := c.NewBatch()

for i := 1; i <= 5; i++ {
if _, err := batch.Health(context.Background()); err != nil {
log.Fatal(err)
}
}

// Send the requests
results, err := batch.Send(context.Background())
if err != nil {
log.Fatal(err)
}

// Each result in the returned list is the deserialized result of each
// respective status response
for _, result := range results {
_, ok := result.(*types.RPCError)
if !ok {
log.Fatal("invalid result type")
}
fmt.Println("Max Request Batch Exceeded")
}

// Output:
// Max Request Batch Exceeded
}
86 changes: 57 additions & 29 deletions rpc/jsonrpc/client/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,48 +37,76 @@ func unmarshalResponseBytes(
return result, nil
}

// Separate the unmarshalling actions using different functions to improve readability and maintainability.
func unmarshalIndividualResponse(responseBytes []byte) (types.RPCResponse, error) {
var singleResponse types.RPCResponse
err := json.Unmarshal(responseBytes, &singleResponse)
return singleResponse, err
}

func unmarshalMultipleResponses(responseBytes []byte) ([]types.RPCResponse, error) {
var responses []types.RPCResponse
err := json.Unmarshal(responseBytes, &responses)
return responses, err
}

func unmarshalResponseBytesArray(
responseBytes []byte,
expectedIDs []types.JSONRPCIntID,
results []any,
) ([]any, error) {
var responses []types.RPCResponse

if err := json.Unmarshal(responseBytes, &responses); err != nil {
return nil, fmt.Errorf("error unmarshalling: %w", err)
}

// No response error checking here as there may be a mixture of successful
// and unsuccessful responses.
// Try to unmarshal as multiple responses
responses, err := unmarshalMultipleResponses(responseBytes)
// if err == nil it could unmarshal in multiple responses
if err == nil {
// No response error checking here as there may be a mixture of successful
// and unsuccessful responses.

if len(results) != len(responses) {
return nil, fmt.Errorf(
"expected %d result objects into which to inject responses, but got %d",
len(responses),
len(results),
)
}

if len(results) != len(responses) {
return nil, fmt.Errorf(
"expected %d result objects into which to inject responses, but got %d",
len(responses),
len(results),
)
}
// Intersect IDs from responses with expectedIDs.
ids := make([]types.JSONRPCIntID, len(responses))
var ok bool
for i, resp := range responses {
ids[i], ok = resp.ID.(types.JSONRPCIntID)
if !ok {
return nil, fmt.Errorf("expected JSONRPCIntID, got %T", resp.ID)
}
}
if err := validateResponseIDs(ids, expectedIDs); err != nil {
return nil, fmt.Errorf("wrong IDs: %w", err)
}

// Intersect IDs from responses with expectedIDs.
ids := make([]types.JSONRPCIntID, len(responses))
var ok bool
for i, resp := range responses {
ids[i], ok = resp.ID.(types.JSONRPCIntID)
if !ok {
return nil, fmt.Errorf("expected JSONRPCIntID, got %T", resp.ID)
for i := 0; i < len(responses); i++ {
if err := cmtjson.Unmarshal(responses[i].Result, results[i]); err != nil {
return nil, fmt.Errorf("error unmarshalling #%d result: %w", i, err)
}
}

return results, nil
}
if err := validateResponseIDs(ids, expectedIDs); err != nil {
return nil, fmt.Errorf("wrong IDs: %w", err)
// check if it's a single response that should be an error
singleResponse, err := unmarshalIndividualResponse(responseBytes)
if err != nil {
// Here, an error means that even single response unmarshalling failed,
// so return the error.
return nil, fmt.Errorf("error unmarshalling: %w", err)
}

for i := 0; i < len(responses); i++ {
if err := cmtjson.Unmarshal(responses[i].Result, results[i]); err != nil {
return nil, fmt.Errorf("error unmarshalling #%d result: %w", i, err)
}
singleResult := make([]any, 0)
if singleResponse.Error != nil {
singleResult = append(singleResult, singleResponse.Error)
} else {
singleResult = append(singleResult, singleResponse.Result)
}

return results, nil
return singleResult, nil
}

func validateResponseIDs(ids, expectedIDs []types.JSONRPCIntID) error {
Expand Down
2 changes: 1 addition & 1 deletion rpc/jsonrpc/server/http_json_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.Han
return
}

// if its an empty request (like from a browser), just display a list of
// if it's an empty request (like from a browser), just display a list of
// functions
if len(b) == 0 {
writeListOfEndpoints(w, r, funcMap)
Expand Down
73 changes: 61 additions & 12 deletions rpc/jsonrpc/server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package server

import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -35,16 +37,19 @@ type Config struct {
MaxBodyBytes int64
// mirrors http.Server#MaxHeaderBytes
MaxHeaderBytes int
// maximum number of requests in a batch request
MaxRequestBatchSize int
}

// DefaultConfig returns a default configuration.
func DefaultConfig() *Config {
return &Config{
MaxOpenConnections: 0, // unlimited
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxBodyBytes: int64(1000000), // 1MB
MaxHeaderBytes: 1 << 20, // same as the net/http default
MaxOpenConnections: 0, // unlimited
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxBodyBytes: int64(1000000), // 1MB
MaxHeaderBytes: 1 << 20, // same as the net/http default
MaxRequestBatchSize: 0,
}
}

Expand All @@ -56,7 +61,7 @@ func DefaultConfig() *Config {
func Serve(listener net.Listener, handler http.Handler, logger log.Logger, config *Config) error {
logger.Info("serve", "msg", log.NewLazySprintf("Starting RPC HTTP server on %s", listener.Addr()))
s := &http.Server{
Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: config.MaxBodyBytes}, logger),
Handler: PreChecksHandler(RecoverAndLogHandler(defaultHandler{h: handler}, logger), config),
ReadTimeout: config.ReadTimeout,
ReadHeaderTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
Expand All @@ -67,7 +72,7 @@ func Serve(listener net.Listener, handler http.Handler, logger log.Logger, confi
return err
}

// Serve creates a http.Server and calls ServeTLS with the given listener,
// ServeTLS creates a http.Server and calls ServeTLS with the given listener,
// certFile and keyFile. It wraps handler with RecoverAndLogHandler and a
// handler, which limits the max body size to config.MaxBodyBytes.
//
Expand All @@ -82,7 +87,7 @@ func ServeTLS(
logger.Info("serve tls", "msg", log.NewLazySprintf("Starting RPC HTTPS server on %s (cert: %q, key: %q)",
listener.Addr(), certFile, keyFile))
s := &http.Server{
Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: config.MaxBodyBytes}, logger),
Handler: PreChecksHandler(RecoverAndLogHandler(defaultHandler{h: handler}, logger), config),
ReadTimeout: config.ReadTimeout,
ReadHeaderTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
Expand Down Expand Up @@ -249,13 +254,11 @@ func (w *responseWriterWrapper) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return w.ResponseWriter.(http.Hijacker).Hijack()
}

type maxBytesHandler struct {
type defaultHandler struct {
h http.Handler
n int64
}

func (h maxBytesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Body = http.MaxBytesReader(w, r.Body, h.n)
func (h defaultHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.h.ServeHTTP(w, r)
}

Expand All @@ -277,3 +280,49 @@ func Listen(addr string, maxOpenConnections int) (listener net.Listener, err err

return listener, nil
}

// Middleware

// PreChecksHandler is a middleware function that checks the size of batch requests and returns an error
// if it exceeds the maximum configured size. It also checks if the request body is not greater than the
// configured maximum request body bytes limit.
func PreChecksHandler(next http.Handler, config *Config) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// ensure that the current request body bytes is not greater than the configured maximum request body bytes
r.Body = http.MaxBytesReader(w, r.Body, config.MaxBodyBytes)

// if maxBatchSize is 0 then don't constraint the limit of requests per batch
// the default value is 0, and it cannot be negative because of the config validation
if config.MaxRequestBatchSize > 0 {
var requests []types.RPCRequest
var responses []types.RPCResponse
var err error

data, err := io.ReadAll(r.Body)
if err != nil {
res := types.RPCInvalidRequestError(nil, fmt.Errorf("error reading request body: %w", err))
_ = WriteRPCResponseHTTPError(w, http.StatusBadRequest, res)
return
}

err = json.Unmarshal(data, &requests)
// if no err it means multiple requests, check if the number of request exceeds
// the maximum batch size configured
if err == nil {
// if the number of requests in batch exceed the maximum configured then return an error
if len(requests) > config.MaxRequestBatchSize {
res := types.RPCInvalidRequestError(nil, fmt.Errorf("batch request exceeds maximum (%d) allowed number of requests", config.MaxRequestBatchSize))
responses = append(responses, res)
_ = WriteRPCResponseHTTP(w, responses...)
return
}
}

// ensure the request body can be read again by other handlers
r.Body = io.NopCloser(bytes.NewBuffer(data))
andynog marked this conversation as resolved.
Show resolved Hide resolved
}

// next handler
next.ServeHTTP(w, r)
})
}
Loading
Loading