Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.

p2p, network, bzzeth: p2p protocol handlers to async#2018

Merged
pradovic merged 43 commits into
masterfrom
protocol-async-errors
Jan 9, 2020
Merged

p2p, network, bzzeth: p2p protocol handlers to async#2018
pradovic merged 43 commits into
masterfrom
protocol-async-errors

Conversation

@pradovic
Copy link
Copy Markdown
Contributor

@pradovic pradovic commented Dec 10, 2019

This is an idea how to run handlers async in p2p.Peer run event loop. This is still in progress so I still did not comment all the code, or refactored it to look better. Main idea was to try to:

  1. Make all handlers execution async
  2. Exit run loop with appropriate error.
  3. Instead of relying on the side peer.Drop. If I am right, this disconnects the peer with the actual produced error instead of the subprotocol with error string from the handler. I am not sure if this is important or desired. If not, we could simplify the code in the run loop a bit.
  4. Separate sync execution needed for handshake and a run loop, as I feel it makes the code more simple. If there is a need for sync run loop, we can easily add this as separate function as well (reusing existing private function in the package)

There is a couple of todo comments that I need to investigate more, I would appreciate any input about those :)

todo:

  1. Write more isolated test to test and document the behavior of Run event loop.
  2. Rethink stream graceful shutdown a bit.
  3. This PR does not cover logging errors and making the actual error more nice. If we agree on this proposal, we can continue to making an error more nice and refactor a code to look a bit better (ex. use context cancellation instead of the running bool + mutex in protocol.Run, etc ...). Also, if we don't think that there is any benefit from 3., I can reduce the code not to support this.
  4. Investigate handleMsgPauser in stream, that is used for tests and see how it feels this kind of loop.
  5. It appears that serverCollectBatch function in stream produces some leaking go routines, should investigate it a bit. It looks like it is happening on a master branch as well, but still, might happen more now.

LMKWYT :)


// expect peer disconnection
err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("subprotocol error")})
err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("Message handler error: (msg code 0): unsolicited chunk delivery from peer: cannot find ruid")})
Copy link
Copy Markdown
Contributor Author

@pradovic pradovic Dec 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of difference in disconnect errors now that I mentioned in 3. in the description.

Copy link
Copy Markdown
Contributor Author

@pradovic pradovic Dec 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zelig It appears you were right. The disconnect will eventually happen with subprotocol error anyway. But, if we return the specific error from the run loop, instead of doing a peer.Stop(), the actual error will be broadcasted via peer broadcast (which is how it is checked in the tester.TestDisonnected). I am still not sure if it is useful to us, but it's still probably more clean.

@pradovic pradovic requested review from janos and zelig December 10, 2019 14:07
Copy link
Copy Markdown
Member

@janos janos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on the approach and would like this PR polished.

  1. Write more isolated test to test and document the behavior of Run event loop.

That would be great.

  1. Rethink stream graceful shutdown a bit.

👍

  1. This PR does not cover logging errors and making the actual error more nice. If we agree on this proposal, we can continue to making an error more nice and refactor a code to look a bit better (ex. use context cancellation instead of the running bool + mutex in protocol.Run, etc ...). Also, if we don't think that there is any benefit from 3., I can reduce the code not to support this.

It would be great to address errors also, if it could be in a followup PR, to make them smaller, and leave this one with only async protocols. I think that there is a lot of room for error improvements and that it may make changes in this PR harder to reviw.

  1. Investigate handleMsgPauser in stream, that is used for tests and see how it feels this kind of loop.

Pauser would be useful if it would be available on protocols level for testing purposes.

  1. It appears that serverCollectBatch function in stream produces some leaking go routines, should investigate it a bit. It looks like it is happening on a master branch as well, but still, might happen more now.

We should investigate this, as serverCollectBatch produces long running goroutines when they are waiting for chunk descriptions from the subscription. They can leak if peer or registry are not closed.

LMKWYT :)

Comment thread p2p/protocols/protocol.go Outdated
spec *Spec
encode func(context.Context, interface{}) (interface{}, int, error)
decode func(p2p.Msg) (context.Context, []byte, error)
eg *errgroup.Group // error group used for executing handlers asynchronously
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that eg can be a value, since Peer is using pointer semantics.

Comment thread p2p/protocols/protocol.go Outdated
p.running = false
p.mtx.Unlock()

p.eg.Wait()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment on error handing inside errgroup with stop should justify not handling error on Wait.

But, if it is not needed to handle the error here, could errgroup be replaced with WaitGroup?

Copy link
Copy Markdown
Contributor Author

@pradovic pradovic Dec 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, nice catch! I left it there just in case if we decide to do shutdown with commented code. I am not sure if we are going to need that, depends on the graceful shutdown. I will switch to wait group for now then and bring back eg if needed.

Copy link
Copy Markdown
Member

@zelig zelig left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Brilliant PR thanks

  • many places logging an error and returning the same error with less context.
    Consider keeping the richer context of the log in the error but only return the error
  • the stream package needs serious cleanup, I put a lot of comments but feel free to keep changes to the minimum and defer sorting it out in subsequent PRs by whoever takes it up

Comment thread bzzeth/bzzeth.go
deliveredCnt++
p.logger.Trace("bzzeth.handleNewBlockHeaders", "hash", ch.Address().Hex(), "delivered", deliveredCnt)

req.lock.RLock()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, this was not async beore or you found a bug?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was async before as well. I am not sure is it really a bug, but it might be potentially dangerous. Basically, if I am right, there is a lock in req that is to be used to update hashes map. This lock is used for update but not for read, which might not be fatal, but still a bit not correct :)

Comment thread bzzeth/bzzeth.go Outdated
p.logger.Warn("bzzeth.handleBlockHeaders: nonexisting request id", "id", msg.Rid)
p.Drop("nonexisting request id")
return
p.logger.Warn("", "id", msg.Rid)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log not needed, and certainly not with empty message

Comment thread bzzeth/bzzeth.go Outdated
if err != nil {
p.logger.Warn("bzzeth.handleBlockHeaders: fatal dropping peer", "id", msg.Rid, "err", err)
p.Drop("error on deliverAndStoreAll")
return fmt.Errorf("bzzeth.handleBlockHeaders: fatal dropping peer, id: %d err: %w", msg.Rid, err)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about just

return  b.deliverAndStoreAll(ctx, req, headers)

on line 246

Comment thread bzzeth/bzzeth.go
// wait for all validations to get over and close the channels
err := wg.Wait()

// finish storage is used mostly in testing
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so is it needed?

Comment thread network/hive.go
func (h *Hive) handleSubPeersMsg(ctx context.Context, d *Peer, msg *subPeersMsg) error {
d.setDepth(msg.Depth)
// only send peers after the initial subPeersMsg
h.lock.Lock()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this was not async before?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, it was not, it used to be sync. Basically, the check for sentPeers boolean value should pass only once, and then all other calls should skip this part. Does it makes sense?

Comment thread p2p/protocols/protocol.go Outdated
// * handles decoding with reflection,
// * call handlers as callbacks
func (p *Peer) handleMsg(msg *p2p.Msg, handle func(ctx context.Context, msg interface{}) error) error {
// make sure that the payload has been fully consume
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumed

Comment thread p2p/protocols/protocol.go Outdated
if err != nil {
if err != io.EOF {
metrics.GetOrRegisterCounter("peer.handleincoming.error", nil).Inc(1)
log.Error("peer.handleIncoming", "err", err)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change metrics name and log message according to rename if needed

Comment thread p2p/protocols/protocol.go Outdated
// * handles decoding with reflection,
// * call handlers as callbacks
func (p *Peer) handleIncoming(handle func(ctx context.Context, msg interface{}) error) error {
// Receive(code) is a sync call that handles incoming message with provided message handler
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(code) ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some copy-paste mistake 🙈

Comment thread p2p/protocols/protocol.go Outdated
// * call handlers as callbacks
func (p *Peer) handleIncoming(handle func(ctx context.Context, msg interface{}) error) error {
// Receive(code) is a sync call that handles incoming message with provided message handler
func (p *Peer) Receive(handler func(ctx context.Context, msg interface{}) error) error {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure we want this exported?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, good catch. We can export it if needed, later

Comment thread p2p/protocols/protocol.go Outdated

if msg.Size > p.spec.MaxMsgSize {
return errorf(ErrMsgTooLong, "%v > %v", msg.Size, p.spec.MaxMsgSize)
err := errorf(ErrMsgTooLong, "%v > %v", msg.Size, p.spec.MaxMsgSize)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why these changes needed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry, it's probably just my debug leftover 🙈

@pradovic
Copy link
Copy Markdown
Contributor Author

pradovic commented Dec 13, 2019

Thanks @zelig! I agree with both points, I left it for the next PR, as @janos and me wanted to go through errors, especially in stream, after this! I will fix all other suggested changes, and leave error related for the reference for the next PR to avoid to bloat this one too much. I will also add couple of unit tests in this one. Sounds good?

@pradovic pradovic requested review from janos and zelig December 17, 2019 12:10
@pradovic
Copy link
Copy Markdown
Contributor Author

pradovic commented Dec 17, 2019

@janos, @zelig I believe I fixed what was suggested in the comments. Some stuff related to the stream, errors and pause is left for the next error-related PRs, should come very soon. Also, during the error pruning, I will investigate a bit more if need the correct error in the node events, and if we don't need it (no need to propagate errors, other then readMsg errors in the run method), then I will switch back to peer drop. Basically, it will just make our run function less complicated, but we will still log correct errors.

@pradovic pradovic self-assigned this Dec 17, 2019
@pradovic
Copy link
Copy Markdown
Contributor Author

Smoke tests seem to pass. @janos thanks for help!

Copy link
Copy Markdown
Member

@janos janos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pradovic pradovic mentioned this pull request Dec 20, 2019
@janos janos mentioned this pull request Dec 23, 2019
@acud acud changed the title p2p, network, bzzeth: p2p protocl handlers to async p2p, network, bzzeth: p2p protocol handlers to async Jan 6, 2020
Comment thread p2p/protocols/protocol.go
Comment thread p2p/protocols/protocol.go
Comment thread p2p/protocols/protocol.go
Comment thread p2p/protocols/protocol.go
}()

return func() error {
return <-errc
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

goroutine leak? write into the channel a nil value in the main goroutine when the function returns

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, the only place where this function is called is in the run method, which is waiting for this channel in order to return. How can it leak? I am not sure wdym 🙈 Maybe it could leak if there was multiple callers waiting, but this is not the use-case for now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, if you'll look again you'll see that errc is only written to in case of an error, but the channel is never closed otherwise. for cleanliness' sake i'd be happy if there is a defer close(errc) in the main goroutine that is on line 249

Copy link
Copy Markdown
Contributor Author

@pradovic pradovic Jan 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, it makes sense in general, but then the "lingering" goroutines can not send to it. It can be solved in there implementation, but since we abandoned this approach in the error related PR I will not update it now, and will keep this in mind if we switch back. Nice catch, thanks!

Comment thread p2p/protocols/protocol.go Outdated
p.logger.Error("retrieval.handleRetrieveRequest - peer delivery failed", "ref", msg.Addr, "err", err)
osp.LogFields(olog.Bool("delivered", false))
return
// continue in event loop
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zelig if a chunk is not found in this case an error will be returned, then the peer is dropped as a result (which is incorrect if the chunk cannot be found)

Comment thread network/retrieval/retrieve.go
p.logger.Error("netstore error putting chunk to localstore", "err", err)
if err == storage.ErrChunkInvalid {
p.Drop("invalid chunk in netstore put")
return fmt.Errorf("netstore error putting chunk to localstore: %s", err)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the chunk is invalid then we should definitely drop the peer, but if netstore Put fails we should definitely consider propagating the error too. @janos? @zelig?

Comment thread p2p/protocols/protocol.go Outdated
Comment thread pss/outbox/outbox_test.go
Comment thread network/stream/stream.go
delete(p.openOffers, msg.Ruid)
p.mtx.Unlock()
p.Drop("error sending offered hashes")
return fmt.Errorf("error sending offered hashes: %w", err)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i chose to delete the entry for the sake of cleanliness, but indeed it is not necessary since the whole peer object should be disposed of when a send error occurs

Comment thread network/stream/stream.go Outdated
return r.requestSubsequentRange(ctx, p, provider, w, msg.LastIndex)
}

if errc == nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only case where errc is nil is when the if statement in line 559 is entered. in that case, the if statement in line 576 would be entered too, and the subsequent range would be requested. as a result, it is not possible that this if block will ever be triggered so it is safe to remove it IMO

Comment thread network/stream/stream.go
select {
case <-done:
case <-time.After(5 * time.Second):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

@pradovic pradovic requested a review from acud January 6, 2020 12:44
Copy link
Copy Markdown
Contributor

@acud acud left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Please just rebase to resolve conflicts

@pradovic pradovic merged commit 16db47b into master Jan 9, 2020
@pradovic pradovic deleted the protocol-async-errors branch January 9, 2020 16:34
@acud acud added this to the 0.5.5 milestone Jan 21, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants