Skip to content

Commit

Permalink
fix(rpc): Websocket memory leak (#893)
Browse files Browse the repository at this point in the history
  • Loading branch information
zale144 authored Jun 3, 2024
1 parent 5ff49c7 commit ee249d4
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

### Bug Fixes

* **bug:** memory leak in websocket handler ([#892](https://github.com/dymensionxyz/dymint/issues/892)) ([02fcbde](https://github.com/dymensionxyz/dymint/commit/48c263fbde71594ec34e0f731d9febc0702fcbde))
* **bug:** sync from da and p2p when starting a node ([#763](https://github.com/dymensionxyz/dymint/issues/763)) ([68ffd05](https://github.com/dymensionxyz/dymint/commit/68ffd05794949ddc42df1c132d1fde5f21b505f4))
* **celestia test:** fix race in test ([#755](https://github.com/dymensionxyz/dymint/issues/755)) ([0b36781](https://github.com/dymensionxyz/dymint/commit/0b367818bf6aa8da4a4fd8e4e5c78223b60b44e0))
* **celestia:** impl retry on submit ([#748](https://github.com/dymensionxyz/dymint/issues/748)) ([61630eb](https://github.com/dymensionxyz/dymint/commit/61630eb458197abe2440a81426210000dff25d40))
Expand Down
38 changes: 24 additions & 14 deletions rpc/json/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package json

import (
"bytes"
"context"
"errors"
"io"
"net/http"
"time"

"github.com/gorilla/websocket"

"github.com/dymensionxyz/dymint/types"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"

"github.com/dymensionxyz/dymint/types"
)

type wsConn struct {
Expand All @@ -18,13 +20,16 @@ type wsConn struct {
logger types.Logger
}

const writeWait = 10 * time.Second

func (wsc *wsConn) sendLoop() {
for msg := range wsc.queue {
writer, err := wsc.conn.NextWriter(websocket.TextMessage)
if err != nil {
wsc.logger.Error("create writer", "error", err)
continue
}
_ = wsc.conn.SetWriteDeadline(time.Now().Add(writeWait))
_, err = writer.Write(msg)
if err != nil {
wsc.logger.Error("write message", "error", err)
Expand All @@ -50,28 +55,32 @@ func (h *handler) wsHandler(w http.ResponseWriter, r *http.Request) {
h.logger.Error("update to WebSocket connection", "error", err)
return
}
remoteAddr := wsc.RemoteAddr().String()
defer func() {
err := wsc.Close()
if err != nil {
h.logger.Error("close WebSocket connection", "err")
}
}()

ws := &wsConn{
conn: wsc,
queue: make(chan []byte),
logger: h.logger,
}

defer func() {
close(ws.queue)

if err := ws.conn.Close(); err != nil {
h.logger.Error("close WebSocket connection", "err")
}
}()

go ws.sendLoop()

remoteAddr := ws.conn.RemoteAddr().String()

for {
mt, r, err := wsc.NextReader()
mt, rdr, err := wsc.NextReader()
if err != nil {
if _, ok := err.(*websocket.CloseError); ok {
h.logger.Debug("WebSocket connection closed", "reason", err)
err := h.srv.client.EventBus.UnsubscribeAll(context.Background(), remoteAddr)
if err != nil && err != tmpubsub.ErrSubscriptionNotFound {
err := h.srv.client.EventBus.UnsubscribeAll(r.Context(), remoteAddr)
if err != nil && !errors.Is(err, tmpubsub.ErrSubscriptionNotFound) {
h.logger.Error("unsubscribe addr from events", "addr", remoteAddr, "err", err)
}
} else {
Expand All @@ -85,13 +94,14 @@ func (h *handler) wsHandler(w http.ResponseWriter, r *http.Request) {
h.logger.Debug("expected text message")
continue
}
req, err := http.NewRequest(http.MethodGet, "", r)
req.RemoteAddr = remoteAddr
req, err := http.NewRequest(http.MethodGet, "", rdr)
if err != nil {
h.logger.Error("create request", "error", err)
continue
}

req.RemoteAddr = remoteAddr

writer := new(bytes.Buffer)
h.serveJSONRPCforWS(newResponseWriter(writer), req, ws)
ws.queue <- writer.Bytes()
Expand Down

0 comments on commit ee249d4

Please sign in to comment.