Я сделал считывание кода RPi по серийнику с датчика давления. Каким-то образом мой код читает необработанные данные с последовательного датчика, но мой цикл while
не работает, и я ставлю на пустой экран.
Что я делаю не так? Весь код ниже:
import serial
import sys
import time
gauge = serial.Serial(
port = '/dev/serial0',
baudrate = 9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout = 1
)
while(True):
while(True):
input_string = list(gauge.read(2)) #watch for two magic bytes
if (input_string[0] == 7 and input_string[1] == 5):
input_string += list(gauge.read(9-2)) #append the rest of the data
if ((sum(input_string[1:8]) % 256) == input_string[8]): #compute checksum
break
gauge_value = 10.0**((input_string[4] * 256.0 + input_string[5]) / 4000.0 - 12.5)
if (len(sys.argv) > 2):
print("mBar: {} Pa: {} Status: {} ({}) Error: {} ({})".format(gauge_value,gauge_value*100000.0,
input_string[2],bin(input_string[2]),
input_string[3],bin(input_string[3])))
else:
print("{},{}".format(time.time(), gauge_value))
Вот код, читающий необработанные данные:
import serial
import time
ser = serial.Serial(
port='/dev/serial0',#Replace ttyUSB0 with ttyAMA0 for Pi1,Pi2,Pi0
baudrate = 9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=1
)
while 1:
x=ser.readline()
print (x)
Необработанные данные, как показано ниже:
'\n'
b'\x8e\x07\x05@\x08\xbcI(!\xfe\x15A\x00\xec\x1c\x00\x8d\xbc\x87\x1c\x1c\xf4!\x00\n'
b'\x1c\x07\x05\x00\x00\xec\x95\x94\xb5\xf8\x144\x15\xec\x0e\x00\n'
b'\t\x07\x05 \x88n8H$\xa8\x1c\x00\xec\x03\x00\xca{\x07\x15\x90\xb0\xb5?\n'
b'\xf3P\x10\x00\xeb\xb6\x08\x95\xbc(\x1c\x1c\xb4\xfa\x00\n'
Думаю, я сделал что-то не так в цикле while
.
Для пояснения ниже формата сообщения из руководства к устройству:
@001 добавил скрин из мануала.
@001 Возможно, вы правы, код читает 2 байта в поисках значений 7 и 5 и переходит к контрольной сумме и контрольной сумме. Если что-то не так с трансером, шлейф может никогда не перейти на внешний if-else
. Как вы думаете, было бы лучше использовать readline
вместо read
. Я совсем новичок и не совсем знаю, как readline
поймать бит 4 и 5, которые дают мне интересующие меня данные.
Я бы не использовал readline
. При этом считываются все байты до символа новой строки - и нет никакой гарантии, что ваши данные будут иметь символ новой строки. Я читал по одному байту, чтобы найти начало сообщения. Например: прочитать 1 байт. Если это 7, прочитайте 1 байт. Если это 5, прочитайте 7 байт...
Значения в дампе данных не соответствуют ожиданиям от документа. За каждым 9-м байтом должен следовать 0x7
, но здесь это не так.
Я обнаружил строку input_string = list(gauge.read(2))
читать только 2 значения одновременно и останавливаться. Как заставить читать код до тех пор, пока не будут обнаружены правильные значения бит 0 и 1?
С примерами данных, которые вы опубликовали в вопросе, оператор break
никогда не выполняется, потому что рассчитанные контрольные суммы не соответствуют ожиданиям. Для пакетов, начинающихся с 7
и 5
, тип датчика также не соответствует ожидаемому. Если вы проигнорируете эти проблемы, то одно из показаний сообщит об BA error
. Я думаю, вам нужно упростить циклы while
, чтобы разделить различные методы отказа.
Да, я диагностировал, что датчик манометра поврежден и требует замены. Поверьте, это причина, по которой байт ошибки дает разные значения.
Я бы читал по 1 байту за раз, пока не получите 0x7
и 0x5
. Я не могу это проверить, так как у меня нет устройства, так что...
# Helper function that does not yield (return) until it has a good message
def next_msg(guage):
while True:
# Look for 7,5 combo
data_len = 0
page_number = 0
while data_len != 7 and page_number != 5:
data_len = page_number
page_number = guage.read(1)[0]
# If we are here, we have a valid 7,5 start of message
payload = guage.read(data_len - 1)
checksum = guage.read(1)[0]
if (sum(payload) + page_number) % 255 == checksum:
yield (payload[0], payload[1], payload[2] * 256 + payload[3], payload[4], payload[5])
for status,error,measurement,version,sensor_type in next_msg(guage):
# Do something with the data...
print(status, error, measurement, version, sensor_type)
Примечание: обратите внимание на отсутствие круглых скобок в операторах if
и while
, а также пробелов после ключевых слов. Это больше соответствует стилю форматирования Python.
** Отредактирован расчет контрольной суммы. Я не включил цифру «5» (номер страницы), но думаю, что она там должна быть.
** Обновлено: обновлено для цикла на основе ответа ОП.
# Ignoring status,error,version,sensor_type
for _,_,measurement,_,_ in next_msg(guage):
measurement = 10.0**(measurement / 4000.0 - 12.5)
print(measurement)
Спасибо @001 Я расширил код, как показано ниже:
import serial
import sys
import time
gauge = serial.Serial(
port = '/dev/serial0',
baudrate = 9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout = 1000
)
def next_msg(gauge):
while True:
# Look for 7,5 combo
msg_start = [None, None]
while msg_start != [7, 5]:
msg_start[0] = msg_start[1]
msg_start[1] = gauge.read(1)[0]
# If we are here, we have a valid 7,5 start of message
payload = gauge.read(6)
checksum = gauge.read(1)[0]
if sum(payload) % 255 == checksum:
yield (payload[0], payload[1], payload[2] * 256 + payload[3], payload[4], payload[5])
for measurement in next_msg(gauge):
payload = gauge.read(6)
measurement = 10.0**((payload[4] * 256.0 + payload[5]) / 4000.0 - 12.5)
print(measurement)
И получил результат:
3.162277660168379e-13
9.036494737223004
4.340102636447431e-10
3.7325015779571986
3.162277660168379e-13
4.487453899331332e-10
3.5686174928348136e-13
6.9742899688173485e-12
1.499684835502374e-08
Каждое число появляется с задержкой в 1 секунду друг от друга. Механизм — вакуумметр Inficon BPG400. Предполагается, что это значения давления в мбар, но это неверно. Сейчас работаем над атмосферным давлением, поэтому правильное значение должно быть около 1000 мбар. Если я не напутал в коде, я считаю, что это важно, или правильная настройка, или датчик могут быть повреждены.
payload = gauge.read(6)
Это уже было сделано в next_msg
, поэтому вам не нужно делать это снова в цикле for
. Смотрите мой обновленный ответ.
Если я сделаю это по-твоему, получу ошибку payload not defined
. Дополнительная тема: иногда приходится ждать измерения до минуты. Придется отлаживать вакуумметр
Похоже, вы всегда ищете полезную нагрузку размером 9 байт, поэтому может быть полезно читать последовательное соединение по одному байту и сохранять его в деке Python .
Затем после каждого чтения проверяйте 9 байтов, хранящихся в деке, чтобы увидеть, существует ли правильная/ожидаемая последовательность байтов.
Данные, которые вы опубликовали в своем вопросе, не показались последовательными, поэтому я расширил их, используя следующий пример из руководства: код, кажется, дает правильный ответ.
У меня нет датчика, поэтому я заменил последовательный порт двоичным потоком ввода-вывода Python с данными примера. Это код, который я использовал для имитации данных, поступающих от датчика, и использования структуры для обработки входящих данных:
from collections import deque
from enum import Enum
import io
PKT_SIZE = 9
# gauge = serial.Serial(
# port = '/dev/serial0',
# baudrate = 9600,
# parity=serial.PARITY_NONE,
# stopbits=serial.STOPBITS_ONE,
# bytesize=serial.EIGHTBITS,
# timeout = 1000
# )
gauge = io.BytesIO((
b"\n\x8e"
b"\x07\x05@\x08\xbcI(!\xfe"
b"\x15A\x00\xec\x1c\x00\x8d\xbc\x87\x1c\x1c\xf4!\x00\n\x1c"
b"\x07\x05\x00\x00\xec\x95\x94\xb5\xf8"
b"\x144\x15\xec\x0e\x00\n\t"
b"\x07\x05 \x88n8H$\xa8"
b"\x1c\x00\xec\x03\x00\xca{"
b"\x07\x15\x90\xb0\xb5?\n\xf3P"
b"\x10\x00\xeb\xb6\x08\x95\xbc(\x1c\x1c\xb4\xfa\x00\n"
b"\x07\x05\x00\x00\xf2\x30\x14\x0a\x45"
))
class Units(Enum):
MBAR = 0
TORR = 1
PA = 2
UNUSED = 3
class ErrorCodes(Enum):
NO_ERROR = 0
PIRANI_ADJ_ERROR = 5
BA_ERROR = 8
PIRANI_ERROR = 9
def checksum_ok(sensor_data):
calc_sum = sum(list(sensor_data)[1:8]) & 0xff
return calc_sum == sensor_data[8]
def process_read(sensor_data):
# print(sensor_data)
(
data_len,
page_num,
status,
error,
hi_byte,
lo_byte,
version,
sen_type,
check_sum,
) = sensor_data
if all(
(
data_len == 7,
page_num == 5,
sen_type == 10
)
):
if not checksum_ok(sensor_data):
print("ERROR: Failed checksum")
return None
# print(f"{status:08b}")
measurement = hi_byte << 8 | lo_byte
units_status = get_units(status)
error_status = get_error_status(error)
if error_status != ErrorCodes.NO_ERROR:
print(f"ERROR: {error_status}")
gauge_value = calc_gauge_value(measurement, units_status)
return gauge_value
return None
def get_units(status_byte):
return Units(status_byte >> 4 & 0x03)
def get_error_status(error_byte):
return ErrorCodes(error_byte >> 4 & 0x0f)
def calc_gauge_value(measurement, units_status):
if units_status == Units.MBAR:
gauge_value = 10.0 ** (measurement / 4000.0 - 12.5)
elif units_status == Units.TORR:
gauge_value = 10.0 ** (measurement / 4000.0 - 12.625)
elif units_status == Units.PA:
gauge_value = 10.0 ** (measurement / 4000.0 - 10.5)
else:
print("ERROR: invalid units setting")
gauge_value = None
return gauge_value
def main():
msg = deque([0] * PKT_SIZE, PKT_SIZE)
while True:
new_byte = gauge.read(1)
msg.append(int.from_bytes(new_byte, "little", signed=False))
# print(f'\t{msg}')
reading = process_read(msg)
if reading:
print(f"\tSensor reading: {reading}")
if sum(msg) == 0:
break
if __name__ == "__main__":
main()
Вероятно, вы получаете пустой экран, потому что внутренний цикл
while
никогда не достигаетbreak
. Какой формат сообщения указан в документации устройства?