From 2be2073833e653fb52bb40fa255a75a520457d9e Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Mon, 19 Aug 2024 23:28:29 +0200 Subject: [PATCH] bp: Don't remarshal within broadcast #125 (#141) * Don't remarshal within broadcast * fix one more mock * Remove concurrency as its now net negative --- p2p/mock/peer.go | 7 ++++--- p2p/mocks/peer.go | 14 ++++++++++++++ p2p/peer.go | 19 ++++++++++++++----- p2p/peer_set_test.go | 33 +++++++++++++++++---------------- p2p/switch.go | 18 +++++++++++++++--- p2p/types.go | 6 ++++++ 6 files changed, 70 insertions(+), 27 deletions(-) diff --git a/p2p/mock/peer.go b/p2p/mock/peer.go index b4111004c81..90acb65aa5b 100644 --- a/p2p/mock/peer.go +++ b/p2p/mock/peer.go @@ -42,9 +42,10 @@ func NewPeer(ip net.IP) *Peer { return mp } -func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error -func (mp *Peer) TrySend(_ p2p.Envelope) bool { return true } -func (mp *Peer) Send(_ p2p.Envelope) bool { return true } +func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error +func (mp *Peer) TrySend(_ p2p.Envelope) bool { return true } +func (mp *Peer) TrySendMarshalled(e p2p.MarshalledEnvelope) bool { return true } +func (mp *Peer) Send(_ p2p.Envelope) bool { return true } func (mp *Peer) NodeInfo() p2p.NodeInfo { return p2p.DefaultNodeInfo{ DefaultNodeID: mp.addr.ID, diff --git a/p2p/mocks/peer.go b/p2p/mocks/peer.go index 235b0e976fb..bc0ed10470b 100644 --- a/p2p/mocks/peer.go +++ b/p2p/mocks/peer.go @@ -349,6 +349,20 @@ func (_m *Peer) TrySend(_a0 p2p.Envelope) bool { return r0 } +// TrySendMarshalled provides a mock function with given fields: _a0 +func (_m *Peer) TrySendMarshalled(_a0 p2p.MarshalledEnvelope) bool { + ret := _m.Called(_a0) + + var r0 bool + if rf, ok := ret.Get(0).(func(p2p.MarshalledEnvelope) bool); ok { + r0 = rf(_a0) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + type mockConstructorTestingTNewPeer interface { mock.TestingT Cleanup(func()) diff --git a/p2p/peer.go b/p2p/peer.go index 37e19c7fda9..fabecc3a82b 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -41,6 +41,7 @@ type Peer interface { Send(Envelope) bool TrySend(Envelope) bool + TrySendMarshalled(MarshalledEnvelope) bool Set(string, interface{}) Get(string) interface{} @@ -268,12 +269,11 @@ func (p *peer) TrySend(e Envelope) bool { return p.send(e.ChannelID, e.Message, p.mconn.TrySend) } +func (p *peer) TrySendMarshalled(e MarshalledEnvelope) bool { + return p.sendMarshalled(e.ChannelID, getMsgType(e.Message), e.MarshalledMessage, p.mconn.TrySend) +} + func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bool) bool { - if !p.IsRunning() { - return false - } else if !p.hasChannel(chID) { - return false - } msgType := getMsgType(msg) if w, ok := msg.(Wrapper); ok { msg = w.Wrap() @@ -283,6 +283,15 @@ func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bo p.Logger.Error("marshaling message to send", "error", err) return false } + return p.sendMarshalled(chID, msgType, msgBytes, sendFunc) +} + +func (p *peer) sendMarshalled(chID byte, msgType reflect.Type, msgBytes []byte, sendFunc func(byte, []byte) bool) bool { + if !p.IsRunning() { + return false + } else if !p.hasChannel(chID) { + return false + } res := sendFunc(chID, msgBytes) if res { p.pendingMetrics.AddPendingSendBytes(msgType, len(msgBytes)) diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index 64911ecebff..5a7701a55b2 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -18,22 +18,23 @@ type mockPeer struct { id ID } -func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error -func (mp *mockPeer) TrySend(Envelope) bool { return true } -func (mp *mockPeer) Send(Envelope) bool { return true } -func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} } -func (mp *mockPeer) Status() ConnectionStatus { return ConnectionStatus{} } -func (mp *mockPeer) ID() ID { return mp.id } -func (mp *mockPeer) IsOutbound() bool { return false } -func (mp *mockPeer) IsPersistent() bool { return true } -func (mp *mockPeer) Get(s string) interface{} { return s } -func (mp *mockPeer) Set(string, interface{}) {} -func (mp *mockPeer) RemoteIP() net.IP { return mp.ip } -func (mp *mockPeer) SocketAddr() *NetAddress { return nil } -func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} } -func (mp *mockPeer) CloseConn() error { return nil } -func (mp *mockPeer) SetRemovalFailed() {} -func (mp *mockPeer) GetRemovalFailed() bool { return false } +func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error +func (mp *mockPeer) TrySend(Envelope) bool { return true } +func (mp *mockPeer) TrySendMarshalled(MarshalledEnvelope) bool { return true } +func (mp *mockPeer) Send(Envelope) bool { return true } +func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} } +func (mp *mockPeer) Status() ConnectionStatus { return ConnectionStatus{} } +func (mp *mockPeer) ID() ID { return mp.id } +func (mp *mockPeer) IsOutbound() bool { return false } +func (mp *mockPeer) IsPersistent() bool { return true } +func (mp *mockPeer) Get(s string) interface{} { return s } +func (mp *mockPeer) Set(string, interface{}) {} +func (mp *mockPeer) RemoteIP() net.IP { return mp.ip } +func (mp *mockPeer) SocketAddr() *NetAddress { return nil } +func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} } +func (mp *mockPeer) CloseConn() error { return nil } +func (mp *mockPeer) SetRemovalFailed() {} +func (mp *mockPeer) GetRemovalFailed() bool { return false } // Returns a mock peer func newMockPeer(ip net.IP) *mockPeer { diff --git a/p2p/switch.go b/p2p/switch.go index 1ed1a7dca9e..e4834d6ebd2 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -281,11 +281,23 @@ func (sw *Switch) Broadcast(e Envelope) { // // NOTE: TryBroadcast uses goroutines, so order of broadcast may not be preserved. func (sw *Switch) TryBroadcast(e Envelope) { + sw.Logger.Debug("TryBroadcast", "channel", e.ChannelID) + + marshalMsg := e.Message + if wrapper, ok := e.Message.(Wrapper); ok { + marshalMsg = wrapper.Wrap() + } + marshalledMsg, err := proto.Marshal(marshalMsg) + if err != nil { + return + } + marshalledEnvelope := MarshalledEnvelope{ + Envelope: e, + MarshalledMessage: marshalledMsg, + } peers := sw.peers.List() for _, peer := range peers { - go func(p Peer) { - p.TrySend(e) - }(peer) + peer.TrySendMarshalled(marshalledEnvelope) } } diff --git a/p2p/types.go b/p2p/types.go index 48a6746ceba..51bdd6ad48b 100644 --- a/p2p/types.go +++ b/p2p/types.go @@ -17,6 +17,12 @@ type Envelope struct { ChannelID byte } +// MarshalledEnvelope contains a proto message, its marshalled message, with sender routing info. +type MarshalledEnvelope struct { + Envelope + MarshalledMessage []byte +} + // Unwrapper is a Protobuf message that can contain a variety of inner messages // (e.g. via oneof fields). If a Channel's message type implements Unwrapper, the // p2p layer will automatically unwrap inbound messages so that reactors do not have to do this themselves.