Измерения пропускной способности устройства пользовательской сети с использованием Linux и Python3

У меня есть специальное сетевое устройство с двумя интерфейсами Ethernet. Он может получать сообщения UDP на любом интерфейсе, а также специальный экспериментальный протокол на обоих интерфейсах. Как только он получает сообщение, он обрабатывает его и пересылает cmd/results на другой интерфейс (в моем случае это похоже на брандмауэр, но я хочу продолжить обсуждение с общими сетевыми устройствами).

Я хотел измерить пропускную способность и задержку устройства, поэтому подключил два кабеля Ethernet от устройства к системе Linux RHEL 8. Я создал приложение Python 3, работающее в системе Linux, которое имеет 3 процесса: 1) 1 для отправки данных на один интерфейс, 2) 1 для получения обработанных данных на другом интерфейсе и 3) 1 для отправки фиктивных данных в противоположном направлении. так что измерения можно проводить под нагрузкой данных, идущих в обоих направлениях

Я использую библиотеку многопроцессорной обработки Python и сокеты Linux udp из библиотеки сокетов, а также необработанные сокеты для пользовательского протокола (iperf 3 не работает в моем случае из-за пользовательского протокола). Чтобы измерить пропускную способность и задержку, я создаю группу тестовых пакетов и отмечаю время каждого из них после его отправки из процесса 1). Пока процесс 1) отправляет пакеты, процесс 2) принимает пакеты и присваивает им временные метки на другой стороне. После получения всех объемов данных он вычисляет разницу между соседними полученными пакетами, чтобы получить измерение пропускной способности, и вычисляет разницу между отправкой и получением, чтобы получить измерения задержки.

Проблема связана с взаимодействием многопроцессорной обработки и сокетов. На машине, когда я вызываю sock.sendto(data, (dest, port)) из процесса 1 и вызываю sock2.receivefrom(data) из процесса 2, похоже, они не работают одновременно. Кажется, что процесс 1) отправляет все свои пакеты (я пробую это с пакетами размером 1000 байт, 300 пакетов), а затем процесс 2) начинает получать. Из трассировок Wireshark я знаю, что полученные пакеты фактически достигают системы Linux еще до того, как она начнет их обрабатывать. Даже более странный процесс 3) отправляет все свои пакеты раньше, чем процесс 2) отправляет даже один пакет. Что еще хуже, процесс 1) иногда пропускает пакеты, если общее количество байтов пакета превышает 200 000.

Ниже приведен псевдокод того, что я делаю.

import multiprocessing
import time
import socket

def sender(socket, queue, data_list, dest):
    timestamps = []
    for data in data_list:
        socket.sendto(data, dest)
        timestamps.append(time.perf_counter())
    queue.put(timestamps)

def sender_dummy(socket, queue, data_list, dest):
    for data in data_list:
        socket.sendto(data, dest)

def main():
    ## Setup the sockets with appropriate timeouts and interfaces and protocols
    ....
    ## Setup the test data
    ....
    ## Begin multi-processing
    q = multiprocessing.Queue()
    task_main = multiprocessing.Process(target=sender, args=(sender_sock, q, test_data, (dest_addr, dest_port)))
    task_dummy = multiprocessing.Process(target=sender_dummy, args=(sender_dummy_sock, q, dummy_data, (dest_addr, dest_port)))


    timestamps_recv = []
    task_dummy.start()
    task_main.start()

    for i in range(len(test_data)):
        recv_sock.receivefrom(TEST_DATA_SIZE)
        timestamps_recv.append(time.perf_counter())

    task_dummy.join()
    task_main.join()
    timestamps_send = q.get()

    # Calculate latency and throughput
    ...

Любая помощь в решении проблемы с отбрасыванием пакетов и обеспечении истинного одновременного выполнения отправителей и получателя приветствуется.

Я обнаружил, что отбрасывание пакетов на самом деле было вызвано тем, что устройство отбрасывало пакеты, когда внутренние буферы устройства были превышены из-за слишком высокой скорости. Это было исправлено с помощью некоторых удачно расположенных спящих режимов для ограничения скорости передачи данных. Единственная проблема заключается в том, что все отправляется до того, как что-либо будет получено.

WakkaTrout 06.05.2024 02:18
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
1
54
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вот несколько вещей, которые вы можете попробовать:

  1. Убедитесь, что сокеты не являются общими для разных процессов. Совместное использование может привести к непредсказуемому поведению. Создайте отдельные экземпляры сокетов внутри каждого процесса после его разветвления.

  2. Процессы могут блокировать друг друга из-за проблем с синхронизацией. Попробуйте использовать отдельные очереди для каждого процесса-отправителя. Кроме того, убедитесь, что процесс-получатель активно прослушивает сразу после запуска, а не запускает его синхронно после отправителей.

  3. Попробуйте настроить размеры буфера сокетов, используя socket.setsockopt()

  4. Перепроверьте временные метки с внешними измерениями (например, с помощью Wireshark), чтобы убедиться, что они точно отражают сетевые события.

  5. GIL может влиять на производительность. Каждый процесс выполняется со своим собственным GIL, что приводит к накладным расходам при обработке операций ввода-вывода.

Я бы рассмотрел следующие общие изменения:

Используйте потоки для задач, связанных с вводом-выводом, поскольку потоки используют одно и то же пространство памяти и имеют меньший вес по сравнению с процессами.

Обратите внимание на библиотеку asyncio, которая отлично подходит для обработки операций ввода-вывода и может обеспечить более эффективный способ управления параллельными операциями.

Вот настройка асинхронного начала приема:

def receiver(recv_sock):
    while True:
        data, addr = recv_sock.recvfrom(TEST_DATA_SIZE)
        if not data:
            break
        timestamps_recv.append(time.perf_counter())

task_recv = multiprocessing.Process(target=receiver, args=(recv_sock,))
task_recv.start()

Надеюсь это поможет.

Контент, созданный искусственным интеллектом, в настоящее время запрещен на Stack Overflow. Пожалуйста, прочтите Политику AI для получения дополнительной информации.

Fastnlight 19.04.2024 22:48

Другие вопросы по теме