Skip to content

Commit

Permalink
Merge pull request #13 from mailgun/maxim/develop
Browse files Browse the repository at this point in the history
Add clone methods
  • Loading branch information
smira authored Nov 16, 2018
2 parents cc7284a + bc2fd9a commit 133607a
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 186 deletions.
26 changes: 13 additions & 13 deletions buffers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,35 +29,35 @@ import "sync/atomic"
// checkBuf checks current buffer for overflow, and flushes buffer up to lastLen bytes on overflow
//
// overflow part is preserved in flushBuf
func (c *Client) checkBuf(lastLen int) {
if len(c.buf) > c.options.MaxPacketSize {
c.flushBuf(lastLen)
func (t *transport) checkBuf(lastLen int) {
if len(t.buf) > t.maxPacketSize {
t.flushBuf(lastLen)
}
}

// flushBuf sends buffer to the queue and initializes new buffer
func (c *Client) flushBuf(length int) {
sendBuf := c.buf[0:length]
tail := c.buf[length:len(c.buf)]
func (t *transport) flushBuf(length int) {
sendBuf := t.buf[0:length]
tail := t.buf[length:len(t.buf)]

// get new buffer
select {
case c.buf = <-c.bufPool:
c.buf = c.buf[0:0]
case t.buf = <-t.bufPool:
t.buf = t.buf[0:0]
default:
c.buf = make([]byte, 0, c.bufSize)
t.buf = make([]byte, 0, t.bufSize)
}

// copy tail to the new buffer
c.buf = append(c.buf, tail...)
t.buf = append(t.buf, tail...)

// flush current buffer
select {
case c.sendQueue <- sendBuf:
case t.sendQueue <- sendBuf:
default:
// flush failed, we lost some data
atomic.AddInt64(&c.lostPacketsPeriod, 1)
atomic.AddInt64(&c.lostPacketsOverall, 1)
atomic.AddInt64(&t.lostPacketsPeriod, 1)
atomic.AddInt64(&t.lostPacketsOverall, 1)
}

}
Loading

0 comments on commit 133607a

Please sign in to comment.