-
Notifications
You must be signed in to change notification settings - Fork 1
/
fetch_ethercodes.py
executable file
·300 lines (257 loc) · 9.11 KB
/
fetch_ethercodes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
#! /usr/bin/env python3
"""
Synopsis:
Fetch and generate ethercodes data for arpwatch
Usage: {appname} [-hVvfkt][-T sec][-O ouifile][-o outfile][-p spec]
-h, --help this message
-V, --version print version and exit
-v, --verbose verbose mode (cumulative)
-f, --force force operation
-k, --keep keep existing {ouifile}
-t, --timestamp print timestamp
-T, --deltat sec tolerance in timestamp comparison
(default: {deltat} sec.)
-O, --ouifile file IEEE.org host
(default: {ouifile})
-o, --outfile file arpwatch ethercodes
(default: {outfile})
-p, --patch spec patch specfile with updated timestamp
Description:
Fetch current IEEE MA-L Assignments file (oui.csv) from IEEE.org,
and generate ethercodes.dat for arpwatch consumption.
Fetch oui.csv only, if the timestamp is newer (unless --force is given).
Similar, generate ethercodes.dat only, if the timestamps don't match
(again, unless --force is given). Use option --keep to (re)generate
ethercodes.dat from an existing oui.csv.
Patch replaces a pattern NNNNNNNN_NNNNNN with the current timestamp
in the given file.
Notes:
The timestamps of oui.csv fluctuate in a 2 seconds range(!). Therefore
compensate the fluctuation by taking a deltat tolerance factor into
account.
Copyright:
(c)2018 by {author} {email}
License:
{license}
"""
#
# vim:set et ts=8 sw=4:
#
__version__ = '0.5'
__author__ = 'Hans-Peter Jansen'
__email__ = '<[email protected]>'
__license__ = 'MIT'
import os
import re
import csv
import sys
import time
import getopt
import traceback
import email.utils
import urllib.error
import urllib.parse
import urllib.request
# avoid encoding issues with print() in locale-less environments
os.environ["PYTHONIOENCODING"] = "utf-8"
class gpar:
""" global parameter class """
appdir, appname = os.path.split(sys.argv[0])
if appdir == '.':
appdir = os.getcwd()
pid = os.getpid()
version = __version__
author = __author__
email = __email__
license = __license__
loglevel = 0
force = False
keep = False
timestamp = False
deltat = 2.5
ouifile = 'http://standards-oui.ieee.org/oui/oui.csv'
outfile = 'ethercodes.dat'
patchfile = None
def vout(lvl, msg):
if lvl <= gpar.loglevel:
print((msg).format(**gpar.__dict__), file = sys.stdout, flush = True)
stderr = lambda *msg: print(*msg, file = sys.stderr, flush = True)
def exit(ret = 0, msg = None, usage = False):
""" terminate process with optional message and usage """
if msg:
stderr('{}: {}'.format(gpar.appname, msg))
if usage:
stderr(__doc__.format(**gpar.__dict__))
sys.exit(ret)
def cmp_ts(t1, t2):
""" compare timestamps while taking a global tolerance factor into
account
return True, if timestamps match
"""
return abs(t1 - t2) < gpar.deltat
hex = lambda x: int(x, 16)
hexstr = lambda h: format(h, 'x')
def code_key(val):
""" return the colon formated code key, if val is an exact 24 byte
hexadecimal string, and None otherwise
000000 -> 0:0:0
ABCDEF -> ab:cd:ef
"""
if len(val) == 6:
return ':'.join((map(hexstr, map(hex, (val[:2], val[2:4], val[4:])))))
def fetch_infile(infile):
# check oui.csv parameter
vout(1, 'check {ouifile}')
req = urllib.request.urlopen(gpar.ouifile)
vout(3, 'header info: {}'.format(req.info()))
header = req.info()
ouisize = int(header['Content-Length'])
vout(1, 'oui file size: {}'.format(ouisize))
ouidate = header['Last-Modified']
vout(1, 'oui file date: {}'.format(ouidate))
ouidate = email.utils.parsedate(ouidate)
ouitime = time.mktime(ouidate)
vout(3, 'parsed oui file date: {} ({})'.format(
time.asctime(ouidate), ouitime))
# check, if local oui.csv is outdated
fetchoui = False
if gpar.force:
fetchoui = True
elif not os.path.exists(infile):
vout(1, 'no local file {infile} found')
fetchoui = True
elif os.path.getsize(infile) != ouisize:
vout(1, 'local file size differs: {} vs. {} remote'.format(
os.path.getsize(infile), ouisize))
fetchoui = True
elif not cmp_ts(os.stat(infile).st_mtime, ouitime):
vout(3, str(os.stat(infile).st_mtime))
vout(3, str(ouitime))
mtime = time.localtime(os.stat(infile).st_mtime)
otime = time.localtime(ouitime)
vout(1, 'local file date differs: {} vs. {} remote'.format(
time.asctime(mtime), time.asctime(otime)))
fetchoui = True
# fetch oui.csv
if fetchoui:
vout(1, 'fetch {ouifile}')
open(infile, 'wb').write(req.read())
os.utime(infile, (ouitime, ouitime))
return ouidate
def parse_csv(infile):
vout(1, 'parse {infile}')
codes = {}
with open(infile, newline = '', encoding = 'utf-8') as f:
reader = csv.reader(f)
for row in reader:
vout(3, str(row))
# use ethercode and manufacturer name
code, value = row[1:3]
key = code_key(code)
value = value.strip()
if key:
if code in codes and codes[key] != value:
vout(1, 'value {} exists already: "{}", "{}"'.format(
code, codes[key], value))
else:
codes[key] = value
return codes
def patch_file(patchfile, timestamp):
vout(1, 'patch {}'.format(patchfile))
with open(patchfile, 'r+', encoding = 'utf-8') as f:
content = f.read()
patched, count = re.subn('\d{8}_\d{6}', timestamp, content)
if count and content != patched:
f.seek(0)
f.write(patched)
vout(1, '{} occurances replaced in {}'.format(count, patchfile))
else:
vout(1, '{} unchanged'.format(patchfile))
def fetch_ethercodes():
# extract file argument from URL
gpar.infile = os.path.basename(urllib.parse.urlparse(gpar.ouifile).path)
# default: oui.csv
infile = gpar.infile
# default: ethercodes.dat
outfile = gpar.outfile
if gpar.keep:
if os.path.exists(infile):
# force generation of ethercodes.dat from existing oui.csv
gpar.force = True
ouidate = time.localtime(os.stat(infile).st_mtime)
else:
exit(1, 'keep option selected, but no {infile}')
else:
ouidate = fetch_infile(infile)
ouitime = time.mktime(ouidate)
# check, if ethercodes.dat is outdated
gencodes = False
if gpar.force:
gencodes = True
elif not os.path.exists(outfile):
vout(1, 'no local file {} found'.format(outfile))
gencodes = True
elif not cmp_ts(os.stat(outfile).st_mtime, ouitime):
vout(3, str(os.stat(outfile).st_mtime))
vout(3, str(ouitime))
mtime = time.localtime(os.stat(outfile).st_mtime)
otime = time.localtime(ouitime)
vout(2, 'local file date differs: {} vs. {} remote'.format(
time.asctime(mtime), time.asctime(otime)))
gencodes = True
# generate ethercodes.dat
if gencodes:
codes = parse_csv(infile)
gpar.nrcodes = len(codes)
vout(1, 'generate {outfile} with {nrcodes} entries')
with open(outfile, 'w', newline = '', encoding = 'utf-8') as f:
for key in sorted(codes.keys()):
f.write('%s\t%s\n' % (key, codes[key]))
os.utime(outfile, (ouitime, ouitime))
vout(1, 'successful')
else:
vout(1, 'code file {outfile} up to date already')
timestamp = time.strftime('%Y%m%d_%H%M%S', ouidate)
if gpar.timestamp:
vout(0, 'timestamp: {}'.format(timestamp))
if gpar.patchfile is not None:
patch_file(gpar.patchfile, timestamp)
return 0
def main(argv = None):
if argv is None:
argv = sys.argv[1:]
# yeah, oldschool, I know...
try:
optlist, args = getopt.getopt(argv, 'hVvfktT:O:o:p:',
('help', 'version', 'verbose', 'force', 'keep', 'timestamp',
'deltat', 'ouifile', 'outfile', 'patch')
)
except getopt.error as msg:
exit(1, msg, True)
for opt, par in optlist:
if opt in ('-h', '--help'):
exit(usage = True)
elif opt in ('-V', '--version'):
exit(msg = 'version %s' % gpar.version)
elif opt in ('-v', '--verbose'):
gpar.loglevel += 1
elif opt in ('-f', '--force'):
gpar.force = True
elif opt in ('-k', '--keep'):
gpar.keep = True
elif opt in ('-t', '--timestamp'):
gpar.timestamp = True
elif opt in ('-T', '--deltat'):
gpar.deltat = par
elif opt in ('-O', '--ouifile'):
gpar.ouifile = par
elif opt in ('-o', '--outfile'):
gpar.outfile = par
elif opt in ('-p', '--patch'):
gpar.patchfile = par
try:
fetch_ethercodes()
except Exception:
exit(2, 'unexpected exception occurred:\n%s' % traceback.format_exc())
if __name__ == '__main__':
sys.exit(main())