Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase in memory usage when compression is on #203

Open
joshdvir opened this issue Jan 11, 2017 · 54 comments
Open

Increase in memory usage when compression is on #203

joshdvir opened this issue Jan 11, 2017 · 54 comments
Labels

Comments

@joshdvir
Copy link

Hi,

I've been started using with Centrifugo in the past week.
I'm using the raw Websocket endpoint which uses this library under the hood.

I'm experiencing a situation where per-message-deflate is enabled there is a massive grow in memory up to the point the docker container crashes for using too much memory.

I'm running inside a docker container, with average 150K-200K concurrent users, my average message rate is between 30K-50K messages per sec, with average message size of 600 bytes.

Without the per-message-deflate the is no memory grow at all and performance is awesome, but the data transfer is very high.

Can anyone help me investigate it ?

Thank you.

@kisielk
Copy link
Contributor

kisielk commented Jan 11, 2017

First step would probably be to get a heap and allocation profile of your application using pprof.

@garyburd
Copy link
Contributor

Please generate the profiles with the latest version of the package. The recent change to pool flate readers and writes should help.

@joshdvir
Copy link
Author

Thanks guys,

I'm waiting for the next version of Centrifugo which includes your latest version and then will profile the application and upload here.

You can close this issue until then, but I hope to have the new version today or tomorrow the latest so it's up to you.

@joshdvir
Copy link
Author

joshdvir commented Jan 11, 2017

Here is the pprof dump

This is the heap profile

@kisielk @garyburd I've updated the gist please check now

https://gist.github.com/joshdvir/091229e3d3e4ade8d73b8cffe86c602b

@FZambia
Copy link
Contributor

FZambia commented Jan 13, 2017

I asked @joshdvir to send be cpu and memory profiles from production node, here is what we have:

CPU:

(pprof) top 20 --cum
28.99s of 62.03s total (46.74%)
Dropped 523 nodes (cum <= 0.31s)
Showing top 20 nodes out of 155 (cum >= 8.07s)
      flat  flat%   sum%        cum   cum%
         0     0%     0%     58.68s 94.60%  runtime.goexit
     0.05s 0.081% 0.081%     45.44s 73.25%  github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessages
     0.16s  0.26%  0.34%     44.23s 71.30%  github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessage
     0.16s  0.26%   0.6%     44.07s 71.05%  github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*wsSession).Send
     0.05s 0.081%  0.68%     43.82s 70.64%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage
     0.01s 0.016%  0.69%     21.67s 34.93%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*flateWriteWrapper).Close
     0.03s 0.048%  0.74%     20.19s 32.55%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).NextWriter
     0.07s  0.11%  0.85%     19.79s 31.90%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.compressNoContextTakeover
    17.56s 28.31% 29.16%     17.56s 28.31%  runtime.memclr
     0.04s 0.064% 29.23%     15.46s 24.92%  compress/flate.(*Writer).Reset
     0.03s 0.048% 29.28%     15.42s 24.86%  compress/flate.(*compressor).reset
         0     0% 29.28%     14.40s 23.21%  compress/flate.(*Writer).Flush
         0     0% 29.28%     14.40s 23.21%  compress/flate.(*compressor).syncFlush
     2.62s  4.22% 33.50%     14.01s 22.59%  compress/flate.(*compressor).deflate
     0.01s 0.016% 33.52%     11.05s 17.81%  compress/flate.(*compressor).writeBlock
     0.15s  0.24% 33.76%     11.04s 17.80%  compress/flate.(*huffmanBitWriter).writeBlock
     0.21s  0.34% 34.10%      9.05s 14.59%  runtime.systemstack
     0.06s 0.097% 34.19%      8.87s 14.30%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*messageWriter).flushFrame
     0.07s  0.11% 34.31%      8.81s 14.20%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).write

Most of cpu time spent in WriteMessage:

(pprof) list WriteMessage
Total: 1.03mins
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/conn.go
      50ms     43.82s (flat, cum) 70.64% of Total
         .          .    659:
         .          .    660:// WriteMessage is a helper method for getting a writer using NextWriter,
         .          .    661:// writing the message and closing the writer.
         .          .    662:func (c *Conn) WriteMessage(messageType int, data []byte) error {
         .          .    663:
      50ms       50ms    664:	if c.isServer && (c.newCompressionWriter == nil || !c.enableWriteCompression) {
         .          .    665:
         .          .    666:		// Fast path with no allocations and single frame.
         .          .    667:
         .       20ms    668:		if err := c.prepWrite(messageType); err != nil {
         .          .    669:			return err
         .          .    670:		}
         .          .    671:		mw := messageWriter{c: c, frameType: messageType, pos: maxFrameHeaderSize}
         .       10ms    672:		n := copy(c.writeBuf[mw.pos:], data)
         .          .    673:		mw.pos += n
         .          .    674:		data = data[n:]
         .      1.69s    675:		return mw.flushFrame(true, data)
         .          .    676:	}
         .          .    677:
         .     20.19s    678:	w, err := c.NextWriter(messageType)
         .          .    679:	if err != nil {
         .          .    680:		return err
         .          .    681:	}
         .      190ms    682:	if _, err = w.Write(data); err != nil {
         .          .    683:		return err
         .          .    684:	}
         .     21.67s    685:	return w.Close()
         .          .    686:}

NextWriter:

(pprof) list NextWriter
Total: 1.03mins
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).NextWriter in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/conn.go
      30ms     20.19s (flat, cum) 32.55% of Total
         .          .    437:// method flushes the complete message to the network.
         .          .    438://
         .          .    439:// There can be at most one open writer on a connection. NextWriter closes the
         .          .    440:// previous writer if the application has not already done so.
         .          .    441:func (c *Conn) NextWriter(messageType int) (io.WriteCloser, error) {
         .       90ms    442:	if err := c.prepWrite(messageType); err != nil {
         .          .    443:		return nil, err
         .          .    444:	}
         .          .    445:
         .          .    446:	mw := &messageWriter{
         .          .    447:		c:         c,
         .          .    448:		frameType: messageType,
      10ms      280ms    449:		pos:       maxFrameHeaderSize,
         .          .    450:	}
         .       10ms    451:	c.writer = mw
         .          .    452:	if c.newCompressionWriter != nil && c.enableWriteCompression && isData(messageType) {
      10ms     19.80s    453:		w := c.newCompressionWriter(c.writer)
         .          .    454:		mw.compress = true
      10ms       10ms    455:		c.writer = w
         .          .    456:	}
         .          .    457:	return c.writer, nil
         .          .    458:}
         .          .    459:
         .          .    460:type messageWriter struct {

compressNoContextTakeover:

(pprof) list compressNoContextTakeover
Total: 1.03mins
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.compressNoContextTakeover in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/compression.go
      70ms     19.79s (flat, cum) 31.90% of Total
         .          .     33:	fr.(flate.Resetter).Reset(io.MultiReader(r, strings.NewReader(tail)), nil)
         .          .     34:	return &flateReadWrapper{fr}
         .          .     35:}
         .          .     36:
         .          .     37:func compressNoContextTakeover(w io.WriteCloser) io.WriteCloser {
         .      130ms     38:	tw := &truncWriter{w: w}
      40ms      3.93s     39:	fw, _ := flateWriterPool.Get().(*flate.Writer)
      10ms     15.47s     40:	fw.Reset(tw)
      20ms      260ms     41:	return &flateWriteWrapper{fw: fw, tw: tw}
         .          .     42:}

And now heap profile:

(pprof) top 30 --cum
4794.23MB of 5414.45MB total (88.55%)
Dropped 238 nodes (cum <= 27.07MB)
Showing top 30 nodes out of 46 (cum >= 113.64MB)
      flat  flat%   sum%        cum   cum%
         0     0%     0%  5385.39MB 99.46%  runtime.goexit
         0     0%     0%  4277.82MB 79.01%  sync.(*Pool).Get
         0     0%     0%  4277.82MB 79.01%  sync.(*Pool).getSlow
         0     0%     0%  4182.80MB 77.25%  github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessages
         0     0%     0%  4181.80MB 77.23%  github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessage
         0     0%     0%  4181.80MB 77.23%  github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*wsSession).Send
         0     0%     0%  4181.80MB 77.23%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage
       8MB  0.15%  0.15%  4168.27MB 76.98%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).NextWriter
      12MB  0.22%  0.37%  4160.27MB 76.84%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.compressNoContextTakeover
 3792.80MB 70.05% 70.42%  4148.27MB 76.61%  compress/flate.NewWriter
         0     0% 70.42%  4148.27MB 76.61%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.glob..func1
    0.50MB 0.0092% 70.43%  1156.29MB 21.36%  net/http.(*conn).serve
         0     0% 70.43%   873.42MB 16.13%  github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*HTTPServer).Logged.func1
         0     0% 70.43%   873.42MB 16.13%  net/http.HandlerFunc.ServeHTTP
         0     0% 70.43%   872.92MB 16.12%  github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*HTTPServer).WrapShutdown.func1
         0     0% 70.43%   872.92MB 16.12%  net/http.(*ServeMux).ServeHTTP
         0     0% 70.43%   872.92MB 16.12%  net/http.serverHandler.ServeHTTP
         0     0% 70.43%   866.91MB 16.01%  github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*HTTPServer).RawWebsocketHandler
         0     0% 70.43%   866.91MB 16.01%  github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*HTTPServer).RawWebsocketHandler-fm
         0     0% 70.43%   404.78MB  7.48%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).ReadMessage
  355.47MB  6.57% 76.99%   355.47MB  6.57%  compress/flate.(*compressor).init
         0     0% 76.99%   320.19MB  5.91%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Upgrader).Upgrade
    0.50MB 0.0092% 77.00%   292.64MB  5.40%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).NextReader
    1.50MB 0.028% 77.03%   291.64MB  5.39%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.decompressNoContextTakeover
  215.85MB  3.99% 81.02%   216.35MB  4.00%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.newConn
  159.10MB  2.94% 83.96%   159.10MB  2.94%  compress/flate.(*decompressor).Reset
  129.04MB  2.38% 86.34%   129.04MB  2.38%  compress/flate.NewReader
         0     0% 86.34%   129.04MB  2.38%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.glob..func2
  119.46MB  2.21% 88.55%   119.46MB  2.21%  net/http.newBufioWriterSize
         0     0% 88.55%   113.64MB  2.10%  io/ioutil.ReadAll

NextWriter:

(pprof) list WriteMessage
Total: 5.29GB
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/conn.go
         0     4.08GB (flat, cum) 77.23% of Total
         .          .    673:		mw.pos += n
         .          .    674:		data = data[n:]
         .          .    675:		return mw.flushFrame(true, data)
         .          .    676:	}
         .          .    677:
         .     4.07GB    678:	w, err := c.NextWriter(messageType)
         .          .    679:	if err != nil {
         .          .    680:		return err
         .          .    681:	}
         .          .    682:	if _, err = w.Write(data); err != nil {
         .          .    683:		return err
         .          .    684:	}
         .    13.53MB    685:	return w.Close()
         .          .    686:}

compressNoContextTakeover:

(pprof) list compressNoContextTakeover
Total: 5.29GB
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.compressNoContextTakeover in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/compression.go
      12MB     4.06GB (flat, cum) 76.84% of Total
         .          .     33:	fr.(flate.Resetter).Reset(io.MultiReader(r, strings.NewReader(tail)), nil)
         .          .     34:	return &flateReadWrapper{fr}
         .          .     35:}
         .          .     36:
         .          .     37:func compressNoContextTakeover(w io.WriteCloser) io.WriteCloser {
      10MB       10MB     38:	tw := &truncWriter{w: w}
         .     4.05GB     39:	fw, _ := flateWriterPool.Get().(*flate.Writer)
         .          .     40:	fw.Reset(tw)
       2MB        2MB     41:	return &flateWriteWrapper{fw: fw, tw: tw}
         .          .     42:}

@garyburd
Copy link
Contributor

@FZambia Thank you for posting the profile information.

Counters should be added here and here to determine how effective the pool is. Perhaps there's a code path where the flate writer is not returned to the pool.

@y3llowcake
Copy link
Contributor

Possibly related: golang/go#18625

@FZambia
Copy link
Contributor

FZambia commented Jan 14, 2017

@y3llowcake thanks for pointing on this issue.

I've written test case for Gorilla Websocket:

type testConn struct {
	conn     *Conn
	messages chan []byte
}

func newTestConn(c *Conn, bufferSize int) *testConn {
	return &testConn{
		conn:     c,
		messages: make(chan []byte, bufferSize),
	}
}

func printss() {
	m := runtime.MemStats{}
	runtime.ReadMemStats(&m)
	fmt.Printf("inuse: %d sys: %d\n", m.StackInuse, m.StackSys)
}

func TestWriteWithCompression(t *testing.T) {
	w := ioutil.Discard
	done := make(chan struct{})
	numConns := 1000
	numMessages := 1000
	conns := make([]*testConn, numConns)
	var wg sync.WaitGroup
	for i := 0; i < numConns; i++ {
		c := newConn(fakeNetConn{Reader: nil, Writer: w}, false, 1024, 1024)
		c.enableWriteCompression = true
		c.newCompressionWriter = compressNoContextTakeover
		conns[i] = newTestConn(c, 256)
		wg.Add(1)
		go func(c *testConn) {
			defer wg.Done()
			i := 0
			for i < numMessages {
				select {
				case <-done:
					return
				case msg := <-c.messages:
					c.conn.WriteMessage(TextMessage, msg)
					i++
				}
			}
		}(conns[i])
	}
	messages := textMessages(100)
	for i := 0; i < numMessages; i++ {
		if i%100 == 0 {
			printss()
		}
		msg := messages[i%len(messages)]
		for _, c := range conns {
			c.messages <- msg
		}
	}
	wg.Wait()
}

func textMessages(num int) [][]byte {
	messages := make([][]byte, num)
	for i := 0; i < num; i++ {
		msg := fmt.Sprintf("planet: %d, country: %d, city: %d, street: %d", i, i, i, i)
		messages[i] = []byte(msg)
	}
	return messages
}

It creates 1000 connections with compression enabled, each with buffered message channel. Then in a loop we write message into each connection.

Here is how it behaves with go1.7.4

fz@websocket: go test -test.run=TestWriteWithCompression
inuse: 4259840 sys: 4259840
inuse: 27394048 sys: 27394048
inuse: 246251520 sys: 246251520
inuse: 1048510464 sys: 1048510464
inuse: 1048510464 sys: 1048510464
inuse: 1049034752 sys: 1049034752
inuse: 1049034752 sys: 1049034752
inuse: 1049034752 sys: 1049034752
inuse: 1049034752 sys: 1049034752
inuse: 1049034752 sys: 1049034752
PASS
ok  	github.com/gorilla/websocket	11.053s

Using Go with commit golang/go@9c3630f

fz@websocket: go1.8 test -test.run=TestWriteWithCompression
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
PASS
ok  	github.com/gorilla/websocket	12.023s

Though It's hard to say at moment will this fix solve original problem in this issue or not.

I also tried the same with flate from https://github.com/klauspost/compress by @klauspost which already contains that array copy fix in master:

fz@websocket: go test -test.run=TestWriteWithCompression
inuse: 4358144 sys: 4358144
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
PASS
ok  	github.com/gorilla/websocket	3.426s

But actually even without that fix https://github.com/klauspost/compress library behaves without memory grows... I can't explain this.

Also here is benchmark result using https://github.com/klauspost/compress library:

BenchmarkWriteWithCompression-4   	  200000	      5676 ns/op	     149 B/op	       3 allocs/op

It's 4x speedup comparing to standard lib compress/flate results:

BenchmarkWriteWithCompression-4   	   50000	     25362 ns/op	     128 B/op	       3 allocs/op

@garyburd I understand that having non-standard lib package in core could be a wrong step, but maybe we can consider mechanism to let it be plugged somehow by user's code?

@klauspost
Copy link

even without that fix [it] behaves without memory grows... I can't explain this.

AFAICT, this package uses "level 3" compression (which is a good choice). In my package level 1-4 are a specialized, and do not use the "generic" code, which has the issue.

In Go 1.7 level 1 (Best speed) has a similar specialized function. I would think that if you use that, you will not experience the issue. That might be a solution you can use, so you do not have to import a specialized package (even if I wouldn't mind to give users the option). Performance for level 1 should be very close to my package.

@FZambia
Copy link
Contributor

FZambia commented Jan 14, 2017

@klauspost thanks for explaining, just tried what you said - yes, with compression level 1 performance is comparable to your library and has no memory problems in go1.7 (in test case above)

@FZambia
Copy link
Contributor

FZambia commented Jan 17, 2017

@garyburd what do you think about this? I see two solutions that can help us - make compression level exported variable or allow to plug custom flate implementation. Of course we can also wait for go1.8 but a way to improve compression performance still very important. Do you want us to try creating custom build with advices you gave and fixed array copy bug and see how it behaves in production?

@garyburd
Copy link
Contributor

@FZambia How about setting compression level to one for now? It's probably the best option for most applications at this time and avoids exposing more API surface area.

@joshdvir
Copy link
Author

@garyburd I agree with @FZambia, having the option to set the compression would help greatly, for my specific use case I have a fan out of 80K messages per sec for 200K users which cause a lot of traffic out, if I will be able to set the compression level I could find the sweet spot between servers numbers and traffic out.
Usually, servers cost much less than traffic when the traffic is so high so having this option configurable will be awesome
Thanks

@garyburd
Copy link
Contributor

OK, let's add the following:

type CompressionOptions {
    // Level specifies the compression level for the flate compressor.  Valid levels range from
    // -2 to 9.  Level -2 will use Huffman compression only. Level -1 uses the default compression 
    // level.  Level 0 does not attempt any compression.  Levels 1 through 9 range from best
    // speed to best compression. 
    // 
    // Applications should set this field.  The default value of 0 does not attempt any compression.
    Level int
}

type Dialer struct {
   // CompressionOptions specifies options the client should use for 
   // per message compression (RFC 7692).  If CompressionOptions is nil and
   // EnableCompression is nil, then the client does not attempt to negotiate
   // compression with the server.
   CompressionOptions *CompressionOptions
    
  // EnableCompression specifies if the client should attempt to negotiate
  // per message compression (RFC 7692). Setting this value to true does not
  // guarantee that compression will be supported. Currently only "no context
  // takeover" modes are supported.
  //
  // Deprecated: Set CompressionOptions to non-nil value to enable compression
  // negotiation.
  EnableCompression bool
}

Modify Upgrader to match Dialer.

@FZambia
Copy link
Contributor

FZambia commented Jan 17, 2017

@garyburd I agree that level 1 is better for default value at moment because it fixes memory grows on Go1.7 and compression is so costly. But looks like on fanout level in such a big apps as @joshdvir has saving bandwidth saves a lot of money so having compression level configurable makes sense.

We made a custom build with compression level 1 and counters you suggested and put it into production. Counter values are:

Node 1:
"gorilla_websocket_flate_writer_from_pool": 1453147,
"gorilla_websocket_new_flate_writer": 6702

Node 2:
"gorilla_websocket_flate_writer_from_pool": 1820919,
"gorilla_websocket_new_flate_writer": 3676,

Node 3:
"gorilla_websocket_flate_writer_from_pool": 574187,
"gorilla_websocket_new_flate_writer": 321

...

It's aggregation over 1 minute. So pool looks pretty effective but...

...Compression is still the leader in allocs and CPU profiles and getting from sync.Pool is the most allocation-expensive operation for some reason.

Here is CPU profile now:

(pprof) top 30 --cum
27.28s of 52.42s total (52.04%)
Dropped 414 nodes (cum <= 0.26s)
Showing top 30 nodes out of 137 (cum >= 1.89s)
      flat  flat%   sum%        cum   cum%
         0     0%     0%     50.21s 95.78%  runtime.goexit
     0.16s  0.31%  0.31%     43.93s 83.80%  github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessages
     0.21s   0.4%  0.71%     42.52s 81.11%  github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessage
     0.19s  0.36%  1.07%     42.31s 80.71%  github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*wsSession).Send
     0.21s   0.4%  1.47%     41.87s 79.87%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage
     0.01s 0.019%  1.49%     35.43s 67.59%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*flateWriteWrapper).Close
     0.03s 0.057%  1.55%     24.69s 47.10%  compress/flate.(*Writer).Flush
         0     0%  1.55%     24.66s 47.04%  compress/flate.(*compressor).syncFlush
     0.04s 0.076%  1.62%     24.03s 45.84%  compress/flate.(*compressor).encSpeed
     0.08s  0.15%  1.77%     18.16s 34.64%  compress/flate.(*huffmanBitWriter).writeBlockDynamic
     0.12s  0.23%  2.00%     15.03s 28.67%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*messageWriter).flushFrame
     0.11s  0.21%  2.21%     14.90s 28.42%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).write
     0.91s  1.74%  3.95%     13.21s 25.20%  compress/flate.(*huffmanEncoder).generate
         0     0%  3.95%     12.72s 24.27%  net.(*conn).Write
     0.06s  0.11%  4.06%     12.72s 24.27%  net.(*netFD).Write
    11.78s 22.47% 26.54%     12.16s 23.20%  syscall.Syscall
     0.05s 0.095% 26.63%     12.09s 23.06%  syscall.Write
     0.02s 0.038% 26.67%     12.04s 22.97%  syscall.write
     0.61s  1.16% 27.83%     11.98s 22.85%  compress/flate.(*huffmanBitWriter).indexTokens
         0     0% 27.83%     10.61s 20.24%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*messageWriter).Close
     5.20s  9.92% 37.75%      6.62s 12.63%  compress/flate.(*huffmanEncoder).bitCounts
     4.77s  9.10% 46.85%      5.44s 10.38%  compress/flate.encodeBestSpeed

list WriteMessage

(pprof) list WriteMessage
Total: 52.42s
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/conn.go
     210ms     41.87s (flat, cum) 79.87% of Total
         .          .    659:
         .          .    660:// WriteMessage is a helper method for getting a writer using NextWriter,
         .          .    661:// writing the message and closing the writer.
         .          .    662:func (c *Conn) WriteMessage(messageType int, data []byte) error {
         .          .    663:
     160ms      160ms    664:	if c.isServer && (c.newCompressionWriter == nil || !c.enableWriteCompression) {
         .          .    665:
         .          .    666:		// Fast path with no allocations and single frame.
         .          .    667:
      10ms       40ms    668:		if err := c.prepWrite(messageType); err != nil {
         .          .    669:			return err
         .          .    670:		}
         .          .    671:		mw := messageWriter{c: c, frameType: messageType, pos: maxFrameHeaderSize}
         .       80ms    672:		n := copy(c.writeBuf[mw.pos:], data)
         .          .    673:		mw.pos += n
         .          .    674:		data = data[n:]
      10ms      4.43s    675:		return mw.flushFrame(true, data)
         .          .    676:	}
         .          .    677:
         .      1.63s    678:	w, err := c.NextWriter(messageType)
         .          .    679:	if err != nil {
         .          .    680:		return err
         .          .    681:	}
      30ms      100ms    682:	if _, err = w.Write(data); err != nil {
         .          .    683:		return err
         .          .    684:	}
         .     35.43s    685:	return w.Close()

list Close

         .          .    104:func (w *flateWriteWrapper) Close() error {
         .          .    105:	if w.fw == nil {
         .          .    106:		return errWriteClosed
         .          .    107:	}
         .     24.69s    108:	err1 := w.fw.Flush()
      10ms      130ms    109:	flateWriterPool.Put(w.fw)
         .          .    110:	w.fw = nil
         .          .    111:	if w.tw.p != [4]byte{0, 0, 0xff, 0xff} {
         .          .    112:		return errors.New("websocket: internal error, unexpected bytes at end of flate stream")
         .          .    113:	}
         .     10.61s    114:	err2 := w.tw.w.Close()
         .          .    115:	if err1 != nil {
         .          .    116:		return err1
         .          .    117:	}
         .          .    118:	return err2
         .          .    119:}

list Flush

         .          .    711:// In the terminology of the zlib library, Flush is equivalent to Z_SYNC_FLUSH.
         .          .    712:func (w *Writer) Flush() error {
         .          .    713:	// For more about flushing:
         .          .    714:	// http://www.bolet.org/~pornin/deflate-flush.html
      30ms     24.69s    715:	return w.d.syncFlush()
         .          .    716:}
         .          .    717:
         .          .    718:// Close flushes and closes the writer.
         .          .    719:func (w *Writer) Close() error {
         .          .    720:	return w.d.close()
ROUTINE ======================== compress/flate.(*compressor).syncFlush in /Users/fz/go1.7/src/compress/flate/deflate.go
         0     24.66s (flat, cum) 47.04% of Total
         .          .    555:func (d *compressor) syncFlush() error {
         .          .    556:	if d.err != nil {
         .          .    557:		return d.err
         .          .    558:	}
         .          .    559:	d.sync = true
         .     24.03s    560:	d.step(d)
         .          .    561:	if d.err == nil {
         .      490ms    562:		d.w.writeStoredHeader(0, false)
         .      140ms    563:		d.w.flush()
         .          .    564:		d.err = d.w.err
         .          .    565:	}
         .          .    566:	d.sync = false
         .          .    567:	return d.err
         .          .    568:}

Memory usage is much better now but it still very high, compression allocates a lot:

fz@centrifugo: go tool pprof --alloc_space centrifugo heap_profile_extra 
Entering interactive mode (type "help" for commands)
(pprof) top 30 --cum
518.97GB of 541.65GB total (95.81%)
Dropped 314 nodes (cum <= 2.71GB)
Showing top 30 nodes out of 35 (cum >= 3.33GB)
      flat  flat%   sum%        cum   cum%
         0     0%     0%   541.53GB   100%  runtime.goexit
         0     0%     0%   505.54GB 93.33%  github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessages
         0     0%     0%   504.45GB 93.13%  github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessage
         0     0%     0%   504.45GB 93.13%  github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*wsSession).Send
         0     0%     0%   504.45GB 93.13%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage
    6.63GB  1.22%  1.22%   501.75GB 92.63%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).NextWriter
    6.56GB  1.21%  2.44%   495.11GB 91.41%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.compressNoContextTakeover
         0     0%  2.44%   491.89GB 90.81%  sync.(*Pool).Get
         0     0%  2.44%   491.89GB 90.81%  sync.(*Pool).getSlow
  359.74GB 66.42% 68.85%   488.55GB 90.20%  compress/flate.NewWriter
         0     0% 68.85%   488.55GB 90.20%  github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.glob..func1
  128.81GB 23.78% 92.63%   128.81GB 23.78%  compress/flate.(*compressor).init
    0.01GB 0.0019% 92.64%    28.97GB  5.35%  net/http.(*conn).serve
         0     0% 92.64%    25.18GB  4.65%  github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*HTTPServer).Logged.func1
         0     0% 92.64%    25.18GB  4.65%  net/http.(*ServeMux).ServeHTTP
         0     0% 92.64%    25.18GB  4.65%  net/http.HandlerFunc.ServeHTTP
         0     0% 92.64%    25.18GB  4.65%  net/http.serverHandler.ServeHTTP

list NextWriter

(pprof) list NextWriter
Total: 541.65GB
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).NextWriter in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/conn.go
    6.63GB   501.75GB (flat, cum) 92.63% of Total
         .          .    444:	}
         .          .    445:
         .          .    446:	mw := &messageWriter{
         .          .    447:		c:         c,
         .          .    448:		frameType: messageType,
    6.63GB     6.63GB    449:		pos:       maxFrameHeaderSize,
         .          .    450:	}
         .          .    451:	c.writer = mw
         .          .    452:	if c.newCompressionWriter != nil && c.enableWriteCompression && isData(messageType) {
         .   495.11GB    453:		w := c.newCompressionWriter(c.writer)
         .          .    454:		mw.compress = true
         .          .    455:		c.writer = w
         .          .    456:	}
         .          .    457:	return c.writer, nil
         .          .    458:}

list compressNoContextTakeover

(pprof) list compressNoContextTakeover
Total: 541.65GB
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.compressNoContextTakeover in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/compression.go
    6.56GB   495.11GB (flat, cum) 91.41% of Total
         .          .     44:	fr.(flate.Resetter).Reset(io.MultiReader(r, strings.NewReader(tail)), nil)
         .          .     45:	return &flateReadWrapper{fr}
         .          .     46:}
         .          .     47:
         .          .     48:func compressNoContextTakeover(w io.WriteCloser) io.WriteCloser {
    4.41GB     4.41GB     49:	tw := &truncWriter{w: w}
         .   488.55GB     50:	fw, _ := flateWriterPool.Get().(*flate.Writer)
         .          .     51:	plugin.Metrics.Counters.Inc("gorilla_websocket_flate_writer_from_pool")
         .          .     52:	fw.Reset(tw)
    2.15GB     2.15GB     53:	return &flateWriteWrapper{fw: fw, tw: tw}
         .          .     54:}

--inuse_space shows similar picture just values two order of magnitude less (2.69GB for flateWriterPool.Get().(*flate.Writer) line which is leader).

It's hard to say what can we do with such a big compression overhead...

@garyburd you suggest adding CompressionOptions - I can do pull request with it, but maybe just a global exported variable like DefaultFlateCompressionLevel that we can set on application start or setter method will do the work? We don't need per-connection compression level - and we can eventually do what you suggested if there will be a need later. And no deprecation will be required this way for now.

@garyburd
Copy link
Contributor

@FZambia Thank you for testing the pool effectiveness and reporting the profiles.

I don't want to add an API now that might replaced with another API later.

As I think about it more, it's better to add one method:

// SetCompressionLevel sets the flate compression level for the next message.
// Valid levels range from -2 to 9.  Level -2 will use Huffman compression only. 
// Level -1 uses the default compression  level.  Level 0 does not attempt any
// compression.  Levels 1 through 9 range from best speed to best compression. 
func (c *Conn) SetCompressionLevel(n int) error {
}

This is more flexible than other options. The implementation will replace flateWriterPool with flatWriterPools[12]. The flateWriterWrapper will need to store the level so the writer can be returned to the correct pool.

@FZambia
Copy link
Contributor

FZambia commented Jan 18, 2017

I think that having one method that changes default compression level still makes sense:

var defaultCompressionLevel int = 1

// SetDefaultCompressionLevel sets the flate compression level which will be used by 
// default to compress messages when compression negotiated. This function must be 
// called once before application starts.
//
// Valid levels range from -2 to 9.  Level -2 will use Huffman compression only. 
// Level -1 uses the default compression  level.  Level 0 does not attempt any
// compression.  Levels 1 through 9 range from best speed to best compression. 
func (c *Conn) SetDefaultCompressionLevel(n int) error {
    defaultCompressionLevel = n
}

In most situations I suppose users that need custom compression need to set this default value once.

Then if someone needs compression level per connection/message we can add SetCompressionLevel and a [12]flateWriterPool:

// SetCompressionLevel sets the flate compression level for the next message.
// Valid levels range from -2 to 9.  Level -2 will use Huffman compression only. 
// Level -1 uses the default compression  level.  Level 0 does not attempt any
// compression.  Levels 1 through 9 range from best speed to best compression.
// If not set default compression level will be used.
func (c *Conn) SetCompressionLevel(n int) error {
}

If it's not called but compression negotiated defaultCompressionLevel will be used. The only caveat I see is that SetDefaultCompressionLevel should be called once before application starts in current implementation but it seems pretty opaque.

@FZambia
Copy link
Contributor

FZambia commented Jan 18, 2017

Just looked at size of each flate.Writer instance:

package main

import "unsafe"
import "compress/flate"

func main() {
    var w flate.Writer
    println(unsafe.Sizeof(w))
}

600Kb! This size is surprising for me - I have not even assumed that it's such a big thing:)

@klauspost
Copy link

klauspost commented Jan 18, 2017

Yeah - there is quite a number of tables needed to maintain state, produce huffman tables and buffer output. There isn't much that can be done about it, except for level -2(HuffmanOnly), 0 (No Compression) and 1(Best Speed) in the standard library.

For my own library I have been looking at reducing the memory requirements for encoding options that does not require as many buffers, see klauspost/compress#70 (it still has issues, as can be seen by the crash I logged in the issue)

@garyburd
Copy link
Contributor

@FZambia I do not want to use package level variables for settings. The setting should be included with the other settings in Dialer and Upgrader or it should be part of the connection so it can be changed from message to message.

@garyburd
Copy link
Contributor

See b0dc455.

@FZambia
Copy link
Contributor

FZambia commented Jan 19, 2017

@garyburd many thanks for your time helping us with this. Looking at all these profiles do you see any way to reduce compression memory usage and allocations?

@garyburd
Copy link
Contributor

If the application sends the same message to multiple clients, then #182 is the next thing to try.

@joshdvir
Copy link
Author

@garyburd #182 could do magic in my use case, where my fan out is between 50K-90K per sec where all messages are the same.

Thank you for your help.

@garyburd
Copy link
Contributor

garyburd commented Mar 1, 2017

Is this issue fixed by b0dc455 and 804cb60?

@FZambia
Copy link
Contributor

FZambia commented Mar 6, 2017

@garyburd heap before changes:
screen shot 2017-03-06 at 10 09 57
After changes (compression level 1 and using PreparedMessage):
screen shot 2017-03-06 at 10 10 35

The same picture for CPU. So we don't see compression in heap and cpu profiles on first place anymore.

Though memory usage reduced it's still pretty high when using compression - but this can be application problem, we will try to investigate

@FZambia
Copy link
Contributor

FZambia commented Mar 14, 2017

@garyburd just analized inuse_space profile @joshdvir gave me.

The leaders are gorilla/websocket.newConnBRW (36%), http.newBufioWriterSize (17%), http.newBufioReader (16%). I thought we used a recent improvement you added in #223 but looking at profiles I noticed that read and write buffers from hijack are not reused for some reason. Then I found a bug in Centrifugo code - ReadBufferSize and WriteBufferSize were set to SockJS-go default values (4096) in case of those sizes set to 0 in configuration. Will build with fix and come back with results.

There are some other things in profiles that might interest you - I'll try to upload svg graph visualizations from build with fix.

@FZambia
Copy link
Contributor

FZambia commented Mar 31, 2017

Here are some graphs using Centrifugo with latest Gorilla Websocket. This is from a Centrifugo node running by @joshdvir with many connections (about 100k) and websocket compression enabled (level 1), go1.7.5

CPU:

screen shot 2017-03-31 at 23 54 19

Here we see that most of cpu spent on syscalls - read, write - I don't think we can do a lot here, because we send a lot of fanout messages to different sockets. So this looks normal.

Alloc space:

screen shot 2017-03-31 at 23 52 49

Here we see a big impact of compress/flate.(*decompressor).Reset - I thought a bit how to improve this, but have not found a way..

Inuse space:

screen shot 2017-04-01 at 00 00 31

@garyburd do you see a way to improve things further? If not then we can live with this I suppose.

P.S. I am going to create many ws connections on local machine with compression enabled/disabled and compare profiles.

@klauspost
Copy link

klauspost commented Aug 13, 2018

Maybe if you wrote a fairly standalone benchmark, meaning only using the stdlib and deflate which represented real-world scenarios I could use that as a benchmark for testing if a leaner encoder would actually help.

The PR itself would have to be rewritten, but having a good benchmark would be a starting point.

We could also add a []byte -> []byte API, where the complete frame is supplied that would eliminate the need to have an internal buffer (and the penalty of copying to it).

@nhooyr
Copy link

nhooyr commented Sep 29, 2018

Given these performance issues, is compression worth enabling?

We could also add a []byte -> []byte API, where the complete frame is supplied that would eliminate the need to have an internal buffer (and the penalty of copying to it).

How would this API work? Would we allow writing directly to the net.Conn?

@ghost
Copy link

ghost commented Sep 29, 2018

On the server without compression, the WriteMessage API mostly copies the data directly from the application supplied []byte to the underlying network connection. The prepared message API also copies a []byte directly to the underlying network connection.

@nhooyr
Copy link

nhooyr commented May 31, 2019

I believe this issue is due to compress/flate not not exposing a way to adjust the sliding window. See https://golang.org/issue/3155

Thus, every flat writer allocates a lot of memory and even though they are reused via pool, this ends up causing the large memory usage spike with compression on.

@nhooyr
Copy link

nhooyr commented May 31, 2019

Doesn't seem like there is anything this library can do further until that issue is closed.

@nhooyr
Copy link

nhooyr commented May 31, 2019

In fact, according to this benchmark

func BenchmarkFlate(b *testing.B) {
	b.ReportAllocs()
	for i := 0; i < b.N; i++ {
		flate.NewWriter(nil, flate.BestSpeed)
	}
}
$ go test -bench=BenchmarkFlate -run=^$
goos: darwin
goarch: amd64
pkg: scratch
BenchmarkFlate-8   	   10000	    131504 ns/op	 1200008 B/op	      15 allocs/op
PASS
ok  	scratch	1.345s

flate.NewWriter allocates 1.2 MB! That's a lot of memory to allocate for compression when you're writing 600 byte WebSocket messages.

@nhooyr
Copy link

nhooyr commented May 31, 2019

With BestCompression or DefaultCompress, the allocation drops 800 KB but that's still massive.

# DefaultCompression
$ go test -bench=BenchmarkFlate -run=^$
goos: darwin
goarch: amd64
pkg: nhooyr.io/websocket
BenchmarkFlate-8   	   20000	     93332 ns/op	  806784 B/op	      13 allocs/op
PASS
ok  	nhooyr.io/websocket	2.848s
# BestCompression
$ go test -bench=BenchmarkFlate -run=^$
goos: darwin
goarch: amd64
pkg: nhooyr.io/websocket
BenchmarkFlate-8   	   20000	     95197 ns/op	  806784 B/op	      13 allocs/op
PASS
ok  	nhooyr.io/websocket	2.879s

@nhooyr
Copy link

nhooyr commented May 31, 2019

I've filed golang/go#32371

@klauspost
Copy link

Have a look at klauspost/compress#176

@nhooyr
Copy link

nhooyr commented Nov 9, 2019

@klauspost that's really cool. What sort of memory overhead would be involved for each Write?

@klauspost
Copy link

It will require a few hundred kilobytes, but contrary to before the other alternatives once it is compressed the memory will be freed.

For web sockets, that is what you need. Meaning no live allocations for inactive sockets. Just be sure all is written in a single write.

@FZambia
Copy link
Contributor

FZambia commented Nov 10, 2019

@klauspost thanks, I'll try to check it out as soon as I have time. I have a gist that allows to use custom flate with Gorilla WebSocket library - need to revisit it and experiment with your stateless compression branch.

@klauspost
Copy link

klauspost/compress#185 brings down heap allocation to a few hundred bytes during a Write by having an internal sync.Pool.

@nhooyr
Copy link

nhooyr commented Feb 16, 2020

My testing of @klauspost's library with stateless compression has shown great results.

See klauspost/compress#216 (comment)

About 9.5 KB allocated per message written for the simple 512 byte message []byte(strings.Repeat("1234", 128)).

edit: It's now available for testing in master on my library.

@nhooyr
Copy link

nhooyr commented Feb 16, 2020

To clarify, that's with a dictionary in context takeover mode, it's 50 B a message allocated if there is no dictionary.

@lxzan
Copy link

lxzan commented Sep 28, 2023

Compressor and decompressor instances are very memory intensive, and gorilla uses sync.Pool to reuse them. For best reuse, messages should be processed in a non-blocking manner, with a goroutine opened immediately after decoding.

@hulkingshtick
Copy link

If it is acceptable to add a dependency, fix by changing the package to use https://github.com/klauspost/compress instead of the standard library.

@jaitaiwan
Copy link
Member

It's acceptable to add a dependency, but its not clear to me how using this would improve the situation?

@klauspost
Copy link

klauspost commented Sep 4, 2024

@jaitaiwan gzip (deflate) compression mostly relies on referencing previous data. Beside the window (that is typically extended, so it doesn't have to copy too much) it also keeps temporary buffers, so it doesn't have to allocate for every block, as well as index of the window (typically the most memory intensive).

For reference here is the approximate "idle" usage of each encoder level. That means the allocs made to create an instance and the memory used after a Write call before the Writer is closed:

Usage/stateless: Static Mem: 0KB, 7 allocs
Usage/level--2: Static Mem: 272KB, 7 allocs
Usage/level--1: Static Mem: 1016KB, 9 allocs
Usage/level-0: Static Mem: 304KB, 7 allocs
Usage/level-1: Static Mem: 760KB, 6 allocs
Usage/level-2: Static Mem: 1144KB, 6 allocs
Usage/level-3: Static Mem: 1144KB, 6 allocs
Usage/level-4: Static Mem: 888KB, 6 allocs
Usage/level-5: Static Mem: 1016KB, 6 allocs
Usage/level-6: Static Mem: 1016KB, 6 allocs
Usage/level-7: Static Mem: 952KB, 5 allocs
Usage/level-8: Static Mem: 952KB, 5 allocs
Usage/level-9: Static Mem: 1080KB, 7 allocs

So there are 3 variations:

A) Use a Stateless Writer. This works as a normal compressor but de-allocates everything between each Write call. This means there is 0 "idle" allocations, but there is a significant compression loss, and each Write will cause temporary allocations.

B) Do your own buffering of the last data written, and use StatelessDeflate to provide the history. This will result in less compression loss, but still allocs on each write

C) Do your own buffering of the last data written, and use ResetDict before each write and flush+stash the encoders afterwards.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: No status
Development

No branches or pull requests