-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtelnet_server.py
82 lines (61 loc) · 2.28 KB
/
telnet_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from byteio import *
from run import run
from cpp.um_emulator import UniversalMachine
from pathlib import Path
import sys
import io
import socketserver
import threading
import logging
file_lock = threading.Lock()
class Handler(socketserver.StreamRequestHandler):
def handle(self):
request = []
while True:
line = self.rfile.readline()
if line.strip() == b'':
break
request.append(line)
request = b''.join(request)
response = proxy(request, BaseWriter())
self.wfile.write(response)
with file_lock:
with Path('logs/default.out').open('ab') as f:
f.write(request + response)
request_header = request[:request.find(b' HTTP')].decode('ascii')
response_header = response[response.find(b' ')+1:response.find(b'\n')].decode('ascii')
logging.info(f'{request_header} | {response_header} | {len(response)} bytes')
self.wfile.close()
um = None
um_lock = threading.Lock()
def get_um_copy():
global um
with um_lock:
if um is None:
logging.info('loading um...')
um = UniversalMachine(Path('umix.umz').read_bytes())
b = b'guest\nmail\ntelnet 127.0.0.1 80\n'
run(um, umin = ByteReader(io.BytesIO(b)), umout=BaseWriter())
logging.info('done')
return UniversalMachine(um)
def proxy(request, logwriter: BaseWriter):
local_um = get_um_copy()
run(local_um, umin = ByteReader(io.BytesIO(request)), umout=logwriter)
outstream = io.BytesIO()
run(local_um,
umin=ForkReader(ByteReader(io.BytesIO(b'\n')), [logwriter]),
umout=ForkWriter(logwriter, ByteWriter(outstream)))
response = outstream.getvalue()
assert response.endswith(b'% '), (request, response)
return response[:-2]
if __name__ == '__main__':
import hintcheck
hintcheck.hintcheck_all_functions()
logging.basicConfig(format='%(asctime)s : %(message)s',
level=logging.INFO,
datefmt='%m-%d %H:%M:%S',)
HOST, PORT = 'localhost', 5017
with socketserver.ThreadingTCPServer((HOST, PORT), Handler) as server:
Path('logs/default.out').write_bytes(b'')
logging.info('listening...')
server.serve_forever()