Я попробовал следующий код и не получаю никаких уведомлений при монтировании и размонтировании разделов. Я что-то пропустил ?
Я использую библиотеку pyinotify
для просмотра изменений в файле /proc/mounts
... но кажется, что мой обработчик никогда не вызывается...
import pyinotify, time
# Define a callback function to handle file system events
class EventHandler(pyinotify.ProcessEvent):
def process_IN_MODIFY(self, event):
print(f"Change detected in {event.pathname}!")
# Create an inotify instance
wm = pyinotify.WatchManager()
# Create an event handler instance
handler = EventHandler()
# Create an inotify notifier and pass the function as the event handler
notifier = pyinotify.ThreadedNotifier(wm, handler)
# Watch for changes to /proc/mounts
wm.add_watch("/proc/mounts", pyinotify.IN_MODIFY)
# Start the notifier loop
print("Watching for mount changes...")
notifier.start()
try:
while True:
time.sleep(1)
except: pass
notifier.stop()
print('Done!')
Я только что проверил, что dbus-monitor --system
действительно перечисляет события монтирования/размонтирования - так что это будет работать -
Мне нужно учиться dbus
так как я с этим никогда не работала...
@jsbueno, когда я собираю и размонтирую раздел, поступает много сигналов... Я не знаю, какой из них мне следует слушать, чтобы получать уведомление только при монтировании/размонтировании, а не при других системных событиях...
Вот как я решил проблему. С помощью этого кода я получаю одно уведомление каждый раз, когда блочное устройство (жесткий диск USB) подключается или отключается, а также когда раздел монтируется или отключается. Я не уверен на 100%, что обратный вызов не срабатывает и в других случаях, но если это так, то фильтр сигнала dbus можно настроить...
import time, threading
class FastTimer(threading.Thread):
def __init__(self, interval, callback, args=None, kwargs=None, shots=1):
super().__init__()
self.daemon = True
self.interval = interval
self.callback = callback
self.args = args if args is not None else []
self.kwargs = kwargs if kwargs is not None else {}
self.last_shot = shots
self.trigger = threading.Event()
self.done = threading.Event()
self.terminated = threading.Event()
self.start()
def Terminate(self):
if self.is_alive():
self.terminated.set()
self.done.set()
self.trigger.set()
self.join()
def Gooo(self):
if self.is_alive():
self.trigger.set()
def Abort(self):
if self.is_alive():
self.done.set()
self.trigger.clear()
def Reset(self):
if self.is_alive():
self.done.set()
def Mark(self):
if self.is_alive():
if self.trigger.is_set(): self.done.set()
else: self.trigger.set()
def run(self):
while not self.terminated.is_set():
if not self.trigger.is_set():
shot = 0; self.done.clear()
self.trigger.wait()
Reseted = self.done.wait(self.interval)
self.done.clear()
if self.terminated.is_set(): break
if not Reseted:
self.callback(*self.args, **self.kwargs)
if self.last_shot > 0:
shot += 1
if shot == self.last_shot:
self.trigger.clear()
print('Timer terminated.')
class BlockDevMonitor(threading.Thread):
def __init__(self, callback, args=None, kwargs=None):
import time
super().__init__()
self.loop = None
self.timer = FastTimer(1, callback, args, kwargs)
self.start()
time.sleep(0.3)
def Terminate(self):
if self.is_alive():
if self.loop != None: self.loop.quit()
self.join()
self.timer.Terminate()
def run(self):
import dbus
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GLib
def sigUnitChg(*args):
if (len(args) >= 1) and ( args[0].startswith('blockdev@') or ('-block-' in args[0]) ): self.timer.Mark()
DBusGMainLoop(set_as_default=True)
self.loop = GLib.MainLoop()
bus = dbus.SystemBus()
bus.add_signal_receiver(sigUnitChg, dbus_interface='org.freedesktop.systemd1.Manager', signal_name='UnitNew', path='/org/freedesktop/systemd1')
bus.add_signal_receiver(sigUnitChg, dbus_interface='org.freedesktop.systemd1.Manager', signal_name='UnitRemoved', path='/org/freedesktop/systemd1')
print('DevMonitor started.')
self.loop.run()
print('DevMonitor stopped.')
#----------------------------------------------
def OnUpdate():
print('Device Manager updated !')
DevMon = BlockDevMonitor(OnUpdate)
try:
i = 0
while True:
time.sleep(1);
i += 1; print(i)
except:
pass
DevMon.Terminate()
Извините, сейчас у меня нет времени на тщательное изучение этого вопроса, но вам, скорее всего, придется использовать для этого DBUS: dbus.freedesktop.org/doc/dbus-tutorial.html