Я столкнулся с проблемой со скриптом Python, который работает по-разному при запуске в Windows 11 (через VS) и при запуске на Raspberry Pi4.
Сценарий был модифицирован на основе сценария CLi, который, как выяснилось, взаимодействует с оборудованием Victron Energy.
Благодаря @ukbaz скрипт запустился и вернул данные с подключенного по Bluetooth устройства Victron в моем фургоне. В скрипте использовалась строка:
loop.run_forever()
Большой! на обеих машинах работает отлично. Я не хочу работать вечно, просто на 5 секунд, достаточно долго, чтобы прочитать устройство BT и записать в текстовый файл для использования в будущем.
После внесения изменений:
the_end = time.time() + 5
while time.time() < the_end:
loop.stop()
loop.run_forever()
Это будет нормально работать в Windows, читать устройство BT, записывать в файл. На Rpi сценарий выполняется без ошибок, но не читает BT и не записывает в файл.
Вот полный скрипт
from __future__ import annotations
import inspect
import json
import logging
import time
from enum import Enum
from typing import Set
import asyncio
from typing import List, Tuple
from threading import Thread
from bleak import BleakScanner
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from victron_ble.devices import Device, DeviceData, detect_device_type
from victron_ble.exceptions import AdvertisementKeyMissingError, UnknownDeviceError
logger = logging.getLogger(__name__)
class BaseScanner:
def __init__(self) -> None:
"""Initialize the scanner."""
self._scanner: BleakScanner = BleakScanner(
detection_callback=self._detection_callback
)
self._seen_data: Set[bytes] = set()
def _detection_callback(self, device: BLEDevice, advertisement: AdvertisementData):
# Filter for Victron devices and instant readout advertisements
data = advertisement.manufacturer_data.get(0x02E1)
if not data or not data.startswith(b"\x10") or data in self._seen_data:
return
# De-duplicate advertisements
if len(self._seen_data) > 1000:
self._seen_data = set()
self._seen_data.add(data)
self.callback(device, data)
def callback(self, device: BLEDevice, data: bytes):
raise NotImplementedError()
async def start(self):
await self._scanner.start()
async def stop(self):
await self._scanner.stop()
# An ugly hack to print a class as JSON
class DeviceDataEncoder(json.JSONEncoder):
def default(self, obj):
if issubclass(obj.__class__, DeviceData):
data = {}
for name, method in inspect.getmembers(obj, predicate=inspect.ismethod):
if name.startswith("get_"):
value = method()
if isinstance(value, Enum):
value = value.name.lower()
if value is not None:
data[name[4:]] = value
return data
class Scanner(BaseScanner):
def __init__(self, device_keys: dict[str, str] = {}):
super().__init__()
self._device_keys = {k.lower(): v for k, v in device_keys.items()}
self._known_devices: dict[str, Device] = {}
async def start(self):
logger.info(f"Reading data for {list(self._device_keys.keys())}")
await super().start()
def get_device(self, ble_device: BLEDevice, raw_data: bytes) -> Device:
address = ble_device.address.lower()
if address not in self._known_devices:
advertisement_key = self.load_key(address)
device_klass = detect_device_type(raw_data)
if not device_klass:
raise UnknownDeviceError(
f"Could not identify device type for {ble_device}"
)
self._known_devices[address] = device_klass(advertisement_key)
return self._known_devices[address]
def load_key(self, address: str) -> str:
try:
return self._device_keys[address]
except KeyError:
raise AdvertisementKeyMissingError(f"No key available for {address}")
def callback(self, ble_device: BLEDevice, raw_data: bytes):
logger.debug(
f"Received data from {ble_device.address.lower()}: {raw_data.hex()}"
)
try:
device = self.get_device(ble_device, raw_data)
except AdvertisementKeyMissingError:
return
except UnknownDeviceError as e:
logger.error(e)
return
parsed = device.parse(raw_data)
blob = {
"name": ble_device.name,
"address": ble_device.address,
"rssi": ble_device.rssi,
"payload": parsed,
}
print(json.dumps(blob, cls=DeviceDataEncoder, indent=1))
ve_string = json.dumps(blob, cls=DeviceDataEncoder, indent=1)
print(ve_string)
#MAC_filename = "this_device" + ".txt"
#print(f"MAC filename: {MAC_filename}")
this_file = open("this_device.txt", "w")
this_file.write(ve_string)
this_file.close()
print("file written")
time.sleep(3)
class DiscoveryScanner(BaseScanner):
def __init__(self) -> None:
super().__init__()
self._seen_devices: Set[str] = set()
def callback(self, device: BLEDevice, advertisement: bytes):
if device.address not in self._seen_devices:
logger.info(f"{device}")
self._seen_devices.add(device.address)
class DebugScanner(BaseScanner):
def __init__(self, address: str):
super().__init__()
self.address = address
async def start(self):
logger.info(f"Dumping advertisements from {self.address}")
await super().start()
def callback(self, device: BLEDevice, data: bytes):
if device.address.lower() == self.address.lower():
logger.info(f"{time.time():<24}: {data.hex()}")
def my_scan(device_keys: List[Tuple[str, str]]):
loop = asyncio.get_event_loop()
async def scan(keys):
scanner = Scanner(keys)
await scanner.start()
asyncio.ensure_future(scan({k: v for k, v in device_keys}))
the_end = time.time() + 5
while time.time() < the_end:
loop.stop()
loop.run_forever()
if __name__ == '__main__':
my_scan([("d5:55:aa:4d:99:33","149c3c2865054b71962dcb06866524a9")])
Я пробовал переехать из
run_forever
к
run_until_complete
Этот метод полностью провалился на Rpi :( поэтому вернулся обратно
Почему сценарий будет вести себя по-разному на двух машинах и как я могу его реплицировать на RPi? Большое спасибо за любую помощь
Спасибо, wjandrea, все еще нахожусь на ногах, задавая вопросы, но прислушаюсь к вашему совету :)






Я подозреваю, что вам нужно выйти только после того, как файл будет записан.
Может быть, вы могли бы использовать возможность событий asyncio? https://docs.python.org/3/library/asyncio-sync.html#event
Создайте событие в глобальной области, поместив следующее в начало файла после импорта.
file_written_event = asyncio.Event()
Затем добавьте file_written_event.set() после записи файла. например
this_file = open("this_device.txt", "w")
this_file.write(ve_string)
this_file.close()
print("file written")
file_written_event.set()
Больше всего изменится нижняя часть вашего файла. Он делает в основном то же самое, но перед выходом имеет цикл while, ожидающий события.
async def my_scan(device_keys: List[Tuple[str, str]]):
async def scan(keys):
scanner = Scanner(keys)
await scanner.start()
asyncio.ensure_future(scan({k: v for k, v in device_keys}))
while not file_written_event.is_set():
await asyncio.sleep(0.1)
if __name__ == '__main__':
logging.basicConfig()
logger.setLevel(logging.DEBUG)
asyncio.run(my_scan([("d5:55:aa:4d:99:33","149c3c2865054b71962dcb06866524a9")]))
Спасибо @ukBaz. Я реализовал этот код. Возвращает ошибку «await asyncio.sleep(0.1)» как «SyntaxError: 'await' вне асинхронной функции»?
Хм? В коде, который я разместил, sleep определенно находится внутри функции async. Вы забыли изменить функцию my_scan, чтобы сделать это async def my_scan?
дважды проверил изменения и пропустил асинхронное определение my_scan! Спасибо :)
Это не в моей рубке, но просто хотел сказать, добро пожаловать в ТАК! Посмотрите тур . Важно написать заголовок, описывающий реальную проблему, с которой вы столкнулись, например: «Почему [библиотека] не может прочитать Bluetooth на моем RPi?»; советы по этому поводу можно найти в разделе Как спросить . Но тогда вы, вероятно, могли бы добиться некоторого прогресса в этом самостоятельно, выполнив дополнительную отладку, например добавив ведение журнала. Больше техник в ответе SlugFiller здесь . См. также минимальный воспроизводимый пример.