Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RegisterNetworkErrorListener should fire when there's an error connecting to the peer #127

Merged
merged 4 commits into from
Dec 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/listeners"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/messagequeue"
gsnet "github.com/ipfs/go-graphsync/network"
Expand Down Expand Up @@ -46,10 +47,10 @@ type GraphSync struct {
incomingRequestHooks *responderhooks.IncomingRequestHooks
outgoingBlockHooks *responderhooks.OutgoingBlockHooks
requestUpdatedHooks *responderhooks.RequestUpdatedHooks
completedResponseListeners *responderhooks.CompletedResponseListeners
requestorCancelledListeners *responderhooks.RequestorCancelledListeners
blockSentListeners *responderhooks.BlockSentListeners
networkErrorListeners *responderhooks.NetworkErrorListeners
completedResponseListeners *listeners.CompletedResponseListeners
requestorCancelledListeners *listeners.RequestorCancelledListeners
blockSentListeners *listeners.BlockSentListeners
networkErrorListeners *listeners.NetworkErrorListeners
incomingResponseHooks *requestorhooks.IncomingResponseHooks
outgoingRequestHooks *requestorhooks.OutgoingRequestHooks
incomingBlockHooks *requestorhooks.IncomingBlockHooks
Expand Down Expand Up @@ -113,17 +114,17 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
incomingResponseHooks := requestorhooks.NewResponseHooks()
outgoingRequestHooks := requestorhooks.NewRequestHooks()
incomingBlockHooks := requestorhooks.NewBlockHooks()
requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks)
networkErrorListeners := listeners.NewNetworkErrorListeners()
requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners)
peerTaskQueue := peertaskqueue.New()

persistenceOptions := persistenceoptions.New()
incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions)
outgoingBlockHooks := responderhooks.NewBlockHooks()
requestUpdatedHooks := responderhooks.NewUpdateHooks()
completedResponseListeners := responderhooks.NewCompletedResponseListeners()
requestorCancelledListeners := responderhooks.NewRequestorCancelledListeners()
blockSentListeners := responderhooks.NewBlockSentListeners()
networkErrorListeners := responderhooks.NewNetworkErrorListeners()
completedResponseListeners := listeners.NewCompletedResponseListeners()
requestorCancelledListeners := listeners.NewRequestorCancelledListeners()
blockSentListeners := listeners.NewBlockSentListeners()
unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
graphSync := &GraphSync{
network: network,
Expand Down
35 changes: 35 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,41 @@ func TestNetworkDisconnect(t *testing.T) {
require.EqualError(t, err, graphsync.RequestContextCancelledErr{}.Error())
}

func TestConnectFail(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength)

requestCtx, requestCancel := context.WithTimeout(ctx, 1*time.Second)
defer requestCancel()

// unlink peers so they cannot communicate
td.mn.DisconnectPeers(td.host1.ID(), td.host2.ID())
td.mn.UnlinkPeers(td.host1.ID(), td.host2.ID())

reqNetworkError := make(chan error, 1)
requestor.RegisterNetworkErrorListener(func(p peer.ID, request graphsync.RequestData, err error) {
select {
case reqNetworkError <- err:
default:
}
})
_, errChan := requestor.Request(requestCtx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

var err error
testutil.AssertReceive(ctx, t, reqNetworkError, &err, "should receive network error")
testutil.AssertReceive(ctx, t, errChan, &err, "should receive an error")
require.EqualError(t, err, graphsync.RequestContextCancelledErr{}.Error())
}

func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
// create network
ctx := context.Background()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package hooks
package listeners

import (
"github.com/hannahhoward/go-pubsub"
Expand Down
3 changes: 1 addition & 2 deletions requestmanager/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/requestmanager/hooks"
"github.com/ipfs/go-graphsync/requestmanager/types"
)
Expand All @@ -28,7 +27,7 @@ type AsyncLoadFn func(graphsync.RequestID, ipld.Link) <-chan types.AsyncLoadResu
// ExecutionEnv are request parameters that last between requests
type ExecutionEnv struct {
Ctx context.Context
SendRequest func(peer.ID, gsmsg.GraphSyncRequest, ...notifications.Notifee)
SendRequest func(peer.ID, gsmsg.GraphSyncRequest)
RunBlockHooks func(p peer.ID, response graphsync.ResponseData, blk graphsync.BlockData) error
TerminateRequest func(graphsync.RequestID)
WaitForMessages func(ctx context.Context, resumeMessages chan graphsync.ExtensionData) ([]graphsync.ExtensionData, error)
Expand Down
3 changes: 1 addition & 2 deletions requestmanager/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/requestmanager/executor"
"github.com/ipfs/go-graphsync/requestmanager/hooks"
"github.com/ipfs/go-graphsync/requestmanager/testloader"
Expand Down Expand Up @@ -378,7 +377,7 @@ func (ree *requestExecutionEnv) waitForResume() ([]graphsync.ExtensionData, erro
return extensions, nil
}

func (ree *requestExecutionEnv) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest, notifees ...notifications.Notifee) {
func (ree *requestExecutionEnv) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest) {
ree.requestsSent = append(ree.requestsSent, requestSent{p, request})
if ree.currentWaitForResumeResult < len(ree.loaderRanges) && !request.IsCancel() {
ree.configureLoader(ree.p, ree.request.ID(), ree.tbc, ree.fal, ree.loaderRanges[ree.currentWaitForResumeResult])
Expand Down
49 changes: 42 additions & 7 deletions requestmanager/requestmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"fmt"
"sync/atomic"

"github.com/ipfs/go-graphsync/listeners"
"github.com/ipfs/go-graphsync/messagequeue"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
Expand Down Expand Up @@ -74,6 +77,7 @@ type RequestManager struct {
requestHooks RequestHooks
responseHooks ResponseHooks
blockHooks BlockHooks
networkErrorListeners *listeners.NetworkErrorListeners
}

type requestManagerMessage interface {
Expand All @@ -100,7 +104,9 @@ func New(ctx context.Context,
asyncLoader AsyncLoader,
requestHooks RequestHooks,
responseHooks ResponseHooks,
blockHooks BlockHooks) *RequestManager {
blockHooks BlockHooks,
networkErrorListeners *listeners.NetworkErrorListeners,
) *RequestManager {
ctx, cancel := context.WithCancel(ctx)
return &RequestManager{
ctx: ctx,
Expand All @@ -112,6 +118,7 @@ func New(ctx context.Context,
requestHooks: requestHooks,
responseHooks: responseHooks,
blockHooks: blockHooks,
networkErrorListeners: networkErrorListeners,
}
}

Expand Down Expand Up @@ -333,7 +340,7 @@ func (nrm *newRequestMessage) setupRequest(requestID graphsync.RequestID, rm *Re
rm.inProgressRequestStatuses[request.ID()] = requestStatus
incoming, incomingError := executor.ExecutionEnv{
Ctx: rm.ctx,
SendRequest: rm.peerHandler.SendRequest,
SendRequest: rm.sendRequest,
TerminateRequest: rm.terminateRequest,
RunBlockHooks: rm.processBlockHooks,
Loader: rm.asyncLoader.AsyncLoad,
Expand Down Expand Up @@ -375,7 +382,7 @@ func (crm *cancelRequestMessage) handle(rm *RequestManager) {
return
}

rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID))
rm.sendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID))
if crm.isPause {
inProgressRequestStatus.paused = true
} else {
Expand Down Expand Up @@ -425,7 +432,7 @@ func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg
result := rm.responseHooks.ProcessResponseHooks(p, response)
if len(result.Extensions) > 0 {
updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...)
rm.peerHandler.SendRequest(p, updateRequest)
rm.sendRequest(p, updateRequest)
}
if result.Err != nil {
requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
Expand All @@ -437,7 +444,7 @@ func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg
case requestStatus.networkError <- responseError:
case <-requestStatus.ctx.Done():
}
rm.peerHandler.SendRequest(p, gsmsg.CancelRequest(response.RequestID()))
rm.sendRequest(p, gsmsg.CancelRequest(response.RequestID()))
requestStatus.cancelFn()
return false
}
Expand Down Expand Up @@ -482,7 +489,7 @@ func (rm *RequestManager) processBlockHooks(p peer.ID, response graphsync.Respon
result := rm.blockHooks.ProcessBlockHooks(p, response, block)
if len(result.Extensions) > 0 {
updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...)
rm.peerHandler.SendRequest(p, updateRequest)
rm.sendRequest(p, updateRequest)
}
if result.Err != nil {
_, isPause := result.Err.(hooks.ErrPaused)
Expand Down Expand Up @@ -535,6 +542,34 @@ func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer.
return request, hooksResult, nil
}

type reqSubscriber struct {
p peer.ID
request gsmsg.GraphSyncRequest
networkErrorListeners *listeners.NetworkErrorListeners
}

func (r *reqSubscriber) OnNext(topic notifications.Topic, event notifications.Event) {
mqEvt, isMQEvt := event.(messagequeue.Event)
if !isMQEvt || mqEvt.Name != messagequeue.Error {
return
}

r.networkErrorListeners.NotifyNetworkErrorListeners(r.p, r.request, mqEvt.Err)
//r.re.networkError <- mqEvt.Err
//r.re.terminateRequest()
}

func (r reqSubscriber) OnClose(topic notifications.Topic) {
}

const requestNetworkError = "request_network_error"

func (rm *RequestManager) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest) {
sub := notifications.NewTopicDataSubscriber(&reqSubscriber{p, request, rm.networkErrorListeners})
failNotifee := notifications.Notifee{Data: requestNetworkError, Subscriber: sub}
rm.peerHandler.SendRequest(p, request, failNotifee)
}

func (urm *unpauseRequestMessage) unpause(rm *RequestManager) error {
inProgressRequestStatus, ok := rm.inProgressRequestStatuses[urm.id]
if !ok {
Expand All @@ -546,7 +581,7 @@ func (urm *unpauseRequestMessage) unpause(rm *RequestManager) error {
inProgressRequestStatus.paused = false
select {
case <-inProgressRequestStatus.pauseMessages:
rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.UpdateRequest(urm.id, urm.extensions...))
rm.sendRequest(inProgressRequestStatus.p, gsmsg.UpdateRequest(urm.id, urm.extensions...))
return nil
case <-rm.ctx.Done():
return errors.New("context cancelled")
Expand Down
40 changes: 22 additions & 18 deletions requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/ipfs/go-graphsync/listeners"

blocks "github.com/ipfs/go-block-format"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
Expand Down Expand Up @@ -858,23 +860,24 @@ func TestPauseResumeExternal(t *testing.T) {
}

type testData struct {
requestRecordChan chan requestRecord
fph *fakePeerHandler
fal *testloader.FakeAsyncLoader
requestHooks *hooks.OutgoingRequestHooks
responseHooks *hooks.IncomingResponseHooks
blockHooks *hooks.IncomingBlockHooks
requestManager *RequestManager
blockStore map[ipld.Link][]byte
loader ipld.Loader
storer ipld.Storer
blockChain *testutil.TestBlockChain
extensionName1 graphsync.ExtensionName
extensionData1 []byte
extension1 graphsync.ExtensionData
extensionName2 graphsync.ExtensionName
extensionData2 []byte
extension2 graphsync.ExtensionData
requestRecordChan chan requestRecord
fph *fakePeerHandler
fal *testloader.FakeAsyncLoader
requestHooks *hooks.OutgoingRequestHooks
responseHooks *hooks.IncomingResponseHooks
blockHooks *hooks.IncomingBlockHooks
requestManager *RequestManager
blockStore map[ipld.Link][]byte
loader ipld.Loader
storer ipld.Storer
blockChain *testutil.TestBlockChain
extensionName1 graphsync.ExtensionName
extensionData1 []byte
extension1 graphsync.ExtensionData
extensionName2 graphsync.ExtensionName
extensionData2 []byte
extension2 graphsync.ExtensionData
networkErrorListeners *listeners.NetworkErrorListeners
}

func newTestData(ctx context.Context, t *testing.T) *testData {
Expand All @@ -885,7 +888,8 @@ func newTestData(ctx context.Context, t *testing.T) *testData {
td.requestHooks = hooks.NewRequestHooks()
td.responseHooks = hooks.NewResponseHooks()
td.blockHooks = hooks.NewBlockHooks()
td.requestManager = New(ctx, td.fal, td.requestHooks, td.responseHooks, td.blockHooks)
td.networkErrorListeners = listeners.NewNetworkErrorListeners()
td.requestManager = New(ctx, td.fal, td.requestHooks, td.responseHooks, td.blockHooks, td.networkErrorListeners)
td.requestManager.SetDelegate(td.fph)
td.requestManager.Startup()
td.blockStore = make(map[ipld.Link][]byte)
Expand Down
17 changes: 9 additions & 8 deletions responsemanager/responsemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/dedupkey"
"github.com/ipfs/go-graphsync/listeners"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/responsemanager/hooks"
Expand Down Expand Up @@ -886,10 +887,10 @@ type testData struct {
requestHooks *hooks.IncomingRequestHooks
blockHooks *hooks.OutgoingBlockHooks
updateHooks *hooks.RequestUpdatedHooks
completedListeners *hooks.CompletedResponseListeners
cancelledListeners *hooks.RequestorCancelledListeners
blockSentListeners *hooks.BlockSentListeners
networkErrorListeners *hooks.NetworkErrorListeners
completedListeners *listeners.CompletedResponseListeners
cancelledListeners *listeners.RequestorCancelledListeners
blockSentListeners *listeners.BlockSentListeners
networkErrorListeners *listeners.NetworkErrorListeners
notifeePublisher *testutil.MockPublisher
blockSends chan graphsync.BlockData
completedResponseStatuses chan graphsync.ResponseStatusCode
Expand Down Expand Up @@ -960,10 +961,10 @@ func newTestData(t *testing.T) testData {
td.requestHooks = hooks.NewRequestHooks(td.peristenceOptions)
td.blockHooks = hooks.NewBlockHooks()
td.updateHooks = hooks.NewUpdateHooks()
td.completedListeners = hooks.NewCompletedResponseListeners()
td.cancelledListeners = hooks.NewRequestorCancelledListeners()
td.blockSentListeners = hooks.NewBlockSentListeners()
td.networkErrorListeners = hooks.NewNetworkErrorListeners()
td.completedListeners = listeners.NewCompletedResponseListeners()
td.cancelledListeners = listeners.NewRequestorCancelledListeners()
td.blockSentListeners = listeners.NewBlockSentListeners()
td.networkErrorListeners = listeners.NewNetworkErrorListeners()
td.completedListeners.Register(func(p peer.ID, requestID graphsync.RequestData, status graphsync.ResponseStatusCode) {
select {
case td.completedResponseStatuses <- status:
Expand Down