Самообновляемый объект многопроцессорной обработки Python

Я пытаюсь написать отображение по сторонам света. У меня есть интерфейс, который работает с высокой частотой обновления, и аппаратное устройство, которое обновляется всего несколько раз в секунду.

Мне нужно иметь возможность получать последние сообщаемые значения с оборудования на дисплей каждый раз, когда дисплей обновляется.

Мой текущий подход — попробовать многопроцессорность. У меня есть процесс, который получает значения и обновляет объект через каналы общего состояния с этими значениями, и у меня есть процесс, который берет значения из объекта и обновляет отображение.

К сожалению, похоже, где-то блокируется, и дисплей заблокирован на скорости обновления оборудования.

class AccMag():
    # Init manually to ensure access to and for internal functions
    def init(self):
        # This defines the hardware access
        self.i2c = busio.I2C(board.SCL, board.SDA)
        self.mag = adafruit_lis2mdl.LIS2MDL(self.i2c)
        self.accel = adafruit_lsm303_accel.LSM303_Accel(self.i2c)

        # Output values (mag#) and pipes to the other process
        self.magx = 0
        self.pipex = multiprocessing.Value('d', 0.0)
        self.magy = 0
        self.pipey = multiprocessing.Value('d', 0.0)
        self.magz = 0
        self.pipez = multiprocessing.Value('d', 0.0)

        # The process, named thread.  Yes, my cat is named Rex.
        self.thread = multiprocessing.Process(target=self.threadmagdate, args=(self.pipex, self.pipey, self.pipez))
        self.thread.start()

        # A counter I use to see how often the process has run.
        # Comparing this counter to a similar one in the display is how I know it's blocking.
        self.tc = 0

    # The function that starts the process to update the values
    def magdate(self):
        # Check the current process is done
        self.thread.join(timeout=0)
        if self.thread.is_alive() == False:
            # Pull out previous values
            self.magx = self.pipex.value
            self.magy = self.pipey.value
            self.magz = self.pipez.value
            self.tc += 1
            # Start a new process
            self.thread = multiprocessing.Process(target=self.threadmagdate, args=(self.pipex, self.pipey, self.pipez))
            self.thread.start()

    # Get the data from the hardware into the pipe
    def threadmagdate(self, magx, magy, magz,):
        magx.value = int(self.mag.magnetic[1])
        magy.value = int(self.mag.magnetic[0])
        magz.value = int(self.mag.magnetic[2])

Затем дисплей может вызвать magdate() и получить значения из значений mag#.

Я упускаю что-то очевидное, я чую это, но понятия не имею, почему оно закрывается.

Вероятно, вам не нужна многопроцессорность для распараллеливания ввода-вывода; потоков должно быть достаточно, и это будет проще, поскольку вы можете напрямую использовать общие переменные. (Вы можете использовать общую память с многопроцессорной обработкой, но это несколько сложнее)

nneonneo 20.07.2024 06:39

Спасибо за предложение, я изменил код, чтобы использовать потоки вместо процессов, но обнаружил, что он вообще не возвращает никаких значений. Я собираюсь переключиться на Rust для этого проекта, но с радостью продолжу пробовать любые другие предложения на случай, если это поможет кому-то еще в будущем, и постараюсь убедиться в чистоте извлечения значений.

Sarazil 20.07.2024 11:41
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
2
59
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

self.thread.join(timeout=0) кажется бессмысленной строкой кода, поскольку join() обычно используется для ожидания завершения процесса в течение определенного периода времени, но в этом случае он вообще не ждет.

Судя по вашему коду, я не вижу ничего, что могло бы блокировать, но на некоторых платформах запуск нового процесса может занять некоторое время (иногда более 1 секунды) в зависимости от метода запуска процесса .

Учитывая, что magdate(), скорее всего, будет запускать новый процесс при каждом вызове, я подозреваю, что именно здесь и происходит «блокировка».

Поэтому вместо того, чтобы каждый раз запускать новый процесс в magdate(), я бы предложил вместо этого использовать объект Queue.

from multiprocessing import Queue, Process, Event

class AccMag():
    # Init manually to ensure access to and for internal functions
    def init(self):
        # This defines the hardware access
        self.i2c = busio.I2C(board.SCL, board.SDA)
        self.mag = adafruit_lis2mdl.LIS2MDL(self.i2c)
        self.accel = adafruit_lsm303_accel.LSM303_Accel(self.i2c)

        # Set up queue and a termination event
        self.queue = Queue()
        self.terminate_event = Event()

        # The process, named thread.  Yes, my cat is named Rex.
        self.thread = Process(target=self.threadmagdate, args=(self.queue, self.terminate_event))
        self.thread.start()

        # A counter I use to see how often the process has run.
        # Comparing this counter to a similar one in the display is how I know it's blocking.
        self.tc = 0

    def stop(self):
        self.terminate_event.set()  ## trigger termination event
        self.thread.join()  ## wait for process to terminate

    # The function that starts the process to update the values
    def magdate(self):
        # Check if new value is available
        if not self.queue.empty():
            x, y, z = self.queue.get()

            # Pull out new values
            self.magx = int(x)
            self.magy = int(y)
            self.magz = int(z)

            self.tc += 1

    # Get the data from the hardware into the pipe
    def threadmagdate(self, queue, terminate_event):
        while not terminate_event.is_set():  ## check if it's time to stop reading values
            queue.put(self.mag.magnetic)  ## insert new reading into queue

После реализации предложенного кода я по-прежнему обнаруживаю, что частота кадров соответствует выходной скорости магнитометра. Вполне возможно, что self.mag.Magnetic — это объект, который запускает свою собственную функцию при запросе значений, возвращая данные обратно в основной процесс. Я собираюсь переключиться на Rust для этого проекта, но с радостью продолжу пробовать любые дополнительные предложения на случай, если это поможет кому-то еще в будущем, и постараюсь убедиться, что значения аккуратно извлекаются во втором процессе.

Sarazil 20.07.2024 11:40

@Sarazil проверяет реализацию класса LIS2MDL, возвращаемое magnetic значение представляет собой простой кортеж. Поэтому никакого магического поведения здесь быть не должно. Я подозреваю, что ваша ошибка кроется в другом месте.

nneonneo 20.07.2024 13:09
Ответ принят как подходящий

Что-то вроде этого должно работать с потоками:

class AccMag:
    # Init manually to ensure access to and for internal functions
    def __init__(self):
        # This defines the hardware access
        self.i2c = busio.I2C(board.SCL, board.SDA)
        self.mag = adafruit_lis2mdl.LIS2MDL(self.i2c)
        self.accel = adafruit_lsm303_accel.LSM303_Accel(self.i2c)

        self.mag_data = (0, 0, 0)
        self.update_count = 0

        self.thread = threading.Thread(target=self._mag_thread, daemon=True)
        self.thread.start()

    # Get the data from the hardware into the pipe
    def _mag_thread(self):
        while True:
            mag = tuple(map(int, self.mag.magnetic))
            self.mag_data = mag
            self.update_count += 1

С этой реализацией нет необходимости вручную вызывать magdata: просто возьмите значения x, y, z напрямую из .mag_data.

В главном приложении отображения скопируйте ссылку на кортеж mag_data в локальную переменную (например, cur_data = accmag.mag_data; x = cur_data[0]), а не обращайтесь напрямую к полям (например, x = accmag.mag_data[0]), чтобы избежать гонок с потоком обновления и просмотра частичных результатов.

Однако я подозреваю, что ваша проблема с блокировкой заключается в чем-то другом, учитывая, что другой ответ также должен показывать нулевую блокировку, но, похоже, блокируется за вас.

Спасибо за этот отзыв, это действительно доказало мне, что моя проблема заключается в другом, и что задержка возникла из-за вызова класса AccMag основным методом HUD. Значение update_count AccMag намного превышало значение основного метода.

Sarazil 22.07.2024 12:24

Другие вопросы по теме