Как получить уведомление, когда раздел смонтирован или размонтирован в ОС RPi и Python?

Я попробовал следующий код и не получаю никаких уведомлений при монтировании и размонтировании разделов. Я что-то пропустил ?

Я использую библиотеку pyinotify для просмотра изменений в файле /proc/mounts... но кажется, что мой обработчик никогда не вызывается...

import pyinotify, time

# Define a callback function to handle file system events
class EventHandler(pyinotify.ProcessEvent):
    def process_IN_MODIFY(self, event):
        print(f"Change detected in {event.pathname}!")

# Create an inotify instance
wm = pyinotify.WatchManager()

# Create an event handler instance
handler = EventHandler()

# Create an inotify notifier and pass the function as the event handler
notifier = pyinotify.ThreadedNotifier(wm, handler)

# Watch for changes to /proc/mounts
wm.add_watch("/proc/mounts", pyinotify.IN_MODIFY)

# Start the notifier loop
print("Watching for mount changes...")
notifier.start()

try:
  while True:
    time.sleep(1)
except: pass

notifier.stop()
print('Done!')

Извините, сейчас у меня нет времени на тщательное изучение этого вопроса, но вам, скорее всего, придется использовать для этого DBUS: dbus.freedesktop.org/doc/dbus-tutorial.html

jsbueno 04.05.2024 20:13

Я только что проверил, что dbus-monitor --system действительно перечисляет события монтирования/размонтирования - так что это будет работать -

jsbueno 04.05.2024 20:25

Мне нужно учиться dbus так как я с этим никогда не работала...

Marus Gradinaru 04.05.2024 21:23

@jsbueno, когда я собираю и размонтирую раздел, поступает много сигналов... Я не знаю, какой из них мне следует слушать, чтобы получать уведомление только при монтировании/размонтировании, а не при других системных событиях...

Marus Gradinaru 06.05.2024 10:15
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
4
60
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вот как я решил проблему. С помощью этого кода я получаю одно уведомление каждый раз, когда блочное устройство (жесткий диск USB) подключается или отключается, а также когда раздел монтируется или отключается. Я не уверен на 100%, что обратный вызов не срабатывает и в других случаях, но если это так, то фильтр сигнала dbus можно настроить...

import time, threading

class FastTimer(threading.Thread):
  def __init__(self, interval, callback, args=None, kwargs=None, shots=1):
    super().__init__()
    self.daemon = True
    self.interval = interval
    self.callback = callback
    self.args = args if args is not None else []
    self.kwargs = kwargs if kwargs is not None else {}
    self.last_shot = shots
    self.trigger = threading.Event()
    self.done = threading.Event()
    self.terminated = threading.Event()
    self.start()

  def Terminate(self):
    if self.is_alive():
      self.terminated.set()
      self.done.set()
      self.trigger.set()
      self.join()

  def Gooo(self):
    if self.is_alive():
      self.trigger.set()

  def Abort(self):
    if self.is_alive():
      self.done.set()
      self.trigger.clear()

  def Reset(self):
    if self.is_alive():
      self.done.set()

  def Mark(self):
    if self.is_alive():
      if self.trigger.is_set(): self.done.set()
      else: self.trigger.set()

  def run(self):
    while not self.terminated.is_set():
      if not self.trigger.is_set():
        shot = 0; self.done.clear()
      self.trigger.wait()
      Reseted = self.done.wait(self.interval)
      self.done.clear()
      if self.terminated.is_set(): break
      if not Reseted:
        self.callback(*self.args, **self.kwargs)
        if self.last_shot > 0:
          shot += 1
          if shot == self.last_shot:
            self.trigger.clear()
    print('Timer terminated.')


class BlockDevMonitor(threading.Thread):
  def __init__(self, callback, args=None, kwargs=None):
    import time
    super().__init__()
    self.loop = None
    self.timer = FastTimer(1, callback, args, kwargs)
    self.start()
    time.sleep(0.3)

  def Terminate(self):
    if self.is_alive():
      if self.loop != None: self.loop.quit()
      self.join()
    self.timer.Terminate()

  def run(self):
    import dbus
    from dbus.mainloop.glib import DBusGMainLoop
    from gi.repository import GLib

    def sigUnitChg(*args):
      if (len(args) >= 1) and ( args[0].startswith('blockdev@') or ('-block-' in args[0]) ): self.timer.Mark()

    DBusGMainLoop(set_as_default=True)
    self.loop = GLib.MainLoop()
    bus = dbus.SystemBus()
    bus.add_signal_receiver(sigUnitChg, dbus_interface='org.freedesktop.systemd1.Manager', signal_name='UnitNew', path='/org/freedesktop/systemd1')
    bus.add_signal_receiver(sigUnitChg, dbus_interface='org.freedesktop.systemd1.Manager', signal_name='UnitRemoved', path='/org/freedesktop/systemd1')

    print('DevMonitor started.')
    self.loop.run()
    print('DevMonitor stopped.')


#----------------------------------------------

def OnUpdate():
  print('Device Manager updated !')

DevMon = BlockDevMonitor(OnUpdate)

try:
  i = 0
  while True:
    time.sleep(1);
    i += 1; print(i)
except:
  pass

DevMon.Terminate()

Другие вопросы по теме