-
Notifications
You must be signed in to change notification settings - Fork 3
/
catchpoint_opentsdb_bridge.py
executable file
·170 lines (138 loc) · 6.93 KB
/
catchpoint_opentsdb_bridge.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#!/usr/bin/env python
import argparse
from node_list import catchpoint_nodes
from cyclone import web
import sys
import logging
from twisted.internet import epollreactor
if 'twisted.internet.reactor' not in sys.modules:
epollreactor.install()
from twisted.internet import reactor, defer
from twisted.internet.protocol import Protocol, ReconnectingClientFactory
from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.python import log as twisted_log
import time
import json
import os
from distutils.dir_util import mkpath
import traceback
loglevel = 'INFO'
l = logging.getLogger()
l.setLevel(loglevel)
format_str = '[cp_pushd' \
+ ':%(module)s:%(name)s:%(lineno)d] ' \
'%(levelname)s: %(message)s'
term_format = logging.Formatter(fmt='%(asctime)s ' + format_str)
# log to stdout for now
h = logging.StreamHandler(sys.stdout)
h.setFormatter(term_format)
l.addHandler(h)
# create a log file at logs/catchoint_opentsdb_bridge.log
current_dir = os.path.dirname(os.path.realpath(__file__))
log_dir = current_dir + '/logs/'
mkpath(log_dir)
log_file = log_dir + sys.argv[0].split('/')[-1][:-3] + '.log'
obs = twisted_log.PythonLoggingObserver()
obs.start()
class OpenTSDBProtocol(Protocol):
@defer.inlineCallbacks
def put(self, metric_base, D, valkey, tagkeys=()):
metric = '%s.%s' % (metric_base, D['testid'])
tags = ' '.join(['%s=%s' % (k, str(D[k]) if D[k] is not None else '') for k in tagkeys])
putline = 'put %s %d %f %s\n' % (metric, D['timestamp'], float(D[valkey]), tags)
yield self.transport.write(putline)
defer.succeed(True)
class OpenTSDBFactory(ReconnectingClientFactory):
myproto = None
def buildProtocol(self, addr):
self.resetDelay()
if self.myproto is None:
self.myproto = OpenTSDBProtocol()
return self.myproto
def clientConnectionLost(self, connector, reason):
self.myproto = None
ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
def clientConnectionFailed(self, connector, reason):
self.myproto = None
ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
class cp_push_request(web.RequestHandler):
_tsdb = None
def initialize(self, tsdb):
self._tsdb = tsdb
# {"Version":3,"V":3,"TestDetail":{"Name":"Test Name","TypeId":5,"MonitorTypeId":13},"TestId":12345,"ReportWindow":"201602260100","NodeId":312,"NodeName":"Jinan, CN - Unicom","Asn":4837,"DivisionId":1033,"ClientId":1234,"Summary":{"V":1,"Timestamp":"20160226010449609","Timing":{"Total":158},"Address":"1.2.3.4","Request":"www.example.com"}}
def post(self):
try:
body = json.loads(self.request.body)
nodeid = body.get('NodeId', None)
D = {
'testid': body['TestId'],
'timestamp': body.get('Summary', {}).get('Timestamp', None),
'rtt': body.get('Summary', {}).get('Timing', {}).get('Total', None),
'error': body.get('Summary', {}).get('Error', {}).get('Code', None),
}
except:
raise web.HTTPError(400, 'bad body')
try:
node = nodes.get_node(int(nodeid))
except:
raise web.HTTPError(400, 'bad node id')
try:
ts = time.mktime(time.strptime(D['timestamp'][:-3], '%Y%m%d%H%M%S'))
D['timestamp'] = int(ts)
except:
D['timestamp'] = int(time.time())
D['counter'] = 1 # silly hack, will assign either 'error' or 'ok' as status
D['status'] = 'ok' if D['error'] is None else 'error'
D.update(node)
if self._tsdb is not None and self._tsdb.myproto is not None:
# put each of these in tsdb, and tag the by_xxx, and nodeid so
# we can agg across all nodes
self._tsdb.myproto.put('catchpoint.rtt.by_node', D, 'rtt', ['nodeid'])
self._tsdb.myproto.put('catchpoint.status.by_node', D, 'counter', ['nodeid', 'status'])
self._tsdb.myproto.put('catchpoint.rtt.by_asn', D, 'rtt', ['nodeid', 'asn'])
self._tsdb.myproto.put('catchpoint.status.by_asn', D, 'counter', ['nodeid', 'asn', 'status'])
self._tsdb.myproto.put('catchpoint.rtt.by_continent', D, 'rtt', ['nodeid', 'continent'])
self._tsdb.myproto.put('catchpoint.status.by_continent', D, 'counter', ['nodeid', 'continent', 'status'])
self._tsdb.myproto.put('catchpoint.rtt.by_city', D, 'rtt', ['nodeid', 'city'])
self._tsdb.myproto.put('catchpoint.status.by_city', D, 'counter', ['nodeid', 'city', 'status'])
self._tsdb.myproto.put('catchpoint.rtt.by_country', D, 'rtt', ['nodeid', 'country'])
self._tsdb.myproto.put('catchpoint.status.by_country', D, 'counter', ['nodeid', 'country', 'status'])
self._tsdb.myproto.put('catchpoint.rtt.by_region', D, 'rtt', ['nodeid', 'region'])
self._tsdb.myproto.put('catchpoint.status.by_region', D, 'counter', ['nodeid', 'region', 'status'])
self._tsdb.myproto.put('catchpoint.rtt.by_isp', D, 'rtt', ['nodeid', 'isp'])
self._tsdb.myproto.put('catchpoint.status.by_isp', D, 'counter', ['nodeid', 'isp', 'status'])
self._tsdb.myproto.put('catchpoint.error.by_node', D, 'counter', ['nodeid', 'error'])
defer.succeed(True)
else:
# not connected to tsdb, dump data to file
try:
with open(log_file, 'a') as outfile:
json.dump(D, outfile)
outfile.write('\n')
except IOError:
tb = traceback.format_exc()
l.warning(tb)
return
if __name__ == '__main__':
ap = argparse.ArgumentParser(description='Catchpoint Push API -> OpenTSDB bridge')
ap.add_argument('-l', '--listen', default=8080, help='Listen port', type=int)
ap.add_argument('-t', '--tsdb-host', default='localhost', help='OpenTSDB host')
ap.add_argument('-p', '--tsdb-port', default=4242, help='OpenTSDB port', type=int)
ap.add_argument('-k', '--key', help='Catchpoint Push API key', required=True)
ap.add_argument('-s' ,'--secret', help='Catchpoint Push API secret', required=True)
ap.add_argument('-f' ,'--file-mode', action='store_true', help='Dump Catchpoint data to the local filesystem and do not connect to OpenTSDB')
args = ap.parse_args()
# init catchpoint node list
nodes = catchpoint_nodes(args.key, args.secret)
nodes.update_node_list()
if args.file_mode:
l.info('File mode detected. Not connecting to OpenTSDB')
l.info("Printing Catchpoint data to local log file {log_file}".format(log_file=log_file))
tsdb = None
else:
l.info('Connecting to OpenTSDB')
tsdb = OpenTSDBFactory()
reactor.connectTCP(args.tsdb_host, args.tsdb_port, tsdb)
app = web.Application([(r'/', cp_push_request, dict(tsdb=tsdb))], xheaders=True, debug=True)
reactor.listenTCP(port=args.listen, factory=app)
reactor.run()