-
Notifications
You must be signed in to change notification settings - Fork 109
P2p validate accounting #2051
P2p validate accounting #2051
Changes from all commits
4e226df
143a586
22b4f59
90ebd7e
3a62941
b4f7bd0
369cd81
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -71,7 +71,7 @@ type Price struct { | |
| // A protocol provides the message price in absolute value | ||
| // This method then returns the correct signed amount, | ||
| // depending on who pays, which is identified by the `payer` argument: | ||
| // `Send` will pass a `Sender` payer, `Receive` will pass the `Receiver` argument. | ||
| // Sending will pass a `Sender` payer, receiving will pass the `Receiver` argument. | ||
| // Thus: If Sending and sender pays, amount negative, otherwise positive | ||
| // If Receiving, and receiver pays, amount negative, otherwise positive | ||
| func (p *Price) For(payer Payer, size uint32) int64 { | ||
|
|
@@ -93,6 +93,10 @@ type Balance interface { | |
| // positive amount = credit local node | ||
| // negative amount = debit local node | ||
| Add(amount int64, peer *Peer) error | ||
| // Check is a dry-run for the Add operation: | ||
| // As the accounting takes place **after** the actual send/receive operation happens, | ||
| // we want to make sure that that operation would not result in any problem | ||
| Check(amount int64, peer *Peer) error | ||
| } | ||
|
|
||
| // Accounting implements the Hook interface | ||
|
|
@@ -118,44 +122,35 @@ func SetupAccountingMetrics(reportInterval time.Duration, path string) *Accounti | |
| return NewAccountingMetrics(metrics.AccountingRegistry, reportInterval, path) | ||
| } | ||
|
|
||
| // Send takes a peer, a size and a msg and | ||
| // - calculates the cost for the local node sending a msg of size to peer querying the message for its price | ||
| // - credits/debits local node using balance interface | ||
| func (ah *Accounting) Send(peer *Peer, size uint32, msg interface{}) error { | ||
| // get the price for a message | ||
| var pricedMessage PricedMessage | ||
| var ok bool | ||
| // if the msg implements `Price`, it is an accounted message | ||
| if pricedMessage, ok = msg.(PricedMessage); !ok { | ||
| return nil | ||
| } | ||
| // evaluate the price for sending messages | ||
| costToLocalNode := pricedMessage.Price().For(Sender, size) | ||
| // Apply takes a peer, the signed cost for the local node and the msg size and credits/debits local node using balance interface | ||
| func (ah *Accounting) Apply(peer *Peer, costToLocalNode int64, size uint32) error { | ||
| // do the accounting | ||
| err := ah.Add(costToLocalNode, peer) | ||
| // record metrics: just increase counters for user-facing metrics | ||
| ah.doMetrics(costToLocalNode, size, err) | ||
| return err | ||
| } | ||
|
|
||
| // Receive takes a peer, a size and a msg and | ||
| // - calculates the cost for the local node receiving a msg of size from peer querying the message for its price | ||
| // - credits/debits local node using balance interface | ||
| func (ah *Accounting) Receive(peer *Peer, size uint32, msg interface{}) error { | ||
| // Validate calculates the cost for the local node sending or receiving a msg to/from a peer querying the message for its price. | ||
| // It returns either the signed cost for the local node as int64 or an error, signaling that the accounting operation would fail | ||
| // (no change has been applied at this point) | ||
| func (ah *Accounting) Validate(peer *Peer, size uint32, msg interface{}, payer Payer) (int64, error) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. conceptually, i don't understand the difference between this function and this one isn't mentioned in the PR description but it seems to be new
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well
mortelli marked this conversation as resolved.
|
||
| // get the price for a message (by querying the message type via the PricedMessage interface) | ||
| var pricedMessage PricedMessage | ||
| var ok bool | ||
| // if the msg implements `Price`, it is an accounted message | ||
| if pricedMessage, ok = msg.(PricedMessage); !ok { | ||
| return nil | ||
| return 0, nil | ||
| } | ||
| // evaluate the price for receiving messages | ||
| costToLocalNode := pricedMessage.Price().For(Receiver, size) | ||
| // do the accounting | ||
| err := ah.Add(costToLocalNode, peer) | ||
| // record metrics: just increase counters for user-facing metrics | ||
| ah.doMetrics(costToLocalNode, size, err) | ||
| return err | ||
| costToLocalNode := pricedMessage.Price().For(payer, size) | ||
| // check that the operation would perform correctly | ||
| err := ah.Check(costToLocalNode, peer) | ||
| if err != nil { | ||
| // signal to caller that the operation would fail | ||
| return 0, err | ||
| } | ||
| return costToLocalNode, nil | ||
| } | ||
|
|
||
| // record some metrics | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -115,10 +115,10 @@ func errorf(code int, format string, params ...interface{}) *Error { | |
| //To access this functionality, we provide a Hook interface which will call accounting methods | ||
| //NOTE: there could be more such (horizontal) hooks in the future | ||
| type Hook interface { | ||
| //A hook for sending messages | ||
| Send(peer *Peer, size uint32, msg interface{}) error | ||
| //A hook for receiving messages | ||
| Receive(peer *Peer, size uint32, msg interface{}) error | ||
| // A hook for applying accounting | ||
| Apply(peer *Peer, costToLocalNode int64, size uint32) error | ||
| // Run some validation before applying accounting | ||
| Validate(peer *Peer, size uint32, msg interface{}, payer Payer) (int64, error) | ||
| } | ||
|
|
||
| // Spec is a protocol specification including its name and version as well as | ||
|
|
@@ -203,6 +203,7 @@ type Peer struct { | |
| spec *Spec | ||
| encode func(context.Context, interface{}) (interface{}, int, error) | ||
| decode func(p2p.Msg) (context.Context, []byte, error) | ||
| lock sync.Mutex | ||
| } | ||
|
|
||
| // NewPeer constructs a new peer | ||
|
|
@@ -277,15 +278,31 @@ func (p *Peer) Send(ctx context.Context, msg interface{}) error { | |
| } | ||
| size = len(r) | ||
| } | ||
| // if the accounting hook is set, call it | ||
|
|
||
| // if the accounting hook is set, do accounting logic | ||
| if p.spec.Hook != nil { | ||
| err = p.spec.Hook.Send(p, uint32(size), msg) | ||
|
|
||
| // let's lock, we want to avoid that after validating, a separate call might interfere | ||
| p.lock.Lock() | ||
| defer p.lock.Unlock() | ||
| // validate that this operation would succeed... | ||
| costToLocalNode, err := p.spec.Hook.Validate(p, uint32(size), wmsg, Sender) | ||
|
holisticode marked this conversation as resolved.
|
||
| if err != nil { | ||
| // ...because if it would fail, we return and don't send the message | ||
| return err | ||
| } | ||
| // seems like accounting would succeed, thus send the message first... | ||
| err = p2p.Send(p.rw, code, wmsg) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| // ...and finally apply (write) the accounting change | ||
| err = p.spec.Hook.Apply(p, costToLocalNode, uint32(size)) | ||
| } else { | ||
| err = p2p.Send(p.rw, code, wmsg) | ||
| } | ||
|
|
||
| return p2p.Send(p.rw, code, wmsg) | ||
| return err | ||
| } | ||
|
|
||
| // handleIncoming(code) | ||
|
|
@@ -322,23 +339,40 @@ func (p *Peer) handleIncoming(handle func(ctx context.Context, msg interface{}) | |
| return errorf(ErrDecode, "<= %v: %v", msg, err) | ||
| } | ||
|
|
||
| // if the accounting hook is set, call it | ||
| // if the accounting hook is set, do accounting logic | ||
| if p.spec.Hook != nil { | ||
| err := p.spec.Hook.Receive(p, uint32(len(msgBytes)), val) | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No problem, thanks for the heads-up!
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the heads up |
||
| p.lock.Lock() | ||
| defer p.lock.Unlock() | ||
|
|
||
| size := uint32(len(msgBytes)) | ||
|
|
||
| // validate that the accounting call would succeed... | ||
| costToLocalNode, err := p.spec.Hook.Validate(p, size, val, Receiver) | ||
| if err != nil { | ||
| // ...because if it would fail, we return and don't handle the message | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| // call the registered handler callbacks | ||
| // a registered callback take the decoded message as argument as an interface | ||
| // which the handler is supposed to cast to the appropriate type | ||
| // it is entirely safe not to check the cast in the handler since the handler is | ||
| // chosen based on the proper type in the first place | ||
| if err := handle(ctx, val); err != nil { | ||
| return errorf(ErrHandler, "(msg code %v): %v", msg.Code, err) | ||
| // seems like accounting would be fine, so handle the message | ||
| if err := handle(ctx, val); err != nil { | ||
|
ralph-pichler marked this conversation as resolved.
|
||
| return errorf(ErrHandler, "(msg code %v): %v", msg.Code, err) | ||
| } | ||
|
|
||
| // handling succeeded, finally apply accounting | ||
| err = p.spec.Hook.Apply(p, costToLocalNode, size) | ||
| } else { | ||
| // call the registered handler callbacks | ||
| // a registered callback take the decoded message as argument as an interface | ||
| // which the handler is supposed to cast to the appropriate type | ||
| // it is entirely safe not to check the cast in the handler since the handler is | ||
| // chosen based on the proper type in the first place | ||
| if err := handle(ctx, val); err != nil { | ||
| return errorf(ErrHandler, "(msg code %v): %v", msg.Code, err) | ||
| } | ||
| } | ||
| return nil | ||
|
|
||
| return err | ||
| } | ||
|
|
||
| // Handshake negotiates a handshake on the peer connection | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -304,7 +304,7 @@ func keyToID(key string, prefix string) enode.ID { | |
| return enode.HexID(key[len(prefix):]) | ||
| } | ||
|
|
||
| // createOwner assings keys and addresses | ||
| // createOwner assigns keys and addresses | ||
| func createOwner(prvkey *ecdsa.PrivateKey) *Owner { | ||
| pubkey := &prvkey.PublicKey | ||
| return &Owner{ | ||
|
|
@@ -314,20 +314,45 @@ func createOwner(prvkey *ecdsa.PrivateKey) *Owner { | |
| } | ||
| } | ||
|
|
||
| // modifyBalanceOk checks that the amount would not result in crossing the disconnection threshold | ||
| func (s *Swap) modifyBalanceOk(amount int64, swapPeer *Peer) (err error) { | ||
| // check if balance with peer is over the disconnect threshold and if the message would increase the existing debt | ||
| balance := swapPeer.getBalance() | ||
| if balance >= s.params.DisconnectThreshold && amount > 0 { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know you did not introduce this line, but it seems to me that here, we would need to test whether But I think we should, in this case, we should disconnect whenever we receive a single message with a positive amount.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be sure, it would be great to see where and by whom this line was introduced.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change was introduced by @mortelli with the idea to always allow debt-reducing messages. Let him comment on this; in any case, if this would have to change, it is an important change and then it should be done in a separate PR
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I vaguely remember this... This comment is not a blocker for me, but would like to get reassurance from @mortelli :)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indeed, i introduced this change through PR #1922. the only difference in checking in the first case, one message will go over the threshold and then stop the flow; in the second, the threshold will never be reached or surpassed, but there might be a bigger difference between the threshold and the final balance before subsequent messages are stopped. while correct, the situation you mentioned is unlikely to come up: if a message is priced at assuming then, that |
||
| return fmt.Errorf("balance for peer %s is over the disconnect threshold %d and cannot incur more debt, disconnecting", swapPeer.ID().String(), s.params.DisconnectThreshold) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // Check is called as a *dry run* before applying the actual accounting to an operation. | ||
| // It only checks that performing a given accounting operation would not incur in an error. | ||
| // If it returns no error, this signals to the caller that the operation is safe | ||
| func (s *Swap) Check(amount int64, peer *protocols.Peer) (err error) { | ||
| swapPeer := s.getPeer(peer.ID()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You do
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's because
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand then why you would call it at this place at all. Seemingly all you are doing is verifying whether the peer is a swapPeer, but this is also the first thing you are doing in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried to explain before but I'll try again, a bit more explicit. There are two things here:
The other thing is that we are locking the Your optimization suggestions would be more stringent if we can remove the lock from
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. However your second suggestion is good, I changed |
||
| if swapPeer == nil { | ||
| return fmt.Errorf("peer %s not a swap enabled peer", peer.ID().String()) | ||
| } | ||
|
|
||
| swapPeer.lock.Lock() | ||
| defer swapPeer.lock.Unlock() | ||
| // currently this is the only real check needed: | ||
| return s.modifyBalanceOk(amount, swapPeer) | ||
| } | ||
|
|
||
| // Add is the (sole) accounting function | ||
| // Swap implements the protocols.Balance interface | ||
| func (s *Swap) Add(amount int64, peer *protocols.Peer) (err error) { | ||
| swapPeer := s.getPeer(peer.ID()) | ||
| if swapPeer == nil { | ||
| return fmt.Errorf("peer %s not a swap enabled peer", peer.ID().String()) | ||
| } | ||
|
|
||
| swapPeer.lock.Lock() | ||
| defer swapPeer.lock.Unlock() | ||
|
|
||
| // check if balance with peer is over the disconnect threshold and if the message would increase the existing debt | ||
| balance := swapPeer.getBalance() | ||
| if balance >= s.params.DisconnectThreshold && amount > 0 { | ||
| return fmt.Errorf("balance for peer %s is over the disconnect threshold %d and cannot incur more debt, disconnecting", peer.ID().String(), s.params.DisconnectThreshold) | ||
| // we should probably check here again: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why? With the peer under lock the balance couldn't have changed since the call to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is correct. However, just the nature of the two being separate functions could lead to the situation that Even if the probability is very low, it is correct to check again. However, I am fine removing the check if a majority thinks it is superfluous.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm. I don't really like this double-check approach. From just looking at it, it looks as if you are doing something redundantly and it is probably also not water-tight. A different solution could be that the Check function would leave a trace in Swap that a certain amount has been checked and Add only executes on a validated amount. For example, we could create a mapping where each amount maps to a bool (Checked)), to be set to true during the function Might be too much engineering though. Let me know your thoughts!
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing the additional check, and adding a comment that it is the responsibility of the calling function to check the balance would be more than enough, I think though! |
||
| if err = s.modifyBalanceOk(amount, swapPeer); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if err = swapPeer.updateBalance(amount); err != nil { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.