Skip to content

Commit

Permalink
feat(impl): fix shutdown (#112)
Browse files Browse the repository at this point in the history
we were closing channels which will prevent restarts, and not handling shutdown properly generally
by unregistering from graphsync
  • Loading branch information
hannahhoward authored Nov 7, 2020
1 parent 24daab4 commit 1dcb6eb
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 56 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/hashicorp/go-multierror v1.1.0
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.7
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,6 @@ github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1 h1:F9k+7
github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1/go.mod h1:jvfsLIxk0fY/2BKSQ1xf2406AKA5dwMmKKv0ADcOfN8=
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e h1:3YKHER4nmd7b5qy5t0GWDTwSn4OyRgfAXSmo6VnryBY=
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e/go.mod h1:I8h3MITA53gN9OnWGCgaMa0JWVRdXthWw4M3CPM54OY=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
Expand Down
15 changes: 1 addition & 14 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/hannahhoward/go-pubsub"
"github.com/hashicorp/go-multierror"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -151,19 +150,7 @@ func (m *manager) OnReady(ready datatransfer.ReadyFunc) {

// Stop terminates all data transfers and ends processing
func (m *manager) Stop(ctx context.Context) error {
openChannels, err := m.channels.InProgress()
if err != nil {
return xerrors.Errorf("error getting channels in progress: %w", err)
}

var result error
for chid := range openChannels {
if err := m.CloseDataTransferChannel(ctx, chid); err != nil {
result = multierror.Append(result, xerrors.Errorf("error closing channel with ID %v, err: %w", chid, err))
}
}

return result
return m.transport.Shutdown(ctx)
}

// RegisterVoucherType registers a validator for the given voucher type
Expand Down
21 changes: 12 additions & 9 deletions impl/restart_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func TestRestartPush(t *testing.T) {
},
restartF: func(rh *restartHarness, chId datatransfer.ChannelID, subscriber datatransfer.Subscriber) {
var err error
require.NoError(t, rh.dt1.Stop(rh.testCtx))
time.Sleep(100 * time.Millisecond)
tp1 := rh.gsData.SetupGSTransportHost1()
rh.dt1, err = NewDataTransfer(rh.gsData.DtDs1, rh.gsData.DtNet1, tp1, rh.gsData.StoredCounter1)
require.NoError(rh.t, err)
Expand All @@ -65,6 +67,8 @@ func TestRestartPush(t *testing.T) {
},
restartF: func(rh *restartHarness, chId datatransfer.ChannelID, subscriber datatransfer.Subscriber) {
var err error
require.NoError(t, rh.dt2.Stop(rh.testCtx))
time.Sleep(100 * time.Millisecond)
tp2 := rh.gsData.SetupGSTransportHost2()
rh.dt2, err = NewDataTransfer(rh.gsData.DtDs2, rh.gsData.DtNet2, tp2, rh.gsData.StoredCounter2)
require.NoError(rh.t, err)
Expand Down Expand Up @@ -119,7 +123,6 @@ func TestRestartPush(t *testing.T) {
}
}
}

if channelState.Status() == datatransfer.Completed {
finishedPeersLk.Lock()
finishedPeers = append(finishedPeers, channelState.SelfPeer())
Expand Down Expand Up @@ -173,15 +176,15 @@ func TestRestartPush(t *testing.T) {
}

// WAIT FOR TRANSFER TO COMPLETE -> THIS SHOULD NOT HAPPEN
sentI, receivedI, err := waitF(10*time.Second, 1)
sentI, receivedI, err := waitF(2*time.Second, 1)
require.EqualError(t, err, "context timed-out without completing data transfer")
require.True(t, len(receivedI) < totalIncrements)
require.NotEmpty(t, sentI)
t.Logf("not request was completed after disconnect")

// Connect the peers and restart
require.NoError(t, rh.gsData.Mn.LinkAll())
// let linking take effect
time.Sleep(1 * time.Second)
conn, err := rh.gsData.Mn.ConnectPeers(rh.peer1, rh.peer2)
require.NoError(t, err)
require.NotNil(t, conn)
Expand Down Expand Up @@ -234,6 +237,8 @@ func TestRestartPull(t *testing.T) {
},
restartF: func(rh *restartHarness, chId datatransfer.ChannelID, subscriber datatransfer.Subscriber) {
var err error
require.NoError(t, rh.dt2.Stop(rh.testCtx))
time.Sleep(100 * time.Millisecond)
tp2 := rh.gsData.SetupGSTransportHost2()
rh.dt2, err = NewDataTransfer(rh.gsData.DtDs2, rh.gsData.DtNet2, tp2, rh.gsData.StoredCounter2)
require.NoError(rh.t, err)
Expand All @@ -253,6 +258,8 @@ func TestRestartPull(t *testing.T) {
},
restartF: func(rh *restartHarness, chId datatransfer.ChannelID, subscriber datatransfer.Subscriber) {
var err error
require.NoError(t, rh.dt1.Stop(rh.testCtx))
time.Sleep(100 * time.Millisecond)
tp1 := rh.gsData.SetupGSTransportHost1()
rh.dt1, err = NewDataTransfer(rh.gsData.DtDs1, rh.gsData.DtNet1, tp1, rh.gsData.StoredCounter1)
require.NoError(rh.t, err)
Expand Down Expand Up @@ -365,17 +372,13 @@ func TestRestartPull(t *testing.T) {
}

// WAIT FOR TRANSFER TO COMPLETE -> THIS SHOULD NOT HAPPEN
sentI, receivedI, err := waitF(10*time.Second, 1)
sentI, receivedI, err := waitF(1*time.Second, 1)
require.EqualError(t, err, "context timed-out without completing data transfer")
require.True(t, len(receivedI) < totalIncrements)
require.NotEmpty(t, sentI)

// Connect the peers and restart
require.NoError(t, rh.gsData.Mn.LinkAll())
// let linking take effect
time.Sleep(500 * time.Millisecond)
// let linking take effect
time.Sleep(500 * time.Millisecond)
conn, err := rh.gsData.Mn.ConnectPeers(rh.peer1, rh.peer2)
require.NoError(t, err)
require.NotNil(t, conn)
Expand Down Expand Up @@ -433,7 +436,7 @@ type restartHarness struct {

func newRestartHarness(t *testing.T) *restartHarness {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
ctx, cancel := context.WithTimeout(ctx, 120*time.Second)

// Setup host
gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
Expand Down
44 changes: 32 additions & 12 deletions testutil/fakegraphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type FakeGraphSync struct {
IncomingBlockHook graphsync.OnIncomingBlockHook
OutgoingBlockHook graphsync.OnOutgoingBlockHook
IncomingRequestHook graphsync.OnIncomingRequestHook
ResponseCompletedListener graphsync.OnResponseCompletedListener
CompletedResponseListener graphsync.OnResponseCompletedListener
RequestUpdatedHook graphsync.OnRequestUpdatedHook
IncomingResponseHook graphsync.OnIncomingResponseHook
RequestorCancelledListener graphsync.OnRequestorCancelledListener
Expand Down Expand Up @@ -280,43 +280,57 @@ func (fgs *FakeGraphSync) UnregisterPersistenceOption(name string) error {
// RegisterIncomingRequestHook adds a hook that runs when a request is received
func (fgs *FakeGraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingRequestHook) graphsync.UnregisterHookFunc {
fgs.IncomingRequestHook = hook
return nil
return func() {
fgs.IncomingRequestHook = nil
}
}

// RegisterIncomingResponseHook adds a hook that runs when a response is received
func (fgs *FakeGraphSync) RegisterIncomingResponseHook(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
fgs.IncomingResponseHook = hook
return nil
return func() {
fgs.IncomingResponseHook = nil
}
}

// RegisterOutgoingRequestHook adds a hook that runs immediately prior to sending a new request
func (fgs *FakeGraphSync) RegisterOutgoingRequestHook(hook graphsync.OnOutgoingRequestHook) graphsync.UnregisterHookFunc {
fgs.OutgoingRequestHook = hook
return nil
return func() {
fgs.OutgoingRequestHook = nil
}
}

// RegisterOutgoingBlockHook adds a hook that runs every time a block is sent from a responder
func (fgs *FakeGraphSync) RegisterOutgoingBlockHook(hook graphsync.OnOutgoingBlockHook) graphsync.UnregisterHookFunc {
fgs.OutgoingBlockHook = hook
return nil
return func() {
fgs.OutgoingBlockHook = nil
}
}

// RegisterIncomingBlockHook adds a hook that runs every time a block is received by the requestor
func (fgs *FakeGraphSync) RegisterIncomingBlockHook(hook graphsync.OnIncomingBlockHook) graphsync.UnregisterHookFunc {
fgs.IncomingBlockHook = hook
return nil
return func() {
fgs.IncomingBlockHook = nil
}
}

// RegisterRequestUpdatedHook adds a hook that runs every time an update to a request is received
func (fgs *FakeGraphSync) RegisterRequestUpdatedHook(hook graphsync.OnRequestUpdatedHook) graphsync.UnregisterHookFunc {
fgs.RequestUpdatedHook = hook
return nil
return func() {
fgs.RequestUpdatedHook = nil
}
}

// RegisterCompletedResponseListener adds a listener on the responder for completed responses
func (fgs *FakeGraphSync) RegisterCompletedResponseListener(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc {
fgs.ResponseCompletedListener = listener
return nil
fgs.CompletedResponseListener = listener
return func() {
fgs.CompletedResponseListener = nil
}
}

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
Expand Down Expand Up @@ -352,19 +366,25 @@ func (fgs *FakeGraphSync) CancelResponse(p peer.ID, requestID graphsync.RequestI
// RegisterRequestorCancelledListener adds a listener on the responder for requests cancelled by the requestor
func (fgs *FakeGraphSync) RegisterRequestorCancelledListener(listener graphsync.OnRequestorCancelledListener) graphsync.UnregisterHookFunc {
fgs.RequestorCancelledListener = listener
return nil
return func() {
fgs.RequestorCancelledListener = nil
}
}

// RegisterBlockSentListener adds a listener on the responder as blocks go out
func (fgs *FakeGraphSync) RegisterBlockSentListener(listener graphsync.OnBlockSentListener) graphsync.UnregisterHookFunc {
fgs.BlockSentListener = listener
return nil
return func() {
fgs.BlockSentListener = nil
}
}

// RegisterNetworkErrorListener adds a listener on the responder as blocks go out
func (fgs *FakeGraphSync) RegisterNetworkErrorListener(listener graphsync.OnNetworkErrorListener) graphsync.UnregisterHookFunc {
fgs.NetworkErrorListener = listener
return nil
return func() {
fgs.NetworkErrorListener = nil
}
}

var _ graphsync.GraphExchange = &FakeGraphSync{}
Expand Down
4 changes: 4 additions & 0 deletions testutil/faketransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (ft *FakeTransport) SetEventHandler(events datatransfer.EventsHandler) erro
return ft.SetEventHandlerErr
}

func (ft *FakeTransport) Shutdown(ctx context.Context) error {
return nil
}

// PauseChannel paused the given channel ID
func (ft *FakeTransport) PauseChannel(ctx context.Context, chid datatransfer.ChannelID) error {
ft.PausedChannels = append(ft.PausedChannels, chid)
Expand Down
16 changes: 14 additions & 2 deletions testutil/gstestdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ type GraphsyncTestingData struct {
DtNet2 network.DataTransferNetwork
AllSelector ipld.Node
OrigBytes []byte
gs1Cancel func()
gs2Cancel func()
}

// NewGraphsyncTestingData returns a new GraphsyncTestingData instance
Expand Down Expand Up @@ -163,7 +165,12 @@ func NewGraphsyncTestingData(ctx context.Context, t *testing.T, host1Protocols [
// SetupGraphsyncHost1 sets up a new, real graphsync instance on top of the first host
func (gsData *GraphsyncTestingData) SetupGraphsyncHost1() graphsync.GraphExchange {
// setup graphsync
return gsimpl.New(gsData.Ctx, gsData.GsNet1, gsData.Loader1, gsData.Storer1)
if gsData.gs1Cancel != nil {
gsData.gs1Cancel()
}
gsCtx, gsCancel := context.WithCancel(gsData.Ctx)
gsData.gs1Cancel = gsCancel
return gsimpl.New(gsCtx, gsData.GsNet1, gsData.Loader1, gsData.Storer1)
}

// SetupGSTransportHost1 sets up a new grapshync transport over real graphsync on the first host
Expand All @@ -184,7 +191,12 @@ func (gsData *GraphsyncTestingData) SetupGSTransportHost1() datatransfer.Transpo
// SetupGraphsyncHost2 sets up a new, real graphsync instance on top of the second host
func (gsData *GraphsyncTestingData) SetupGraphsyncHost2() graphsync.GraphExchange {
// setup graphsync
return gsimpl.New(gsData.Ctx, gsData.GsNet2, gsData.Loader2, gsData.Storer2)
if gsData.gs2Cancel != nil {
gsData.gs2Cancel()
}
gsCtx, gsCancel := context.WithCancel(gsData.Ctx)
gsData.gs2Cancel = gsCancel
return gsimpl.New(gsCtx, gsData.GsNet2, gsData.Loader2, gsData.Storer2)
}

// SetupGSTransportHost2 sets up a new grapshync transport over real graphsync on the second host
Expand Down
1 change: 1 addition & 0 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type Transport interface {
// CleanupChannel is called on the otherside of a cancel - removes any associated
// data for the channel
CleanupChannel(chid ChannelID)
Shutdown(ctx context.Context) error
}

// PauseableTransport is a transport that can also pause and resume channels
Expand Down
38 changes: 25 additions & 13 deletions transport/graphsync/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Transport struct {
responseProgressMap map[datatransfer.ChannelID]*responseProgress
stores map[datatransfer.ChannelID]struct{}
supportedExtensions []graphsync.ExtensionName
unregisterFuncs []graphsync.UnregisterHookFunc
}

// NewTransport makes a new hooks manager with the given hook events interface
Expand Down Expand Up @@ -288,18 +289,30 @@ func (t *Transport) SetEventHandler(events datatransfer.EventsHandler) error {
return datatransfer.ErrHandlerAlreadySet
}
t.events = events
t.gs.RegisterIncomingRequestHook(t.gsReqRecdHook)
t.gs.RegisterCompletedResponseListener(t.gsCompletedResponseListener)
t.gs.RegisterIncomingBlockHook(t.gsIncomingBlockHook)
t.gs.RegisterOutgoingBlockHook(t.gsOutgoingBlockHook)

t.gs.RegisterBlockSentListener(t.gsBlockSentHook)

t.gs.RegisterOutgoingRequestHook(t.gsOutgoingRequestHook)
t.gs.RegisterIncomingResponseHook(t.gsIncomingResponseHook)
t.gs.RegisterRequestUpdatedHook(t.gsRequestUpdatedHook)
t.gs.RegisterRequestorCancelledListener(t.gsRequestorCancelledListener)
t.gs.RegisterNetworkErrorListener(t.gsNetworkErrorListener)

t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterIncomingRequestHook(t.gsReqRecdHook))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterCompletedResponseListener(t.gsCompletedResponseListener))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterIncomingBlockHook(t.gsIncomingBlockHook))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterOutgoingBlockHook(t.gsOutgoingBlockHook))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterBlockSentListener(t.gsBlockSentHook))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterOutgoingRequestHook(t.gsOutgoingRequestHook))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterIncomingResponseHook(t.gsIncomingResponseHook))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterRequestUpdatedHook(t.gsRequestUpdatedHook))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterRequestorCancelledListener(t.gsRequestorCancelledListener))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterNetworkErrorListener(t.gsNetworkErrorListener))
return nil
}

// Shutdown disconnects a transport interface from graphsync
func (t *Transport) Shutdown(ctx context.Context) error {
for _, unregisterFunc := range t.unregisterFuncs {
unregisterFunc()
}
t.dataLock.RLock()
for _, cancel := range t.contextCancelMap {
cancel()
}
t.dataLock.RUnlock()
return nil
}

Expand Down Expand Up @@ -425,7 +438,6 @@ func (t *Transport) gsOutgoingBlockHook(p peer.ID, request graphsync.RequestData
// gsReqRecdHook is a graphsync.OnRequestReceivedHook hook
// if an incoming request does not match a previous push request, it returns an error.
func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {

// if this is a push request the sender is us.
msg, err := extension.GetTransferData(request)
if err != nil {
Expand Down
Loading

0 comments on commit 1dcb6eb

Please sign in to comment.