У меня есть приложение, которому необходимо определить и выполнить около 3,9 миллиардов единиц работы. Он выполняет это небольшими партиями, от 1000 до 100 000 единиц работы (в основном на меньшем конце этого масштаба). Мне приходится группировать их, потому что каждая партия должна быть оценена, и эта оценка влияет на исходные данные для следующей партии.
Проблема в том, что я пытаюсь выполнить работу по оценке каждой единицы работы в многопоточном коде по соображениям пропускной способности/производительности. Вначале он хорошо масштабируется, но на более крупных серверах масштабируется отрицательно. Подробности ниже.
У меня есть ThreadPoolExecutor
, который я создаю следующим образом:
// Leave 2 CPUs unused for system processing, minimum of 1.
numOfCores = Math.max(1, numOfCores - 2);
this.backgroundExecutor = new ThreadPoolExecutor(
/* coreSize */ numOfCores,
/* maximumPoolSize */ numOfCores,
/* keepAliveTime */ 10, TimeUnit.SECONDS,
/* workQueue */ new LinkedBlockingQueue<Runnable>(),
threadFactory);
Основной поток приложения помещает работу в очередь небольшими порциями, возможно, от 1000 до 100 000 запросов, используя этот метод:
public void addRequest(R taskRequest) {
this.taskList.add(() -> {
// This is a callable - so it's code that will run in the background thread.
final ThreadedBuilderThread<T> currentThread = (ThreadedBuilderThread<T>) Thread.currentThread();
T newObject = taskRequest.build();
if (taskRequest.isBestOnlyMode() && newObject.getValue() > Double.NEGATIVE_INFINITY) {
T bestObject = currentThread.bestObjectReference.getPlain();
if (bestObject == null || newObject.getValue() > bestObject.getValue()
|| (newObject.getValue() == bestObject.getValue() && newObject.getId() < bestObject.getId())) {
currentThread.bestObjectReference.setPlain(newObject);
}
// Don't return any object at this point, because we won't know which one is
// best until all objects are built.
return null;
} else {
// Not in "best only" mode, so return the object we just built.
return newObject;
}
});
}
После добавления каждого пакета запросов основной поток ожидает результатов следующим образом:
public Collection<T> waitAndGetResults(boolean bestOnlyMode) {
Collection<Future<T>> resultsFutureList;
try {
// NOTE: .invokeAll() waits for all the tasks to finish.
resultsFutureList = this.backgroundExecutor.invokeAll(taskList); // , 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
final Collection<T> allResultList = new ArrayList<>();
for (final Future<T> f : resultsFutureList) {
final T t;
try {
t = f.get(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException("Background Thread was interrupted!", e);
} catch (ExecutionException e) {
throw new RuntimeException("Background Thread has encounted an uncaught exception!", e);
} catch (TimeoutException e) {
throw new RuntimeException("Background Thread has timed out!", e);
}
if (!bestOnlyMode) {
allResultList.add(t);
}
}
// Verify we got all of our work done (because we might have timed-out).
if (!bestOnlyMode && allResultList.size() != taskList.size()) {
throw new AssertionError(
"Requested " + taskList.size() + " buildable objects, but only got " + allResultList.size() + ".");
}
// Clear the task list for the next group of tasks to be added later
this.taskList.clear();
if (!bestOnlyMode) {
// We are not in best only mode, so return the full list of results
for (ThreadedBuilderThread<T> th : threadList) {
th.bestObjectReference.setRelease(null);
}
return allResultList;
}
// Otherwise, we are in best only mode
double overallBestValue = Double.NEGATIVE_INFINITY;
T overallBestObject = null;
// Iterate through the builder threads and get the best object built for each
// one.
for (final ThreadedBuilderThread<T> th : threadList) {
final T threadBestObject = th.bestObjectReference.get();
if (threadBestObject != null && threadBestObject.getValue() > overallBestValue) {
// This thread had the best object (so far)
overallBestValue = threadBestObject.getValue();
overallBestObject = threadBestObject;
}
}
final Collection<T> bestResultList = new ArrayList<>();
if (overallBestObject != null) {
bestResultList.add(overallBestObject);
}
// Clear the thread map for the next batch of tasks
for (ThreadedBuilderThread<T> th : threadList) {
th.bestObjectReference.setRelease(null);
}
return bestResultList;
}
ПРОБЛЕМА
Я запускаю этот код на серверах, развернутых в облаке Amazon AWS. Когда я работаю на экземпляре с 8 виртуальными ЦП, я получаю лучшее время работы по настенным часам, чем когда я работаю на экземпляре с 4 виртуальными ЦП. Но когда я пытаюсь масштабировать до 64 виртуальных ЦП, время работы настенных часов составляет почти 200% от времени работы 8 виртуальных ЦП.
Я рассмотрел свою реализацию и удалил все источники конфликтов потоков, которые только мог придумать, и получил скромные улучшения, но общая проблема отрицательного масштабирования все еще сохраняется.
Есть ли у кого-нибудь мысли о том, как я могу это отладить или, что еще лучше, какой возможный источник разногласий может быть?
Я не засек время, затраченное на единицу работы, но, используя совокупную статистику недавнего запуска, среднее значение составляет порядка 4 микросекунд. Проблема в том, что у меня их 3,9 миллиарда.
Если вы правы, объяснит ли это положительное масштабирование от 4 до 8 виртуальных ЦП и отрицательное масштабирование от 8 до 64? Есть ли лучший способ параллельного выполнения такой рабочей нагрузки?
Я мог бы попытаться объединить waitAndGetResults
в более крупные задачи, каждая из которых обрабатывает, скажем, 100 единиц работы. Как вы думаете, это покажет улучшения?
4 микросекунды очень мало для времени выполнения потока. 3,9 миллиарда — это смехотворное количество дискретных задач, которые можно выполнить за любой разумный промежуток времени. «Если вы правы, объяснит ли это положительное масштабирование от 4 до 8 виртуальных ЦП и отрицательное масштабирование от 8 до 64?» Вполне возможно. При наличии 8 виртуальных ЦП вы можете эффективно обслуживать все потоки, если удаление задачи из очереди занимает менее 500 наносекунд. При 64 виртуальных ЦП это нужно сделать за 62 нс. Вполне вероятно, что лучшее, что вы можете сделать, находится где-то посередине.
И да, объединение задач в более крупные блоки, скорее всего, улучшит производительность для любого количества виртуальных ЦП > 1, а также уменьшит конфликты за очередь задач. Я бы рассмотрел возможность группировки их в единицы по 250 или более, чтобы попасть хотя бы в миллисекундный диапазон. И стоило бы протестировать и более крупные агрегаты.
Если вы подозреваете, что проблема в очереди за пулом потоков, взгляните на это lmax-exchange.github.io/disruptor. Я не являюсь аффилированным лицом, но использовал его в своем проекте для очень масштабируемой системы на основе сообщений, где конфликты за блокировку потоков были проблемой.
Какая работа ведется по вашим задачам? Работа ограничена процессором? Или это включает в себя такие блокировки, как ведение журнала, доступ к базе данных, файловый ввод-вывод, доступ к сети? Хотя я не изучал ваш код внимательно, на первый взгляд он кажется перегруженным и может быть заменен некоторыми более простыми вызовами службы-исполнителя с использованием виртуальных потоков в синтаксисе try-with-resources.
@BasilBourque это все процессор и память. Нет журналирования (за исключением исключений), нет базы данных.
@JohnBollinger Пакетирование запроса и запуск на компьютере с 64 виртуальными ЦП теперь выполняется в 20% случаев как непакетированная версия. Я еще не проводил тестирование на машинах разных размеров, чтобы увидеть, насколько хорошо оно масштабируется, как это теоретически возможно, но теперь стало НАМНОГО лучше, благодаря вашему руководству! Если вы хотите опубликовать ответ, я проголосую за! Спасибо!!!
Исключить задачу из очереди задач пула потоков невозможно, особенно если за это борются несколько потоков. 3,9 миллиарда дискретных задач требуют затрат 3,9 миллиарда раз, что окажет заметное влияние на любое количество потоков и (v)ЦП, если время выхода из очереди составляет значительную часть собственного вычислительного времени задачи.
Более того, удаление задач из очереди должно быть сериализовано, поэтому время удаления из очереди ограничивает количество потоков, которые могут исключить задачу из очереди в единицу времени. Между тем, количество потоков, которые хотят исключить задачи из очереди в единицу времени, обратно пропорционально времени выполнения каждой задачи. Если время выполнения каждой задачи очень мало, как в вашем случае, то наступит момент, когда количество потоков, требующих выполнения задачи в единицу времени, превысит скорость, с которой они могут удалять задачи из очереди. Дополнительные виртуальные ЦП/потоки не могут обеспечить какого-либо положительного ускорения после этой точки, но они все равно несут накладные расходы, поэтому ваша работа в целом может замедлиться. Если вы углубитесь в эту сферу, то процесс может сильно замедлиться. Кажется, это, по крайней мере, часть того, что вы наблюдали.
Рассмотрим некоторые цифры:
вы подсчитали, что каждая из ваших задач занимает 4 микросекунды.
для эффективного обслуживания 8 одновременных потоков требуется, чтобы удаление из очереди занимало менее 4 мкс / 8 = 500 нс каждый.
для эффективного обслуживания 64 одновременных потоков требуется, чтобы удаление из очереди занимало менее 63 нс каждый.
если ваши виртуальные ЦП работают, скажем, на частоте 3 ГГц, то 63 нс — это менее 200 циклов ЦП, что определенно достаточно мало, чтобы вызывать беспокойство.
Вполне вероятно, что минимальное время выхода из очереди составляет где-то около 500 нс, так что при попытке масштабирования за пределы 8 виртуальных ЦП вы несете больше затрат, чем получаете выгоды.
Как правило, производительность многопоточности повышается за счет относительно крупномасштабных рабочих нагрузок. Разумным подходом к этому в вашем случае было бы группирование этих задач по 4 мкс в (гораздо) более крупные группы. Я бы рекомендовал стрелять по достаточно большим группам, чтобы каждая из них попадала по крайней мере в миллисекундный временной диапазон, но это вам предстоит протестировать и настроить.
Кроме того, вы вполне можете обнаружить, что это ограничивает количество виртуальных ЦП, которые вы можете эффективно использовать. Не упускайте из виду возможность того, что вам будет лучше просто использовать меньше 64. Если стоимость выполнения ваших вычислений также является для вас фактором, то это еще более вероятно, потому что вам нужно ожидать, что ускорение на дополнительный виртуальный ЦП будет сублинейным.
Сколько времени занимает каждая единица работы? То, что вы отправляете десятки тысяч за раз, говорит о том, что осталось недолго. Если она достаточно короткая, то борьба за очередь задач может стоить вам больше, чем выигрыш от параллельного выполнения. Эффекты конкуренции за очередь будут масштабироваться в зависимости от количества потоков в пуле.