Skip to content

Commit

Permalink
feat(requestmanager): put network error in request manager
Browse files Browse the repository at this point in the history
Put network error in request manager to cover more cases
  • Loading branch information
hannahhoward committed Dec 10, 2020
1 parent ece60f6 commit 31e5ea6
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 49 deletions.
38 changes: 7 additions & 31 deletions requestmanager/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package executor
import (
"bytes"
"context"
"github.com/ipfs/go-graphsync/messagequeue"
"github.com/ipfs/go-graphsync/responsemanager"
"strings"
"sync/atomic"

Expand All @@ -18,7 +16,6 @@ import (
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/requestmanager/hooks"
"github.com/ipfs/go-graphsync/requestmanager/types"
)
Expand All @@ -29,13 +26,12 @@ type AsyncLoadFn func(graphsync.RequestID, ipld.Link) <-chan types.AsyncLoadResu

// ExecutionEnv are request parameters that last between requests
type ExecutionEnv struct {
Ctx context.Context
SendRequest func(peer.ID, gsmsg.GraphSyncRequest, ...notifications.Notifee)
RunBlockHooks func(p peer.ID, response graphsync.ResponseData, blk graphsync.BlockData) error
TerminateRequest func(graphsync.RequestID)
WaitForMessages func(ctx context.Context, resumeMessages chan graphsync.ExtensionData) ([]graphsync.ExtensionData, error)
Loader AsyncLoadFn
NetworkErrorListeners responsemanager.NetworkErrorListeners
Ctx context.Context
SendRequest func(peer.ID, gsmsg.GraphSyncRequest)
RunBlockHooks func(p peer.ID, response graphsync.ResponseData, blk graphsync.BlockData) error
TerminateRequest func(graphsync.RequestID)
WaitForMessages func(ctx context.Context, resumeMessages chan graphsync.ExtensionData) ([]graphsync.ExtensionData, error)
Loader AsyncLoadFn
}

// RequestExecution are parameters for a single request execution
Expand Down Expand Up @@ -169,28 +165,8 @@ func (re *requestExecutor) run() {
close(re.inProgressErr)
}

type reqSubscriber struct {
re *requestExecutor
}

func (r *reqSubscriber) OnNext(topic notifications.Topic, event notifications.Event) {
mqEvt, isMQEvt := event.(messagequeue.Event)
if !isMQEvt || mqEvt.Name != messagequeue.Error {
return
}

r.re.env.NetworkErrorListeners.NotifyNetworkErrorListeners(r.re.p, r.re.request, mqEvt.Err)
//r.re.networkError <- mqEvt.Err
//r.re.terminateRequest()
}

func (r reqSubscriber) OnClose(topic notifications.Topic) {
}

func (re *requestExecutor) sendRequest(request gsmsg.GraphSyncRequest) {
sub := notifications.NewMappableSubscriber(&reqSubscriber{re}, notifications.IdentityTransform)
failNotifee := notifications.Notifee{Topic: messagequeue.Error, Subscriber: sub}
re.env.SendRequest(re.p, request, failNotifee)
re.env.SendRequest(re.p, request)
}

func (re *requestExecutor) terminateRequest() {
Expand Down
3 changes: 1 addition & 2 deletions requestmanager/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/requestmanager/executor"
"github.com/ipfs/go-graphsync/requestmanager/hooks"
"github.com/ipfs/go-graphsync/requestmanager/testloader"
Expand Down Expand Up @@ -378,7 +377,7 @@ func (ree *requestExecutionEnv) waitForResume() ([]graphsync.ExtensionData, erro
return extensions, nil
}

func (ree *requestExecutionEnv) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest, notifees ...notifications.Notifee) {
func (ree *requestExecutionEnv) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest) {
ree.requestsSent = append(ree.requestsSent, requestSent{p, request})
if ree.currentWaitForResumeResult < len(ree.loaderRanges) && !request.IsCancel() {
ree.configureLoader(ree.p, ree.request.ID(), ree.tbc, ree.fal, ree.loaderRanges[ree.currentWaitForResumeResult])
Expand Down
55 changes: 41 additions & 14 deletions requestmanager/requestmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"errors"
"fmt"
"github.com/ipfs/go-graphsync/responsemanager"
"sync/atomic"

"github.com/ipfs/go-graphsync/listeners"
"github.com/ipfs/go-graphsync/messagequeue"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
Expand Down Expand Up @@ -75,7 +77,7 @@ type RequestManager struct {
requestHooks RequestHooks
responseHooks ResponseHooks
blockHooks BlockHooks
networkErrorListeners responsemanager.NetworkErrorListeners
networkErrorListeners *listeners.NetworkErrorListeners
}

type requestManagerMessage interface {
Expand Down Expand Up @@ -103,7 +105,7 @@ func New(ctx context.Context,
requestHooks RequestHooks,
responseHooks ResponseHooks,
blockHooks BlockHooks,
networkErrorListeners responsemanager.NetworkErrorListeners,
networkErrorListeners *listeners.NetworkErrorListeners,
) *RequestManager {
ctx, cancel := context.WithCancel(ctx)
return &RequestManager{
Expand Down Expand Up @@ -337,12 +339,11 @@ func (nrm *newRequestMessage) setupRequest(requestID graphsync.RequestID, rm *Re
lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged))
rm.inProgressRequestStatuses[request.ID()] = requestStatus
incoming, incomingError := executor.ExecutionEnv{
Ctx: rm.ctx,
SendRequest: rm.peerHandler.SendRequest,
TerminateRequest: rm.terminateRequest,
RunBlockHooks: rm.processBlockHooks,
Loader: rm.asyncLoader.AsyncLoad,
NetworkErrorListeners: rm.networkErrorListeners,
Ctx: rm.ctx,
SendRequest: rm.sendRequest,
TerminateRequest: rm.terminateRequest,
RunBlockHooks: rm.processBlockHooks,
Loader: rm.asyncLoader.AsyncLoad,
}.Start(
executor.RequestExecution{
Ctx: ctx,
Expand Down Expand Up @@ -381,7 +382,7 @@ func (crm *cancelRequestMessage) handle(rm *RequestManager) {
return
}

rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID))
rm.sendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID))
if crm.isPause {
inProgressRequestStatus.paused = true
} else {
Expand Down Expand Up @@ -431,7 +432,7 @@ func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg
result := rm.responseHooks.ProcessResponseHooks(p, response)
if len(result.Extensions) > 0 {
updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...)
rm.peerHandler.SendRequest(p, updateRequest)
rm.sendRequest(p, updateRequest)
}
if result.Err != nil {
requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
Expand All @@ -443,7 +444,7 @@ func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg
case requestStatus.networkError <- responseError:
case <-requestStatus.ctx.Done():
}
rm.peerHandler.SendRequest(p, gsmsg.CancelRequest(response.RequestID()))
rm.sendRequest(p, gsmsg.CancelRequest(response.RequestID()))
requestStatus.cancelFn()
return false
}
Expand Down Expand Up @@ -488,7 +489,7 @@ func (rm *RequestManager) processBlockHooks(p peer.ID, response graphsync.Respon
result := rm.blockHooks.ProcessBlockHooks(p, response, block)
if len(result.Extensions) > 0 {
updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...)
rm.peerHandler.SendRequest(p, updateRequest)
rm.sendRequest(p, updateRequest)
}
if result.Err != nil {
_, isPause := result.Err.(hooks.ErrPaused)
Expand Down Expand Up @@ -541,6 +542,32 @@ func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer.
return request, hooksResult, nil
}

type reqSubscriber struct {
p peer.ID
request gsmsg.GraphSyncRequest
networkErrorListeners *listeners.NetworkErrorListeners
}

func (r *reqSubscriber) OnNext(topic notifications.Topic, event notifications.Event) {
mqEvt, isMQEvt := event.(messagequeue.Event)
if !isMQEvt || mqEvt.Name != messagequeue.Error {
return
}

r.networkErrorListeners.NotifyNetworkErrorListeners(r.p, r.request, mqEvt.Err)
//r.re.networkError <- mqEvt.Err
//r.re.terminateRequest()
}

func (r reqSubscriber) OnClose(topic notifications.Topic) {
}

func (rm *RequestManager) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest) {
sub := notifications.NewMappableSubscriber(&reqSubscriber{p, request, rm.networkErrorListeners}, notifications.IdentityTransform)
failNotifee := notifications.Notifee{Topic: messagequeue.Error, Subscriber: sub}
rm.peerHandler.SendRequest(p, request, failNotifee)
}

func (urm *unpauseRequestMessage) unpause(rm *RequestManager) error {
inProgressRequestStatus, ok := rm.inProgressRequestStatuses[urm.id]
if !ok {
Expand All @@ -552,7 +579,7 @@ func (urm *unpauseRequestMessage) unpause(rm *RequestManager) error {
inProgressRequestStatus.paused = false
select {
case <-inProgressRequestStatus.pauseMessages:
rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.UpdateRequest(urm.id, urm.extensions...))
rm.sendRequest(inProgressRequestStatus.p, gsmsg.UpdateRequest(urm.id, urm.extensions...))
return nil
case <-rm.ctx.Done():
return errors.New("context cancelled")
Expand Down
3 changes: 1 addition & 2 deletions requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/ipfs/go-graphsync/listeners"
"github.com/ipfs/go-graphsync/responsemanager"

blocks "github.com/ipfs/go-block-format"
"github.com/ipld/go-ipld-prime"
Expand Down Expand Up @@ -878,7 +877,7 @@ type testData struct {
extensionName2 graphsync.ExtensionName
extensionData2 []byte
extension2 graphsync.ExtensionData
networkErrorListeners responsemanager.NetworkErrorListeners
networkErrorListeners *listeners.NetworkErrorListeners
}

func newTestData(ctx context.Context, t *testing.T) *testData {
Expand Down

0 comments on commit 31e5ea6

Please sign in to comment.