Skip to content

Commit

Permalink
clean up the code, add jussi/appbase detection, plus test if the node…
Browse files Browse the repository at this point in the history
… is working before getting block/ver
  • Loading branch information
Someguy123 committed Sep 27, 2018
1 parent 9679da6 commit 5f5c68f
Showing 1 changed file with 139 additions and 32 deletions.
171 changes: 139 additions & 32 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@

verbose = args.verbose
if verbose:
Logger.setLevel('debug')
logging.basicConfig(level=logging.DEBUG)
# s = requests.Session()
s = AsyncSession(n=20)

RPC_TIMEOUT = 10
RPC_TIMEOUT = 5
MAX_TRIES = 5
# nodes to be specified line by line. format: http://gtg.steem.house:8090
NODE_LIST_FILE = "nodes.txt"
Expand All @@ -39,9 +39,7 @@ class ServerDead(BaseException):
class NodePlug:
@defer.inlineCallbacks
def tryNode(self, reactor, host, method, params=[]):
# self.dfr = defer.Deferred()
self.reactor = reactor
# self._tryNode(host, method, params)
try:
tn = yield self._tryNode(host, method, params)
return tn
Expand All @@ -51,13 +49,10 @@ def tryNode(self, reactor, host, method, params=[]):

@defer.inlineCallbacks
def _tryNode(self, host, method, params=[], tries=0):
# dfr = self.dfr
if tries >= MAX_TRIES:
# dfr.errback(ServerDead('{} did not respond properly after {} tries'.format(host, tries)))
# return dfr
logging.debug('SERVER IS DEAD')
raise ServerDead('{} did not respond properly after {} tries'.format(host, tries))
# yield dfr

try:
logging.info('{} {} attempt {}'.format(host, method, tries))
start = time.time()
Expand All @@ -70,26 +65,56 @@ def _tryNode(self, host, method, params=[], tries=0):
# if we made it this far, we're fine :)
success = True
results = [res, runtime, tries]
# dfr.callback(tuple(results))
logging.debug(Fore.GREEN + '[{}] Successful request for {}'.format(host, method) + Fore.RESET)
# print('RETURNING RESULTS FOR {}: {} (len {})'.format(host, method, len(results)))
# return tuple(results)
# yield dfr
# dfr.callback(tuple(results))
return tuple(results)
except Exception as e:
if 'HTTPError' in str(type(e)) and '426 Client Error' in str(e):
raise ServerDead('Server {} only supports websockets'.format(host))
logging.info('{red}{} attempt {} failed. Message:'.format(host, tries, red=Fore.RED), type(e), str(e), Fore.RESET)
logging.info('%s [%s] %s attempt %d failed. Message: %s %s %s', Fore.RED, method, host, tries, type(e), str(e), Fore.RESET)
dl = yield deferLater(self.reactor, 10, self._tryNode, host, method, params, tries)
return dl
# flow.Cooperate()
# don't try again on this same node for a few seconds
# rec = yield self.reactor.callLater(3, self._tryNode, host, method, params, tries)
# print("Reactor yielded:", rec)
# return rec
# yield dfr
# return dfr

@defer.inlineCallbacks
def identJussi(self, reactor, host):
self.reactor = reactor
try:
tn = yield self._identJussi(host)
return tn
except Exception as e:
logging.debug('caught in identJussi and raised')
raise e

@defer.inlineCallbacks
def _identJussi(self, host, tries=0):
if tries >= MAX_TRIES:
logging.debug('[identJussi] SERVER IS DEAD')
raise ServerDead('{} did not respond properly after {} tries'.format(host, tries))
try:
logging.info('{} identJussi attempt {}'.format(host, tries))
start = time.time()
tries += 1
res = yield s.get(host)
j = res.json()

end = time.time()
runtime = end - start
srvtype = 'err'
if 'jussi_num' in j:
srvtype = 'jussi'
elif 'error' in j and 'message' in j['error']:
if j['error']['message'] == 'End Of File:stringstream':
srvtype = 'appbase'
# if we made it this far, we're fine :)
success = True
results = [srvtype, runtime, tries]
logging.debug(Fore.GREEN + '[{}] Successful request for identJussi'.format(host) + Fore.RESET)
return tuple(results)
except Exception as e:
if 'HTTPError' in str(type(e)) and '426 Client Error' in str(e):
raise ServerDead('Server {} only supports websockets'.format(host))
logging.info('%s [identJussi] %s attempt %d failed. Message: %s %s %s', Fore.RED, host, tries, type(e), str(e), Fore.RESET)
dl = yield deferLater(self.reactor, 10, self._identJussi, host, tries)
return dl



Expand All @@ -116,6 +141,27 @@ def rpc(reactor, host, method, params=[]):
# return tuple(results)
return d

@inlineCallbacks
def identifyNode(reactor, host):
"""
Detects a server type
:returns: tuple (servtype, time_taken_sec, tries)
:raises: ServerDead - tried too many times and failed
"""
# tries = 0
logging.info(Fore.BLUE + 'Attempting method identifyNode on server {host}. Will try {tries} times'.format(
tries=MAX_TRIES, host=host
) + Fore.RESET)
# d = defer.Deferred()
np = NodePlug()
try:
d = yield np.identJussi(reactor, host)
except ServerDead as e:
logging.debug('caught in identifyNode and raised')
raise e

# return tuple(results)
return d


@inlineCallbacks
Expand Down Expand Up @@ -145,25 +191,74 @@ def _rpc(host, method, params=[]):

@inlineCallbacks
def scan_nodes(reactor):
ident_nodes = []
conf_nodes = []
prop_nodes = []
up_nodes = []
nodes = NODE_LIST
print('Scanning nodes... Please wait...')
for node in nodes:
node_status[node] = dict(
raw={}, timing={}, tries={},
current_block='error', block_time='error', version='error'
current_block='error', block_time='error', version='error',
srvtype='err'
)
logging.info('Scanning node ', node)
conf_nodes.append((node, rpc(reactor, node, 'get_config')))
prop_nodes.append((node, rpc(reactor, node, 'get_dynamic_global_properties')))
req_success = 0
ident_nodes.append((node, identifyNode(reactor, node)))
logging.info('Identifying ', node)
req_success = 0
print('{}[Stage 1 / 4] Identifying node types (jussi/appbase){}'.format(Fore.GREEN, Fore.RESET))

for host, id_data in ident_nodes:
ns = node_status[host]
try:
c = yield id_data
ident, ident_time, ident_tries = c
logging.info(Fore.GREEN + 'Successfully obtained config' + Fore.RESET)

ns['srvtype'] = ident
ns['timing']['ident'] = ident_time
ns['tries']['ident'] = ident_tries
if ns['srvtype'] == 'jussi':
logging.warning('Server {} is JUSSI'.format(host))
up_nodes.append((host, ns['srvtype'], rpc(reactor, host, 'get_dynamic_global_properties')))
if ns['srvtype'] == 'appbase':
logging.warning('Server {} is APPBASE (no jussi)'.format(host))
up_nodes.append((host, ns['srvtype'], rpc(reactor, host, 'condenser_api.get_dynamic_global_properties')))
req_success += 1
except ServerDead as e:
logging.error(Fore.RED + '[load config]' + str(e) + Fore.RESET)
if "only supports websockets" in str(e):
ns['err_reason'] = 'WS Only'
except Exception as e:
logging.warning(Fore.RED + 'Unknown error occurred (conf)...' + Fore.RESET)
logging.warning('[%s] %s', type(e), str(e))

print('{}[Stage 2 / 4] Filtering out bad nodes{}'.format(Fore.GREEN, Fore.RESET))
for host, srvtype, blkdata in up_nodes:
try:
c = yield blkdata
# if it didn't except, then we're probably fine. we don't care about the block data
# because it will be outdated due to bad nodes. will get it later
if srvtype == 'jussi':
conf_nodes.append((host, rpc(reactor, host, 'get_config')))
prop_nodes.append((host, rpc(reactor, host, 'get_dynamic_global_properties')))
if srvtype == 'appbase':
conf_nodes.append((host, rpc(reactor, host, 'condenser_api.get_config')))
prop_nodes.append((host, rpc(reactor, host, 'condenser_api.get_dynamic_global_properties')))
except ServerDead as e:
logging.error(Fore.RED + '[badnodefilter]' + str(e) + Fore.RESET)
if "only supports websockets" in str(e):
ns['err_reason'] = 'WS Only'
except Exception as e:
logging.warning(Fore.RED + 'Unknown error occurred (badnodefilter)...' + Fore.RESET)
logging.warning('[%s] %s', type(e), str(e))

print('{}[Stage 3 / 4] Obtaining steemd versions {}'.format(Fore.GREEN, Fore.RESET))
for host, cfdata in conf_nodes:
ns = node_status[host]
try:
# config, config_time, config_tries = rpc(node, 'get_config')
c = yield cfdata
logging.debug(Fore.LIGHTMAGENTA_EX + 'cfdata length:', len(c), Fore.RESET)
config, config_time, config_tries = c
logging.info(Fore.GREEN + 'Successfully obtained config' + Fore.RESET)

Expand All @@ -178,7 +273,9 @@ def scan_nodes(reactor):
ns['err_reason'] = 'WS Only'
except Exception as e:
logging.warning(Fore.RED + 'Unknown error occurred (conf)...' + Fore.RESET)
logging.warning(type(e), str(e))
logging.warning('[%s] %s', type(e), str(e))

print('{}[Stage 4 / 4] Checking current block / block time{}'.format(Fore.GREEN, Fore.RESET))
for host, prdata in prop_nodes:
ns = node_status[host]
try:
Expand All @@ -199,14 +296,15 @@ def scan_nodes(reactor):
if "only supports websockets" in str(e):
ns['err_reason'] = 'WS Only'
except Exception as e:
logging.error(Fore.RED + 'Unknown error occurred (prop)...' + Fore.RESET)
logging.error(type(e), str(e))
logging.warning(Fore.RED + 'Unknown error occurred (prop)...' + Fore.RESET)
logging.warning('[%s] %s', type(e), str(e))

print_nodes(node_status)

def print_nodes(list_nodes):
print(Fore.BLUE, '(S) - SSL, (H) - HTTP : (A) - normal appbase (J) - jussi', Fore.RESET)
print(Fore.BLUE, end='', sep='')
print('{:<40}{:<10}{:<15}{:<25}{:<15}{:<10}{}'.format('Server', 'Status', 'Head Block', 'Block Time', 'Version', 'Res Time', 'Avg Retries'))
print('{:<45}{:<10}{:<15}{:<25}{:<15}{:<10}{}'.format('Server', 'Status', 'Head Block', 'Block Time', 'Version', 'Res Time', 'Avg Retries'))
print(Fore.RESET, end='', sep='')
for host, data in list_nodes.items():
statuses = {
Expand All @@ -232,7 +330,16 @@ def print_nodes(list_nodes):
avg_tries = '{:.2f}'.format(avg_tries)
if 'err_reason' in data:
status = Fore.YELLOW + data['err_reason']
print('{:<40}{:<15}{:<15}{:<25}{:<15}{:<10}{}'.format(
host = host.replace('https://', '(S)')
host = host.replace('http://', '(H)')
if data['srvtype'] == 'jussi':
host = "{}(J){} {}".format(Fore.GREEN, Fore.RESET, host)
elif data['srvtype'] == 'appbase':
host = "{}(A){} {}".format(Fore.BLUE, Fore.RESET, host)
else:
host = "{}(?){} {}".format(Fore.RED, Fore.RESET, host)

print('{:<55}{:<15}{:<15}{:<25}{:<15}{:<10}{}'.format(
host,
status,
data['current_block'],
Expand Down

0 comments on commit 5f5c68f

Please sign in to comment.