Skip to content

Commit

Permalink
Merge branch 'development' into fix-upgrade-runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
arijitAD authored Jul 8, 2021
2 parents 4849787 + d36c3b7 commit cffd64e
Show file tree
Hide file tree
Showing 15 changed files with 619 additions and 476 deletions.
4 changes: 1 addition & 3 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,7 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
return nil, err
}

if cfg.Global.NoTelemetry {
return node, nil
}
telemetry.GetInstance().Initialise(!cfg.Global.NoTelemetry)

telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints)
genesisHash := stateSrvc.Block.GenesisHash()
Expand Down
4 changes: 4 additions & 0 deletions dot/rpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net"
"net/http"
"os"
"time"

"github.com/ChainSafe/gossamer/dot/rpc/modules"
"github.com/ChainSafe/gossamer/dot/rpc/subscription"
Expand Down Expand Up @@ -240,6 +241,9 @@ func NewWSConn(conn *websocket.Conn, cfg *HTTPServerConfig) *subscription.WSConn
CoreAPI: cfg.CoreAPI,
TxStateAPI: cfg.TransactionQueueAPI,
RPCHost: fmt.Sprintf("http://%s:%d/", cfg.Host, cfg.RPCPort),
HTTP: &http.Client{
Timeout: time.Second * 30,
},
}
return c
}
20 changes: 13 additions & 7 deletions dot/rpc/modules/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import (
"github.com/ChainSafe/gossamer/lib/keystore"
"github.com/ChainSafe/gossamer/lib/runtime"
"github.com/ChainSafe/gossamer/lib/runtime/wasmer"
"github.com/ChainSafe/gossamer/lib/scale"
"github.com/ChainSafe/gossamer/lib/transaction"
"github.com/ChainSafe/gossamer/lib/trie"
"github.com/ChainSafe/gossamer/pkg/scale"
log "github.com/ChainSafe/log15"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -302,13 +302,19 @@ func setupSystemModule(t *testing.T) *SystemModule {
Nonce: 3,
//RefCount: 0,
Data: struct {
Free common.Uint128
Reserved common.Uint128
MiscFrozen common.Uint128
FreeFrozen common.Uint128
}{},
Free *scale.Uint128
Reserved *scale.Uint128
MiscFrozen *scale.Uint128
FreeFrozen *scale.Uint128
}{
Free: scale.MustNewUint128(big.NewInt(0)),
Reserved: scale.MustNewUint128(big.NewInt(0)),
MiscFrozen: scale.MustNewUint128(big.NewInt(0)),
FreeFrozen: scale.MustNewUint128(big.NewInt(0)),
},
}
aliceAcctEncoded, err := scale.Encode(aliceAcctInfo)

aliceAcctEncoded, err := scale.Marshal(aliceAcctInfo)
require.NoError(t, err)
ts.Set(aliceAcctStoKey, aliceAcctEncoded)

Expand Down
161 changes: 116 additions & 45 deletions dot/rpc/subscription/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package subscription

import (
"context"
"fmt"
"reflect"

Expand All @@ -28,6 +29,7 @@ import (
// Listener interface for functions that define Listener related functions
type Listener interface {
Listen()
Stop()
}

// WSConnAPI interface defining methors a WSConn should have
Expand Down Expand Up @@ -85,59 +87,98 @@ func (s *StorageObserver) GetFilter() map[string][]byte {
// Listen to satisfy Listener interface (but is no longer used by StorageObserver)
func (s *StorageObserver) Listen() {}

// Stop to satisfy Listener interface (but is no longer used by StorageObserver)
func (s *StorageObserver) Stop() {}

// BlockListener to handle listening for blocks importedChan
type BlockListener struct {
Channel chan *types.Block
wsconn WSConnAPI
ChanID byte
subID uint

ctx context.Context
cancel context.CancelFunc
}

// Listen implementation of Listen interface to listen for importedChan changes
func (l *BlockListener) Listen() {
for block := range l.Channel {
if block == nil {
continue
}
head, err := modules.HeaderToJSON(*block.Header)
if err != nil {
logger.Error("failed to convert header to JSON", "error", err)
}
l.ctx, l.cancel = context.WithCancel(context.Background())
go func() {
for {
select {
case <-l.ctx.Done():
return
case block, ok := <-l.Channel:
if !ok {
return
}

res := newSubcriptionBaseResponseJSON()
res.Method = "chain_newHead"
res.Params.Result = head
res.Params.SubscriptionID = l.subID
l.wsconn.safeSend(res)
}
if block == nil {
continue
}
head, err := modules.HeaderToJSON(*block.Header)
if err != nil {
logger.Error("failed to convert header to JSON", "error", err)
}

res := newSubcriptionBaseResponseJSON()
res.Method = "chain_newHead"
res.Params.Result = head
res.Params.SubscriptionID = l.subID
l.wsconn.safeSend(res)
}
}
}()
}

// Stop to cancel the running goroutines to this listener
func (l *BlockListener) Stop() { l.cancel() }

// BlockFinalizedListener to handle listening for finalised blocks
type BlockFinalizedListener struct {
channel chan *types.FinalisationInfo
wsconn WSConnAPI
chanID byte
subID uint
ctx context.Context
cancel context.CancelFunc
}

// Listen implementation of Listen interface to listen for importedChan changes
func (l *BlockFinalizedListener) Listen() {
for info := range l.channel {
if info == nil || info.Header == nil {
continue
}
head, err := modules.HeaderToJSON(*info.Header)
if err != nil {
logger.Error("failed to convert header to JSON", "error", err)
l.ctx, l.cancel = context.WithCancel(context.Background())

go func() {
for {
select {
case <-l.ctx.Done():
return
case info, ok := <-l.channel:
if !ok {
return
}

if info == nil || info.Header == nil {
continue
}
head, err := modules.HeaderToJSON(*info.Header)
if err != nil {
logger.Error("failed to convert header to JSON", "error", err)
}
res := newSubcriptionBaseResponseJSON()
res.Method = "chain_finalizedHead"
res.Params.Result = head
res.Params.SubscriptionID = l.subID
l.wsconn.safeSend(res)
}
}
res := newSubcriptionBaseResponseJSON()
res.Method = "chain_finalizedHead"
res.Params.Result = head
res.Params.SubscriptionID = l.subID
l.wsconn.safeSend(res)
}
}()
}

// Stop to cancel the running goroutines to this listener
func (l *BlockFinalizedListener) Stop() { l.cancel() }

// ExtrinsicSubmitListener to handle listening for extrinsic events
type ExtrinsicSubmitListener struct {
wsconn WSConnAPI
Expand All @@ -149,46 +190,72 @@ type ExtrinsicSubmitListener struct {
importedHash common.Hash
finalisedChan chan *types.FinalisationInfo
finalisedChanID byte

ctx context.Context
cancel context.CancelFunc
}

// AuthorExtrinsicUpdates method name
const AuthorExtrinsicUpdates = "author_extrinsicUpdate"

// Listen implementation of Listen interface to listen for importedChan changes
func (l *ExtrinsicSubmitListener) Listen() {
l.ctx, l.cancel = context.WithCancel(context.Background())

// listen for imported blocks with extrinsic
go func() {
for block := range l.importedChan {
if block == nil {
continue
}
bodyHasExtrinsic, err := block.Body.HasExtrinsic(l.extrinsic)
if err != nil {
fmt.Printf("error %v\n", err)
}
for {
select {
case <-l.ctx.Done():
return
case block, ok := <-l.importedChan:
if !ok {
return
}

if block == nil {
continue
}
bodyHasExtrinsic, err := block.Body.HasExtrinsic(l.extrinsic)
if err != nil {
fmt.Printf("error %v\n", err)
}

if bodyHasExtrinsic {
resM := make(map[string]interface{})
resM["inBlock"] = block.Header.Hash().String()
if bodyHasExtrinsic {
resM := make(map[string]interface{})
resM["inBlock"] = block.Header.Hash().String()

l.importedHash = block.Header.Hash()
l.wsconn.safeSend(newSubscriptionResponse(AuthorExtrinsicUpdates, l.subID, resM))
l.importedHash = block.Header.Hash()
l.wsconn.safeSend(newSubscriptionResponse(AuthorExtrinsicUpdates, l.subID, resM))
}
}
}
}()

// listen for finalised headers
go func() {
for info := range l.finalisedChan {
if reflect.DeepEqual(l.importedHash, info.Header.Hash()) {
resM := make(map[string]interface{})
resM["finalised"] = info.Header.Hash().String()
l.wsconn.safeSend(newSubscriptionResponse(AuthorExtrinsicUpdates, l.subID, resM))
for {
select {
case <-l.ctx.Done():
return
case info, ok := <-l.finalisedChan:
if !ok {
return
}

if reflect.DeepEqual(l.importedHash, info.Header.Hash()) {
resM := make(map[string]interface{})
resM["finalised"] = info.Header.Hash().String()
l.wsconn.safeSend(newSubscriptionResponse(AuthorExtrinsicUpdates, l.subID, resM))
}
}
}
}()
}

// Stop to cancel the running goroutines to this listener
func (l *ExtrinsicSubmitListener) Stop() { l.cancel() }

// RuntimeVersionListener to handle listening for Runtime Version
type RuntimeVersionListener struct {
wsconn *WSConn
Expand All @@ -215,3 +282,7 @@ func (l *RuntimeVersionListener) Listen() {

l.wsconn.safeSend(newSubscriptionResponse("state_runtimeVersion", l.subID, ver))
}

// Stop to runtimeVersionListener not implemented yet because the listener
// does not need to be stoped
func (l *RuntimeVersionListener) Stop() {}
80 changes: 80 additions & 0 deletions dot/rpc/subscription/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package subscription

import (
"errors"
"fmt"
"strconv"
)

var errUknownParamSubscribeID = errors.New("invalid params format type")
var errCannotParseID = errors.New("could not parse param id")
var errCannotFindListener = errors.New("could not find listener")
var errCannotFindUnsubsriber = errors.New("could not find unsubsriber function")

type unsubListener func(reqid float64, l Listener, params interface{})
type setupListener func(reqid float64, params interface{}) (Listener, error)

func (c *WSConn) getSetupListener(method string) setupListener {
switch method {
case "chain_subscribeNewHeads", "chain_subscribeNewHead":
return c.initBlockListener
case "state_subscribeStorage":
return c.initStorageChangeListener
case "chain_subscribeFinalizedHeads":
return c.initBlockFinalizedListener
case "state_subscribeRuntimeVersion":
return c.initRuntimeVersionListener
default:
return nil
}
}

func (c *WSConn) getUnsubListener(method string, params interface{}) (unsubListener, Listener, error) {
subscribeID, err := parseSubscribeID(params)
if err != nil {
return nil, nil, err
}

listener, ok := c.Subscriptions[subscribeID]
if !ok {
return nil, nil, fmt.Errorf("subscriber id %v: %w", subscribeID, errCannotFindListener)
}

var unsub unsubListener

switch method {
case "state_unsubscribeStorage":
unsub = c.unsubscribeStorageListener
default:
return nil, nil, errCannotFindUnsubsriber
}

return unsub, listener, nil
}

func parseSubscribeID(p interface{}) (uint, error) {
switch v := p.(type) {
case []interface{}:
if len(v) == 0 {
return 0, errUknownParamSubscribeID
}
default:
return 0, errUknownParamSubscribeID
}

var id uint
switch v := p.([]interface{})[0].(type) {
case float64:
id = uint(v)
case string:
i, err := strconv.ParseUint(v, 10, 32)
if err != nil {
return 0, errCannotParseID
}
id = uint(i)
default:
return 0, errUknownParamSubscribeID
}

return id, nil
}
Loading

0 comments on commit cffd64e

Please sign in to comment.