ThreadPoolExecutor плохо масштабируется между 6->62 потоками

У меня есть приложение, которому необходимо определить и выполнить около 3,9 миллиардов единиц работы. Он выполняет это небольшими партиями, от 1000 до 100 000 единиц работы (в основном на меньшем конце этого масштаба). Мне приходится группировать их, потому что каждая партия должна быть оценена, и эта оценка влияет на исходные данные для следующей партии.

Проблема в том, что я пытаюсь выполнить работу по оценке каждой единицы работы в многопоточном коде по соображениям пропускной способности/производительности. Вначале он хорошо масштабируется, но на более крупных серверах масштабируется отрицательно. Подробности ниже.

У меня есть ThreadPoolExecutor, который я создаю следующим образом:

    // Leave 2 CPUs unused for system processing, minimum of 1.
    numOfCores = Math.max(1, numOfCores - 2);
    this.backgroundExecutor = new ThreadPoolExecutor(
            /* coreSize */ numOfCores, 
            /* maximumPoolSize */ numOfCores, 
            /* keepAliveTime */ 10, TimeUnit.SECONDS,
            /* workQueue */ new LinkedBlockingQueue<Runnable>(), 
            threadFactory);

Основной поток приложения помещает работу в очередь небольшими порциями, возможно, от 1000 до 100 000 запросов, используя этот метод:

public void addRequest(R taskRequest) {
    this.taskList.add(() -> {
        // This is a callable - so it's code that will run in the background thread.
        final ThreadedBuilderThread<T> currentThread = (ThreadedBuilderThread<T>) Thread.currentThread();
        T newObject = taskRequest.build();
        if (taskRequest.isBestOnlyMode() && newObject.getValue() > Double.NEGATIVE_INFINITY) {
            T bestObject = currentThread.bestObjectReference.getPlain();
            if (bestObject == null || newObject.getValue() > bestObject.getValue()
                    || (newObject.getValue() == bestObject.getValue() && newObject.getId() < bestObject.getId())) {
                currentThread.bestObjectReference.setPlain(newObject);
            }
            // Don't return any object at this point, because we won't know which one is
            // best until all objects are built.
            return null;
        } else {
            // Not in "best only" mode, so return the object we just built.
            return newObject;
        }
    });
}

После добавления каждого пакета запросов основной поток ожидает результатов следующим образом:

public Collection<T> waitAndGetResults(boolean bestOnlyMode) {

    Collection<Future<T>> resultsFutureList;
    try {
        // NOTE: .invokeAll() waits for all the tasks to finish.
        resultsFutureList = this.backgroundExecutor.invokeAll(taskList); // , 10, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }

    final Collection<T> allResultList = new ArrayList<>();

    for (final Future<T> f : resultsFutureList) {
        final T t;
        try {
            t = f.get(10, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            throw new RuntimeException("Background Thread was interrupted!", e);
        } catch (ExecutionException e) {
            throw new RuntimeException("Background Thread has encounted an uncaught exception!", e);
        } catch (TimeoutException e) {
            throw new RuntimeException("Background Thread has timed out!", e);
        }
        if (!bestOnlyMode) {
            allResultList.add(t);
        }
    }
    // Verify we got all of our work done (because we might have timed-out).
    if (!bestOnlyMode && allResultList.size() != taskList.size()) {
        throw new AssertionError(
                "Requested " + taskList.size() + " buildable objects, but only got " + allResultList.size() + ".");
    }

    // Clear the task list for the next group of tasks to be added later
    this.taskList.clear();

    if (!bestOnlyMode) {
        // We are not in best only mode, so return the full list of results
        for (ThreadedBuilderThread<T> th : threadList) {
            th.bestObjectReference.setRelease(null);
        }
        return allResultList;
    }

    // Otherwise, we are in best only mode
    double overallBestValue = Double.NEGATIVE_INFINITY;
    T overallBestObject = null;

    // Iterate through the builder threads and get the best object built for each
    // one.
    for (final ThreadedBuilderThread<T> th : threadList) {
        final T threadBestObject = th.bestObjectReference.get();
        if (threadBestObject != null && threadBestObject.getValue() > overallBestValue) {
            // This thread had the best object (so far)
            overallBestValue = threadBestObject.getValue();
            overallBestObject = threadBestObject;
        }
    }

    final Collection<T> bestResultList = new ArrayList<>();
    if (overallBestObject != null) {
        bestResultList.add(overallBestObject);
    }

    // Clear the thread map for the next batch of tasks
    for (ThreadedBuilderThread<T> th : threadList) {
        th.bestObjectReference.setRelease(null);
    }

    return bestResultList;
}

ПРОБЛЕМА

Я запускаю этот код на серверах, развернутых в облаке Amazon AWS. Когда я работаю на экземпляре с 8 виртуальными ЦП, я получаю лучшее время работы по настенным часам, чем когда я работаю на экземпляре с 4 виртуальными ЦП. Но когда я пытаюсь масштабировать до 64 виртуальных ЦП, время работы настенных часов составляет почти 200% от времени работы 8 виртуальных ЦП.

Я рассмотрел свою реализацию и удалил все источники конфликтов потоков, которые только мог придумать, и получил скромные улучшения, но общая проблема отрицательного масштабирования все еще сохраняется.

Есть ли у кого-нибудь мысли о том, как я могу это отладить или, что еще лучше, какой возможный источник разногласий может быть?

Сколько времени занимает каждая единица работы? То, что вы отправляете десятки тысяч за раз, говорит о том, что осталось недолго. Если она достаточно короткая, то борьба за очередь задач может стоить вам больше, чем выигрыш от параллельного выполнения. Эффекты конкуренции за очередь будут масштабироваться в зависимости от количества потоков в пуле.

John Bollinger 12.08.2024 21:36

Я не засек время, затраченное на единицу работы, но, используя совокупную статистику недавнего запуска, среднее значение составляет порядка 4 микросекунд. Проблема в том, что у меня их 3,9 миллиарда.

Matthew McPeak 12.08.2024 21:57

Если вы правы, объяснит ли это положительное масштабирование от 4 до 8 виртуальных ЦП и отрицательное масштабирование от 8 до 64? Есть ли лучший способ параллельного выполнения такой рабочей нагрузки?

Matthew McPeak 12.08.2024 21:58

Я мог бы попытаться объединить waitAndGetResults в более крупные задачи, каждая из которых обрабатывает, скажем, 100 единиц работы. Как вы думаете, это покажет улучшения?

Matthew McPeak 12.08.2024 22:04

4 микросекунды очень мало для времени выполнения потока. 3,9 миллиарда — это смехотворное количество дискретных задач, которые можно выполнить за любой разумный промежуток времени. «Если вы правы, объяснит ли это положительное масштабирование от 4 до 8 виртуальных ЦП и отрицательное масштабирование от 8 до 64?» Вполне возможно. При наличии 8 виртуальных ЦП вы можете эффективно обслуживать все потоки, если удаление задачи из очереди занимает менее 500 наносекунд. При 64 виртуальных ЦП это нужно сделать за 62 нс. Вполне вероятно, что лучшее, что вы можете сделать, находится где-то посередине.

John Bollinger 12.08.2024 22:06

И да, объединение задач в более крупные блоки, скорее всего, улучшит производительность для любого количества виртуальных ЦП > 1, а также уменьшит конфликты за очередь задач. Я бы рассмотрел возможность группировки их в единицы по 250 или более, чтобы попасть хотя бы в миллисекундный диапазон. И стоило бы протестировать и более крупные агрегаты.

John Bollinger 12.08.2024 22:09

Если вы подозреваете, что проблема в очереди за пулом потоков, взгляните на это lmax-exchange.github.io/disruptor. Я не являюсь аффилированным лицом, но использовал его в своем проекте для очень масштабируемой системы на основе сообщений, где конфликты за блокировку потоков были проблемой.

AndrewL 12.08.2024 22:34

Какая работа ведется по вашим задачам? Работа ограничена процессором? Или это включает в себя такие блокировки, как ведение журнала, доступ к базе данных, файловый ввод-вывод, доступ к сети? Хотя я не изучал ваш код внимательно, на первый взгляд он кажется перегруженным и может быть заменен некоторыми более простыми вызовами службы-исполнителя с использованием виртуальных потоков в синтаксисе try-with-resources.

Basil Bourque 12.08.2024 22:48

@BasilBourque это все процессор и память. Нет журналирования (за исключением исключений), нет базы данных.

Matthew McPeak 12.08.2024 23:00

@JohnBollinger Пакетирование запроса и запуск на компьютере с 64 виртуальными ЦП теперь выполняется в 20% случаев как непакетированная версия. Я еще не проводил тестирование на машинах разных размеров, чтобы увидеть, насколько хорошо оно масштабируется, как это теоретически возможно, но теперь стало НАМНОГО лучше, благодаря вашему руководству! Если вы хотите опубликовать ответ, я проголосую за! Спасибо!!!

Matthew McPeak 13.08.2024 05:25
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
2
10
50
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Исключить задачу из очереди задач пула потоков невозможно, особенно если за это борются несколько потоков. 3,9 миллиарда дискретных задач требуют затрат 3,9 миллиарда раз, что окажет заметное влияние на любое количество потоков и (v)ЦП, если время выхода из очереди составляет значительную часть собственного вычислительного времени задачи.

Более того, удаление задач из очереди должно быть сериализовано, поэтому время удаления из очереди ограничивает количество потоков, которые могут исключить задачу из очереди в единицу времени. Между тем, количество потоков, которые хотят исключить задачи из очереди в единицу времени, обратно пропорционально времени выполнения каждой задачи. Если время выполнения каждой задачи очень мало, как в вашем случае, то наступит момент, когда количество потоков, требующих выполнения задачи в единицу времени, превысит скорость, с которой они могут удалять задачи из очереди. Дополнительные виртуальные ЦП/потоки не могут обеспечить какого-либо положительного ускорения после этой точки, но они все равно несут накладные расходы, поэтому ваша работа в целом может замедлиться. Если вы углубитесь в эту сферу, то процесс может сильно замедлиться. Кажется, это, по крайней мере, часть того, что вы наблюдали.

Рассмотрим некоторые цифры:

  • вы подсчитали, что каждая из ваших задач занимает 4 микросекунды.

  • для эффективного обслуживания 8 одновременных потоков требуется, чтобы удаление из очереди занимало менее 4 мкс / 8 = 500 нс каждый.

  • для эффективного обслуживания 64 одновременных потоков требуется, чтобы удаление из очереди занимало менее 63 нс каждый.

  • если ваши виртуальные ЦП работают, скажем, на частоте 3 ГГц, то 63 нс — это менее 200 циклов ЦП, что определенно достаточно мало, чтобы вызывать беспокойство.

Вполне вероятно, что минимальное время выхода из очереди составляет где-то около 500 нс, так что при попытке масштабирования за пределы 8 виртуальных ЦП вы несете больше затрат, чем получаете выгоды.

Как правило, производительность многопоточности повышается за счет относительно крупномасштабных рабочих нагрузок. Разумным подходом к этому в вашем случае было бы группирование этих задач по 4 мкс в (гораздо) более крупные группы. Я бы рекомендовал стрелять по достаточно большим группам, чтобы каждая из них попадала по крайней мере в миллисекундный временной диапазон, но это вам предстоит протестировать и настроить.

Кроме того, вы вполне можете обнаружить, что это ограничивает количество виртуальных ЦП, которые вы можете эффективно использовать. Не упускайте из виду возможность того, что вам будет лучше просто использовать меньше 64. Если стоимость выполнения ваших вычислений также является для вас фактором, то это еще более вероятно, потому что вам нужно ожидать, что ускорение на дополнительный виртуальный ЦП будет сублинейным.

Другие вопросы по теме