У меня есть специальное сетевое устройство с двумя интерфейсами Ethernet. Он может получать сообщения UDP на любом интерфейсе, а также специальный экспериментальный протокол на обоих интерфейсах. Как только он получает сообщение, он обрабатывает его и пересылает cmd/results на другой интерфейс (в моем случае это похоже на брандмауэр, но я хочу продолжить обсуждение с общими сетевыми устройствами).
Я хотел измерить пропускную способность и задержку устройства, поэтому подключил два кабеля Ethernet от устройства к системе Linux RHEL 8. Я создал приложение Python 3, работающее в системе Linux, которое имеет 3 процесса: 1) 1 для отправки данных на один интерфейс, 2) 1 для получения обработанных данных на другом интерфейсе и 3) 1 для отправки фиктивных данных в противоположном направлении. так что измерения можно проводить под нагрузкой данных, идущих в обоих направлениях
Я использую библиотеку многопроцессорной обработки Python и сокеты Linux udp из библиотеки сокетов, а также необработанные сокеты для пользовательского протокола (iperf 3 не работает в моем случае из-за пользовательского протокола). Чтобы измерить пропускную способность и задержку, я создаю группу тестовых пакетов и отмечаю время каждого из них после его отправки из процесса 1). Пока процесс 1) отправляет пакеты, процесс 2) принимает пакеты и присваивает им временные метки на другой стороне. После получения всех объемов данных он вычисляет разницу между соседними полученными пакетами, чтобы получить измерение пропускной способности, и вычисляет разницу между отправкой и получением, чтобы получить измерения задержки.
Проблема связана с взаимодействием многопроцессорной обработки и сокетов. На машине, когда я вызываю sock.sendto(data, (dest, port)) из процесса 1 и вызываю sock2.receivefrom(data) из процесса 2, похоже, они не работают одновременно. Кажется, что процесс 1) отправляет все свои пакеты (я пробую это с пакетами размером 1000 байт, 300 пакетов), а затем процесс 2) начинает получать. Из трассировок Wireshark я знаю, что полученные пакеты фактически достигают системы Linux еще до того, как она начнет их обрабатывать. Даже более странный процесс 3) отправляет все свои пакеты раньше, чем процесс 2) отправляет даже один пакет. Что еще хуже, процесс 1) иногда пропускает пакеты, если общее количество байтов пакета превышает 200 000.
Ниже приведен псевдокод того, что я делаю.
import multiprocessing
import time
import socket
def sender(socket, queue, data_list, dest):
timestamps = []
for data in data_list:
socket.sendto(data, dest)
timestamps.append(time.perf_counter())
queue.put(timestamps)
def sender_dummy(socket, queue, data_list, dest):
for data in data_list:
socket.sendto(data, dest)
def main():
## Setup the sockets with appropriate timeouts and interfaces and protocols
....
## Setup the test data
....
## Begin multi-processing
q = multiprocessing.Queue()
task_main = multiprocessing.Process(target=sender, args=(sender_sock, q, test_data, (dest_addr, dest_port)))
task_dummy = multiprocessing.Process(target=sender_dummy, args=(sender_dummy_sock, q, dummy_data, (dest_addr, dest_port)))
timestamps_recv = []
task_dummy.start()
task_main.start()
for i in range(len(test_data)):
recv_sock.receivefrom(TEST_DATA_SIZE)
timestamps_recv.append(time.perf_counter())
task_dummy.join()
task_main.join()
timestamps_send = q.get()
# Calculate latency and throughput
...
Любая помощь в решении проблемы с отбрасыванием пакетов и обеспечении истинного одновременного выполнения отправителей и получателя приветствуется.
Вот несколько вещей, которые вы можете попробовать:
Убедитесь, что сокеты не являются общими для разных процессов. Совместное использование может привести к непредсказуемому поведению. Создайте отдельные экземпляры сокетов внутри каждого процесса после его разветвления.
Процессы могут блокировать друг друга из-за проблем с синхронизацией. Попробуйте использовать отдельные очереди для каждого процесса-отправителя. Кроме того, убедитесь, что процесс-получатель активно прослушивает сразу после запуска, а не запускает его синхронно после отправителей.
Попробуйте настроить размеры буфера сокетов, используя socket.setsockopt()
Перепроверьте временные метки с внешними измерениями (например, с помощью Wireshark), чтобы убедиться, что они точно отражают сетевые события.
GIL может влиять на производительность. Каждый процесс выполняется со своим собственным GIL, что приводит к накладным расходам при обработке операций ввода-вывода.
Я бы рассмотрел следующие общие изменения:
Используйте потоки для задач, связанных с вводом-выводом, поскольку потоки используют одно и то же пространство памяти и имеют меньший вес по сравнению с процессами.
Обратите внимание на библиотеку asyncio
, которая отлично подходит для обработки операций ввода-вывода и может обеспечить более эффективный способ управления параллельными операциями.
Вот настройка асинхронного начала приема:
def receiver(recv_sock):
while True:
data, addr = recv_sock.recvfrom(TEST_DATA_SIZE)
if not data:
break
timestamps_recv.append(time.perf_counter())
task_recv = multiprocessing.Process(target=receiver, args=(recv_sock,))
task_recv.start()
Надеюсь это поможет.
Контент, созданный искусственным интеллектом, в настоящее время запрещен на Stack Overflow. Пожалуйста, прочтите Политику AI для получения дополнительной информации.
Я обнаружил, что отбрасывание пакетов на самом деле было вызвано тем, что устройство отбрасывало пакеты, когда внутренние буферы устройства были превышены из-за слишком высокой скорости. Это было исправлено с помощью некоторых удачно расположенных спящих режимов для ограничения скорости передачи данных. Единственная проблема заключается в том, что все отправляется до того, как что-либо будет получено.