Python: функция зависает в дочернем процессе

Я написал этот скрипт, который использует python API exchangengelib для входа в мою учетную запись Microsoft Exchange и загрузки отправленных сообщений, содержащих определенную тему.

Я разделил его на две функции, каждая из которых работает как отдельный процесс:

Функция A получает электронные письма из моей учетной записи и помещает те, которые соответствуют определенной теме, в очередь.

Функция B берет эти электронные письма из очереди и сохраняет их на диск.

Эти функции работают правильно при запуске в одном процессе, но когда я решил запустить функцию A в дочернем процессе, в то время как функция B выполняется в основном процессе, функция A просто зависает в строке, которая загружает электронные письма.

Мой код ниже.

from exchangelib import Credentials, Account, Mailbox, HTMLBody, Message

import multiprocessing as mp
from multiprocessing import Process, Queue


def authenticate(user_id, passw):
    credentials = Credentials(user_id, passw)
    exchange = Account(account_id, credentials=credentials, autodiscover=True)
    return exchange

def fetch_bounces(exchange, output_q, progress):
    print('fetcher process started')
    while exchange.inbox.total_count != 0:
        exchange.inbox.refresh()
        inbox=exchange.inbox.all()
        for msg in inbox:
            if 'Undeliverable:' in msg.subject:
                output_q.put(msg.display_to)
        inbox.move_to_trash()
    progress.put('done')

def save_bounced(outputfile, output_q):
    bounces = []
    while output_q.empty() is False:
        lead = output_q.get()
        print('bounce: '+lead)
        bounces.append(lead)
    if len(bounces)>0:
        existing = []
        try:
            with open(outputfile, 'r+') as f:
                lines = f.readlines()
                existing = [line.strip().lower() for line in lines]
        except:
            pass
        with open(outputfile, 'a+') as f:
            for line in bounces:
                if line.strip().lower() in existing:
                    continue
                else:
                    f.write(line.strip().lower()+'\n')

if __name__ == '__main__':
    #credentials
    account_id =
    password = 


    #set Queues to store bounces and progress
    bounced = Queue()
    progress = Queue()

    #login to exhcnage
    exchange = authenticate(account_id, password)
    print('logged in successfully')

    #spawn bounce fetching process and start it
    f_process = Process(target=fetch_bounces, args=(exchange, bounced, progress))
    f_process.start()

    #define file path where bounces will be stored
    output_f = 'bounces.txt'

    #while new bounce emails are being fetch, remain in loop and also specify flag to know when all emails have been fetched

    complete = False
    while not complete:
        save_bounced(output_f, bounced)
        try:
            msg = progress.get(block=False)
            if msg == 'done':
                complete =  True
        except:
            pass

    #terminate fetcher process

    f_process.join()

    print('bounces completely removed')
2
0
127
1

Ответы 1

Когда вы используете многопроцессорность, все аргументы должны быть сериализуемыми. Но экземпляр Account нельзя безопасно сериализовать. Вы должны создать экземпляр Account в каждом процессе.

Другие вопросы по теме