Skip to content

Commit

Permalink
remove region from peer struct
Browse files Browse the repository at this point in the history
  • Loading branch information
czarcas7ic committed Jun 23, 2024
1 parent 376c6c4 commit ef28c37
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 30 deletions.
39 changes: 19 additions & 20 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Peer interface {
SetRemovalFailed()
GetRemovalFailed() bool

GetRegion() string
// GetRegion() string
}

//----------------------------------------------------------
Expand Down Expand Up @@ -129,7 +129,7 @@ type peer struct {
// When removal of a peer fails, we set this flag
removalAttemptFailed bool

region string
// region string
}

type PeerOption func(*peer)
Expand All @@ -143,22 +143,21 @@ func newPeer(
chDescs []*cmtconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
mlc *metricsLabelCache,
sameRegion bool,
options ...PeerOption,
) *peer {
var peerRegion string
if sameRegion {
// Note if the new peer is in the same region as us
peerIp, err := nodeInfo.NetAddress()
if err != nil {
fmt.Print("Failed to get IP from nodeInfo", "err", err)
}
peerRegionInternal, err := getRegionFromIP(peerIp.IP.String())
if err != nil {
fmt.Print("Failed to get region from IP", "err", err)
}
peerRegion = peerRegionInternal
}
// var peerRegion string
// if sameRegion {
// // Note if the new peer is in the same region as us
// peerIp, err := nodeInfo.NetAddress()
// if err != nil {
// fmt.Print("Failed to get IP from nodeInfo", "err", err)
// }
// peerRegionInternal, err := getRegionFromIP(peerIp.IP.String())
// if err != nil {
// fmt.Print("Failed to get region from IP", "err", err)
// }
// peerRegion = peerRegionInternal
// }

p := &peer{
peerConn: pc,
Expand All @@ -168,7 +167,7 @@ func newPeer(
metricsTicker: time.NewTicker(metricsTickerDuration),
metrics: NopMetrics(),
mlc: mlc,
region: peerRegion,
// region: peerRegion,
}

p.mconn = createMConnection(
Expand Down Expand Up @@ -457,6 +456,6 @@ func createMConnection(
)
}

func (p *peer) GetRegion() string {
return p.region
}
// func (p *peer) GetRegion() string {
// return p.region
// }
2 changes: 1 addition & 1 deletion p2p/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func createOutboundPeerAndPerformHandshake(
return nil, err
}

p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, msgTypeByChID, chDescs, func(p Peer, r interface{}) {}, newMetricsLabelCache(), false)
p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, msgTypeByChID, chDescs, func(p Peer, r interface{}) {}, newMetricsLabelCache())
p.SetLogger(log.TestingLogger().With("peer", addr))
return p, nil
}
Expand Down
18 changes: 17 additions & 1 deletion p2p/pex/addrbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ type AddrBook interface {

// Add and remove an address
AddAddress(addr *p2p.NetAddress, src *p2p.NetAddress) error
PickAddressNotInRegion(biasTowardsNewAddrs int, region string) *p2p.NetAddress
RemoveAddress(*p2p.NetAddress)

// Check if the address is in the book
HasAddress(*p2p.NetAddress) bool
GetAddressRegion(addr *p2p.NetAddress) string

// Do we need more peers?
NeedMoreAddrs() bool
Expand All @@ -65,6 +65,7 @@ type AddrBook interface {
// Pick an address to dial
PickAddress(biasTowardsNewAddrs int) *p2p.NetAddress
PickAddressWithRegion(biasTowardsNewAddrs int, region string) *p2p.NetAddress
PickAddressNotInRegion(biasTowardsNewAddrs int, region string) *p2p.NetAddress

// Mark address
MarkGood(p2p.ID)
Expand Down Expand Up @@ -271,6 +272,17 @@ func (a *addrBook) HasAddress(addr *p2p.NetAddress) bool {
return ka != nil
}

func (a *addrBook) GetAddressRegion(addr *p2p.NetAddress) string {
a.mtx.Lock()
defer a.mtx.Unlock()

ka, exists := a.addrLookup[addr.ID]
if !exists {
return ""
}
return ka.Region
}

// NeedMoreAddrs implements AddrBook - returns true if there are not have enough addresses in the book.
func (a *addrBook) NeedMoreAddrs() bool {
return a.Size() < needAddressThreshold
Expand Down Expand Up @@ -340,6 +352,8 @@ func (a *addrBook) PickAddressWithRegion(biasTowardsNewAddrs int, region string)
}
ka.Region = region
a.addrLookup[ka.ID()] = ka
} else {
fmt.Println("PickAddressWithRegion Region already set", ka.Addr, "region", ka.Region)
}
if ka.Region == region {
fmt.Println("PickAddressWithRegion Adding address", ka.Addr, "region", ka.Region)
Expand Down Expand Up @@ -424,6 +438,8 @@ func (a *addrBook) PickAddressNotInRegion(biasTowardsNewAddrs int, region string
}
ka.Region = region
a.addrLookup[ka.ID()] = ka
} else {
fmt.Println("PickAddressNotInRegion Region already set", ka.Addr, "region", ka.Region)
}
if ka.Region != region {
fmt.Println("PickAddressNotInRegion Adding address", ka.Addr, "region", ka.Region)
Expand Down
13 changes: 7 additions & 6 deletions p2p/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type AddrBook interface {
MarkGood(ID)
RemoveAddress(*NetAddress)
HasAddress(*NetAddress) bool
GetAddressRegion(*NetAddress) string
Save()
}

Expand Down Expand Up @@ -414,13 +415,13 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
sw.metrics.Peers.Add(float64(-1))
if peer.IsOutbound() {
if sw.config.SameRegion {
if peer.GetRegion() != sw.config.MyRegion {
if sw.addrBook.GetAddressRegion(peer.SocketAddr()) != sw.config.MyRegion {
sw.config.CurrentNumOutboundPeersInOtherRegion--
}
}
} else {
if sw.config.SameRegion {
if peer.GetRegion() != sw.config.MyRegion {
if sw.addrBook.GetAddressRegion(peer.SocketAddr()) != sw.config.MyRegion {
sw.config.CurrentNumInboundPeersInOtherRegion--
}
}
Expand Down Expand Up @@ -754,8 +755,8 @@ func (sw *Switch) acceptRoutine() {

if sw.config.SameRegion {
// Note if the new peer is in the same region as us
fmt.Println("Checking if peer is same region. My region: ", sw.config.MyRegion, " Peer region: ", p.GetRegion())
isSameRegion := p.GetRegion() == sw.config.MyRegion
fmt.Println("Checking if peer is same region. My region: ", sw.config.MyRegion, " Peer region: ", sw.addrBook.GetAddressRegion(p.SocketAddr()))
isSameRegion := sw.addrBook.GetAddressRegion(p.SocketAddr()) == sw.config.MyRegion

if !isSameRegion {
// If this peer is not in our same region and we have no room to dial peers outside of our region, return error
Expand Down Expand Up @@ -981,9 +982,9 @@ func (sw *Switch) addPeer(p Peer) error {
}
sw.metrics.Peers.Add(float64(1))
if sw.config.SameRegion {
if p.IsOutbound() && p.GetRegion() != sw.config.MyRegion {
if p.IsOutbound() && sw.addrBook.GetAddressRegion(p.SocketAddr()) != sw.config.MyRegion {
sw.config.CurrentNumOutboundPeersInOtherRegion++
} else if !p.IsOutbound() && p.GetRegion() != sw.config.MyRegion {
} else if !p.IsOutbound() && sw.addrBook.GetAddressRegion(p.SocketAddr()) != sw.config.MyRegion {
sw.config.CurrentNumInboundPeersInOtherRegion++
}
}
Expand Down
4 changes: 3 additions & 1 deletion p2p/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
sw.chDescs,
sw.StopPeerForError,
sw.mlc,
sw.config.SameRegion,
)

if err = sw.addPeer(p); err != nil {
Expand Down Expand Up @@ -306,6 +305,9 @@ func (book *AddrBookMock) HasAddress(addr *NetAddress) bool {
_, ok := book.Addrs[addr.String()]
return ok
}
func (book *AddrBookMock) GetAddressRegion(addr *NetAddress) string {
return ""
}
func (book *AddrBookMock) RemoveAddress(addr *NetAddress) {
delete(book.Addrs, addr.String())
}
Expand Down
1 change: 0 additions & 1 deletion p2p/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,6 @@ func (mt *MultiplexTransport) wrapPeer(
cfg.chDescs,
cfg.onPeerError,
cfg.mlc,
cfg.sameRegion,
PeerMetrics(cfg.metrics),
)

Expand Down

0 comments on commit ef28c37

Please sign in to comment.