Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: dot/rpc: move websocket messages and listeners subscription package #1459

Merged
merged 10 commits into from
Mar 19, 2021
88 changes: 63 additions & 25 deletions dot/rpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package rpc

import (
"fmt"
"net"
"net/http"
"os"
"sync"

"github.com/ChainSafe/gossamer/dot/rpc/modules"
"github.com/ChainSafe/gossamer/dot/rpc/subscription"
"github.com/ChainSafe/gossamer/lib/common"
log "github.com/ChainSafe/log15"
"github.com/go-playground/validator/v10"
Expand All @@ -36,7 +37,7 @@ type HTTPServer struct {
logger log.Logger
rpcServer *rpc.Server // Actual RPC call handler
serverConfig *HTTPServerConfig
wsConns []*WSConn
wsConns []*subscription.WSConn
}

// HTTPServerConfig configures the HTTPServer
Expand All @@ -60,21 +61,6 @@ type HTTPServerConfig struct {
Modules []string
}

// WSConn struct to hold WebSocket Connection references
type WSConn struct {
wsconn *websocket.Conn
mu sync.Mutex
blockSubChannels map[int]byte
storageSubChannels map[int]byte
qtyListeners int
subscriptions map[int]Listener
storageAPI modules.StorageAPI
blockAPI modules.BlockAPI
runtimeAPI modules.RuntimeAPI
coreAPI modules.CoreAPI
txStateAPI modules.TransactionStateAPI
}

var logger log.Logger

// NewHTTPServer creates a new http server and registers an associated rpc server
Expand Down Expand Up @@ -189,22 +175,74 @@ func (h *HTTPServer) Stop() error {
if h.serverConfig.WS {
// close all channels and websocket connections
for _, conn := range h.wsConns {
for _, sub := range conn.subscriptions {
for _, sub := range conn.Subscriptions {
switch v := sub.(type) {
case *StorageChangeListener:
h.serverConfig.StorageAPI.UnregisterStorageChangeChannel(v.chanID)
close(v.channel)
case *BlockListener:
h.serverConfig.BlockAPI.UnregisterImportedChannel(v.chanID)
close(v.channel)
case *subscription.StorageChangeListener:
h.serverConfig.StorageAPI.UnregisterStorageChangeChannel(v.ChanID)
close(v.Channel)
case *subscription.BlockListener:
h.serverConfig.BlockAPI.UnregisterImportedChannel(v.ChanID)
close(v.Channel)
}
}

err := conn.wsconn.Close()
err := conn.Wsconn.Close()
if err != nil {
h.logger.Error("error closing websocket connection", "error", err)
}
}
}
return nil
}

// ServeHTTP implemented to handle WebSocket connections
func (h *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var upg = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
if !h.serverConfig.WSExternal {
ip, _, error := net.SplitHostPort(r.RemoteAddr)
if error != nil {
logger.Error("unable to parse IP", "error")
return false
}

f := LocalhostFilter()
if allowed := f.Allowed(ip); allowed {
return true
}

logger.Debug("external websocket request refused", "error")
return false
}
return true
},
}

ws, err := upg.Upgrade(w, r, nil)
if err != nil {
h.logger.Error("websocket upgrade failed", "error", err)
return
}
// create wsConn
wsc := NewWSConn(ws, h.serverConfig)
h.wsConns = append(h.wsConns, wsc)

go wsc.HandleComm()
}

// NewWSConn to create new WebSocket Connection struct
func NewWSConn(conn *websocket.Conn, cfg *HTTPServerConfig) *subscription.WSConn {
c := &subscription.WSConn{
Wsconn: conn,
Subscriptions: make(map[int]subscription.Listener),
BlockSubChannels: make(map[int]byte),
StorageSubChannels: make(map[int]byte),
StorageAPI: cfg.StorageAPI,
BlockAPI: cfg.BlockAPI,
RuntimeAPI: cfg.RuntimeAPI,
CoreAPI: cfg.CoreAPI,
TxStateAPI: cfg.TransactionQueueAPI,
RPCHost: fmt.Sprintf("http://%s:%d/", cfg.Host, cfg.RPCPort),
}
return c
}
Loading