Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions catchup/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ type testUnicastPeer struct {
version string
responseChannels map[uint64]chan *network.Response
t *testing.T
responseOverride *network.Response
}

func (p *testUnicastPeer) GetAddress() string {
Expand All @@ -254,6 +255,10 @@ func (p *testUnicastPeer) Request(ctx context.Context, tag protocol.Tag, topics
require.NotNil(p.t, dispather)
dispather.Handle(network.IncomingMessage{Tag: tag, Data: topics.MarshallTopics(), Sender: p, Net: p.gn})

if p.responseOverride != nil {
return p.responseOverride, nil
}

// wait for the channel.
select {
case resp = <-responseChannel:
Expand Down Expand Up @@ -297,10 +302,15 @@ func (p *testUnicastPeer) Unicast(ctx context.Context, msg []byte, tag protocol.
}

func makeTestUnicastPeer(gn network.GossipNode, t *testing.T) network.UnicastPeer {
return makeTestUnicastPeerWithResponseOverride(gn, t, nil)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add an empty line


func makeTestUnicastPeerWithResponseOverride(gn network.GossipNode, t *testing.T, responseOverride *network.Response) network.UnicastPeer {
wsp := testUnicastPeer{}
wsp.gn = gn
wsp.t = t
wsp.version = network.ProtocolVersion
wsp.responseChannels = make(map[uint64]chan *network.Response)
wsp.responseOverride = responseOverride
return &wsp
}
133 changes: 119 additions & 14 deletions catchup/universalFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,21 @@ func (uf *universalBlockFetcher) fetchBlock(ctx context.Context, round basics.Ro
return block, cert, downloadDuration, err
}

func processBlockBytes(fetchedBuf []byte, r basics.Round, debugStr string) (blk *bookkeeping.Block, cert *agreement.Certificate, err error) {
func processBlockBytes(fetchedBuf []byte, r basics.Round, peerAddr string) (blk *bookkeeping.Block, cert *agreement.Certificate, err error) {
var decodedEntry rpcs.EncodedBlockCert
err = protocol.Decode(fetchedBuf, &decodedEntry)
if err != nil {
err = fmt.Errorf("fetchBlock(%d): cannot decode block from peer %v: %v", r, debugStr, err)
err = makeErrCannotDecodeBlock(r, peerAddr, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is interesting, since error is interface, I thought one needs to return a pointer to a struct. But attaching Error() to the type, not pointer looks working, nice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like a pointer or struct can be passed for an interface both for return type, or a function parameter.

return
}

if decodedEntry.Block.Round() != r {
err = fmt.Errorf("fetchBlock(%d): got wrong block from peer %v: wanted %v, got %v", r, debugStr, r, decodedEntry.Block.Round())
err = makeErrWrongBlockFromPeer(r, decodedEntry.Block.Round(), peerAddr)
return
}

if decodedEntry.Certificate.Round != r {
err = fmt.Errorf("fetchBlock(%d): got wrong cert from peer %v: wanted %v, got %v", r, debugStr, r, decodedEntry.Certificate.Round)
err = makeErrWrongCertFromPeer(r, decodedEntry.Certificate.Round, peerAddr)
return
}
return &decodedEntry.Block, &decodedEntry.Certificate, nil
Expand Down Expand Up @@ -137,15 +137,15 @@ func (w *wsFetcherClient) getBlockBytes(ctx context.Context, r basics.Round) ([]
if err != nil {
return nil, err
}
if len(blockBytes) == 0 {
if len(blockBytes) == 0 { // This case may never happen
return nil, fmt.Errorf("wsFetcherClient(%d): empty response", r)
}
return blockBytes, nil
}

// Address implements FetcherClient
func (w *wsFetcherClient) address() string {
return fmt.Sprintf("[ws] (%v)", w.target.GetAddress())
return fmt.Sprintf("[ws] (%s)", w.target.GetAddress())
}

// requestBlock send a request for block <round> and wait until it receives a response or a context expires.
Expand All @@ -161,20 +161,20 @@ func (w *wsFetcherClient) requestBlock(ctx context.Context, round basics.Round)
}
resp, err := w.target.Request(ctx, protocol.UniEnsBlockReqTag, topics)
if err != nil {
return nil, fmt.Errorf("wsFetcherClient(%s).requestBlock(%d): Request failed, %v", w.target.GetAddress(), round, err)
return nil, makeErrWsFetcherRequestFailed(round, w.target.GetAddress(), err.Error())
}

if errMsg, found := resp.Topics.GetValue(network.ErrorKey); found {
return nil, fmt.Errorf("wsFetcherClient(%s).requestBlock(%d): Request failed, %s", w.target.GetAddress(), round, string(errMsg))
return nil, makeErrWsFetcherRequestFailed(round, w.target.GetAddress(), string(errMsg))
}

blk, found := resp.Topics.GetValue(rpcs.BlockDataKey)
if !found {
return nil, fmt.Errorf("wsFetcherClient(%s): request failed: block data not found", w.target.GetAddress())
return nil, makeErrWsFetcherRequestFailed(round, w.target.GetAddress(), "Block data not found")
}
cert, found := resp.Topics.GetValue(rpcs.CertDataKey)
if !found {
return nil, fmt.Errorf("wsFetcherClient(%s): request failed: cert data not found", w.target.GetAddress())
return nil, makeErrWsFetcherRequestFailed(round, w.target.GetAddress(), "Cert data not found")
}

blockCertBytes := protocol.EncodeReflect(rpcs.PreEncodedBlockCert{
Expand Down Expand Up @@ -236,9 +236,9 @@ func (hf *HTTPFetcher) getBlockBytes(ctx context.Context, r basics.Round) (data
bodyBytes, err := rpcs.ResponseBytes(response, hf.log, fetcherMaxBlockBytes)
hf.log.Warnf("HTTPFetcher.getBlockBytes: response status code %d from '%s'. Response body '%s' ", response.StatusCode, blockURL, string(bodyBytes))
if err == nil {
err = fmt.Errorf("getBlockBytes error response status code %d when requesting '%s'. Response body '%s'", response.StatusCode, blockURL, string(bodyBytes))
err = makeErrHTTPResponse(response.StatusCode, blockURL, fmt.Sprintf("Response body '%s'", string(bodyBytes)))
} else {
err = fmt.Errorf("getBlockBytes error response status code %d when requesting '%s'. %w", response.StatusCode, blockURL, err)
err = makeErrHTTPResponse(response.StatusCode, blockURL, err.Error())
}
return nil, err
}
Expand All @@ -247,7 +247,7 @@ func (hf *HTTPFetcher) getBlockBytes(ctx context.Context, r basics.Round) (data
// response content type is what we'd like it to be.
contentTypes := response.Header["Content-Type"]
if len(contentTypes) != 1 {
err = fmt.Errorf("http block fetcher invalid content type count %d", len(contentTypes))
err = errHTTPResponseContentType{contentTypeCount: len(contentTypes)}
hf.log.Warn(err)
response.Body.Close()
return nil, err
Expand All @@ -259,7 +259,7 @@ func (hf *HTTPFetcher) getBlockBytes(ctx context.Context, r basics.Round) (data
if contentTypes[0] != rpcs.BlockResponseContentType && contentTypes[0] != blockResponseContentTypeOld {
hf.log.Warnf("http block fetcher response has an invalid content type : %s", contentTypes[0])
response.Body.Close()
return nil, fmt.Errorf("http block fetcher invalid content type '%s'", contentTypes[0])
return nil, errHTTPResponseContentType{contentTypeCount: 1, contentType: contentTypes[0]}
}

return rpcs.ResponseBytes(response, hf.log, fetcherMaxBlockBytes)
Expand All @@ -270,3 +270,108 @@ func (hf *HTTPFetcher) getBlockBytes(ctx context.Context, r basics.Round) (data
func (hf *HTTPFetcher) address() string {
return hf.rootURL
}

type errWrongCertFromPeer struct {
round basics.Round
peer string
certRound basics.Round
}

func makeErrWrongCertFromPeer(round, certRound basics.Round, peer string) errWrongCertFromPeer {
return errWrongCertFromPeer{
round: round,
peer: peer,
certRound: certRound}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line


func (wcfpe errWrongCertFromPeer) Error() string {
return fmt.Sprintf("processBlockBytes: got wrong cert from peer %s: wanted %d, got %d",
wcfpe.peer, wcfpe.round, wcfpe.certRound)
}

type errWrongBlockFromPeer struct {
round basics.Round
peer string
certRound basics.Round
}

func makeErrWrongBlockFromPeer(round, certRound basics.Round, peer string) errWrongBlockFromPeer {
return errWrongBlockFromPeer{
round: round,
peer: peer,
certRound: certRound}
}

func (wbfpe errWrongBlockFromPeer) Error() string {
return fmt.Sprintf("processBlockBytes: got wrong block from peer %s: wanted %d, got %d",
wbfpe.peer, wbfpe.round, wbfpe.certRound)
}

type errCannotDecodeBlock struct {
round basics.Round
peer string
err error
}

func makeErrCannotDecodeBlock(round basics.Round, peer string, err error) errCannotDecodeBlock {
return errCannotDecodeBlock{
round: round,
peer: peer,
err: err}
}

func (cdbe errCannotDecodeBlock) Error() string {
return fmt.Sprintf("processBlockBytes: cannot decode block %d from peer %s: %s",
cdbe.round, cdbe.peer, cdbe.err.Error())
}

func (cdbe errCannotDecodeBlock) Unwrap() error {
return cdbe.err
}

type errWsFetcherRequestFailed struct {
round basics.Round
peer string
cause string
}

func makeErrWsFetcherRequestFailed(round basics.Round, peer, cause string) errWsFetcherRequestFailed {
return errWsFetcherRequestFailed{
round: round,
peer: peer,
cause: cause}
}

func (wrfe errWsFetcherRequestFailed)Error () string {
return fmt.Sprintf("wsFetcherClient(%s).requestBlock(%d): Request failed: %s",
wrfe.peer, wrfe.round, wrfe.cause)
}

type errHTTPResponse struct {
responseStatus int
blockURL string
cause string
}

func makeErrHTTPResponse(responseStatus int, blockURL string, cause string) errHTTPResponse {
return errHTTPResponse{
responseStatus: responseStatus,
blockURL: blockURL,
cause: cause}
}

func (hre errHTTPResponse) Error() string {
return fmt.Sprintf("HTTPFetcher.getBlockBytes: error response status code %d when requesting '%s': %s", hre.responseStatus, hre.blockURL, hre.cause)
}

type errHTTPResponseContentType struct {
contentTypeCount int
contentType string
}

func (cte errHTTPResponseContentType) Error() string {
if cte.contentTypeCount == 1 {
return fmt.Sprintf("HTTPFetcher.getBlockBytes: invalid content type: %s", cte.contentType)
}
return fmt.Sprintf("HTTPFetcher.getBlockBytes: invalid content type count: %d", cte.contentTypeCount)
}
Loading