Я написал этот скрипт, который использует python API exchangengelib для входа в мою учетную запись Microsoft Exchange и загрузки отправленных сообщений, содержащих определенную тему.
Я разделил его на две функции, каждая из которых работает как отдельный процесс:
Функция A получает электронные письма из моей учетной записи и помещает те, которые соответствуют определенной теме, в очередь.
Функция B берет эти электронные письма из очереди и сохраняет их на диск.
Эти функции работают правильно при запуске в одном процессе, но когда я решил запустить функцию A в дочернем процессе, в то время как функция B выполняется в основном процессе, функция A просто зависает в строке, которая загружает электронные письма.
Мой код ниже.
from exchangelib import Credentials, Account, Mailbox, HTMLBody, Message
import multiprocessing as mp
from multiprocessing import Process, Queue
def authenticate(user_id, passw):
credentials = Credentials(user_id, passw)
exchange = Account(account_id, credentials=credentials, autodiscover=True)
return exchange
def fetch_bounces(exchange, output_q, progress):
print('fetcher process started')
while exchange.inbox.total_count != 0:
exchange.inbox.refresh()
inbox=exchange.inbox.all()
for msg in inbox:
if 'Undeliverable:' in msg.subject:
output_q.put(msg.display_to)
inbox.move_to_trash()
progress.put('done')
def save_bounced(outputfile, output_q):
bounces = []
while output_q.empty() is False:
lead = output_q.get()
print('bounce: '+lead)
bounces.append(lead)
if len(bounces)>0:
existing = []
try:
with open(outputfile, 'r+') as f:
lines = f.readlines()
existing = [line.strip().lower() for line in lines]
except:
pass
with open(outputfile, 'a+') as f:
for line in bounces:
if line.strip().lower() in existing:
continue
else:
f.write(line.strip().lower()+'\n')
if __name__ == '__main__':
#credentials
account_id =
password =
#set Queues to store bounces and progress
bounced = Queue()
progress = Queue()
#login to exhcnage
exchange = authenticate(account_id, password)
print('logged in successfully')
#spawn bounce fetching process and start it
f_process = Process(target=fetch_bounces, args=(exchange, bounced, progress))
f_process.start()
#define file path where bounces will be stored
output_f = 'bounces.txt'
#while new bounce emails are being fetch, remain in loop and also specify flag to know when all emails have been fetched
complete = False
while not complete:
save_bounced(output_f, bounced)
try:
msg = progress.get(block=False)
if msg == 'done':
complete = True
except:
pass
#terminate fetcher process
f_process.join()
print('bounces completely removed')
Когда вы используете многопроцессорность, все аргументы должны быть сериализуемыми. Но экземпляр Account
нельзя безопасно сериализовать. Вы должны создать экземпляр Account
в каждом процессе.