p2p, network, bzzeth: p2p protocol handlers to async#2018
Conversation
|
|
||
| // expect peer disconnection | ||
| err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("subprotocol error")}) | ||
| err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("Message handler error: (msg code 0): unsolicited chunk delivery from peer: cannot find ruid")}) |
There was a problem hiding this comment.
This is an example of difference in disconnect errors now that I mentioned in 3. in the description.
There was a problem hiding this comment.
@zelig It appears you were right. The disconnect will eventually happen with subprotocol error anyway. But, if we return the specific error from the run loop, instead of doing a peer.Stop(), the actual error will be broadcasted via peer broadcast (which is how it is checked in the tester.TestDisonnected). I am still not sure if it is useful to us, but it's still probably more clean.
janos
left a comment
There was a problem hiding this comment.
I agree on the approach and would like this PR polished.
- Write more isolated test to test and document the behavior of
Runevent loop.
That would be great.
- Rethink stream graceful shutdown a bit.
👍
- This PR does not cover logging errors and making the actual error more nice. If we agree on this proposal, we can continue to making an error more nice and refactor a code to look a bit better (ex. use context cancellation instead of the
runningbool + mutex in protocol.Run, etc ...). Also, if we don't think that there is any benefit from 3., I can reduce the code not to support this.
It would be great to address errors also, if it could be in a followup PR, to make them smaller, and leave this one with only async protocols. I think that there is a lot of room for error improvements and that it may make changes in this PR harder to reviw.
- Investigate
handleMsgPauserin stream, that is used for tests and see how it feels this kind of loop.
Pauser would be useful if it would be available on protocols level for testing purposes.
- It appears that
serverCollectBatchfunction in stream produces some leaking go routines, should investigate it a bit. It looks like it is happening on amasterbranch as well, but still, might happen more now.
We should investigate this, as serverCollectBatch produces long running goroutines when they are waiting for chunk descriptions from the subscription. They can leak if peer or registry are not closed.
LMKWYT :)
| spec *Spec | ||
| encode func(context.Context, interface{}) (interface{}, int, error) | ||
| decode func(p2p.Msg) (context.Context, []byte, error) | ||
| eg *errgroup.Group // error group used for executing handlers asynchronously |
There was a problem hiding this comment.
I think that eg can be a value, since Peer is using pointer semantics.
| p.running = false | ||
| p.mtx.Unlock() | ||
|
|
||
| p.eg.Wait() |
There was a problem hiding this comment.
The comment on error handing inside errgroup with stop should justify not handling error on Wait.
But, if it is not needed to handle the error here, could errgroup be replaced with WaitGroup?
There was a problem hiding this comment.
Yup, nice catch! I left it there just in case if we decide to do shutdown with commented code. I am not sure if we are going to need that, depends on the graceful shutdown. I will switch to wait group for now then and bring back eg if needed.
zelig
left a comment
There was a problem hiding this comment.
Brilliant PR thanks
- many places logging an error and returning the same error with less context.
Consider keeping the richer context of the log in the error but only return the error - the stream package needs serious cleanup, I put a lot of comments but feel free to keep changes to the minimum and defer sorting it out in subsequent PRs by whoever takes it up
| deliveredCnt++ | ||
| p.logger.Trace("bzzeth.handleNewBlockHeaders", "hash", ch.Address().Hex(), "delivered", deliveredCnt) | ||
|
|
||
| req.lock.RLock() |
There was a problem hiding this comment.
oops, this was not async beore or you found a bug?
There was a problem hiding this comment.
It was async before as well. I am not sure is it really a bug, but it might be potentially dangerous. Basically, if I am right, there is a lock in req that is to be used to update hashes map. This lock is used for update but not for read, which might not be fatal, but still a bit not correct :)
| p.logger.Warn("bzzeth.handleBlockHeaders: nonexisting request id", "id", msg.Rid) | ||
| p.Drop("nonexisting request id") | ||
| return | ||
| p.logger.Warn("", "id", msg.Rid) |
There was a problem hiding this comment.
log not needed, and certainly not with empty message
| if err != nil { | ||
| p.logger.Warn("bzzeth.handleBlockHeaders: fatal dropping peer", "id", msg.Rid, "err", err) | ||
| p.Drop("error on deliverAndStoreAll") | ||
| return fmt.Errorf("bzzeth.handleBlockHeaders: fatal dropping peer, id: %d err: %w", msg.Rid, err) |
There was a problem hiding this comment.
how about just
return b.deliverAndStoreAll(ctx, req, headers)on line 246
| // wait for all validations to get over and close the channels | ||
| err := wg.Wait() | ||
|
|
||
| // finish storage is used mostly in testing |
| func (h *Hive) handleSubPeersMsg(ctx context.Context, d *Peer, msg *subPeersMsg) error { | ||
| d.setDepth(msg.Depth) | ||
| // only send peers after the initial subPeersMsg | ||
| h.lock.Lock() |
There was a problem hiding this comment.
Correct, it was not, it used to be sync. Basically, the check for sentPeers boolean value should pass only once, and then all other calls should skip this part. Does it makes sense?
| // * handles decoding with reflection, | ||
| // * call handlers as callbacks | ||
| func (p *Peer) handleMsg(msg *p2p.Msg, handle func(ctx context.Context, msg interface{}) error) error { | ||
| // make sure that the payload has been fully consume |
| if err != nil { | ||
| if err != io.EOF { | ||
| metrics.GetOrRegisterCounter("peer.handleincoming.error", nil).Inc(1) | ||
| log.Error("peer.handleIncoming", "err", err) |
There was a problem hiding this comment.
change metrics name and log message according to rename if needed
| // * handles decoding with reflection, | ||
| // * call handlers as callbacks | ||
| func (p *Peer) handleIncoming(handle func(ctx context.Context, msg interface{}) error) error { | ||
| // Receive(code) is a sync call that handles incoming message with provided message handler |
There was a problem hiding this comment.
Some copy-paste mistake 🙈
| // * call handlers as callbacks | ||
| func (p *Peer) handleIncoming(handle func(ctx context.Context, msg interface{}) error) error { | ||
| // Receive(code) is a sync call that handles incoming message with provided message handler | ||
| func (p *Peer) Receive(handler func(ctx context.Context, msg interface{}) error) error { |
There was a problem hiding this comment.
Nope, good catch. We can export it if needed, later
|
|
||
| if msg.Size > p.spec.MaxMsgSize { | ||
| return errorf(ErrMsgTooLong, "%v > %v", msg.Size, p.spec.MaxMsgSize) | ||
| err := errorf(ErrMsgTooLong, "%v > %v", msg.Size, p.spec.MaxMsgSize) |
There was a problem hiding this comment.
Oh, sorry, it's probably just my debug leftover 🙈
|
Thanks @zelig! I agree with both points, I left it for the next PR, as @janos and me wanted to go through errors, especially in stream, after this! I will fix all other suggested changes, and leave error related for the reference for the next PR to avoid to bloat this one too much. I will also add couple of unit tests in this one. Sounds good? |
|
@janos, @zelig I believe I fixed what was suggested in the comments. Some stuff related to the stream, errors and pause is left for the next error-related PRs, should come very soon. Also, during the error pruning, I will investigate a bit more if need the correct error in the node events, and if we don't need it (no need to propagate errors, other then readMsg errors in the run method), then I will switch back to peer drop. Basically, it will just make our run function less complicated, but we will still log correct errors. |
|
Smoke tests seem to pass. @janos thanks for help! |
janos
left a comment
There was a problem hiding this comment.
@pradovic if you could address these two minor comments https://github.com/ethersphere/swarm/pull/2018/files#r357484127 https://github.com/ethersphere/swarm/pull/2018/files#r357485691, otherwise, LGTM.
| }() | ||
|
|
||
| return func() error { | ||
| return <-errc |
There was a problem hiding this comment.
goroutine leak? write into the channel a nil value in the main goroutine when the function returns
There was a problem hiding this comment.
Hmmm, the only place where this function is called is in the run method, which is waiting for this channel in order to return. How can it leak? I am not sure wdym 🙈 Maybe it could leak if there was multiple callers waiting, but this is not the use-case for now.
There was a problem hiding this comment.
well, if you'll look again you'll see that errc is only written to in case of an error, but the channel is never closed otherwise. for cleanliness' sake i'd be happy if there is a defer close(errc) in the main goroutine that is on line 249
There was a problem hiding this comment.
Hmmm, it makes sense in general, but then the "lingering" goroutines can not send to it. It can be solved in there implementation, but since we abandoned this approach in the error related PR I will not update it now, and will keep this in mind if we switch back. Nice catch, thanks!
| p.logger.Error("retrieval.handleRetrieveRequest - peer delivery failed", "ref", msg.Addr, "err", err) | ||
| osp.LogFields(olog.Bool("delivered", false)) | ||
| return | ||
| // continue in event loop |
There was a problem hiding this comment.
@zelig if a chunk is not found in this case an error will be returned, then the peer is dropped as a result (which is incorrect if the chunk cannot be found)
| p.logger.Error("netstore error putting chunk to localstore", "err", err) | ||
| if err == storage.ErrChunkInvalid { | ||
| p.Drop("invalid chunk in netstore put") | ||
| return fmt.Errorf("netstore error putting chunk to localstore: %s", err) |
| delete(p.openOffers, msg.Ruid) | ||
| p.mtx.Unlock() | ||
| p.Drop("error sending offered hashes") | ||
| return fmt.Errorf("error sending offered hashes: %w", err) |
There was a problem hiding this comment.
i chose to delete the entry for the sake of cleanliness, but indeed it is not necessary since the whole peer object should be disposed of when a send error occurs
| return r.requestSubsequentRange(ctx, p, provider, w, msg.LastIndex) | ||
| } | ||
|
|
||
| if errc == nil { |
There was a problem hiding this comment.
the only case where errc is nil is when the if statement in line 559 is entered. in that case, the if statement in line 576 would be entered too, and the subsequent range would be requested. as a result, it is not possible that this if block will ever be triggered so it is safe to remove it IMO
| select { | ||
| case <-done: | ||
| case <-time.After(5 * time.Second): | ||
|
|
acud
left a comment
There was a problem hiding this comment.
LGTM. Please just rebase to resolve conflicts
network, p2p: move msg pauser to protocols
This is an idea how to run handlers async in
p2p.Peerrun event loop. This is still in progress so I still did not comment all the code, or refactored it to look better. Main idea was to try to:peer.Drop. If I am right, this disconnects the peer with the actual produced error instead of the subprotocol with error string from the handler. I am not sure if this is important or desired. If not, we could simplify the code in the run loop a bit.There is a couple of
todocomments that I need to investigate more, I would appreciate any input about those :)todo:
Runevent loop.runningbool + mutex in protocol.Run, etc ...). Also, if we don't think that there is any benefit from 3., I can reduce the code not to support this.handleMsgPauserin stream, that is used for tests and see how it feels this kind of loop.serverCollectBatchfunction in stream produces some leaking go routines, should investigate it a bit. It looks like it is happening on amasterbranch as well, but still, might happen more now.LMKWYT :)