Я работаю над проектом, который включает в себя Raspberry Pi 5 с графическим интерфейсом на основе Python и RP2040 (под управлением MicroPython), подключенный к набору датчиков I2C и 32 сервоприводам, управляемым двумя ШИМ-контроллерами PCA9685.
Моя цель — собирать данные с датчиков на частоте не менее 1Гц, а лучше 10Гц+. Данные состоят примерно из 256 переменных с плавающей запятой от этих датчиков, и мне нужно проверить целостность данных с помощью контрольной суммы.
Кроме того, мне нужно обновить 32 сервопривода на частоте 1–10 Гц, передавая им 4-значные целые числа. (0-4096)
В качестве доказательства концепции я попробовал использовать последовательную связь между Raspberry Pi 5 и Raspberry Pi Pico, при этом Pico был подключен к одной коммутационной плате PCA9685 I2C. Однако я столкнулся с проблемами синхронизации, отсутствующими/поврежденными данными и проблемами с правильной работой контрольной суммы.
Мне интересно, не слишком ли я амбициозен с этим решением или есть ли лучшие альтернативы. Любой совет будет принят с благодарностью. Заранее спасибо всем, кто это прочитает.
Вот код, который я использовал:
Код малины Pi 5:
import serial
import json
import hashlib
from random import randint
def compute_checksum(data):
return hashlib.md5(data.encode()).hexdigest()
def main():
s = serial.Serial(port = "/dev/ttyACM0", baudrate=9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=2) # reduced timeout
s.flush()
while True:
try:
data = [randint(0, 4096) for _ in range(16)] # replace with your actual data
json_data = json.dumps(data)
checksum = compute_checksum(json_data)
s.write((json_data + '|' + checksum + '\r').encode())
s.flush() # flush the buffer
if s.in_waiting > 0:
response = s.read_until().strip()
print('Received:', response.decode())
except Exception as e:
print(f"An error occurred: {e}")
break
if __name__ == "__main__":
main()
Код Raspberry Pi Pico:
import board
import busio
import ujson
import select
import sys
from adafruit_pca9685 import PCA9685
# Create the I2C bus interface.
i2c = busio.I2C(board.GP17, board.GP16) # Pi Pico RP2040
# Create a simple PCA9685 class instance.
pca = PCA9685(i2c)
# Set the PWM frequency to 60hz.
pca.frequency = 1000
def compute_checksum(data):
h = uhashlib.md5()
h.update(data)
return ubinascii.hexlify(h.digest()).decode()
# Set up the poll object
poll_obj = select.poll()
poll_obj.register(sys.stdin, select.POLLIN)
# Loop indefinitely
while True:
try:
# Wait for input on stdin
poll_results = poll_obj.poll(0) # the '0' is how long it will wait for message before looping again (in microseconds)
if poll_results:
# Read the data from stdin (read data coming from PC)
received_data = sys.stdin.readline().strip()
data, received_checksum = received_data.split('|')
try:
values = ujson.loads(data)
computed_checksum = compute_checksum(data)
if computed_checksum == received_checksum:
for i in range(16):
pca.channels[i].duty_cycle = values[i]
sys.stdout.write("received data: " + data + "\r")
else:
sys.stdout.write("checksum error\r")
except ValueError:
sys.stdout.write("invalid json: " + data + "\r")
except Exception as e:
print(f"An error occurred: {e}")
break
У меня есть два предложения:
Давайте рассмотрим каждую идею в отдельном разделе. Вы можете использовать одну или обе идеи, первая из которых проще.
Ваша скорость передачи данных в 9600 бит/с довольно медленная, и большинство современных устройств могут справиться с гораздо большим. Когда вы конвертируете биты в байты, это составляет всего 1200 байт/с. Учитывая, что вы потеряете пару бит для контроля четности и остановки/запуска для каждого байта, вы получите только около 1000 байт/с.
Я бы посоветовал вам попробовать:
s = serial.Serial(port = "/dev/ttyACM0", baudrate=115200 ...
В противном случае перейдите на уровень 96000, 57600 и т. д.
Во-вторых, ваша упаковка неэффективна. Когда вы конвертируете список в JSON и добавляете шестнадцатеричный дайджест, он занимает около 130 байт. Вы можете передавать это только 7 раз в секунду со скоростью 1000 байт/с. Я бы предложил гораздо более компактную рамку, а именно:
ff ff <16 off 2-byte shorts> CRC
где ff ff
— это заголовок, за которым следуют 16 значений в виде 2-байтовых беззнаковых шорт и 8-битной CRC. Это делает кадр размером 35 байт, или примерно на 75% меньше, чем у вас сейчас.
Вот код для упаковки списка значений во фрейм и распаковки фрейма в список значений с проверкой:
#!/usr/bin/env python3
import sys
import crc8
import struct
import random
import binascii
def Pack(values):
"""Packs the list of values into a packet with checksum"""
# The 32 byte payload of 16 values
payload = struct.pack('!16H',*values)
# Calculate CRC over payload
CRC = crc8.crc8()
CRC.update(payload)
# Assemble header + payload + CRC into a packet
packet = b'\xff\xff' + payload + CRC.digest()
return packet
def Unpack(packet):
"""Unpacks a packet and returns a list of values or empty list if CRC incorrect or other error"""
# Extract components of packet
hdr = packet[:2]
payload = packet[2:34]
packetCRC = packet[-1]
# Check length of packet is correct
if len(packet) != 35:
print(f'ERROR: Packet length incorrect, received {len(packet)} bytes, expected 35', file=sys.stderr)
return []
# Check header is as expected
if hdr != b'\xff\xff':
print(f'ERROR: Packet header incorrect, received {hdr}, expected <ff> <ff>', file=sys.stderr)
return []
# Check CRC
CRC = crc8.crc8()
CRC.update(payload)
if packetCRC != int.from_bytes(CRC.digest()):
print(f'ERROR: Packet CRC incorrect', file=sys.stderr)
return []
# Everything ok, unpack and return list of 16 values
values = struct.unpack('!16H',payload)
return values
if __name__ == "__main__":
# Generate list of random values
values = [ random.randint(0,65535) for i in range(16)]
print('Initial list: ', *values)
# Assemble values into a packet
packet = Pack(values)
print("<<<UNPACK>>>\n\n")
# Unpack values from packet
result = Unpack(packet)
print('Unpacked list: ', *values)
Ваш код необходимо будет изменить так, чтобы вы читали/записывали ровно 35 байт, а не искали символы новой строки и строки JSON.
Я поместил туда заголовок, чтобы вы могли повторно синхронизировать его, если возникнут какие-либо ошибки. По сути, если вы позвоните Unpack()
и получите пустой список, вы потеряли синхронизацию. В этом случае вам нужно читать до тех пор, пока вы не получите ff
дважды подряд, а затем прочитать следующие 33 байта (полезная нагрузка плюс CRC) и передать их (плюс заголовок ff ff
) в Unpack()
для повторной синхронизации.
Обратите внимание, что значения ваших данных могут быть представлены в 12 битах, а не в 16. Таким образом, вместо того, чтобы упаковывать каждую выборку в 16 бит по 2 байта/значение, вы можете упаковать 2 выборки в 24 бита по 1,5 байта/значение. Однако в настоящее время, похоже, не стоит затрачивать усилия на такое смещение битов.
Несмотря на то, что вы используете библиотеку
serial
, данные физически передаются по USB-кабелю, верно? USB имеет контрольные суммы для каждого пакета, встроенные в протокол на низком уровне, поэтому в целом он должен быть довольно надежным, даже если вы отправляете через него много данных.