Я хочу использовать asyncua для подписки на некоторые переменные. Я использую поток, потому что мне нужно время от времени динамически обновлять список подписки. Но когда я получил уведомление об изменении данных и попытался прочитать имя просмотра узла, это задержалось примерно на 2 секунды. Как это исправить? Вот мой код.
import asyncio
from asyncua import Client, Node
from asyncua.common.subscription import DataChangeNotif, SubHandler
import pymssql
from threading import Thread
ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'
class MyHandler(SubHandler):
n = 1
def __init__(self, variables_len=0):
self._queue = asyncio.Queue()
self.variables_len = variables_len
conn = pymssql.connect(server='(local)', user='sa',
password='sa', database='AdventureWorks2005')
self.cursor = conn.cursor()
def datachange_notification(self, node: Node, value, data: DataChangeNotif) -> None:
if self.n > self.variables_len:
self._queue.put_nowait([node, value, data])
print(f'Data change notification was received and queued.')
self.n = self.variables_len+1
self.n += 1
async def process(self) -> None:
try:
while True:
# Get data in a queue.
[node, value, data] = self._queue.get_nowait()
# *** Write your processing code ***
print(await node.read_browse_name()) #delay here
self.cursor.execute('select TOP(1) * from tool_list') #test code
row = self.cursor.fetchall()
print(row)
except asyncio.QueueEmpty:
pass
async def main() -> None:
async with Client(url=ENDPOINT) as client:
# Get a variable node.
idx = await client.get_namespace_index(NAMESPACE)
obj = await client.get_objects_node().get_child([f'{idx}:MyObject'])
variables = await obj.get_variables()
# Subscribe data change.
handler = MyHandler(variables_len=len(variables))
subscription = await client.create_subscription(period=0, handler=handler)
handle = await subscription.subscribe_data_change(variables)
# Process data change every 100ms
async def insert_to_sql():
while True:
await handler.process()
#service_level = await client.get_node('i=2267').get_value()
await asyncio.sleep(0.1)
def between_callback():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(some_callback())
loop.close()
_thread = Thread(target=between_callback)
_thread.daemon = True
_thread.start()
while True:
# Get a variable node.
idx = await client.get_namespace_index(NAMESPACE)
obj = await client.get_objects_node().get_child([f'{idx}:MyObject'])
variables_new = await obj.get_variables()
if variables != variables_new:
handler.variables_len = len(variables_new)
handler.n = 1
await subscription.unsubscribe(handle)
handle = await subscription.subscribe_data_change(variables_new)
variables = variables_new
print('update done')
await asyncio.sleep(100)
if __name__ == '__main__':
asyncio.run(main())
Я попытался изменить **print(await node.read_browse_name()) ** на функцию datachange_notification, но получил тот же результат.
Запустить собственную тему сложно. Я бы попробовал использовать вместо своей темы to_thread