-
Notifications
You must be signed in to change notification settings - Fork 0
/
byteio.py
126 lines (96 loc) · 3.19 KB
/
byteio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import io
from typing import List, Optional
from abc import abstractmethod
class BaseReader:
# returns one byte from stream or None if EOF
def readbyte(self) -> Optional[int]:
return None
class TextReader(BaseReader):
# stream is fileobject, opened for reading as text
def __init__(self, stream, *, encoding='ascii', errors='strict'):
assert stream.readable()
self.stream = stream
self.encoding = encoding
self.errors = errors
self.buffer = bytearray()
def readbyte(self) -> Optional[int]:
if len(self.buffer) > 0:
return self.buffer.pop(0)
c = self.stream.read(1)
if len(c) == 0:
return None
assert len(c) == 1
c = c.encode(encoding=self.encoding, errors=self.errors)
self.buffer = c[1:]
return c[0]
class ByteReader(BaseReader):
# stream is fileobject, opened for reading as binary
def __init__(self, stream):
assert stream.readable()
self.stream = stream
def readbyte(self) -> Optional[int]:
c = self.stream.read(1)
if len(c) == 0:
return None
assert len(c) == 1
return c[0]
class ForkReader(BaseReader):
def __init__(self, reader: BaseReader, writers):
self.reader = reader
self.writers = writers
def readbyte(self) -> Optional[int]:
byte = self.reader.readbyte()
if byte is None:
return None
for writer in self.writers:
writer.writebyte(byte)
return byte
class SequentialReader(BaseReader):
def __init__(self, *readers):
self.readers = list(readers)
def readbyte(self) -> Optional[int]:
while len(self.readers) > 0:
c = self.readers[0].readbyte()
if c is None:
self.readers.pop(0)
continue
return c
return None
# ------------------------------------------------- #
class BaseWriter:
def writebyte(self, byte: int):
pass
class TextWriter(BaseWriter):
# stream is fileobject, opened for writing as text
def __init__(self, stream, *, encoding='ascii', errors='strict'):
assert stream.writable()
self.stream = stream
self.encoding = encoding
self.errors = errors
def writebyte(self, byte: int):
b = bytes([byte]).decode(encoding=self.encoding, errors=self.errors)
self.stream.write(b)
self.stream.flush()
class ByteWriter(BaseWriter):
# stream is fileobject, opened for writing as binary
def __init__(self, stream):
assert stream.writable()
self.stream = stream
def writebyte(self, byte: int):
self.stream.write(bytes([byte]))
self.stream.flush()
class ForkWriter(BaseWriter):
def __init__(self, *writers):
self.writers = writers
def writebyte(self, byte: int):
for stream in self.writers:
stream.writebyte(byte)
__all__ = ['BaseReader',
'BaseWriter',
'TextReader',
'TextWriter',
'ForkWriter',
'ForkReader',
'SequentialReader',
'ByteReader',
'ByteWriter']