Skip to content

Commit

Permalink
Merge PR cosmos#576: Cleanup rly start
Browse files Browse the repository at this point in the history
* add rly start back into the bash script testing environment

* remove old commented out code

* add the sleep statement back to the main StartRelayer go routine + goimports

* breakout prepending UpdateClient msgs to non-empty msg slices into its own function & parallelize

* breakout constructing msgs from sequences into its own function
  • Loading branch information
jtieri authored Feb 10, 2022
1 parent 0d314ac commit e19b88e
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 84 deletions.
3 changes: 2 additions & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
"time"

"github.com/avast/retry-go"
"github.com/spf13/viper"

"github.com/cosmos/relayer/relayer"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"golang.org/x/sync/errgroup"
)

Expand Down
10 changes: 8 additions & 2 deletions dev-env
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ echo "waiting for blocks..."
sleep 3

rly tx link demo -d -o 3s
sleep 2
echo "Initial balances"
echo "balance 0 $(rly q bal ibc-0)"
echo "balance 1 $(rly q bal ibc-1)"
rly tx transfer ibc-0 ibc-1 100000samoleans "$(rly keys show ibc-1)" -d
rly tx transfer ibc-1 ibc-0 100000samoleans "$(rly keys show ibc-0)" -d
sleep 2
rly tx relay-packets demo -d
rly start demo -d
sleep 2
rly tx relay-acknowledgements demo -d
echo "Balances after packets are sent"
echo "balance 0 $(rly q bal ibc-0)"
echo "balance 1 $(rly q bal ibc-1)"
141 changes: 61 additions & 80 deletions relayer/naive-strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,51 +335,13 @@ func RelayPackets(src, dst *Chain, sp *RelaySequences, maxTxSize, maxMsgLength u
}

// add messages for sequences on src
for _, seq := range sp.Src {
// Query src for the sequence number to get type of packet
var recvMsg, timeoutMsg provider.RelayerMessage
if err = retry.Do(func() error {
recvMsg, timeoutMsg, err = src.ChainProvider.RelayPacketFromSequence(src.ChainProvider, dst.ChainProvider, uint64(srch), uint64(dsth), seq, dst.PathEnd.ChannelID, dst.PathEnd.PortID, src.PathEnd.ChannelID, src.PathEnd.PortID, src.PathEnd.ClientID)
return err
}, RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) {
srch, dsth, _ = QueryLatestHeights(src, dst)
})); err != nil {
return err
}

// depending on the type of message to be relayed, we need to
// send to different chains
if recvMsg != nil {
msgs.Dst = append(msgs.Dst, recvMsg)
}

if timeoutMsg != nil {
msgs.Src = append(msgs.Src, timeoutMsg)
}
if err = AddMessagesForSequences(sp.Src, src, dst, srch, dsth, &msgs.Src, &msgs.Dst); err != nil {
return err
}

// add messages for sequences on dst
for _, seq := range sp.Dst {
// Query dst for the sequence number to get type of packet
var recvMsg, timeoutMsg provider.RelayerMessage
if err = retry.Do(func() error {
recvMsg, timeoutMsg, err = dst.ChainProvider.RelayPacketFromSequence(dst.ChainProvider, src.ChainProvider, uint64(dsth), uint64(srch), seq, src.PathEnd.ChannelID, src.PathEnd.PortID, dst.PathEnd.ChannelID, dst.PathEnd.PortID, dst.PathEnd.ClientID)
return nil
}, RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) {
srch, dsth, _ = QueryLatestHeights(src, dst)
})); err != nil {
return err
}

// depending on the type of message to be relayed, we need to
// send to different chains
if recvMsg != nil {
msgs.Src = append(msgs.Src, recvMsg)
}

if timeoutMsg != nil {
msgs.Dst = append(msgs.Dst, timeoutMsg)
}
if err = AddMessagesForSequences(sp.Dst, dst, src, dsth, srch, &msgs.Dst, &msgs.Src); err != nil {
return err
}

if !msgs.Ready() {
Expand All @@ -389,77 +351,96 @@ func RelayPackets(src, dst *Chain, sp *RelaySequences, maxTxSize, maxMsgLength u
}

// Prepend non-empty msg lists with UpdateClient
if len(msgs.Dst) != 0 {
eg := new(errgroup.Group)

eg.Go(func() error {
return PrependUpdateClientMsg(&msgs.Dst, src, dst, srch)
})

eg.Go(func() error {
return PrependUpdateClientMsg(&msgs.Src, dst, src, dsth)
})

if err = eg.Wait(); err != nil {
return err
}

// send messages to their respective chains
if msgs.Send(src, dst); msgs.Success() {
if len(msgs.Dst) > 1 {
dst.logPacketsRelayed(src, len(msgs.Dst)-1)
}
if len(msgs.Src) > 1 {
src.logPacketsRelayed(dst, len(msgs.Src)-1)
}
}

return nil
}

// AddMessagesForSequences constructs RecvMsgs and TimeoutMsgs from sequence numbers on a src chain
// and adds them to the appropriate queue of msgs for both src and dst
func AddMessagesForSequences(sequences []uint64, src, dst *Chain, srch, dsth int64, srcMsgs, dstMsgs *[]provider.RelayerMessage) error {
for _, seq := range sequences {

var (
srcHeader ibcexported.Header
updateMsg provider.RelayerMessage
recvMsg, timeoutMsg provider.RelayerMessage
err error
)

// Query src for the sequence number to get type of packet
if err = retry.Do(func() error {
srcHeader, err = src.ChainProvider.GetIBCUpdateHeader(srch, dst.ChainProvider, dst.PathEnd.ClientID)
recvMsg, timeoutMsg, err = src.ChainProvider.RelayPacketFromSequence(src.ChainProvider, dst.ChainProvider, uint64(srch), uint64(dsth), seq, dst.PathEnd.ChannelID, dst.PathEnd.PortID, src.PathEnd.ChannelID, src.PathEnd.PortID, src.PathEnd.ClientID)
return err
}, RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) {
srch, _, _ = QueryLatestHeights(src, dst)
srch, dsth, _ = QueryLatestHeights(src, dst)
})); err != nil {
return err
}

if err = retry.Do(func() error {
updateMsg, err = dst.ChainProvider.UpdateClient(dst.PathEnd.ClientID, srcHeader)
return nil
}, RtyAtt, RtyDel, RtyErr); err != nil {
return err
// Depending on the type of message to be relayed, we need to send to different chains
if recvMsg != nil {
*dstMsgs = append(*dstMsgs, recvMsg)
}

msgs.Dst = append([]provider.RelayerMessage{updateMsg}, msgs.Dst...)
if timeoutMsg != nil {
*srcMsgs = append(*srcMsgs, timeoutMsg)
}
}

if len(msgs.Src) != 0 {
//dstHeader, err := dst.ChainProvider.GetIBCUpdateHeader(dsth, src.ChainProvider, src.PathEnd.ClientID)
//if err != nil {
// return err
//}
//updateMsg, err := src.ChainProvider.UpdateClient(src.PathEnd.ClientID, dstHeader)
//if err != nil {
// return err
//}
return nil
}

// PrependUpdateClientMsg adds an UpdateClient msg to the front of non-empty msg lists
func PrependUpdateClientMsg(msgs *[]provider.RelayerMessage, src, dst *Chain, srch int64) error {
if len(*msgs) != 0 {
var (
dstHeader ibcexported.Header
srcHeader ibcexported.Header
updateMsg provider.RelayerMessage
err error
)

// Query IBC Update Header
if err = retry.Do(func() error {
dstHeader, err = dst.ChainProvider.GetIBCUpdateHeader(dsth, src.ChainProvider, src.PathEnd.ClientID)
srcHeader, err = src.ChainProvider.GetIBCUpdateHeader(srch, dst.ChainProvider, dst.PathEnd.ClientID)
return err
}, RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) {
_, dsth, _ = QueryLatestHeights(src, dst)
srch, _, _ = QueryLatestHeights(src, dst)
})); err != nil {
return err
}

// Construct UpdateClient msg
if err = retry.Do(func() error {
updateMsg, err = src.ChainProvider.UpdateClient(src.PathEnd.ClientID, dstHeader)
updateMsg, err = dst.ChainProvider.UpdateClient(dst.PathEnd.ClientID, srcHeader)
return nil
}, RtyAtt, RtyDel, RtyErr); err != nil {
return err
}

msgs.Src = append([]provider.RelayerMessage{updateMsg}, msgs.Src...)
// Prepend UpdateClient msg to the slice of msgs
*msgs = append([]provider.RelayerMessage{updateMsg}, *msgs...)
}

// send messages to their respective chains
if msgs.Send(src, dst); msgs.Success() {
if len(msgs.Dst) > 1 {
dst.logPacketsRelayed(src, len(msgs.Dst)-1)
}
if len(msgs.Src) > 1 {
src.logPacketsRelayed(dst, len(msgs.Src)-1)
}
} else {
fmt.Println()
}

return nil
}

Expand Down
3 changes: 2 additions & 1 deletion relayer/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package relayer

import (
"fmt"
"time"
)

// StartRelayer starts the main relaying loop
Expand Down Expand Up @@ -49,7 +50,7 @@ func StartRelayer(src, dst *Chain, maxTxSize, maxMsgLength uint64) (func(), erro
}
}

//time.Sleep(100 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
}
}
}()
Expand Down

0 comments on commit e19b88e

Please sign in to comment.