Как я могу осуществлять двунаправленную связь через USB с контрольной суммой для Raspberry Pi 5 и Raspberry Pi Pico (последовательный порт, UART, другое?)

Я работаю над проектом, который включает в себя Raspberry Pi 5 с графическим интерфейсом на основе Python и RP2040 (под управлением MicroPython), подключенный к набору датчиков I2C и 32 сервоприводам, управляемым двумя ШИМ-контроллерами PCA9685.

Моя цель — собирать данные с датчиков на частоте не менее 1Гц, а лучше 10Гц+. Данные состоят примерно из 256 переменных с плавающей запятой от этих датчиков, и мне нужно проверить целостность данных с помощью контрольной суммы.

Кроме того, мне нужно обновить 32 сервопривода на частоте 1–10 Гц, передавая им 4-значные целые числа. (0-4096)

В качестве доказательства концепции я попробовал использовать последовательную связь между Raspberry Pi 5 и Raspberry Pi Pico, при этом Pico был подключен к одной коммутационной плате PCA9685 I2C. Однако я столкнулся с проблемами синхронизации, отсутствующими/поврежденными данными и проблемами с правильной работой контрольной суммы.

Мне интересно, не слишком ли я амбициозен с этим решением или есть ли лучшие альтернативы. Любой совет будет принят с благодарностью. Заранее спасибо всем, кто это прочитает.

Вот код, который я использовал:

Код малины Pi 5:

import serial
import json
import hashlib
from random import randint

def compute_checksum(data):
    return hashlib.md5(data.encode()).hexdigest()

def main():
    s = serial.Serial(port = "/dev/ttyACM0", baudrate=9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=2)  # reduced timeout
    s.flush()

    while True:
        try:
            data = [randint(0, 4096) for _ in range(16)]  # replace with your actual data
            json_data = json.dumps(data)
            checksum = compute_checksum(json_data)
            s.write((json_data + '|' + checksum + '\r').encode())
            s.flush()  # flush the buffer
            if s.in_waiting > 0:
                response = s.read_until().strip()
                print('Received:', response.decode())
        except Exception as e:
            print(f"An error occurred: {e}")
            break

if __name__ == "__main__":
    main()

Код Raspberry Pi Pico:

import board
import busio
import ujson
import select
import sys
from adafruit_pca9685 import PCA9685

# Create the I2C bus interface.
i2c = busio.I2C(board.GP17, board.GP16)    # Pi Pico RP2040

# Create a simple PCA9685 class instance.
pca = PCA9685(i2c)

# Set the PWM frequency to 60hz.
pca.frequency = 1000

def compute_checksum(data):
    h = uhashlib.md5()
    h.update(data)
    return ubinascii.hexlify(h.digest()).decode()

# Set up the poll object
poll_obj = select.poll()
poll_obj.register(sys.stdin, select.POLLIN)

# Loop indefinitely
while True:
    try:
        # Wait for input on stdin
        poll_results = poll_obj.poll(0)  # the '0' is how long it will wait for message before looping again (in microseconds)
        if poll_results:
            # Read the data from stdin (read data coming from PC)
            received_data = sys.stdin.readline().strip()
            data, received_checksum = received_data.split('|')
            try:
                values = ujson.loads(data)
                computed_checksum = compute_checksum(data)
                if computed_checksum == received_checksum:
                    for i in range(16):
                        pca.channels[i].duty_cycle = values[i]
                    sys.stdout.write("received data: " + data + "\r")
                else:
                    sys.stdout.write("checksum error\r")
            except ValueError:
                sys.stdout.write("invalid json: " + data + "\r")
    except Exception as e:
        print(f"An error occurred: {e}")
        break

Несмотря на то, что вы используете библиотеку serial, данные физически передаются по USB-кабелю, верно? USB имеет контрольные суммы для каждого пакета, встроенные в протокол на низком уровне, поэтому в целом он должен быть довольно надежным, даже если вы отправляете через него много данных.

David Grayson 17.05.2024 00:42
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
1
102
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

У меня есть два предложения:

  • передавать данные быстрее, увеличивая скорость передачи данных или
  • передавать меньше данных за счет более эффективной упаковки

Давайте рассмотрим каждую идею в отдельном разделе. Вы можете использовать одну или обе идеи, первая из которых проще.


Ваша скорость передачи данных в 9600 бит/с довольно медленная, и большинство современных устройств могут справиться с гораздо большим. Когда вы конвертируете биты в байты, это составляет всего 1200 байт/с. Учитывая, что вы потеряете пару бит для контроля четности и остановки/запуска для каждого байта, вы получите только около 1000 байт/с.

Я бы посоветовал вам попробовать:

s = serial.Serial(port = "/dev/ttyACM0", baudrate=115200 ...

В противном случае перейдите на уровень 96000, 57600 и т. д.


Во-вторых, ваша упаковка неэффективна. Когда вы конвертируете список в JSON и добавляете шестнадцатеричный дайджест, он занимает около 130 байт. Вы можете передавать это только 7 раз в секунду со скоростью 1000 байт/с. Я бы предложил гораздо более компактную рамку, а именно:

ff ff <16 off 2-byte shorts> CRC

где ff ff — это заголовок, за которым следуют 16 значений в виде 2-байтовых беззнаковых шорт и 8-битной CRC. Это делает кадр размером 35 байт, или примерно на 75% меньше, чем у вас сейчас.

Вот код для упаковки списка значений во фрейм и распаковки фрейма в список значений с проверкой:

#!/usr/bin/env python3

import sys
import crc8
import struct
import random
import binascii


def Pack(values):
   """Packs the list of values into a packet with checksum"""

   # The 32 byte payload of 16 values
   payload = struct.pack('!16H',*values)

   # Calculate CRC over payload
   CRC = crc8.crc8()
   CRC.update(payload)

   # Assemble header + payload + CRC into a packet
   packet = b'\xff\xff' + payload + CRC.digest()
   return packet

def Unpack(packet):
   """Unpacks a packet and returns a list of values or empty list if CRC incorrect or other error"""

   # Extract components of packet
   hdr = packet[:2]
   payload = packet[2:34]
   packetCRC = packet[-1]

   # Check length of packet is correct
   if len(packet) != 35:
      print(f'ERROR: Packet length incorrect, received {len(packet)} bytes, expected 35', file=sys.stderr)
      return []

   # Check header is as expected
   if hdr != b'\xff\xff':
      print(f'ERROR: Packet header incorrect, received {hdr}, expected <ff> <ff>', file=sys.stderr)
      return []

   # Check CRC
   CRC = crc8.crc8()
   CRC.update(payload)
   if packetCRC != int.from_bytes(CRC.digest()):
      print(f'ERROR: Packet CRC incorrect', file=sys.stderr)
      return []

   # Everything ok, unpack and return list of 16 values
   values = struct.unpack('!16H',payload)
   return values

if __name__ == "__main__":
   # Generate list of random values
   values = [ random.randint(0,65535) for i in range(16)]
   print('Initial list: ', *values)

   # Assemble values into a packet 
   packet = Pack(values)

   print("<<<UNPACK>>>\n\n")
   # Unpack values from packet
   result = Unpack(packet)
   print('Unpacked list: ', *values)

Ваш код необходимо будет изменить так, чтобы вы читали/записывали ровно 35 байт, а не искали символы новой строки и строки JSON.

Я поместил туда заголовок, чтобы вы могли повторно синхронизировать его, если возникнут какие-либо ошибки. По сути, если вы позвоните Unpack() и получите пустой список, вы потеряли синхронизацию. В этом случае вам нужно читать до тех пор, пока вы не получите ff дважды подряд, а затем прочитать следующие 33 байта (полезная нагрузка плюс CRC) и передать их (плюс заголовок ff ff) в Unpack() для повторной синхронизации.


Обратите внимание, что значения ваших данных могут быть представлены в 12 битах, а не в 16. Таким образом, вместо того, чтобы упаковывать каждую выборку в 16 бит по 2 байта/значение, вы можете упаковать 2 выборки в 24 бита по 1,5 байта/значение. Однако в настоящее время, похоже, не стоит затрачивать усилия на такое смещение битов.

Другие вопросы по теме