Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: move all connection post processing to worker loop #2462

Closed
wants to merge 4 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 115 additions & 132 deletions p2p/net/swarm/dial_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,14 @@ type dialWorker struct {
trackedDials map[string]*addrDial
// resch is used to receive response for dials to the peers addresses.
resch chan dialResult

connected bool // true when a connection has been successfully established
// connected is true when a connection has been successfully established
connected bool
// dq is used to pace dials to different addresses of the peer
dq *dialQueue
// dialsInFlight are the addresses with dials pending completion.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not addresses, it's just a counter?

dialsInFlight int
// totalDials is used to track number of dials made by this worker for metrics
totalDials int

// for testing
wg sync.WaitGroup
Expand Down Expand Up @@ -111,12 +117,9 @@ func (w *dialWorker) loop() {
w.wg.Add(1)
defer w.wg.Done()
defer w.s.limiter.clearAllPeerDials(w.peer)
defer w.cleanup()

// dq is used to pace dials to different addresses of the peer
dq := newDialQueue()
// dialsInFlight is the number of dials in flight.
dialsInFlight := 0

w.dq = newDialQueue()
startTime := w.cl.Now()
// dialTimer is the dialTimer used to trigger dials
dialTimer := w.cl.InstantTimer(startTime.Add(math.MaxInt64))
Expand All @@ -127,24 +130,22 @@ func (w *dialWorker) loop() {
<-dialTimer.Ch()
}
timerRunning = false
if dq.Len() > 0 {
if dialsInFlight == 0 && !w.connected {
if w.dq.Len() > 0 {
if w.dialsInFlight == 0 && !w.connected {
// if there are no dials in flight, trigger the next dials immediately
dialTimer.Reset(startTime)
} else {
dialTimer.Reset(startTime.Add(dq.top().Delay))
dialTimer.Reset(startTime.Add(w.dq.top().Delay))
}
timerRunning = true
}
}

// totalDials is used to track number of dials made by this worker for metrics
totalDials := 0
loop:
for {
// The loop has three parts
// 1. Input requests are received on w.reqch. If a suitable connection is not available we create
// a pendRequest object to track the dialRequest and add the addresses to dq.
// a pendRequest object to track the dialRequest and add the addresses to w.dq.
// 2. Addresses from the dialQueue are dialed at appropriate time intervals depending on delay logic.
// We are notified of the completion of these dials on w.resch.
// 3. Responses for dials are received on w.resch. On receiving a response, we updated the pendRequests
Expand All @@ -153,20 +154,6 @@ loop:
select {
case req, ok := <-w.reqch:
if !ok {
if w.s.metricsTracer != nil {
w.s.metricsTracer.DialCompleted(w.connected, totalDials)
}
for dialsInFlight > 0 {
res := <-w.resch
// We're recording any error as a failure here.
// Notably, this also applies to cancelations (i.e. if another dial attempt was faster).
// This is ok since the black hole detector uses a very low threshold (5%).
w.s.bhd.RecordResult(res.Addr, res.Err == nil)
if res.Conn != nil {
res.Conn.Close()
}
dialsInFlight--
}
return
}
// We have received a new request. If we do not have a suitable connection,
Expand All @@ -182,104 +169,10 @@ loop:

addrs, addrErrs, err := w.s.addrsForDial(req.ctx, w.peer)
if err != nil {
req.resch <- dialResponse{
err: &DialError{
Peer: w.peer,
DialErrors: addrErrs,
Cause: err,
}}
continue loop
}

// get the delays to dial these addrs from the swarms dialRanker
simConnect, _, _ := network.GetSimultaneousConnect(req.ctx)
addrRanking := w.rankAddrs(addrs, simConnect)
addrDelay := make(map[string]time.Duration, len(addrRanking))

// create the pending request object
pr := &pendRequest{
req: req,
addrs: make(map[string]struct{}, len(addrRanking)),
err: &DialError{Peer: w.peer, DialErrors: addrErrs},
}
for _, adelay := range addrRanking {
pr.addrs[string(adelay.Addr.Bytes())] = struct{}{}
addrDelay[string(adelay.Addr.Bytes())] = adelay.Delay
}

// Check if dials to any of the addrs have completed already
// If they have errored, record the error in pr. If they have succeeded,
// respond with the connection.
// If they are pending, add them to tojoin.
// If we haven't seen any of the addresses before, add them to todial.
var todial []ma.Multiaddr
var tojoin []*addrDial

for _, adelay := range addrRanking {
ad, ok := w.trackedDials[string(adelay.Addr.Bytes())]
if !ok {
todial = append(todial, adelay.Addr)
continue
}

if ad.conn != nil {
// dial to this addr was successful, complete the request
req.resch <- dialResponse{conn: ad.conn}
continue loop
}

if ad.err != nil {
// dial to this addr errored, accumulate the error
pr.err.recordErr(ad.addr, ad.err)
delete(pr.addrs, string(ad.addr.Bytes()))
continue
}

// dial is still pending, add to the join list
tojoin = append(tojoin, ad)
}

if len(todial) == 0 && len(tojoin) == 0 {
// all request applicable addrs have been dialed, we must have errored
pr.err.Cause = ErrAllDialsFailed
req.resch <- dialResponse{err: pr.err}
req.resch <- dialResponse{err: &DialError{Peer: w.peer, DialErrors: addrErrs, Cause: err}}
continue loop
}

// The request has some pending or new dials
w.pendingRequests[pr] = struct{}{}

for _, ad := range tojoin {
if !ad.dialed {
// we haven't dialed this address. update the ad.ctx to have simultaneous connect values
// set correctly
if simConnect, isClient, reason := network.GetSimultaneousConnect(req.ctx); simConnect {
if simConnect, _, _ := network.GetSimultaneousConnect(ad.ctx); !simConnect {
ad.ctx = network.WithSimultaneousConnect(ad.ctx, isClient, reason)
// update the element in dq to use the simultaneous connect delay.
dq.Add(network.AddrDelay{
Addr: ad.addr,
Delay: addrDelay[string(ad.addr.Bytes())],
})
}
}
}
// add the request to the addrDial
}

if len(todial) > 0 {
now := time.Now()
// these are new addresses, track them and add them to dq
for _, a := range todial {
w.trackedDials[string(a.Bytes())] = &addrDial{
addr: a,
ctx: req.ctx,
createdAt: now,
}
dq.Add(network.AddrDelay{Addr: a, Delay: addrDelay[string(a.Bytes())]})
}
}
// setup dialTimer for updates to dq
w.addNewRequest(req, addrs, addrErrs)
scheduleNextDial()

case <-dialTimer.Ch():
Expand All @@ -289,7 +182,7 @@ loop:
// the inflight dials have errored and we should dial the next batch of
// addresses
now := time.Now()
for _, adelay := range dq.NextBatch() {
for _, adelay := range w.dq.NextBatch() {
// spawn the dial
ad, ok := w.trackedDials[string(adelay.Addr.Bytes())]
if !ok {
Expand All @@ -300,13 +193,11 @@ loop:
ad.dialRankingDelay = now.Sub(ad.createdAt)
err := w.s.dialNextAddr(ad.ctx, w.peer, ad.addr, w.resch)
if err != nil {
// Errored without attempting a dial. This happens in case of
// backoff or black hole.
// Errored without attempting a dial. This happens in case of backoff.
w.dispatchError(ad, err)
} else {
// the dial was successful. update inflight dials
dialsInFlight++
totalDials++
w.dialsInFlight++
w.totalDials++
}
}
timerRunning = false
Expand All @@ -326,7 +217,7 @@ loop:
}
// It is better to decrement the dials in flight and schedule one extra dial
// than risking not closing the worker loop on cleanup
dialsInFlight--
w.dialsInFlight--
continue
}

Expand All @@ -336,7 +227,7 @@ loop:
continue
}

dialsInFlight--
w.dialsInFlight--
// We're recording any error as a failure here.
// Notably, this also applies to cancelations (i.e. if another dial attempt was faster).
// This is ok since the black hole detector uses a very low threshold (5%).
Expand All @@ -352,6 +243,80 @@ loop:
}
}

// addNewRequest adds a new dial request to the worker loop. If the request has no pending dials, a response
// is sent immediately otherwise it is tracked in pendingRequests
func (w *dialWorker) addNewRequest(req dialRequest, addrs []ma.Multiaddr, addrErrs []TransportError) {
// check if a dial to any of the addrs has succeeded already
for _, addr := range addrs {
if ad, ok := w.trackedDials[string(addr.Bytes())]; ok {
if ad.conn != nil {
// dial to this addr was successful, complete the request
req.resch <- dialResponse{conn: ad.conn}
}
}
}

// get the delays to dial these addrs from the swarms dialRanker
simConnect, _, _ := network.GetSimultaneousConnect(req.ctx)
addrRanking := w.rankAddrs(addrs, simConnect)

// create the pending request object
pr := &pendRequest{
req: req,
err: &DialError{Peer: w.peer, DialErrors: addrErrs},
addrs: make(map[string]struct{}, len(addrRanking)),
}
for _, adelay := range addrRanking {
pr.addrs[string(adelay.Addr.Bytes())] = struct{}{}
}

for _, adelay := range addrRanking {
ad, ok := w.trackedDials[string(adelay.Addr.Bytes())]
if !ok {
// new address, track and enqueue
now := time.Now()
w.trackedDials[string(adelay.Addr.Bytes())] = &addrDial{
addr: adelay.Addr,
ctx: req.ctx,
createdAt: now,
}
w.dq.Add(network.AddrDelay{Addr: adelay.Addr, Delay: adelay.Delay})
continue
}

if ad.err != nil {
// dial to this addr errored, accumulate the error
pr.err.recordErr(ad.addr, ad.err)
delete(pr.addrs, string(ad.addr.Bytes()))
continue
}

if !ad.dialed {
// we haven't dialed this address. update the ad.ctx to have simultaneous connect values
// set correctly
if isSimConnect, isClient, reason := network.GetSimultaneousConnect(req.ctx); isSimConnect {
if wasSimConnect, _, _ := network.GetSimultaneousConnect(ad.ctx); !wasSimConnect {
ad.ctx = network.WithSimultaneousConnect(ad.ctx, isClient, reason)
// update the element in dq to use the simultaneous connect delay.
w.dq.Add(network.AddrDelay{
Addr: ad.addr,
Delay: adelay.Delay,
})
}
}
}
}

if len(pr.addrs) == 0 {
// all request applicable addrs have been dialed, we must have errored
pr.err.Cause = ErrAllDialsFailed
req.resch <- dialResponse{err: pr.err}
} else {
// The request has some pending or new dials
w.pendingRequests[pr] = struct{}{}
}
}

func (w *dialWorker) handleSuccess(ad *addrDial, res dialResult) {
// Ensure we connected to the correct peer.
// This was most likely already checked by the security protocol, but it doesn't hurt do it again here.
Expand Down Expand Up @@ -400,7 +365,7 @@ func (w *dialWorker) handleError(ad *addrDial, res dialResult) {
if res.Err != nil && w.s.metricsTracer != nil {
w.s.metricsTracer.FailedDialing(res.Addr, res.Err, context.Cause(ad.ctx))
}
// it must be an error -- add backoff if applicable and dispatch
// add backoff if applicable and dispatch
// ErrDialRefusedBlackHole shouldn't end up here, just a safety check
if res.Err != ErrDialRefusedBlackHole && res.Err != context.Canceled && !w.connected {
// we only add backoff if there has not been a successful connection
Expand Down Expand Up @@ -455,6 +420,24 @@ func (w *dialWorker) rankAddrs(addrs []ma.Multiaddr, isSimConnect bool) []networ
return w.s.dialRanker(addrs)
}

// cleanup is called on workerloop close
func (w *dialWorker) cleanup() {
if w.s.metricsTracer != nil {
w.s.metricsTracer.DialCompleted(w.connected, w.totalDials)
}
for w.dialsInFlight > 0 {
res := <-w.resch
// We're recording any error as a failure here.
// Notably, this also applies to cancelations (i.e. if another dial attempt was faster).
// This is ok since the black hole detector uses a very low threshold (5%).
w.s.bhd.RecordResult(res.Addr, res.Err == nil)
if res.Conn != nil {
res.Conn.Close()
}
w.dialsInFlight--
}
}

// dialQueue is a priority queue used to schedule dials
type dialQueue struct {
// q contains dials ordered by delay
Expand Down