Skip to content

Commit

Permalink
Version 1.4 - Major refactoring, plus new --plugins switch, for thoro…
Browse files Browse the repository at this point in the history
…ugh node scanning
  • Loading branch information
Someguy123 committed Jul 3, 2019
1 parent ffa3771 commit 8a5b931
Show file tree
Hide file tree
Showing 9 changed files with 627 additions and 350 deletions.
378 changes: 33 additions & 345 deletions app.py

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
attrs==18.2.0
Automat==0.7.0
certifi==2018.8.24
certifi>=2018.8.24
chardet==3.0.4
colorama==0.3.9
constantly==15.1.0
hyperlink==18.0.0
idna==2.7
incremental==17.5.0
PyHamcrest==1.9.0
requests==2.19.1
requests-threads==0.1.1
requests>=2.19.1
requests-threads>=0.1.1
six==1.11.0
Twisted==18.7.0
Twisted>=18.7.0
urllib3==1.23
zope.interface==4.5.0
zope.interface>=4.5.0
privex-loghelper
privex-helpers>=1.1.0
112 changes: 112 additions & 0 deletions rpcscanner/MethodTests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import twisted.internet.reactor
from twisted.internet.defer import inlineCallbacks

from rpcscanner.core import PUB_PREFIX
from rpcscanner.rpc import rpc
from rpcscanner.exceptions import ValidationError
from rpcscanner import settings
import logging

log = logging.getLogger(__name__)


class MethodTests:
"""
Thorough plugin test functions for RPC nodes, to verify functionality.
Basic usage:
>>> mt = MethodTests('https://steemd.privex.io', reactor)
>>> try:
... res, time_taken, total_retries = yield mt.test('account_history_api.get_account_history')
>>> except Exception:
... log.exception('Account history test failed for steemd.privex.io')
"""

def __init__(self, host: str, reactor: twisted.internet.reactor):
self.host, self.reactor = host, reactor
self.test_acc = settings.test_account.lower().strip()
self.METHOD_MAP = {
'account_history_api.get_account_history': self.test_account_history,
'condenser_api.get_account_history': self.test_condenser_history,
'condenser_api.get_accounts': self.test_condenser_account,
'condenser_api.get_witness_by_account': self.test_condenser_witness,
}

@inlineCallbacks
def test(self, api_name):
"""Call a test method by the API method name"""
log.debug(f'MethodTests.test now calling API {api_name}')
res = yield self.METHOD_MAP[api_name]()
# log.debug(f'MethodTest.test got result for {api_name}: {res}')
return res

# @retry_on_err(max_retries=MAX_TRIES)
@inlineCallbacks
def test_account_history(self):
"""Test a node for functioning account_history_api account history"""
mtd = 'account_history_api.get_account_history'
params = dict(account=self.test_acc, start=-1, limit=100)
res, tt, tr = yield rpc(self.reactor, self.host, mtd, params)

log.debug(f'History check if result from {self.host} has history key')
if 'history' not in res:
raise ValidationError(f"JSON key 'history' not found in RPC query for node {self.host}")

self._check_hist(res['history'])
return res, tt, tr

# @retry_on_err(max_retries=MAX_TRIES)
@inlineCallbacks
def test_condenser_history(self):
"""Test a node for functioning condenser_api account history"""
mtd = 'condenser_api.get_account_history'
params = [self.test_acc, -100, 100]
res, tt, tr = yield rpc(self.reactor, self.host, mtd, params)

self._check_hist(res)
return res, tt, tr

# @retry_on_err(max_retries=MAX_TRIES)
@inlineCallbacks
def test_condenser_account(self):
"""Test a node for functioning condenser_api get_accounts query"""
mtd, params = 'condenser_api.get_accounts', [ [self.test_acc], ]
res, tt, tr = yield rpc(self.reactor, self.host, mtd, params)

# Normal python exceptions such as IndexError should be thrown if the data isn't formatted correctly
acc = res[0]
log.debug(f'Checking if result from {self.host} has user {self.test_acc}')
if acc['name'].lower().strip() != self.test_acc:
raise ValidationError(f"Account {acc['name']} was returned, but expected {self.test_acc} for node {self.host}")
log.debug(f'Success - result from {self.host} has user {self.test_acc}')
return res, tt, tr

# @retry_on_err(max_retries=MAX_TRIES)
@inlineCallbacks
def test_condenser_witness(self):
"""Test a node for functioning witness lookup (get_witness_by_account)"""
mtd, params = 'condenser_api.get_witness_by_account', [self.test_acc]
res, tt, tr = yield rpc(self.reactor, self.host, mtd, params)
if res['owner'].lower().strip() != self.test_acc:
raise ValidationError(f"Witness {res['owner']} was returned, but expected {self.test_acc} for node {self.host}")
prf = res['signing_key'][0:3]
if prf != PUB_PREFIX:
raise ValidationError(f"Signing key prefix was {prf} but expected {PUB_PREFIX} for node {self.host}")

return res, tt, tr

def _check_hist(self, response: dict):
"""Small helper function to verify an RPC response contains valid account history records"""
res = response

# Get the first item from the history
hist = res[0]
if type(hist[0]) != int or type(hist[1]) != dict:
raise ValidationError(f"History data is malformed in RPC query for node {self.host}")

log.debug(f'Length check if result from {self.host} has at least 5 results')
hist_len = len(res)
if hist_len < 5:
raise ValidationError(f"Too little history. Only {hist_len} history results (<5) for {self.host}")
249 changes: 249 additions & 0 deletions rpcscanner/RPCScanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
from os.path import join

import twisted.internet.reactor
import logging
from colorama import Fore
from twisted.internet.defer import inlineCallbacks
from rpcscanner.MethodTests import MethodTests
from rpcscanner.core import TEST_PLUGINS_LIST, BASE_DIR
from rpcscanner.rpc import rpc, identify_node
from rpcscanner.exceptions import ServerDead
from rpcscanner import settings

log = logging.getLogger(__name__)


class RPCScanner:

def __init__(self, reactor: twisted.internet.reactor):
self.conf_nodes = []
self.prop_nodes = []
self.reactor = reactor
self.node_status = {}
self.ident_nodes = []
self.up_nodes = []
node_list = open(join(BASE_DIR, settings.node_file), 'r').readlines()
# nodes to be specified line by line. format: http://gtg.steem.house:8090
# NODE_LIST_FILE = "nodes.txt"
node_list = [n.strip() for n in node_list]
# Allow nodes to be commented out with # symbol
node_list = [n for n in node_list if n[0] != '#']
self.nodes = node_list
self.req_success = 0

@inlineCallbacks
def scan_nodes(self):
reactor = self.reactor
print('Scanning nodes... Please wait...')
print('{}[Stage 1 / 4] Identifying node types (jussi/appbase){}'.format(Fore.GREEN, Fore.RESET))
for node in self.nodes:
self.node_status[node] = dict(
raw={}, timing={}, tries={}, plugins=[],
current_block='error', block_time='error', version='error',
srvtype='err'
)
self.ident_nodes.append((node, identify_node(reactor, node)))

yield from self.identify_nodes()

print('{}[Stage 2 / 4] Filtering out bad nodes{}'.format(Fore.GREEN, Fore.RESET))
yield from self.filter_badnodes()

print('{}[Stage 3 / 4] Obtaining steemd versions {}'.format(Fore.GREEN, Fore.RESET))
yield from self.scan_versions()

print('{}[Stage 4 / 4] Checking current block / block time{}'.format(Fore.GREEN, Fore.RESET))
yield from self.scan_block_info()

if settings.plugins:
print('{}[Thorough Plugin Check] User specified --plugins. Now running thorough '
'plugin tests for alive nodes.{}'.format(Fore.GREEN, Fore.RESET))
for host, data in self.node_status.items():
status = len(data['raw'])
if status == 0:
log.info(f'Skipping node {host} as it appears to be dead.')
continue
log.info(f'{Fore.BLUE} > Running plugin tests for node {host} ...{Fore.RESET}')
mt = MethodTests(host, reactor)
for plugin in TEST_PLUGINS_LIST:
pt = yield self.plugin_test(host, plugin, mt)
log.info(f'{Fore.GREEN} (+) Finished plugin tests for node {host} ... {Fore.RESET}')

self.print_nodes()

@inlineCallbacks
def plugin_test(self, host: str, plugin_name: str, mt: MethodTests):
ns = self.node_status[host]
try:
log.info(f' >>> Testing {plugin_name} for node {host} ...')
res = yield mt.test(plugin_name)
ns['plugins'].append(plugin_name)
log.info(f'{Fore.GREEN} +++ The API {plugin_name} is functioning for node {host}{Fore.RESET}')
return res
except Exception as e:
log.error(
f'{Fore.RED} !!! The API {plugin_name} test failed for node {host}: {type(e)} {str(e)} {Fore.RESET}')

@inlineCallbacks
def identify_nodes(self):
reactor = self.reactor
for host, id_data in self.ident_nodes:
ns = self.node_status[host]
try:
c = yield id_data
ident, ident_time, ident_tries = c
log.info(Fore.GREEN + 'Successfully obtained server type for node %s' + Fore.RESET, host)

ns['srvtype'] = ident
ns['timing']['ident'] = ident_time
ns['tries']['ident'] = ident_tries
if ns['srvtype'] == 'jussi':
log.info('Server {} is JUSSI'.format(host))
self.up_nodes.append((host, ns['srvtype'], rpc(reactor, host, 'get_dynamic_global_properties')))
if ns['srvtype'] == 'appbase':
log.info('Server {} is APPBASE (no jussi)'.format(host))
self.up_nodes.append(
(host, ns['srvtype'], rpc(reactor, host, 'condenser_api.get_dynamic_global_properties')))
self.req_success += 1
except ServerDead as e:
log.error(Fore.RED + '[ident jussi]' + str(e) + Fore.RESET)
if "only supports websockets" in str(e):
ns['err_reason'] = 'WS Only'
except Exception as e:
log.warning(Fore.RED + 'Unknown error occurred (ident jussi)...' + Fore.RESET)
log.warning('[%s] %s', type(e), str(e))

@inlineCallbacks
def filter_badnodes(self):
prop_nodes = self.prop_nodes
conf_nodes = self.conf_nodes
reactor = self.reactor
for host, srvtype, blkdata in self.up_nodes:
ns = self.node_status[host]
try:
c = yield blkdata
# if it didn't except, then we're probably fine. we don't care about the block data
# because it will be outdated due to bad nodes. will get it later
if srvtype == 'jussi':
conf_nodes.append((host, rpc(reactor, host, 'get_config')))
prop_nodes.append((host, rpc(reactor, host, 'get_dynamic_global_properties')))
if srvtype == 'appbase':
conf_nodes.append((host, rpc(reactor, host, 'condenser_api.get_config')))
prop_nodes.append((host, rpc(reactor, host, 'condenser_api.get_dynamic_global_properties')))
log.info(Fore.GREEN + 'Node %s seems fine' + Fore.RESET, host)
except ServerDead as e:
log.error(Fore.RED + '[badnodefilter]' + str(e) + Fore.RESET)
if "only supports websockets" in str(e):
ns['err_reason'] = 'WS Only'
except Exception as e:
log.warning(Fore.RED + 'Unknown error occurred (badnodefilter)...' + Fore.RESET)
log.warning('[%s] %s', type(e), str(e))
return prop_nodes, conf_nodes

@inlineCallbacks
def scan_block_info(self):
for host, prdata in self.prop_nodes:
ns = self.node_status[host]
try:
# head_block_number
# time (UTC)
props, props_time, props_tries = yield prdata
log.debug(Fore.GREEN + 'Successfully obtained props' + Fore.RESET)
ns['raw']['props'] = props
ns['timing']['props'] = props_time
ns['tries']['props'] = props_tries
ns['current_block'] = props.get('head_block_number', 'Unknown')
ns['block_time'] = props.get('time', 'Unknown')
self.req_success += 1

except ServerDead as e:
log.error(Fore.RED + '[load props]' + str(e) + Fore.RESET)
# log.error(str(e))
if "only supports websockets" in str(e):
ns['err_reason'] = 'WS Only'
except Exception as e:
log.warning(Fore.RED + 'Unknown error occurred (prop)...' + Fore.RESET)
log.warning('[%s] %s', type(e), str(e))

@inlineCallbacks
def scan_versions(self):
for host, cfdata in self.conf_nodes:
ns = self.node_status[host]
try:
# config, config_time, config_tries = rpc(node, 'get_config')
c = yield cfdata
config, config_time, config_tries = c
log.info(Fore.GREEN + 'Successfully obtained config for node %s' + Fore.RESET, host)

ns['raw']['config'] = config
ns['timing']['config'] = config_time
ns['tries']['config'] = config_tries
ns['version'] = config.get('STEEM_BLOCKCHAIN_VERSION', config.get('STEEMIT_BLOCKCHAIN_VERSION', 'Unknown'))
self.req_success += 1
except ServerDead as e:
log.error(Fore.RED + '[load config]' + str(e) + Fore.RESET)
if "only supports websockets" in str(e):
ns['err_reason'] = 'WS Only'
except Exception as e:
log.warning(Fore.RED + 'Unknown error occurred (conf)...' + Fore.RESET)
log.warning('[%s] %s', type(e), str(e))

def print_nodes(self):
list_nodes = self.node_status
print(Fore.BLUE, '(S) - SSL, (H) - HTTP : (A) - normal appbase (J) - jussi', Fore.RESET)
print(Fore.BLUE, end='', sep='')
fmt_params = ['Server', 'Status', 'Head Block', 'Block Time', 'Version', 'Res Time', 'Avg Retries']
fmt_str = '{:<45}{:<10}{:<15}{:<25}{:<15}{:<10}{:<15}'
if settings.plugins:
fmt_str += '{:<15}'
fmt_params.append('Plugin Tests')
print(fmt_str.format(*fmt_params))
print(Fore.RESET, end='', sep='')
for host, data in list_nodes.items():
statuses = {
0: Fore.RED + "DEAD",
1: Fore.YELLOW + "UNSTABLE",
2: Fore.GREEN + "Online",
}
status = statuses[len(data['raw'])]
avg_res = 'error'
if len(data['timing']) > 0:
time_total = 0.0
for time_type, time in data['timing'].items():
time_total += time
avg_res = time_total / len(data['timing'])
avg_res = '{:.2f}'.format(avg_res)

avg_tries = 'error'
if len(data['tries']) > 0:
tries_total = 0
for tries_type, tries in data['tries'].items():
tries_total += tries
avg_tries = tries_total / len(data['tries'])
avg_tries = '{:.2f}'.format(avg_tries)
if 'err_reason' in data:
status = Fore.YELLOW + data['err_reason']
host = host.replace('https://', '(S)')
host = host.replace('http://', '(H)')
if data['srvtype'] == 'jussi':
host = "{}(J){} {}".format(Fore.GREEN, Fore.RESET, host)
elif data['srvtype'] == 'appbase':
host = "{}(A){} {}".format(Fore.BLUE, Fore.RESET, host)
else:
host = "{}(?){} {}".format(Fore.RED, Fore.RESET, host)
fmt_params = [
host, status, data['current_block'], data['block_time'],
data['version'], avg_res, avg_tries
]
fmt_str = '{:<55}{:<15}{:<15}{:<25}{:<15}{:<10}{:<15}'
if settings.plugins:
fmt_str += '{:<15}'
plg, ttl_plg = len(data['plugins']), len(TEST_PLUGINS_LIST)

f_plugins = f'{plg} / {ttl_plg}'
if plg < (ttl_plg // 2): f_plugins = f'{Fore.RED}{f_plugins}'
elif plg < ttl_plg: f_plugins = f'{Fore.YELLOW}{f_plugins}'
elif plg == ttl_plg: f_plugins = f'{Fore.GREEN}{f_plugins}'

fmt_params.append(f'{f_plugins}{Fore.RESET}')
print(fmt_str.format(*fmt_params), Fore.RESET)
5 changes: 5 additions & 0 deletions rpcscanner/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from rpcscanner.core import *
from rpcscanner.rpc import NodePlug
from rpcscanner.MethodTests import MethodTests
from rpcscanner.exceptions import *
from rpcscanner.RPCScanner import RPCScanner
Loading

0 comments on commit 8a5b931

Please sign in to comment.