-
Notifications
You must be signed in to change notification settings - Fork 0
/
imap.py
330 lines (283 loc) · 10.7 KB
/
imap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
import email
import logging
import re
import string
import time
from email.header import decode_header
from imaplib import IMAP4
from itertools import batched
import backoff
import bs4
import imapclient
from imapclient.exceptions import IMAPClientError, LoginError
from settings import Settings
_RE_SYMBOL_SEQ = re.compile(r"(?<=\s)\W+(?=\s)")
_RE_WHITESPACE = re.compile(r"\s+")
_RE_COMBINE_CR = re.compile(r"\n+")
_RE_NO_ARROWS = re.compile(r"^([>])+", re.MULTILINE)
_BATCH_SIZE = 40
HEADER_KEY = b"BODY[HEADER.FIELDS (SUBJECT FROM TO CC BCC)]"
BODY_KEY = b"BODY[]"
base_logger = logging.getLogger("imap")
def html2text(html: str) -> str:
"""Convert html to plain-text using beautifulsoup"""
soup = bs4.BeautifulSoup(html, "html.parser")
text = soup.get_text(separator=" ")
text = "".join(filter(lambda x: x in string.printable, text))
text = re.sub(r"&[a-z]{3,4};", " ", text)
return text
def get_header(raw_header, key):
header_str = ""
email_headers = email.message_from_bytes(raw_header)
header = email_headers[key]
if header is None:
return header_str
for _header, c_set in decode_header(header):
if isinstance(_header, str):
header_str = f"{header_str} {_header}"
else:
try:
header_str = f"{header_str} {_header.decode(c_set or "utf-8")}"
except LookupError:
# Retry with utf-8, if we didn't find the codec.
header_str = f"{header_str} {_header.decode("utf-8")}"
return header_str.strip()
def mesg_to_text(mesg: email.message.Message) -> str:
"""Convert an email message to plain-text"""
text = ""
for part in mesg.walk():
charset = part.get_content_charset() or "utf-8"
if part.get_content_type() == "text/plain":
text += part.get_payload(decode=True).decode(charset, errors="ignore")
elif part.get_content_type() == "text/html":
text += html2text(
part.get_payload(decode=True).decode(charset, errors="ignore")
)
text = _RE_SYMBOL_SEQ.sub("", text)
text = _RE_WHITESPACE.sub(" ", text)
text = _RE_COMBINE_CR.sub(" ", text)
text = _RE_NO_ARROWS.sub("", text)
return text
class ImapHandler:
def __init__(self, settings: Settings, readonly=False) -> None:
self.__settings = settings
self.__imap_conn = None
self.logger = logging.getLogger(self.__class__.__name__)
self.__readonly = readonly
self.__capabilities = None
@property
def capabilities(self) -> list:
return self.__capabilities
@property
def has_move(self) -> bool:
if self.__capabilities is None:
return False
return "MOVE" in self.__capabilities
def get_connection(self):
return self.__imap_conn
@backoff.on_exception(
backoff.expo,
(IOError, IMAP4.error, IMAPClientError),
max_tries=5,
on_backoff=lambda details: base_logger.warning(
"Backing off %0.1f seconds on %i logging in to IMAP",
details["wait"],
details["tries"],
),
)
def connect_imap(self) -> bool:
if not self.__settings.imap_host or not self.__settings.username:
return False
try:
self.__imap_conn = imapclient.IMAPClient(
self.__settings.imap_host, ssl=True
)
self.__imap_conn.login(self.__settings.username, self.__settings.password)
except LoginError as e:
self.logger.error("Could not login to %s", self.__settings.imap_host)
return False
except Exception as e:
self.logger.error(
"Unknow error logging in to imap server %s", self.__settings.imap_host
)
self.logger.error(e, exc_info=True)
return False
self.__capabilities = [x.decode() for x in self.__imap_conn.capabilities()]
self.logger.debug(self.__capabilities)
return True
def __del__(self):
self.close()
def __reconnect(self):
if self.__imap_conn is not None:
self.logger.warning("Error communicating with IMAP server. Reconnecting.")
self.connect_imap()
def close(self):
if self.__imap_conn is not None:
self.logger.debug("Cleaning up imap connection")
try:
self.__imap_conn.logout()
except IMAP4.error as se:
self.logger.debug("Error logging out from imap.")
except Exception as e:
self.logger.debug("Other error %s", e)
finally:
self.__imap_conn = None
@backoff.on_exception(
backoff.expo,
(IOError, IMAP4.error, IMAPClientError),
max_tries=5,
on_backoff=lambda details: base_logger.warning(
"Backing off %0.1f seconds on %i listing folders with IMAP",
details["wait"],
details["tries"],
),
)
def list_folders(self) -> list[str]:
try:
return self.__list_folders()
except IMAPClientError as e:
self.logger.warning(
"Got exception on listing folders, will reconnect %s", e
)
self.__reconnect()
return self.__list_folders()
def __list_folders(self) -> list[str]:
return [t[2] for t in self.__imap_conn.list_folders()]
def fetch(self, uids: list) -> dict:
"""Will fetch a set of email based on the list of uids. Any list extending
a certain size will be batched.
Args:
uids (list): a list of uids for emails to be feteched
Returns:
dict: All fetched emails
"""
try:
return self.__fetch(uids)
except IMAPClientError as e:
self.logger.warning("Got exception on fetching, will reconnect %s", e)
self.__reconnect()
return self.__fetch(uids)
def __fetch(self, uids) -> dict:
all_mails = {}
batched_uids = list(batched(uids, _BATCH_SIZE))
index = 0
for uid_batch in batched_uids:
index += 1
self.logger.debug("\t Batch %i/%i", index, len(batched_uids))
time.sleep(1)
all_mails.update(self.__fetch_batch(uid_batch))
return all_mails
@backoff.on_exception(
backoff.expo,
(IOError, IMAP4.error, IMAPClientError),
max_tries=4,
on_backoff=lambda self, details: self.logger.warning(
"Backing off %0.1f seconds after %i tries",
details["wait"],
details["tries"],
),
)
def __fetch_batch(self, uid_batch: list):
return self.__imap_conn.fetch(uid_batch, [HEADER_KEY, BODY_KEY])
@backoff.on_exception(
backoff.expo,
(IOError, IMAP4.error, IMAPClientError),
max_tries=5,
on_backoff=lambda details: base_logger.warning(
"Backing off %0.1f seconds on %i searching IMAP",
details["wait"],
details["tries"],
),
)
def search(self, folder: str, search_args=None) -> list[int]:
"""Searches for messages in imap folder
Args:
folder (str): the folder
search_args (Any, optional): Search criteria. Defaults to ["ALL"].
Returns:
list: list of uids
"""
try:
return self.__search(folder, search_args)
except IMAPClientError as e:
self.logger.warning("Got exception on searching, will reconnect %s", e)
self.__reconnect()
return self.__search(folder, search_args)
def __search(self, folder: str, search_args=None) -> list[int]:
if search_args is None:
search_args = ["ALL"]
self.__imap_conn.select_folder(folder, self.__readonly)
results = self.__imap_conn.search(search_args)
return results
@backoff.on_exception(
backoff.expo,
(IOError, IMAP4.error, IMAPClientError),
max_tries=2,
)
def move(
self,
folder: str,
uids: list,
dest_folder: str,
flag_messages=True,
flag_unseen=True,
) -> int:
"""Move a message from one folder to another
Args:
folder (str): source folder
uids (list): message uids
dest_folder (str): destination folder
flag_messages (bool, optional): Whether to flag messages moved. Defaults to True.
flag_unseen (bool, optional): Make moved messages unread.
Returns:
int: number of messages moved
"""
if not isinstance(uids, list):
self.logger.error(
"Expected the uids to be a list \
moving from folder %s to folder %s",
folder,
dest_folder,
)
raise ValueError("Expected uids to be a list")
self.__imap_conn.select_folder(folder, self.__readonly)
if flag_messages:
self.__imap_conn.add_flags(uids, [imapclient.FLAGGED])
if flag_unseen:
# For some reason it seems like adding a flag also put a message as "SEEN"
# thus we need to make it "UNSEEN" again
self.__imap_conn.remove_flags(uids, [imapclient.SEEN])
# Move in imap is a combination of operations. Copy, delete and expunge.
# Unless the move capability exists.
if self.has_move:
self.__imap_conn.move(uids, dest_folder)
else:
self.__imap_conn.copy(uids, dest_folder)
self.__imap_conn.add_flags(uids, [imapclient.DELETED], silent=True)
self.__imap_conn.uid_expunge(uids)
self.logger.info("Moved from %s to %s: %i", folder, dest_folder, len(uids))
return len(uids)
def parse_mesg(self, p_mesg: dict) -> dict:
"""Parse a raw message into a string
Args:
mesg (dict): the message
Returns:
dict: the message as a string
"""
# Some IMAP dialects respond with the keys containing double quotes and some not.
# Let's strip the quotes
mesg = {k.replace(b'"', b""): v for k, v in p_mesg.items()}
raw_header = mesg[HEADER_KEY]
raw_body = mesg[BODY_KEY]
payload = email.message_from_bytes(raw_body)
body_text = mesg_to_text(payload)
to_addr = get_header(raw_header, "TO")
to_addr += get_header(raw_header, "CC")
from_addr = get_header(raw_header, "FROM")
subject = get_header(raw_header, "SUBJECT").removeprefix("**SPAM**").strip()
mesg_dict = {
"from": from_addr,
"tocc": to_addr,
"body": f"Subject: {subject}. {body_text}",
}
return mesg_dict