Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepared message concept #207

Closed
wants to merge 2 commits into from
Closed

Prepared message concept #207

wants to merge 2 commits into from

Conversation

FZambia
Copy link
Contributor

@FZambia FZambia commented Jan 22, 2017

This is a concept of PreparedMessage seen in #182 to improve memory usage with enabled compression mentioned in #203

I don't pretend that this pull request will be merged in a form as it's now. It's just an attempt to prove that preparing message can help a lot to reduce CPU and memory usage in PUB/SUB scenarios with many active subscribers (broadcast) especially when permessage compression is enabled.

I wrote several benchmarks inside - create many (10k) connections and send the same message to them:

  • without compression
  • with enabled compression
  • without compression preparing message before broadcasting
  • with compression enabled preparing message before broadcasting

It's possible to tweak some constants in benchmarks to modify behaviour a bit (num of conns, num of messages on every iteration).

Here results from my machine:

fz@websocket: go test -test.run=XXX -bench=BenchmarkBroadcast
BenchmarkBroadcastNoCompression-4             	     200	   8363120 ns/op	       9 B/op	       0 allocs/op
BenchmarkBroadcastWithCompression-4           	      50	  84743048 ns/op	101941565 B/op	   31711 allocs/op
BenchmarkBroadcastNoCompressionPrepared-4     	     200	   8315049 ns/op	     321 B/op	       2 allocs/op
BenchmarkBroadcastWithCompressionPrepared-4   	     200	   8447478 ns/op	   20435 B/op	       5 allocs/op

The difference is huge! This makes me feel that I did something wrong:) But at least I wrote tests to compare connection streams when using PreparedMessage and not. Also tried on simple chat demo - it works.

@garyburd as you can see I make PreparedMessage using fake connection writers - it was the simplest solution I could imagine that does not require massive refactoring and a bigger chance for me to make a mistake.

@garyburd
Copy link
Contributor

I took a quick look at this. I might be mistaken, but it looks like PreparedMessage payload fields share memory with pooled connections.

The fake connection is clever. I don't think the a massive refactoring is required, but I would need to spend some time on it to know for sure.

The NextWriter/Write/Close sequence can be replaced with a call to WriteMessage.

Some type assertions can be eliminated by declaring netConn as

type netConn struct {
    bytes.Buffer
}
func (netConn) Read(p []byte) (int, error) { return 0, nil }

@garyburd
Copy link
Contributor

garyburd commented Jan 24, 2017

I noticed that the client case is not handled. Window sizes will not be handled when and if that feature is added. To handle all of the possible message variants, I suggest creating the variants lazily from an uncompressed server message.

 type preparedKey struct {
     isServer bool
     compressed bool
}

type preparedFrame struct {
    once sync.Once
    data []byte
}

type PreparedMessage struct {
    frameType int
    data []byte // slice of frames[preparedKey{isServer: true, compressed: false}].data
    mu sync.Mutex
    frames map[frameKey]*preparedFrame
}

func (pm *PreparedMessage) frameData(key preparedKey) (int, []byte) {
   pm.mu.Lock()
   frame := pm.frames[key]
   if frame == nil {
       frame = &preparedFrame{}
       pm.frames[key] = frame
   } 
   pm.mu.Unlock()
   frame.once.Do(func() {
         // create frame.data using key and pm.data
   })
   return pm.frameType, frame.data
}

func (c *Conn) WritePreparedMessage(pm *PreparedMessage) error {
   frameType, frameData := pm.frame(preparedKey{
          isServer: c.IsServer, 
          compressed: c.newCompressionWriter != nil && c.enableWriteCompression,
   })
   if c.isWriting {
      panic("concurrent write to websocket connection")
   }
   c.isWriting = true
   err := c.write(frameType, c.writeDeadline, frameData, nil)
   if !c.isWriting {
          panic("concurrent write to websocket connection")
   }
   c.isWriting = false
   return err
}

The options for NewPreparedMessage should be passed as a struct to make it possible to add more options later:

func NewPreparedMessage(messageType int, data []byte, options *PreparedMessageOptions) *PreparedMessage {
}

type PreparedMessageOptions struct {
   compressionLevel int // zero means use default, not no compression
}

@FZambia
Copy link
Contributor Author

FZambia commented Jan 24, 2017

Thanks for looking!

I might be mistaken, but it looks like PreparedMessage payload fields share memory with pooled connections.

Exactly, need a copy

To handle all of the possible message variants, I suggest creating the variants lazily from an uncompressed server message.

Looks like preparedKey should also include compression level as it can be set per connection right?

@garyburd
Copy link
Contributor

Looks like preparedKey should also include compression level as it can be set per connection right?

I specified the compression level in the prepared message options because I think that applications will select the compression level based on the size and type of the message data. I don't think it's something that applications will want to set per connection.

The connection API has SetCompressionLevel to match SetWriteDeadline and EnableCompression. If I could design the API from scratch, I'd put the options in a struct:

 type WriteOptions {
      messageType int
      timeout time.Duration
      disableCompression bool
      compressionLevel int // 0 uses default level
}

 func (c *Conn) NextWriter(options *WriteOptions) (io.WriteCloser, error) {
 }

func NewPreparedMessage(data []byte, options *WriteOptions) *PreparedMessage {
}

@garyburd
Copy link
Contributor

As I think about it more, I don't like that prepared messages in my proposal use SetWriteDeadline and ignores EnableCompression and SetCompressionLevel. These are details that the developer should not need to think about. I think it's simpler to say that the prepared message is sent using the connection's current settings. In that case, compression level should be part of the key.

I'd like to find a better name than "prepared" for the feature because the feature is really a cache. Perhaps something like this:

// MultiMessage caches on the wire representations of a message payload.
// Use MultiMessage to efficiently send a message payload to multiple
// connections. MultiMessage is especially useful when compression 
/ is used because the CPU and memory expensive compression operation
// can be executed once for a given set of compression options. 
type MultiMessage struct {
}

func NewMultiMessage(messageType int, data []byte) *MultiMessage {
}

I am not happy with this name either, but I think it's better than PreparedMessage.

@FZambia
Copy link
Contributor Author

FZambia commented Jan 24, 2017

@garyburd I'll try to implement everything you said. What's good in PreparedMessage name is that it's already used in websocket libraries for other programming languages - so it could be convenient for some developers. If you don't mind I'll leave it unchanged until your next pr revision and then will change on final variant.

@garyburd
Copy link
Contributor

Sounds good. Do we agree on the following API?

type PreparedMessag struct { /* unexported fields */ }
 func NewPreparedMessage(messageType int, data []byte) *PreparedMessage
 func (c *Conn) WritePreparedMessage(*PreparedMessage) error

If you would like, I'll take on refactoring the code to eliminate the need for fake conn. I can do this after you are done with everything else.

@FZambia
Copy link
Contributor Author

FZambia commented Jan 24, 2017

Yes, let me then push everything we discussed and then you can take over on this.

@FZambia
Copy link
Contributor Author

FZambia commented Jan 24, 2017

@garyburd just pushed all changes - also added test that requires fix in #208 to pass.

@FZambia
Copy link
Contributor Author

FZambia commented Jan 24, 2017

Oh, well, this won't pass also on old go versions because I use -2 compression level in TestPreparedMessageBytesStreamCompressed test. But as you will anyway refactoring/renaming things please change this as you like. I won't touch this pr - looking forward to your changes, if you want you can even close this and start from scratch.

c.newCompressionWriter = compressNoContextTakeover
c.SetCompressionLevel(key.compressionLevel)
}
writeErr := c.WriteMessage(pm.frameType, pm.data)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shadowing here, need writeErr = c.WriteMessage(pm.frameType, pm.data)

@FZambia
Copy link
Contributor Author

FZambia commented Feb 2, 2017

@garyburd did you have a chance to turn to this since then? Maybe I can help with sth - at least fix tests and shadowing?

@garyburd
Copy link
Contributor

garyburd commented Feb 6, 2017

Let's continue the discussion at #211

@garyburd garyburd closed this Feb 6, 2017
@gorilla gorilla locked and limited conversation to collaborators Feb 14, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants