Я написал очередь блокировки на C, очень похожую на ту, которая предоставляется классом BlockingQueue в Java. Очередь в основном предоставляет следующий API:
void blocking_queue_add(Blocking_Queue* bq, void* element);
void* blocking_queue_get(Blocking_Queue* bq);
и обладает следующими свойствами:
blocking_queue_add
вызывается, когда очередь заполнена, вызывающий блокируется до тех пор, пока в очереди не останется места.blocking_queue_get
вызывается, когда очередь пуста, вызывающая сторона блокируется до тех пор, пока не появятся данные для выборки.blocking_queue_add
или blocking_queue_get
, их необходимо обслуживать по порядку (FIFO).Мне трудно реализовать очередь, которая удовлетворяет последнему пункту, сохраняя при этом приемлемую производительность. Я использую pthreads
для синхронизации и, чтобы удовлетворить требование FIFO, я полагаюсь на функции pthread_cond_wait
и pthread_cond_broadcast
. Вся идея состоит в том, что все заблокированные вызывающие абоненты будут находиться в состоянии ожидания. Как только один из них может быть обслужен, излучается широковещательный сигнал, и все они просыпаются. На этом этапе они могут легко проверить, кто выбран, на основе внутреннего состояния FIFO. Остальные добровольно блокируют себя, снова позвонив pthread_cond_wait
, и ждут своей очереди.
Эта реализация, похоже, работает нормально, и сама очередь работает. Однако у меня сильно падает производительность, когда потребителей намного больше, чем производителей, или наоборот. Проблема в том, что когда у меня есть этот дисбаланс, всегда будет много заблокированных абонентов, поэтому накладные расходы на их пробуждение каждый раз звонком pthread_cond_broadcast
становятся неприемлемыми.
Я ищу подсказки о том, как решить эту проблему. Я сравнил свою очередь с реализацией Java, и она работает быстрее при работе с небольшими рабочими нагрузками, но когда я тестирую сценарий с большой и полностью несбалансированной рабочей нагрузкой, моя реализация оказывается катастрофой, а реализация Java кажется лишь линейно медленнее. На самом деле, если я уберу из своей реализации весь код, связанный с очередями, и просто изменю обе функции blocking_queue_add
и blocking_queue_get
, чтобы иметь только логику cond/broadcast, это уже катастрофа.
Любой вклад приветствуется, спасибо.
Идея Брэндона значительно улучшила производительность. Производительность теперь кажется более чем в 2 раза лучше, чем реализация Java.
Для всех, кто интересуется будущим, я сделал исходный код открытым: https://github.com/felipeek/c-fifo-blocking-queue
Вы видели, как работает версия Java?
Мне трудно реализовать очередь, которая удовлетворяет последнему пункту, сохраняя при этом приемлемую производительность. Я использую pthreads для синхронизации и, чтобы удовлетворить требование FIFO, я полагаюсь на функции pthread_cond_wait и pthread_cond_broadcast.
В этом случае, когда поток добавляется в очередь, он выполняет pthread_cond_broadcast()
и пробуждает все потоки, которые были заблокированы в ожидании получения данных из пустой очереди; и (если есть много потоков, которые были заблокированы в ожидании), это приводит к тому, что много процессорного времени тратится впустую на переключения потоков и накладные расходы планировщика; потому что каждый ожидающий поток разблокируется, пытается получить мьютекс (и, возможно, снова блокируется и разблокируется при попытке получить мьютекс), затем проверяет, является ли он следующим, и снова блокируется, если он не следующий.
Чтобы исправить это; каждому потоку нужна своя отдельная переменная условия. Когда поток начинает ожидать данных из пустой очереди, он помещает свою условную переменную в «очередь ожидающих читателей»; и когда поток добавляет данные в очередь, он берет первую условную переменную из «очереди ожидающих читателей» и (если есть ожидающий) делает один pthread_cond_signal()
(а не широковещательный), чтобы разблокировался только один ожидающий поток.
Обратите внимание, что «очередь переменных состояния ожидающего чтения» может быть связанным списком структур «struct waiter { struct waiter * next; pthread_cond_t condVar; }
»; и эти структуры могут создаваться и инициализироваться при создании потока, а затем постоянно перезапускаться (пока поток не завершится).
Для «несколько авторов» это по сути одна и та же проблема с одним и тем же решением (и может повторно использовать один и тот же «struct waiter
», созданный при создании потока). Когда потоку нужно подождать, чтобы добавить данные в очередь, он добавляет свою условную переменную в «связанный список ожидающих модулей записи», а когда поток завершает удаление данных из очереди, он делает один pthread_cond_signal()
, чтобы разблокировать следующего ожидающего модуля записи.
Обратите внимание, что это должно значительно улучшить производительность при высокой конкуренции (много ожидающих читателей или много ожидающих писателей); но дополнительные накладные расходы на управление «очередями ожидающих» также могут снизить производительность при низкой конкуренции (худший случай — это когда регулярно есть только один ожидающий поток, что является лучшим случаем для вашего текущего подхода с использованием pthread_cond_broadcast
).
Спасибо. Работает как шарм.
Я не думаю, что мы можем многое сказать, не рассматривая ваш реальный код. Но поскольку у вас есть правильно функционирующий код (но для производительности), вам следует вместо этого обратиться за помощью в CodeReview SE. Однако, если вы пойдете туда, будьте готовы к тому, что обзоры не будут ограничиваться вашими проблемами с производительностью.