Skip to content

Commit

Permalink
refactor: dot/rpc: move websocket messages and listeners subscription…
Browse files Browse the repository at this point in the history
… package (#1459)
  • Loading branch information
edwardmack authored Mar 19, 2021
1 parent aad1df3 commit dad3f3d
Show file tree
Hide file tree
Showing 5 changed files with 373 additions and 324 deletions.
88 changes: 63 additions & 25 deletions dot/rpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package rpc

import (
"fmt"
"net"
"net/http"
"os"
"sync"

"github.com/ChainSafe/gossamer/dot/rpc/modules"
"github.com/ChainSafe/gossamer/dot/rpc/subscription"
"github.com/ChainSafe/gossamer/lib/common"
log "github.com/ChainSafe/log15"
"github.com/go-playground/validator/v10"
Expand All @@ -36,7 +37,7 @@ type HTTPServer struct {
logger log.Logger
rpcServer *rpc.Server // Actual RPC call handler
serverConfig *HTTPServerConfig
wsConns []*WSConn
wsConns []*subscription.WSConn
}

// HTTPServerConfig configures the HTTPServer
Expand All @@ -60,21 +61,6 @@ type HTTPServerConfig struct {
Modules []string
}

// WSConn struct to hold WebSocket Connection references
type WSConn struct {
wsconn *websocket.Conn
mu sync.Mutex
blockSubChannels map[int]byte
storageSubChannels map[int]byte
qtyListeners int
subscriptions map[int]Listener
storageAPI modules.StorageAPI
blockAPI modules.BlockAPI
runtimeAPI modules.RuntimeAPI
coreAPI modules.CoreAPI
txStateAPI modules.TransactionStateAPI
}

var logger log.Logger

// NewHTTPServer creates a new http server and registers an associated rpc server
Expand Down Expand Up @@ -189,22 +175,74 @@ func (h *HTTPServer) Stop() error {
if h.serverConfig.WS {
// close all channels and websocket connections
for _, conn := range h.wsConns {
for _, sub := range conn.subscriptions {
for _, sub := range conn.Subscriptions {
switch v := sub.(type) {
case *StorageChangeListener:
h.serverConfig.StorageAPI.UnregisterStorageChangeChannel(v.chanID)
close(v.channel)
case *BlockListener:
h.serverConfig.BlockAPI.UnregisterImportedChannel(v.chanID)
close(v.channel)
case *subscription.StorageChangeListener:
h.serverConfig.StorageAPI.UnregisterStorageChangeChannel(v.ChanID)
close(v.Channel)
case *subscription.BlockListener:
h.serverConfig.BlockAPI.UnregisterImportedChannel(v.ChanID)
close(v.Channel)
}
}

err := conn.wsconn.Close()
err := conn.Wsconn.Close()
if err != nil {
h.logger.Error("error closing websocket connection", "error", err)
}
}
}
return nil
}

// ServeHTTP implemented to handle WebSocket connections
func (h *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var upg = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
if !h.serverConfig.WSExternal {
ip, _, error := net.SplitHostPort(r.RemoteAddr)
if error != nil {
logger.Error("unable to parse IP", "error")
return false
}

f := LocalhostFilter()
if allowed := f.Allowed(ip); allowed {
return true
}

logger.Debug("external websocket request refused", "error")
return false
}
return true
},
}

ws, err := upg.Upgrade(w, r, nil)
if err != nil {
h.logger.Error("websocket upgrade failed", "error", err)
return
}
// create wsConn
wsc := NewWSConn(ws, h.serverConfig)
h.wsConns = append(h.wsConns, wsc)

go wsc.HandleComm()
}

// NewWSConn to create new WebSocket Connection struct
func NewWSConn(conn *websocket.Conn, cfg *HTTPServerConfig) *subscription.WSConn {
c := &subscription.WSConn{
Wsconn: conn,
Subscriptions: make(map[int]subscription.Listener),
BlockSubChannels: make(map[int]byte),
StorageSubChannels: make(map[int]byte),
StorageAPI: cfg.StorageAPI,
BlockAPI: cfg.BlockAPI,
RuntimeAPI: cfg.RuntimeAPI,
CoreAPI: cfg.CoreAPI,
TxStateAPI: cfg.TransactionQueueAPI,
RPCHost: fmt.Sprintf("http://%s:%d/", cfg.Host, cfg.RPCPort),
}
return c
}
Loading

0 comments on commit dad3f3d

Please sign in to comment.