Skip to content

Commit 18cd6cd

Browse files
authored
make sure the block service is not attempting to access the ledger after being stopped. (#3303)
## Summary The block service was attempting to serve block via the http handler even after it has been stopped. This lead to undesired downstream failures in the ledger, which was shutdown as well. ## Test Plan unit test added.
1 parent b2ca02f commit 18cd6cd

File tree

2 files changed

+87
-6
lines changed

2 files changed

+87
-6
lines changed

rpcs/blockService.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package rpcs
1919
import (
2020
"context"
2121
"encoding/binary"
22+
"errors"
2223
"net/http"
2324
"path"
2425
"strconv"
@@ -29,6 +30,8 @@ import (
2930

3031
"github.com/algorand/go-codec/codec"
3132

33+
"github.com/algorand/go-deadlock"
34+
3235
"github.com/algorand/go-algorand/agreement"
3336
"github.com/algorand/go-algorand/config"
3437
"github.com/algorand/go-algorand/crypto"
@@ -61,6 +64,8 @@ const (
6164
BlockAndCertValue = "blockAndCert" // block+cert request data (as the value of requestDataTypeKey)
6265
)
6366

67+
var errBlockServiceClosed = errors.New("block service is shutting down")
68+
6469
// BlockService represents the Block RPC API
6570
type BlockService struct {
6671
ledger *data.Ledger
@@ -74,6 +79,7 @@ type BlockService struct {
7479
enableArchiverFallback bool
7580
log logging.Logger
7681
closeWaitGroup sync.WaitGroup
82+
mu deadlock.Mutex
7783
}
7884

7985
// EncodedBlockCert defines how GetBlockBytes encodes a block and its certificate
@@ -118,6 +124,8 @@ func MakeBlockService(log logging.Logger, config config.Local, ledger *data.Ledg
118124

119125
// Start listening to catchup requests over ws
120126
func (bs *BlockService) Start() {
127+
bs.mu.Lock()
128+
defer bs.mu.Unlock()
121129
if bs.enableServiceOverGossip {
122130
handlers := []network.TaggedMessageHandler{
123131
{Tag: protocol.UniCatchupReqTag, MessageHandler: network.HandlerFunc(bs.processIncomingMessage)},
@@ -133,12 +141,14 @@ func (bs *BlockService) Start() {
133141

134142
// Stop servicing catchup requests over ws
135143
func (bs *BlockService) Stop() {
144+
bs.mu.Lock()
136145
close(bs.stop)
146+
bs.mu.Unlock()
137147
bs.closeWaitGroup.Wait()
138148
}
139149

140150
// ServerHTTP returns blocks
141-
// Either /v{version}/block/{round} or ?b={round}&v={version}
151+
// Either /v{version}/{genesisID}/block/{round} or ?b={round}&v={version}
142152
// Uses gorilla/mux for path argument parsing.
143153
func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Request) {
144154
pathVars := mux.Vars(request)
@@ -200,7 +210,7 @@ func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Re
200210
response.WriteHeader(http.StatusBadRequest)
201211
return
202212
}
203-
encodedBlockCert, err := RawBlockBytes(bs.ledger, basics.Round(round))
213+
encodedBlockCert, err := bs.rawBlockBytes(basics.Round(round))
204214
if err != nil {
205215
switch err.(type) {
206216
case ledgercore.ErrNoEntry:
@@ -321,7 +331,7 @@ func (bs *BlockService) redirectRequest(round uint64, response http.ResponseWrit
321331
bs.log.Debugf("redirectRequest: %s", err.Error())
322332
return false
323333
}
324-
parsedURL.Path = FormatBlockQuery(round, parsedURL.Path, bs.net)
334+
parsedURL.Path = strings.Replace(FormatBlockQuery(round, parsedURL.Path, bs.net), "{genesisID}", bs.genesisID, 1)
325335
http.Redirect(response, request, parsedURL.String(), http.StatusTemporaryRedirect)
326336
bs.log.Debugf("redirectRequest: redirected block request to %s", parsedURL.String())
327337
return true
@@ -356,6 +366,22 @@ func (bs *BlockService) getRandomArchiver() (endpointAddress string) {
356366
return
357367
}
358368

369+
// rawBlockBytes returns the block/cert for a given round, while taking the lock
370+
// to ensure the block service is currently active.
371+
func (bs *BlockService) rawBlockBytes(round basics.Round) ([]byte, error) {
372+
bs.mu.Lock()
373+
defer bs.mu.Unlock()
374+
select {
375+
case _, ok := <-bs.stop:
376+
if !ok {
377+
// service is closed.
378+
return nil, errBlockServiceClosed
379+
}
380+
default:
381+
}
382+
return RawBlockBytes(bs.ledger, round)
383+
}
384+
359385
func topicBlockBytes(log logging.Logger, dataLedger *data.Ledger, round basics.Round, requestType string) network.Topics {
360386
blk, cert, err := dataLedger.EncodedBlockCert(round)
361387
if err != nil {

rpcs/blockService_test.go

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ package rpcs
1919
import (
2020
"context"
2121
"fmt"
22+
"io/ioutil"
2223
"net/http"
24+
"strings"
2325
"testing"
2426
"time"
2527

@@ -118,7 +120,7 @@ func TestHandleCatchupReqNegative(t *testing.T) {
118120
require.Equal(t, roundNumberParseErrMsg, string(val))
119121
}
120122

121-
// TestRedirectBasic tests the case when the block service redirects the request to elsewhere
123+
// TestRedirectFallbackArchiver tests the case when the block service fallback to another in the absense of a given block.
122124
func TestRedirectFallbackArchiver(t *testing.T) {
123125
partitiontest.PartitionTest(t)
124126

@@ -136,8 +138,8 @@ func TestRedirectFallbackArchiver(t *testing.T) {
136138
net2 := &httpTestPeerSource{}
137139

138140
config := config.GetDefaultLocal()
139-
bs1 := MakeBlockService(log, config, ledger1, net1, "{genesisID}")
140-
bs2 := MakeBlockService(log, config, ledger2, net2, "{genesisID}")
141+
bs1 := MakeBlockService(log, config, ledger1, net1, "test-genesis-ID")
142+
bs2 := MakeBlockService(log, config, ledger2, net2, "test-genesis-ID")
141143

142144
nodeA := &basicRPCNode{}
143145
nodeB := &basicRPCNode{}
@@ -159,6 +161,7 @@ func TestRedirectFallbackArchiver(t *testing.T) {
159161

160162
ctx := context.Background()
161163
parsedURL.Path = FormatBlockQuery(uint64(2), parsedURL.Path, net1)
164+
parsedURL.Path = strings.Replace(parsedURL.Path, "{genesisID}", "test-genesis-ID", 1)
162165
blockURL := parsedURL.String()
163166
request, err := http.NewRequest("GET", blockURL, nil)
164167
require.NoError(t, err)
@@ -170,6 +173,58 @@ func TestRedirectFallbackArchiver(t *testing.T) {
170173
require.NoError(t, err)
171174

172175
require.Equal(t, http.StatusOK, response.StatusCode)
176+
bodyData, err := ioutil.ReadAll(response.Body)
177+
require.NoError(t, err)
178+
require.NotEqual(t, 0, len(bodyData))
179+
}
180+
181+
// TestBlockServiceShutdown tests that the block service is shutting down correctly.
182+
func TestBlockServiceShutdown(t *testing.T) {
183+
partitiontest.PartitionTest(t)
184+
185+
log := logging.TestingLog(t)
186+
187+
ledger1 := makeLedger(t, "l1")
188+
addBlock(t, ledger1)
189+
190+
net1 := &httpTestPeerSource{}
191+
192+
config := config.GetDefaultLocal()
193+
bs1 := MakeBlockService(log, config, ledger1, net1, "test-genesis-ID")
194+
bs1.Start()
195+
196+
nodeA := &basicRPCNode{}
197+
198+
nodeA.RegisterHTTPHandler(BlockServiceBlockPath, bs1)
199+
nodeA.start()
200+
defer nodeA.stop()
201+
202+
parsedURL, err := network.ParseHostOrURL(nodeA.rootURL())
203+
require.NoError(t, err)
204+
205+
client := http.Client{}
206+
207+
ctx := context.Background()
208+
parsedURL.Path = FormatBlockQuery(uint64(1), parsedURL.Path, net1)
209+
parsedURL.Path = strings.Replace(parsedURL.Path, "{genesisID}", "test-genesis-ID", 1)
210+
blockURL := parsedURL.String()
211+
request, err := http.NewRequest("GET", blockURL, nil)
212+
require.NoError(t, err)
213+
requestCtx, requestCancel := context.WithTimeout(ctx, time.Duration(config.CatchupHTTPBlockFetchTimeoutSec)*time.Second)
214+
defer requestCancel()
215+
request = request.WithContext(requestCtx)
216+
network.SetUserAgentHeader(request.Header)
217+
218+
requestDone := make(chan struct{})
219+
go func() {
220+
defer close(requestDone)
221+
client.Do(request)
222+
}()
223+
224+
bs1.Stop()
225+
ledger1.Close()
226+
227+
<-requestDone
173228
}
174229

175230
// TestRedirectBasic tests the case when the block service redirects the request to elsewhere

0 commit comments

Comments
 (0)