Скрипт Python, использующий Bluetooth, работающий в Windows 11, против Raspberry Pi4

Я столкнулся с проблемой со скриптом Python, который работает по-разному при запуске в Windows 11 (через VS) и при запуске на Raspberry Pi4.

Сценарий был модифицирован на основе сценария CLi, который, как выяснилось, взаимодействует с оборудованием Victron Energy.

Благодаря @ukbaz скрипт запустился и вернул данные с подключенного по Bluetooth устройства Victron в моем фургоне. В скрипте использовалась строка:

loop.run_forever()

Большой! на обеих машинах работает отлично. Я не хочу работать вечно, просто на 5 секунд, достаточно долго, чтобы прочитать устройство BT и записать в текстовый файл для использования в будущем.

После внесения изменений:

 the_end = time.time() + 5
    while time.time() < the_end:

        loop.stop()
        loop.run_forever()

Это будет нормально работать в Windows, читать устройство BT, записывать в файл. На Rpi сценарий выполняется без ошибок, но не читает BT и не записывает в файл.

Вот полный скрипт

from __future__ import annotations

import inspect
import json
import logging
import time
from enum import Enum
from typing import Set
import asyncio
from typing import List, Tuple
from threading import Thread

from bleak import BleakScanner
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData

from victron_ble.devices import Device, DeviceData, detect_device_type
from victron_ble.exceptions import AdvertisementKeyMissingError, UnknownDeviceError

logger = logging.getLogger(__name__)


class BaseScanner:
    def __init__(self) -> None:
        """Initialize the scanner."""
        self._scanner: BleakScanner = BleakScanner(
            detection_callback=self._detection_callback
        )
        self._seen_data: Set[bytes] = set()

    def _detection_callback(self, device: BLEDevice, advertisement: AdvertisementData):
        # Filter for Victron devices and instant readout advertisements
        data = advertisement.manufacturer_data.get(0x02E1)
        if not data or not data.startswith(b"\x10") or data in self._seen_data:
            return

        # De-duplicate advertisements
        if len(self._seen_data) > 1000:
            self._seen_data = set()
        self._seen_data.add(data)

        self.callback(device, data)

    def callback(self, device: BLEDevice, data: bytes):
        raise NotImplementedError()

    async def start(self):
        await self._scanner.start()

    async def stop(self):
        await self._scanner.stop()


# An ugly hack to print a class as JSON
class DeviceDataEncoder(json.JSONEncoder):
    def default(self, obj):
        if issubclass(obj.__class__, DeviceData):
            data = {}
            for name, method in inspect.getmembers(obj, predicate=inspect.ismethod):
                if name.startswith("get_"):
                    value = method()
                    if isinstance(value, Enum):
                        value = value.name.lower()
                    if value is not None:
                        data[name[4:]] = value
            return data


class Scanner(BaseScanner):
    def __init__(self, device_keys: dict[str, str] = {}):
        super().__init__()
        self._device_keys = {k.lower(): v for k, v in device_keys.items()}
        self._known_devices: dict[str, Device] = {}

    async def start(self):
        logger.info(f"Reading data for {list(self._device_keys.keys())}")
        await super().start()

    def get_device(self, ble_device: BLEDevice, raw_data: bytes) -> Device:
        address = ble_device.address.lower()
        if address not in self._known_devices:
            advertisement_key = self.load_key(address)

            device_klass = detect_device_type(raw_data)
            if not device_klass:
                raise UnknownDeviceError(
                    f"Could not identify device type for {ble_device}"
                )

            self._known_devices[address] = device_klass(advertisement_key)
        return self._known_devices[address]

    def load_key(self, address: str) -> str:
        try:
            return self._device_keys[address]
        except KeyError:
            raise AdvertisementKeyMissingError(f"No key available for {address}")

    def callback(self, ble_device: BLEDevice, raw_data: bytes):
        logger.debug(
            f"Received data from {ble_device.address.lower()}: {raw_data.hex()}"
        )
        try:
            device = self.get_device(ble_device, raw_data)
        except AdvertisementKeyMissingError:
            return
        except UnknownDeviceError as e:
            logger.error(e)
            return
        parsed = device.parse(raw_data)

        blob = {
            "name": ble_device.name,
            "address": ble_device.address,
            "rssi": ble_device.rssi,
            "payload": parsed,
        }
        print(json.dumps(blob, cls=DeviceDataEncoder, indent=1))
        ve_string = json.dumps(blob, cls=DeviceDataEncoder, indent=1)
        print(ve_string)
        #MAC_filename = "this_device" + ".txt"
        #print(f"MAC filename: {MAC_filename}")

        this_file = open("this_device.txt", "w")
        this_file.write(ve_string)
        this_file.close()

        print("file written")
        time.sleep(3)


class DiscoveryScanner(BaseScanner):
    def __init__(self) -> None:
        super().__init__()
        self._seen_devices: Set[str] = set()

    def callback(self, device: BLEDevice, advertisement: bytes):
        if device.address not in self._seen_devices:
            logger.info(f"{device}")
            self._seen_devices.add(device.address)


class DebugScanner(BaseScanner):
    def __init__(self, address: str):
        super().__init__()
        self.address = address

    async def start(self):
        logger.info(f"Dumping advertisements from {self.address}")
        await super().start()

    def callback(self, device: BLEDevice, data: bytes):
        if device.address.lower() == self.address.lower():
            logger.info(f"{time.time():<24}: {data.hex()}")



def my_scan(device_keys: List[Tuple[str, str]]):
    
    loop = asyncio.get_event_loop()

    async def scan(keys):
        scanner = Scanner(keys)
        await scanner.start()

    asyncio.ensure_future(scan({k: v for k, v in device_keys}))

    the_end = time.time() + 5
    while time.time() < the_end:

        loop.stop()
        loop.run_forever()

if __name__ == '__main__':

    my_scan([("d5:55:aa:4d:99:33","149c3c2865054b71962dcb06866524a9")])


Я пробовал переехать из

run_forever 

к

run_until_complete

Этот метод полностью провалился на Rpi :( поэтому вернулся обратно

Почему сценарий будет вести себя по-разному на двух машинах и как я могу его реплицировать на RPi? Большое спасибо за любую помощь

Это не в моей рубке, но просто хотел сказать, добро пожаловать в ТАК! Посмотрите тур . Важно написать заголовок, описывающий реальную проблему, с которой вы столкнулись, например: «Почему [библиотека] не может прочитать Bluetooth на моем RPi?»; советы по этому поводу можно найти в разделе Как спросить . Но тогда вы, вероятно, могли бы добиться некоторого прогресса в этом самостоятельно, выполнив дополнительную отладку, например добавив ведение журнала. Больше техник в ответе SlugFiller здесь . См. также минимальный воспроизводимый пример.

wjandrea 29.03.2024 18:58

Спасибо, wjandrea, все еще нахожусь на ногах, задавая вопросы, но прислушаюсь к вашему совету :)

IronDad75 30.03.2024 14:36
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
2
2
84
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я подозреваю, что вам нужно выйти только после того, как файл будет записан.

Может быть, вы могли бы использовать возможность событий asyncio? https://docs.python.org/3/library/asyncio-sync.html#event

Создайте событие в глобальной области, поместив следующее в начало файла после импорта.

file_written_event = asyncio.Event()

Затем добавьте file_written_event.set() после записи файла. например

        this_file = open("this_device.txt", "w")
        this_file.write(ve_string)
        this_file.close()

        print("file written")
        file_written_event.set()

Больше всего изменится нижняя часть вашего файла. Он делает в основном то же самое, но перед выходом имеет цикл while, ожидающий события.

async def my_scan(device_keys: List[Tuple[str, str]]):

    async def scan(keys):
        scanner = Scanner(keys)
        await scanner.start()

    asyncio.ensure_future(scan({k: v for k, v in device_keys}))
    while not file_written_event.is_set():
        await asyncio.sleep(0.1)


if __name__ == '__main__':
    logging.basicConfig()
    logger.setLevel(logging.DEBUG)
    asyncio.run(my_scan([("d5:55:aa:4d:99:33","149c3c2865054b71962dcb06866524a9")]))

Спасибо @ukBaz. Я реализовал этот код. Возвращает ошибку «await asyncio.sleep(0.1)» как «SyntaxError: 'await' вне асинхронной функции»?

IronDad75 01.04.2024 10:59

Хм? В коде, который я разместил, sleep определенно находится внутри функции async. Вы забыли изменить функцию my_scan, чтобы сделать это async def my_scan?

ukBaz 01.04.2024 12:59

дважды проверил изменения и пропустил асинхронное определение my_scan! Спасибо :)

IronDad75 03.04.2024 19:02

Другие вопросы по теме