Skip to content

Commit

Permalink
bp: Don't remarshal within broadcast #125 (#141)
Browse files Browse the repository at this point in the history
* Don't remarshal within broadcast

* fix one more mock

* Remove concurrency as its now net negative
  • Loading branch information
ValarDragon authored Aug 19, 2024
1 parent 0e0ff5f commit 2be2073
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 27 deletions.
7 changes: 4 additions & 3 deletions p2p/mock/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ func NewPeer(ip net.IP) *Peer {
return mp
}

func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error
func (mp *Peer) TrySend(_ p2p.Envelope) bool { return true }
func (mp *Peer) Send(_ p2p.Envelope) bool { return true }
func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error
func (mp *Peer) TrySend(_ p2p.Envelope) bool { return true }
func (mp *Peer) TrySendMarshalled(e p2p.MarshalledEnvelope) bool { return true }
func (mp *Peer) Send(_ p2p.Envelope) bool { return true }
func (mp *Peer) NodeInfo() p2p.NodeInfo {
return p2p.DefaultNodeInfo{
DefaultNodeID: mp.addr.ID,
Expand Down
14 changes: 14 additions & 0 deletions p2p/mocks/peer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 14 additions & 5 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Peer interface {

Send(Envelope) bool
TrySend(Envelope) bool
TrySendMarshalled(MarshalledEnvelope) bool

Set(string, interface{})
Get(string) interface{}
Expand Down Expand Up @@ -268,12 +269,11 @@ func (p *peer) TrySend(e Envelope) bool {
return p.send(e.ChannelID, e.Message, p.mconn.TrySend)
}

func (p *peer) TrySendMarshalled(e MarshalledEnvelope) bool {
return p.sendMarshalled(e.ChannelID, getMsgType(e.Message), e.MarshalledMessage, p.mconn.TrySend)
}

func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bool) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
return false
}
msgType := getMsgType(msg)
if w, ok := msg.(Wrapper); ok {
msg = w.Wrap()
Expand All @@ -283,6 +283,15 @@ func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bo
p.Logger.Error("marshaling message to send", "error", err)
return false
}
return p.sendMarshalled(chID, msgType, msgBytes, sendFunc)
}

func (p *peer) sendMarshalled(chID byte, msgType reflect.Type, msgBytes []byte, sendFunc func(byte, []byte) bool) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
return false
}
res := sendFunc(chID, msgBytes)
if res {
p.pendingMetrics.AddPendingSendBytes(msgType, len(msgBytes))
Expand Down
33 changes: 17 additions & 16 deletions p2p/peer_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,23 @@ type mockPeer struct {
id ID
}

func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error
func (mp *mockPeer) TrySend(Envelope) bool { return true }
func (mp *mockPeer) Send(Envelope) bool { return true }
func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} }
func (mp *mockPeer) Status() ConnectionStatus { return ConnectionStatus{} }
func (mp *mockPeer) ID() ID { return mp.id }
func (mp *mockPeer) IsOutbound() bool { return false }
func (mp *mockPeer) IsPersistent() bool { return true }
func (mp *mockPeer) Get(s string) interface{} { return s }
func (mp *mockPeer) Set(string, interface{}) {}
func (mp *mockPeer) RemoteIP() net.IP { return mp.ip }
func (mp *mockPeer) SocketAddr() *NetAddress { return nil }
func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
func (mp *mockPeer) CloseConn() error { return nil }
func (mp *mockPeer) SetRemovalFailed() {}
func (mp *mockPeer) GetRemovalFailed() bool { return false }
func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error
func (mp *mockPeer) TrySend(Envelope) bool { return true }
func (mp *mockPeer) TrySendMarshalled(MarshalledEnvelope) bool { return true }
func (mp *mockPeer) Send(Envelope) bool { return true }
func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} }
func (mp *mockPeer) Status() ConnectionStatus { return ConnectionStatus{} }
func (mp *mockPeer) ID() ID { return mp.id }
func (mp *mockPeer) IsOutbound() bool { return false }
func (mp *mockPeer) IsPersistent() bool { return true }
func (mp *mockPeer) Get(s string) interface{} { return s }
func (mp *mockPeer) Set(string, interface{}) {}
func (mp *mockPeer) RemoteIP() net.IP { return mp.ip }
func (mp *mockPeer) SocketAddr() *NetAddress { return nil }
func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
func (mp *mockPeer) CloseConn() error { return nil }
func (mp *mockPeer) SetRemovalFailed() {}
func (mp *mockPeer) GetRemovalFailed() bool { return false }

// Returns a mock peer
func newMockPeer(ip net.IP) *mockPeer {
Expand Down
18 changes: 15 additions & 3 deletions p2p/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,23 @@ func (sw *Switch) Broadcast(e Envelope) {
//
// NOTE: TryBroadcast uses goroutines, so order of broadcast may not be preserved.
func (sw *Switch) TryBroadcast(e Envelope) {
sw.Logger.Debug("TryBroadcast", "channel", e.ChannelID)

marshalMsg := e.Message
if wrapper, ok := e.Message.(Wrapper); ok {
marshalMsg = wrapper.Wrap()
}
marshalledMsg, err := proto.Marshal(marshalMsg)
if err != nil {
return
}
marshalledEnvelope := MarshalledEnvelope{
Envelope: e,
MarshalledMessage: marshalledMsg,
}
peers := sw.peers.List()
for _, peer := range peers {
go func(p Peer) {
p.TrySend(e)
}(peer)
peer.TrySendMarshalled(marshalledEnvelope)
}

}
Expand Down
6 changes: 6 additions & 0 deletions p2p/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ type Envelope struct {
ChannelID byte
}

// MarshalledEnvelope contains a proto message, its marshalled message, with sender routing info.
type MarshalledEnvelope struct {
Envelope
MarshalledMessage []byte
}

// Unwrapper is a Protobuf message that can contain a variety of inner messages
// (e.g. via oneof fields). If a Channel's message type implements Unwrapper, the
// p2p layer will automatically unwrap inbound messages so that reactors do not have to do this themselves.
Expand Down

0 comments on commit 2be2073

Please sign in to comment.