Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions agreement/gossip/networkFull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestMain(m *testing.M) {

logging.Base().SetLevel(logging.Debug)
// increase limit on max allowed number of sockets
err := util.RaiseRlimit(500)
err := util.SetFdSoftLimit(500)
if err != nil {
os.Exit(1)
}
Expand All @@ -50,7 +50,6 @@ func spinNetwork(t *testing.T, nodesCount int) ([]*networkImpl, []*messageCounte
cfg := config.GetDefaultLocal()
cfg.GossipFanout = nodesCount - 1
cfg.NetAddress = "127.0.0.1:0"
cfg.IncomingConnectionsLimit = -1
cfg.IncomingMessageFilterBucketCount = 5
cfg.IncomingMessageFilterBucketSize = 32
cfg.OutgoingMessageFilterBucketCount = 3
Expand Down
15 changes: 11 additions & 4 deletions config/localTemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type Local struct {
CadaverSizeTarget uint64 `version[0]:"1073741824"`

// IncomingConnectionsLimit specifies the max number of long-lived incoming
// connections. 0 means no connections allowed. -1 is unbounded.
// connections. 0 means no connections allowed. Must be non-negative.
// Estimating 5MB per incoming connection, 5MB*800 = 4GB
IncomingConnectionsLimit int `version[0]:"-1" version[1]:"10000" version[17]:"800"`

Expand All @@ -99,9 +99,9 @@ type Local struct {
PriorityPeers map[string]bool `version[4]:""`

// To make sure the algod process does not run out of FDs, algod ensures
// that RLIMIT_NOFILE exceeds the max number of incoming connections (i.e.,
// IncomingConnectionsLimit) by at least ReservedFDs. ReservedFDs are meant
// to leave room for short-lived FDs like DNS queries, SQLite files, etc.
// that RLIMIT_NOFILE >= IncomingConnectionsLimit + RestConnectionsHardLimit +
// ReservedFDs. ReservedFDs are meant to leave room for short-lived FDs like
// DNS queries, SQLite files, etc. This parameter shouldn't be changed.
ReservedFDs uint64 `version[2]:"256"`

// local server
Expand Down Expand Up @@ -423,6 +423,13 @@ type Local struct {

// ProposalAssemblyTime is the max amount of time to spend on generating a proposal block.
ProposalAssemblyTime time.Duration `version[19]:"250000000"`

// When the number of http connections to the REST layer exceeds the soft limit,
// we start returning http code 429 Too Many Requests.
RestConnectionsSoftLimit uint64 `version[20]:"1024"`
// The http server does not accept new connections as long we have this many
// (hard limit) connections already.
RestConnectionsHardLimit uint64 `version[20]:"2048"`
}

// DNSBootstrapArray returns an array of one or more DNS Bootstrap identifiers
Expand Down
2 changes: 2 additions & 0 deletions config/local_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ var defaultLocal = Local{
PublicAddress: "",
ReconnectTime: 60000000000,
ReservedFDs: 256,
RestConnectionsHardLimit: 2048,
RestConnectionsSoftLimit: 1024,
RestReadTimeoutSeconds: 15,
RestWriteTimeoutSeconds: 120,
RunHosted: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,35 @@
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

// +build windows
package middlewares

package network
import (
"net/http"

func (wn *WebsocketNetwork) rlimitIncomingConnections() error {
return nil
"github.com/labstack/echo/v4"
)

// MakeConnectionLimiter makes an echo middleware that limits the number of
// simultaneous connections. All connections above the limit will be returned
// the 429 Too Many Requests http error.
func MakeConnectionLimiter(limit uint64) echo.MiddlewareFunc {
sem := make(chan struct{}, limit)

return func(next echo.HandlerFunc) echo.HandlerFunc {
return func(ctx echo.Context) error {
select {
case sem <- struct{}{}:
defer func() {
// If we fail to read from `sem`, just continue.
select {
case <-sem:
default:
}
}()
return next(ctx)
default:
return ctx.NoContent(http.StatusTooManyRequests)
}
}
}
}
100 changes: 100 additions & 0 deletions daemon/algod/api/server/lib/middlewares/connectionLimiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (C) 2019-2022 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package middlewares_test

import (
"errors"
"net/http"
"net/http/httptest"
"testing"

"github.com/labstack/echo/v4"
"github.com/stretchr/testify/assert"

"github.com/algorand/go-algorand/daemon/algod/api/server/lib/middlewares"
"github.com/algorand/go-algorand/test/partitiontest"
)

func TestConnectionLimiterBasic(t *testing.T) {
partitiontest.PartitionTest(t)

e := echo.New()

handlerCh := make(chan struct{})
limit := 5
handler := func(c echo.Context) error {
<-handlerCh
return c.String(http.StatusOK, "test")
}
middleware := middlewares.MakeConnectionLimiter(uint64(limit))

numConnections := 13
for i := 0; i < 3; i++ {
var recorders []*httptest.ResponseRecorder
doneCh := make(chan int)
errCh := make(chan error)

for index := 0; index < numConnections; index++ {
req := httptest.NewRequest(http.MethodGet, "/", nil)
rec := httptest.NewRecorder()
ctx := e.NewContext(req, rec)

recorders = append(recorders, rec)

go func(index int) {
err := middleware(handler)(ctx)
doneCh <- index
errCh <- err
}(index)
}

// Check http 429 code.
for j := 0; j < numConnections-limit; j++ {
index := <-doneCh
assert.Equal(t, http.StatusTooManyRequests, recorders[index].Code)
}

// Let handlers finish.
for j := 0; j < limit; j++ {
handlerCh <- struct{}{}
}

// All other connections must return 200.
for j := 0; j < limit; j++ {
index := <-doneCh
assert.Equal(t, http.StatusOK, recorders[index].Code)
}

// Check that no errors were returned by the middleware.
for i := 0; i < numConnections; i++ {
assert.NoError(t, <-errCh)
}
}
}

func TestConnectionLimiterForwardsError(t *testing.T) {
partitiontest.PartitionTest(t)

handlerError := errors.New("handler error")
handler := func(c echo.Context) error {
return handlerError
}
middleware := middlewares.MakeConnectionLimiter(1)

err := middleware(handler)(nil)
assert.ErrorIs(t, err, handlerError)
}
16 changes: 9 additions & 7 deletions daemon/algod/api/server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ import (

const (
apiV1Tag = "/v1"
// TokenHeader is the header where we put the token.
TokenHeader = "X-Algo-API-Token"
)

// wrapCtx passes a common context to each request without a global variable.
Expand All @@ -99,11 +101,8 @@ func registerHandlers(router *echo.Echo, prefix string, routes lib.Routes, ctx l
}
}

// TokenHeader is the header where we put the token.
const TokenHeader = "X-Algo-API-Token"

// NewRouter builds and returns a new router with our REST handlers registered.
func NewRouter(logger logging.Logger, node *node.AlgorandFullNode, shutdown <-chan struct{}, apiToken string, adminAPIToken string, listener net.Listener) *echo.Echo {
func NewRouter(logger logging.Logger, node *node.AlgorandFullNode, shutdown <-chan struct{}, apiToken string, adminAPIToken string, listener net.Listener, numConnectionsLimit uint64) *echo.Echo {
if err := tokens.ValidateAPIToken(apiToken); err != nil {
logger.Errorf("Invalid apiToken was passed to NewRouter ('%s'): %v", apiToken, err)
}
Expand All @@ -118,9 +117,12 @@ func NewRouter(logger logging.Logger, node *node.AlgorandFullNode, shutdown <-ch
e.Listener = listener
e.HideBanner = true

e.Pre(middleware.RemoveTrailingSlash())
e.Use(middlewares.MakeLogger(logger))
e.Use(middlewares.MakeCORS(TokenHeader))
e.Pre(
middlewares.MakeConnectionLimiter(numConnectionsLimit),
middleware.RemoveTrailingSlash())
e.Use(
middlewares.MakeLogger(logger),
middlewares.MakeCORS(TokenHeader))

// Request Context
ctx := lib.ReqContext{Node: node, Log: logger, Shutdown: shutdown}
Expand Down
41 changes: 37 additions & 4 deletions daemon/algod/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package algod

import (
"context"
"errors"
"fmt"
"io/ioutil"
"net"
Expand All @@ -35,10 +36,13 @@ import (
"github.com/algorand/go-algorand/config"
apiServer "github.com/algorand/go-algorand/daemon/algod/api/server"
"github.com/algorand/go-algorand/daemon/algod/api/server/lib"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/logging/telemetryspec"
"github.com/algorand/go-algorand/network/limitlistener"
"github.com/algorand/go-algorand/node"
"github.com/algorand/go-algorand/util"
"github.com/algorand/go-algorand/util/metrics"
"github.com/algorand/go-algorand/util/tokens"
)
Expand Down Expand Up @@ -84,6 +88,34 @@ func (s *Server) Initialize(cfg config.Local, phonebookAddresses []string, genes
s.log.SetLevel(logging.Level(cfg.BaseLoggerDebugLevel))
setupDeadlockLogger()

// Check some config parameters.
if cfg.RestConnectionsSoftLimit > cfg.RestConnectionsHardLimit {
s.log.Warnf(
"RestConnectionsSoftLimit %d exceeds RestConnectionsHardLimit %d",
cfg.RestConnectionsSoftLimit, cfg.RestConnectionsHardLimit)
cfg.RestConnectionsSoftLimit = cfg.RestConnectionsHardLimit
}
if cfg.IncomingConnectionsLimit < 0 {
return fmt.Errorf(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will break algod startup for people who have IncomingConnectionsLimit=-2 for whatever reason. This value looks allowed before the proposed change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but for security reasons we need to limit this value. If it breaks somebody's setup, I think it's ok.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it's reasonable to "break" a configuration that should not have existed to start with.

"Initialize() IncomingConnectionsLimit %d must be non-negative",
cfg.IncomingConnectionsLimit)
}

// Set large enough soft file descriptors limit.
var ot basics.OverflowTracker
fdRequired := ot.Add(
cfg.ReservedFDs,
ot.Add(uint64(cfg.IncomingConnectionsLimit), cfg.RestConnectionsHardLimit))
if ot.Overflowed {
return errors.New(
"Initialize() overflowed when adding up ReservedFDs, IncomingConnectionsLimit " +
"RestConnectionsHardLimit; decrease them")
}
err = util.SetFdSoftLimit(fdRequired)
if err != nil {
return fmt.Errorf("Initialize() err: %w", err)
}

// configure the deadlock detector library
switch {
case cfg.DeadlockDetection > 0:
Expand Down Expand Up @@ -192,11 +224,12 @@ func (s *Server) Start() {
}

listener, err := makeListener(addr)

if err != nil {
fmt.Printf("Could not start node: %v\n", err)
os.Exit(1)
}
listener = limitlistener.RejectingLimitListener(
listener, cfg.RestConnectionsHardLimit, s.log)

addr = listener.Addr().String()
server = http.Server{
Expand All @@ -205,9 +238,9 @@ func (s *Server) Start() {
WriteTimeout: time.Duration(cfg.RestWriteTimeoutSeconds) * time.Second,
}

tcpListener := listener.(*net.TCPListener)

e := apiServer.NewRouter(s.log, s.node, s.stopping, apiToken, adminAPIToken, tcpListener)
e := apiServer.NewRouter(
s.log, s.node, s.stopping, apiToken, adminAPIToken, listener,
cfg.RestConnectionsSoftLimit)

// Set up files for our PID and our listening address
// before beginning to listen to prevent 'goal node start'
Expand Down
2 changes: 2 additions & 0 deletions installer/config.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
"PublicAddress": "",
"ReconnectTime": 60000000000,
"ReservedFDs": 256,
"RestConnectionsHardLimit": 2048,
"RestConnectionsSoftLimit": 1024,
"RestReadTimeoutSeconds": 15,
"RestWriteTimeoutSeconds": 120,
"RunHosted": false,
Expand Down
12 changes: 12 additions & 0 deletions network/limitlistener/helper_stub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

//go:build !aix && !darwin && !dragonfly && !freebsd && !linux && !netbsd && !openbsd && !solaris && !windows
// +build !aix,!darwin,!dragonfly,!freebsd,!linux,!netbsd,!openbsd,!solaris,!windows

package limitlistener_test

func maxOpenFiles() int {
return defaultMaxOpenFiles
}
18 changes: 18 additions & 0 deletions network/limitlistener/helper_unix_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

//go:build aix || darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris
// +build aix darwin dragonfly freebsd linux netbsd openbsd solaris

package limitlistener_test

import "syscall"

func maxOpenFiles() int {
var rlim syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlim); err != nil {
return defaultMaxOpenFiles
}
return int(rlim.Cur)
}
9 changes: 9 additions & 0 deletions network/limitlistener/helper_windows_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package limitlistener_test

func maxOpenFiles() int {
return 4 * defaultMaxOpenFiles /* actually it's 16581375 */
}
Loading