Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor state based relaying (fixes update client bug on retries) #435

Merged
merged 15 commits into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 140 additions & 8 deletions relayer/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,14 +324,142 @@ func (c *Chain) MsgTransfer(dst *PathEnd, amount sdk.Coin, dstAddr string,
)
}

// MsgRelayTimeout
colin-axner marked this conversation as resolved.
Show resolved Hide resolved
func (c *Chain) MsgRelayTimeout(counterparty *Chain, packet *relayMsgTimeout) (msgs []sdk.Msg, err error) {
var recvRes *chantypes.QueryPacketReceiptResponse
// TODO: try commenting out retries to reduce complexity
// retry getting commit response until it succeeds
colin-axner marked this conversation as resolved.
Show resolved Hide resolved
if err = retry.Do(func() error {
// NOTE: Timeouts currently only work with ORDERED channels for nwo
// NOTE: the proof height uses - 1 due to tendermint's delayed execution model
recvRes, err = counterparty.QueryPacketReceipt(int64(counterparty.MustGetLatestLightHeight())-1, packet.seq)
if err != nil {
return err
}

if recvRes.Proof == nil {
return fmt.Errorf("timeout packet receipt proof seq(%d) is nil", packet.seq)
}

return nil
}, rtyAtt, rtyDel, rtyErr, retry.OnRetry(func(n uint, err error) {
// clear messages
msgs = []sdk.Msg{}

// OnRetry we want to update the light clients and then debug log
updateMsg, err := c.UpdateClient(counterparty)
if err != nil {
return
}

msgs = append(msgs, updateMsg)

if counterparty.debug {
counterparty.Log(fmt.Sprintf("- [%s]@{%d} - try(%d/%d) query packet receipt: %s",
counterparty.ChainID, counterparty.MustGetLatestLightHeight()-1, n+1, rtyAttNum, err))
}

})); err != nil {
counterparty.Error(err)
return
}

if recvRes == nil {
return nil, fmt.Errorf("timeout packet [%s]seq{%d} has no associated proofs", c.ChainID, packet.seq)
}

version := clienttypes.ParseChainID(counterparty.ChainID)
colin-axner marked this conversation as resolved.
Show resolved Hide resolved
msg := chantypes.NewMsgTimeout(
chantypes.NewPacket(
packet.packetData,
packet.seq,
c.PathEnd.PortID,
c.PathEnd.ChannelID,
counterparty.PathEnd.PortID,
counterparty.PathEnd.ChannelID,
clienttypes.NewHeight(version, packet.timeout),
packet.timeoutStamp,
),
packet.seq,
recvRes.Proof,
recvRes.ProofHeight,
c.MustGetAddress(),
)

return append(msgs, msg), nil
}

// MsgRelayRecvPacket
colin-axner marked this conversation as resolved.
Show resolved Hide resolved
func (c *Chain) MsgRelayRecvPacket(counterparty *Chain, packet *relayMsgRecvPacket) (msgs []sdk.Msg, err error) {
var comRes *chantypes.QueryPacketCommitmentResponse
// TODO: try commenting out retries to reduce complexity
// retry getting commit response until it succeeds
if err = retry.Do(func() error {
// NOTE: the proof height uses - 1 due to tendermint's delayed execution model
comRes, err = counterparty.QueryPacketCommitment(int64(counterparty.MustGetLatestLightHeight())-1, packet.seq)
if err != nil {
return err
}

if comRes.Proof == nil || comRes.Commitment == nil {
return fmt.Errorf("recv packet commitment query seq(%d) is nil", packet.seq)
}

return nil
}, rtyAtt, rtyDel, rtyErr, retry.OnRetry(func(n uint, err error) {
// clear messages
msgs = []sdk.Msg{}

// OnRetry we want to update the light clients and then debug log
updateMsg, err := c.UpdateClient(counterparty)
if err != nil {
return
}

msgs = append(msgs, updateMsg)

if counterparty.debug {
counterparty.Log(fmt.Sprintf("- [%s]@{%d} - try(%d/%d) query packet commitment: %s",
counterparty.ChainID, counterparty.MustGetLatestLightHeight()-1, n+1, rtyAttNum, err))
}

})); err != nil {
counterparty.Error(err)
return
}

if comRes == nil {
return nil, fmt.Errorf("receive packet [%s]seq{%d} has no associated proofs", c.ChainID, packet.seq)
}

version := clienttypes.ParseChainID(counterparty.ChainID)
colin-axner marked this conversation as resolved.
Show resolved Hide resolved
msg := chantypes.NewMsgRecvPacket(
chantypes.NewPacket(
packet.packetData,
packet.seq,
counterparty.PathEnd.PortID,
counterparty.PathEnd.ChannelID,
c.PathEnd.PortID,
c.PathEnd.ChannelID,
clienttypes.NewHeight(version, packet.timeout),
packet.timeoutStamp,
),
comRes.Proof,
comRes.ProofHeight,
c.MustGetAddress(),
)

return append(msgs, msg), nil
}

// MsgRelayAcknowledgement
func (c *Chain) MsgRelayAcknowledgement(counterparty *Chain, packet *relayMsgPacketAck) (msgs []sdk.Msg, err error) {

var ackRes *chantypes.QueryPacketAcknowledgementResponse
// TODO: try commenting out retries to reduce complexity
// retry getting commit response until it succeeds
if err = retry.Do(func() error {
// NOTE: the proof height uses - 1 due to tendermint's delayed execution model
ackRes, err := counterparty.QueryPacketAcknowledgement(int64(counterparty.MustGetLatestLightHeight())-1, packet.seq)
ackRes, err = counterparty.QueryPacketAcknowledgement(int64(counterparty.MustGetLatestLightHeight())-1, packet.seq)
if err != nil {
return err
}
Expand Down Expand Up @@ -363,22 +491,26 @@ func (c *Chain) MsgRelayAcknowledgement(counterparty *Chain, packet *relayMsgPac
return
}

version := clienttypes.ParseChainID(counterparty.ChainID)
if ackRes == nil {
return nil, fmt.Errorf("ack packet [%s]seq{%d} has no associated proofs", counterparty.ChainID, packet.seq)
}

version := clienttypes.ParseChainID(c.ChainID)
colin-axner marked this conversation as resolved.
Show resolved Hide resolved
msg := chantypes.NewMsgAcknowledgement(
chantypes.NewPacket(
packet.packetData,
packet.seq,
c.PathEnd.PortID,
c.PathEnd.ChannelID,
counterparty.PathEnd.PortID,
counterparty.PathEnd.ChannelID,
c.PathEnd.PortID,
c.PathEnd.ChannelID,
colin-axner marked this conversation as resolved.
Show resolved Hide resolved
clienttypes.NewHeight(version, packet.timeout),
packet.timeoutStamp,
),
packet.ack,
packet.dstComRes.Proof,
packet.dstComRes.ProofHeight,
c.MustGetAddress(),
ackRes.Proof,
ackRes.ProofHeight,
counterparty.MustGetAddress(),
)

return append(msgs, msg), nil
Expand Down
98 changes: 40 additions & 58 deletions relayer/naive-strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,27 +420,23 @@ func (nrs *NaiveStrategy) RelayAcknowledgements(src, dst *Chain, sp *RelaySequen
// add messages for sequences on src
for _, seq := range sp.Src {
// SRC wrote ack, so we query packet and send to DST
msgs, err := acknowledgementFromSequence(src, dst, seq)
relayAckMsgs, err := acknowledgementFromSequence(src, dst, seq)
if err != nil {
return err
}

msgs.Dst = append(msgs.Dst, msgs)
msgs.Dst = append(msgs.Dst, relayAckMsgs...)
}

// add messages for sequences on dst
for _, seq := range sp.Dst {
// DST wrote ack, so we query packet and send to SRC
pkt, err := acknowledgementFromSequence(dst, src, seq)
relayAckMsgs, err := acknowledgementFromSequence(dst, src, seq)
if err != nil {
return err
}

msg, err := pkt.Msg(src, dst)
if err != nil {
return err
}
msgs.Src = append(msgs.Src, msg)
msgs.Src = append(msgs.Src, relayAckMsgs...)
}

if !msgs.Ready() {
Expand Down Expand Up @@ -488,56 +484,38 @@ func (nrs *NaiveStrategy) RelayPackets(src, dst *Chain, sp *RelaySequences) erro
// add messages for sequences on src
for _, seq := range sp.Src {
// Query src for the sequence number to get type of packet
pkt, err := relayPacketFromSequence(src, dst, seq)
recvMsgs, timeoutMsgs, err := relayPacketFromSequence(src, dst, seq)
if err != nil {
return err
}

// depending on the type of message to be relayed, we need to
// send to different chains
switch pkt.(type) {
case *relayMsgRecvPacket:
msg, err := pkt.Msg(dst, src)
if err != nil {
return err
}
msgs.Dst = append(msgs.Dst, msg)
case *relayMsgTimeout:
msg, err := pkt.Msg(src, dst)
if err != nil {
return err
}
msgs.Src = append(msgs.Src, msg)
default:
return fmt.Errorf("%T packet types not supported", pkt)
if recvMsgs != nil {
msgs.Dst = append(msgs.Dst, recvMsgs...)
}

if timeoutMsgs != nil {
msgs.Src = append(msgs.Src, timeoutMsgs...)
}
}

// add messages for sequences on dst
for _, seq := range sp.Dst {
// Query dst for the sequence number to get type of packet
pkt, err := relayPacketFromSequence(dst, src, seq)
recvMsgs, timeoutMsgs, err := relayPacketFromSequence(dst, src, seq)
if err != nil {
return err
}

// depending on the type of message to be relayed, we need to
// send to different chains
switch pkt.(type) {
case *relayMsgRecvPacket:
msg, err := pkt.Msg(src, dst)
if err != nil {
return err
}
msgs.Src = append(msgs.Src, msg)
case *relayMsgTimeout:
msg, err := pkt.Msg(dst, src)
if err != nil {
return err
}
msgs.Dst = append(msgs.Dst, msg)
default:
return fmt.Errorf("%T packet types not supported", pkt)
if recvMsgs != nil {
msgs.Src = append(msgs.Src, recvMsgs...)
}

if timeoutMsgs != nil {
msgs.Dst = append(msgs.Dst, timeoutMsgs...)
}
}

Expand Down Expand Up @@ -579,51 +557,55 @@ func (nrs *NaiveStrategy) RelayPackets(src, dst *Chain, sp *RelaySequences) erro
return nil
}

// relayPacketFromSequence returns a sdk.Msg to relay a packet with a given seq on src
func relayPacketFromSequence(src, dst *Chain, seq uint64) (relayPacket, error) {
// relayPacketFromSequence relays a packet with a given seq on src and returns recvPacket msgs, timeoutPacketmsgs and error
func relayPacketFromSequence(src, dst *Chain, seq uint64) ([]sdk.Msg, []sdk.Msg, error) {
txs, err := src.QueryTxs(src.MustGetLatestLightHeight(), 1, 1000, rcvPacketQuery(src.PathEnd.ChannelID, int(seq)))
switch {
case err != nil:
return nil, err
return nil, nil, err
case len(txs) == 0:
return nil, fmt.Errorf("no transactions returned with query")
return nil, nil, fmt.Errorf("no transactions returned with query")
case len(txs) > 1:
return nil, fmt.Errorf("more than one transaction returned with query")
return nil, nil, fmt.Errorf("more than one transaction returned with query")
}

rcvPackets, timeoutPackets, err := relayPacketsFromResultTx(src, dst, txs[0])
switch {
case err != nil:
return nil, err
return nil, nil, err
case len(rcvPackets) == 0 && len(timeoutPackets) == 0:
return nil, fmt.Errorf("no relay msgs created from query response")
return nil, nil, fmt.Errorf("no relay msgs created from query response")
case len(rcvPackets)+len(timeoutPackets) > 1:
return nil, fmt.Errorf("more than one relay msg found in tx query")
return nil, nil, fmt.Errorf("more than one relay msg found in tx query")
}

if len(rcvPackets) == 1 {
pkt := rcvPackets[0]
if seq != pkt.Seq() {
return nil, fmt.Errorf("wrong sequence: expected(%d) got(%d)", seq, pkt.Seq())
return nil, nil, fmt.Errorf("wrong sequence: expected(%d) got(%d)", seq, pkt.Seq())
}
if err = pkt.FetchCommitResponse(dst, src); err != nil {
return nil, err

msgs, err := dst.MsgRelayRecvPacket(src, pkt.(*relayMsgRecvPacket))
if err != nil {
return nil, nil, err
}
return pkt, nil
return msgs, nil, nil
}

if len(timeoutPackets) == 1 {
pkt := timeoutPackets[0]
if seq != pkt.Seq() {
return nil, fmt.Errorf("wrong sequence: expected(%d) got(%d)", seq, pkt.Seq())
return nil, nil, fmt.Errorf("wrong sequence: expected(%d) got(%d)", seq, pkt.Seq())
}
if err = pkt.FetchCommitResponse(src, dst); err != nil {
return nil, err

msgs, err := src.MsgRelayTimeout(dst, pkt.(*relayMsgTimeout))
if err != nil {
return nil, nil, err
}
return pkt, nil
return nil, msgs, nil
}

return nil, fmt.Errorf("should have errored before here")
return nil, nil, fmt.Errorf("should have errored before here")
}

// source is the sending chain, destination is the receiving chain
Expand Down Expand Up @@ -653,7 +635,7 @@ func acknowledgementFromSequence(src, dst *Chain, seq uint64) ([]sdk.Msg, error)
return nil, fmt.Errorf("wrong sequence: expected(%d) got(%d)", seq, pkt.Seq())
}

msgs, err := src.MsgRelayAcknowledgement(dst, pkt)
msgs, err := dst.MsgRelayAcknowledgement(src, pkt)
colin-axner marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
Expand Down