Последовательное чтение не дает ожидаемого результата

Я сделал считывание кода RPi по серийнику с датчика давления. Каким-то образом мой код читает необработанные данные с последовательного датчика, но мой цикл while не работает, и я ставлю на пустой экран. Что я делаю не так? Весь код ниже:

import serial
import sys
import time

gauge = serial.Serial(
    port = '/dev/serial0',
    baudrate = 9600,
    parity=serial.PARITY_NONE,
    stopbits=serial.STOPBITS_ONE,
    bytesize=serial.EIGHTBITS,
    timeout = 1
    )

while(True):
    while(True):
        input_string = list(gauge.read(2)) #watch for two magic bytes
        if (input_string[0] == 7 and input_string[1] == 5):
            input_string += list(gauge.read(9-2)) #append the rest of the data
            if ((sum(input_string[1:8]) % 256) == input_string[8]): #compute checksum
                break

    gauge_value = 10.0**((input_string[4] * 256.0 + input_string[5]) / 4000.0 - 12.5)
    if (len(sys.argv) > 2):
        print("mBar: {} Pa: {} Status: {} ({}) Error: {} ({})".format(gauge_value,gauge_value*100000.0,
                                                        input_string[2],bin(input_string[2]),
                                                        input_string[3],bin(input_string[3])))
    else:
        print("{},{}".format(time.time(), gauge_value))

Вот код, читающий необработанные данные:

import serial
import time

ser = serial.Serial(
        port='/dev/serial0',#Replace ttyUSB0 with ttyAMA0 for Pi1,Pi2,Pi0
        baudrate = 9600,
        parity=serial.PARITY_NONE,
        stopbits=serial.STOPBITS_ONE,
        bytesize=serial.EIGHTBITS,
        timeout=1
)

while 1:
        x=ser.readline()
        print (x)

Необработанные данные, как показано ниже:

'\n'
b'\x8e\x07\x05@\x08\xbcI(!\xfe\x15A\x00\xec\x1c\x00\x8d\xbc\x87\x1c\x1c\xf4!\x00\n'
b'\x1c\x07\x05\x00\x00\xec\x95\x94\xb5\xf8\x144\x15\xec\x0e\x00\n'
b'\t\x07\x05 \x88n8H$\xa8\x1c\x00\xec\x03\x00\xca{\x07\x15\x90\xb0\xb5?\n'
b'\xf3P\x10\x00\xeb\xb6\x08\x95\xbc(\x1c\x1c\xb4\xfa\x00\n'

Думаю, я сделал что-то не так в цикле while. Для пояснения ниже формата сообщения из руководства к устройству:

Вероятно, вы получаете пустой экран, потому что внутренний цикл while никогда не достигает break. Какой формат сообщения указан в документации устройства?

001 07.08.2024 19:06

@001 добавил скрин из мануала.

adamssson 07.08.2024 19:34

@001 Возможно, вы правы, код читает 2 байта в поисках значений 7 и 5 и переходит к контрольной сумме и контрольной сумме. Если что-то не так с трансером, шлейф может никогда не перейти на внешний if-else. Как вы думаете, было бы лучше использовать readline вместо read. Я совсем новичок и не совсем знаю, как readline поймать бит 4 и 5, которые дают мне интересующие меня данные.

adamssson 07.08.2024 19:41

Я бы не использовал readline. При этом считываются все байты до символа новой строки - и нет никакой гарантии, что ваши данные будут иметь символ новой строки. Я читал по одному байту, чтобы найти начало сообщения. Например: прочитать 1 байт. Если это 7, прочитайте 1 байт. Если это 5, прочитайте 7 байт...

001 07.08.2024 19:47

Значения в дампе данных не соответствуют ожиданиям от документа. За каждым 9-м байтом должен следовать 0x7, но здесь это не так.

001 07.08.2024 19:48

Я обнаружил строку input_string = list(gauge.read(2)) читать только 2 значения одновременно и останавливаться. Как заставить читать код до тех пор, пока не будут обнаружены правильные значения бит 0 и 1?

adamssson 07.08.2024 20:30

С примерами данных, которые вы опубликовали в вопросе, оператор break никогда не выполняется, потому что рассчитанные контрольные суммы не соответствуют ожиданиям. Для пакетов, начинающихся с 7 и 5, тип датчика также не соответствует ожидаемому. Если вы проигнорируете эти проблемы, то одно из показаний сообщит об BA error. Я думаю, вам нужно упростить циклы while, чтобы разделить различные методы отказа.

ukBaz 10.08.2024 08:20

Да, я диагностировал, что датчик манометра поврежден и требует замены. Поверьте, это причина, по которой байт ошибки дает разные значения.

adamssson 11.08.2024 10:50
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
8
69
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Я бы читал по 1 байту за раз, пока не получите 0x7 и 0x5. Я не могу это проверить, так как у меня нет устройства, так что...

# Helper function that does not yield (return) until it has a good message
def next_msg(guage):
    while True:
        # Look for 7,5 combo
        data_len = 0
        page_number = 0
        while data_len != 7 and page_number != 5:
            data_len = page_number
            page_number = guage.read(1)[0]

        # If we are here, we have a valid 7,5 start of message
        payload = guage.read(data_len - 1)
        checksum = guage.read(1)[0]
        if (sum(payload) + page_number) % 255 == checksum:
            yield (payload[0], payload[1], payload[2] * 256 + payload[3], payload[4], payload[5])

for status,error,measurement,version,sensor_type in next_msg(guage):
    # Do something with the data...
    print(status, error, measurement, version, sensor_type)

Примечание: обратите внимание на отсутствие круглых скобок в операторах if и while, а также пробелов после ключевых слов. Это больше соответствует стилю форматирования Python.

** Отредактирован расчет контрольной суммы. Я не включил цифру «5» (номер страницы), но думаю, что она там должна быть.

** Обновлено: обновлено для цикла на основе ответа ОП.

# Ignoring status,error,version,sensor_type
for _,_,measurement,_,_ in next_msg(guage):
    measurement = 10.0**(measurement / 4000.0 - 12.5)
    print(measurement)

Спасибо @001 Я расширил код, как показано ниже:

import serial
import sys
import time

gauge = serial.Serial(
    port = '/dev/serial0',
    baudrate = 9600,
    parity=serial.PARITY_NONE,
    stopbits=serial.STOPBITS_ONE,
    bytesize=serial.EIGHTBITS,
    timeout = 1000
    )
    
    
def next_msg(gauge):
    while True:
        # Look for 7,5 combo
        msg_start = [None, None]
        while msg_start != [7, 5]:
            msg_start[0] = msg_start[1]
            msg_start[1] = gauge.read(1)[0]

        # If we are here, we have a valid 7,5 start of message
        payload = gauge.read(6)
        checksum = gauge.read(1)[0]
        if sum(payload) % 255 == checksum:
            yield (payload[0], payload[1], payload[2] * 256 + payload[3], payload[4], payload[5])


for measurement in next_msg(gauge): 
    payload = gauge.read(6)
    measurement = 10.0**((payload[4] * 256.0 + payload[5]) / 4000.0 - 12.5)
    print(measurement)

И получил результат:

3.162277660168379e-13
9.036494737223004
4.340102636447431e-10
3.7325015779571986
3.162277660168379e-13
4.487453899331332e-10
3.5686174928348136e-13
6.9742899688173485e-12
1.499684835502374e-08

Каждое число появляется с задержкой в ​​1 секунду друг от друга. Механизм — вакуумметр Inficon BPG400. Предполагается, что это значения давления в мбар, но это неверно. Сейчас работаем над атмосферным давлением, поэтому правильное значение должно быть около 1000 мбар. Если я не напутал в коде, я считаю, что это важно, или правильная настройка, или датчик могут быть повреждены.

payload = gauge.read(6) Это уже было сделано в next_msg, поэтому вам не нужно делать это снова в цикле for. Смотрите мой обновленный ответ.
001 07.08.2024 22:39

Если я сделаю это по-твоему, получу ошибку payload not defined. Дополнительная тема: иногда приходится ждать измерения до минуты. Придется отлаживать вакуумметр

adamssson 07.08.2024 23:55
Ответ принят как подходящий

Похоже, вы всегда ищете полезную нагрузку размером 9 байт, поэтому может быть полезно читать последовательное соединение по одному байту и сохранять его в деке Python .

Затем после каждого чтения проверяйте 9 байтов, хранящихся в деке, чтобы увидеть, существует ли правильная/ожидаемая последовательность байтов.

Данные, которые вы опубликовали в своем вопросе, не показались последовательными, поэтому я расширил их, используя следующий пример из руководства: код, кажется, дает правильный ответ.

У меня нет датчика, поэтому я заменил последовательный порт двоичным потоком ввода-вывода Python с данными примера. Это код, который я использовал для имитации данных, поступающих от датчика, и использования структуры для обработки входящих данных:

from collections import deque
from enum import Enum
import io


PKT_SIZE = 9

# gauge = serial.Serial(
#     port = '/dev/serial0',
#     baudrate = 9600,
#     parity=serial.PARITY_NONE,
#     stopbits=serial.STOPBITS_ONE,
#     bytesize=serial.EIGHTBITS,
#     timeout = 1000
#     )

gauge = io.BytesIO((
    b"\n\x8e"
    b"\x07\x05@\x08\xbcI(!\xfe"
    b"\x15A\x00\xec\x1c\x00\x8d\xbc\x87\x1c\x1c\xf4!\x00\n\x1c"
    b"\x07\x05\x00\x00\xec\x95\x94\xb5\xf8"
    b"\x144\x15\xec\x0e\x00\n\t"
    b"\x07\x05 \x88n8H$\xa8"
    b"\x1c\x00\xec\x03\x00\xca{"
    b"\x07\x15\x90\xb0\xb5?\n\xf3P"
    b"\x10\x00\xeb\xb6\x08\x95\xbc(\x1c\x1c\xb4\xfa\x00\n"
    b"\x07\x05\x00\x00\xf2\x30\x14\x0a\x45"
))


class Units(Enum):
    MBAR = 0
    TORR = 1
    PA = 2
    UNUSED = 3


class ErrorCodes(Enum):
    NO_ERROR = 0
    PIRANI_ADJ_ERROR = 5
    BA_ERROR = 8
    PIRANI_ERROR = 9


def checksum_ok(sensor_data):
    calc_sum = sum(list(sensor_data)[1:8]) & 0xff
    return calc_sum == sensor_data[8]


def process_read(sensor_data):
    # print(sensor_data)
    (
        data_len,
        page_num,
        status,
        error,
        hi_byte,
        lo_byte,
        version,
        sen_type,
        check_sum,
    ) = sensor_data
    if all(
        (
            data_len == 7,
            page_num == 5,
            sen_type == 10
        )
    ):
        if not checksum_ok(sensor_data):
            print("ERROR: Failed checksum")
            return None
        # print(f"{status:08b}")
        measurement = hi_byte << 8 | lo_byte
        units_status = get_units(status)
        error_status = get_error_status(error)
        if error_status != ErrorCodes.NO_ERROR:
            print(f"ERROR: {error_status}")
        gauge_value = calc_gauge_value(measurement, units_status)
        return gauge_value
    return None


def get_units(status_byte):
    return Units(status_byte >> 4 & 0x03)


def get_error_status(error_byte):
    return ErrorCodes(error_byte >> 4 & 0x0f)


def calc_gauge_value(measurement, units_status):
    if units_status == Units.MBAR:
        gauge_value = 10.0 ** (measurement / 4000.0 - 12.5)
    elif units_status == Units.TORR:
        gauge_value = 10.0 ** (measurement / 4000.0 - 12.625)
    elif units_status == Units.PA:
        gauge_value = 10.0 ** (measurement / 4000.0 - 10.5)
    else:
        print("ERROR: invalid units setting")
        gauge_value = None
    return gauge_value


def main():
    msg = deque([0] * PKT_SIZE, PKT_SIZE)
    while True:
        new_byte = gauge.read(1)
        msg.append(int.from_bytes(new_byte, "little", signed=False))
        # print(f'\t{msg}')
        reading = process_read(msg)
        if reading:
            print(f"\tSensor reading: {reading}")
        if sum(msg) == 0:
            break


if __name__ == "__main__":
    main()

Другие вопросы по теме