-
Notifications
You must be signed in to change notification settings - Fork 233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DHT Request pipelining #92
Conversation
Summoning @whyrusleeping @Stebalien |
c0e98e4
to
c4f4864
Compare
Note that there is some subtlety in the fallback to the single request per stream protocol: the single message counter is only incremented in the case of a successful request. In the case of pipelined requests failing in the read, some of them (if there is more than 2) may fail in their retries due to the old protocol while we are collecting enough samples to fallback. |
eb0cd02
to
a4ee75c
Compare
Rebased on master for #93 |
1f9fe55
to
c98efab
Compare
dht_net.go
Outdated
select { | ||
case res = <-resch: | ||
|
||
case <-t.C: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd just add a deadline to the context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do lose the ability to distinguish ErrReadTimeount however.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should be able to learn why this failed from ctx.Err()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, that's what I return now -- it's just not ErrReadTimeout
anymore but rather "context deadline exceeded".
dht_net.go
Outdated
defer s.Close() | ||
|
||
w := ggio.NewDelimitedWriter(s) | ||
return w.WriteMsg(pmes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably reset the stream on error instead of closing it (probably not that important but generally a good idea).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: It's safe to close the stream after resetting it (so you can leave the defer s.Close()
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, small thing to fix.
dht_net.go
Outdated
case <-t.C: | ||
return ErrReadTimeout | ||
return nil, ErrReadTimeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, I'd switch to a context deadline (not your code but easy cleanup).
dht_net.go
Outdated
ms.r = ggio.NewDelimitedReader(nstr, inet.MessageSizeMax) | ||
r := ggio.NewDelimitedReader(nstr, inet.MessageSizeMax) | ||
rch := make(chan chan requestResult, requestResultBuffer) | ||
go messageReceiver(ms.dht.ctx, rch, r) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could save on long-running go routines by starting these only as needed and shutting them down when we have no more outstanding replies (i.e., by having some form of outstanding reply counter). If we don't keep these around, we could also afford to spin up a second goroutine to manage the outstanding requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I'm not a fan of pipelining but out-of-order replies would require a protocol change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lazily spinning the goroutine is perhaps not so hard to implement, but shutting it down gets tricky -- not sure its worth the complexity. Let me think about it.
dht_net.go
Outdated
} | ||
|
||
return nil | ||
} | ||
} | ||
|
||
func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) { | ||
ms.lk.Lock() | ||
defer ms.lk.Unlock() | ||
defer log.EventBegin(ctx, "dhtSendRequest", ms.dht.self, ms.p, pmes).Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dont technically need to prefix the event type with "dht", the log
object is created as a logger for the dht subsystem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ipfs log only gives you the event name in the event
field, so that's kind of necessary to disambiguate events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i mean for practical purposes with grep, otherwise the system field does it just fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove the prefix, the canonical way to process the event log is jq; and grep becomes only slightly more complicated, it can be a double grep for dht and then SendRequest
dht_net.go
Outdated
case res = <-resch: | ||
|
||
case <-rctx.Done(): | ||
return nil, rctx.Err() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the context is cancelled, do we want to kill off the stream? or is that handled elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the specific request context, so it's inappropriate to kill for the entire stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking a bit more about this, I think we do want to kill the whole stream after all.
The issue is that if we start having slow responses (over 1m), the pipeline will fill with everything timing out and become unusable until the responses are received (regardless of whether we have stopped waiting).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will implement with a "reset" directive to the message receiver.
dht_net.go
Outdated
case next, ok = <-rch: | ||
if !ok { | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be a little cleaner if we move the logic at the bottom of the loop into this case. Would save us from having to pre-declare those variables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, was just trying to avoid excessive indentation.
This tenatively looks good to me, I'd like to see some tests added that exercise some of the different scenarios (single vs pipelined, slow handlers). How complicated this is getting makes me think we should make a new DHT protocol handler (version bump) that has message IDs in the protobuf (which should simplify this significantly). Then in the future when we have a 1.0 release, we can drop the old code. Really, this should have been just using a message based interface from the get-go. cc @Stebalien |
I will add some more tests as these are important cases we want to make sure we handle right. I am testing with a live node for now. Re: message ids: Yes, that would make the pump goroutine completely unnecessary and let us handle it with just two locks (read and write) and a |
Some analysis of performance in concurrent requests:
|
@vyzo update here? |
will get back to it soon. |
whoops, sorry. |
rebase artifact...
So i have rebased this, but I don't know what this business with the delayed tests is. |
e174e62
to
4737c8a
Compare
@vyzo yeah, that's the same situation i've run into. anything i can assist with? |
@bigs can you repatch in the buffered writer stuff? Seems like this is important. |
@laser was trying to test network latency. The first attempt, using delayed blockstores, wasn't really sufficient. Later tests used the mocknet but I don't think they ever really removed the delayed datastore stuff (we can probably get rid of that). |
@anacrolix – were you intending to review this PR? We need to merge master into it. The conflicts don't look too bad. |
I'll try to get on to this now. |
I've been poking around this code in master. Why don't we just send a single request per stream? |
@anacrolix because opening a new stream is expensive right now. Once we get multistream 2, we can just do that. |
See: #167 |
I am experimenting with an alternative to this, that optimistically reuses streams, and creates new ones if they're blocked. I believe it won't suffer from pipelining issues, and cross-polluting timeouts etc. |
The DHT code has moved on and this would need to be re-implemented. |
The big lock, shared by SendMessage and SendRequest, restricts our ability to issue pipelined requests with potentially significant performance impact -- #88
This patch implants a goroutine pump for serializing message reads out of line. This allows us to lock only for the duration of the write, which pipelines concurrent requests.