У нас есть временные наблюдения, которые обрабатываются для тысяч показателей, и все они сводятся к одному и тому же методу для обработки через соответствующие процессоры. Я отправляю наблюдения на соответствующий процессор в многопоточном режиме, но future.get () блокирует основной поток дольше, чем ожидалось. Есть ли способ его улучшить?
ExecutorService[] pools = new ExecutorService[Runtime.getRuntime().availableProcessors()];
...
@Override
public Observation apply(final Observation observation) {
Callable<Observation> task = new Processor(observation);
int hash = Math.abs(observation.getMetricId().hashCode());
Future<Observation> result = pools[hash % pools.length].submit(task);
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
}
Является ли цель метода Processor#call
возвращать модифицированный Observation
? Я пытаюсь прояснить, почему у вас Observation
os и на входе, и на выходе ..
@BasilBourque да Процессор изменяет наблюдение. И измененное наблюдение возвращается процессором, а затем Future.get ()
@BasilBourque - это наблюдение, опечатка при редактировании.
Если у текущего потока нет другой работы, если вы сразу же запрашиваете результат, почему вы обрабатываете объект Observation в фоновом потоке?
почему вы должны работать в основном потоке? а почему надо запускать в режиме блокировки?
@Zabuzard для метрики, все его наблюдения должны обрабатываться по порядку, поэтому, если поток A обрабатывает наблюдение a, наблюдение b должно обрабатываться A только после того, как наблюдение a было обработано. Вот почему я распространяю хэш-код
@dungtavan, потому что мне нужно поддерживать порядок входящих наблюдений
Вы можете использовать для этого один пул и настроить очередь задач, таких как pool.submit(firstTask).andThen(secondTask).andThen(thirdTask).andThen(fourthTask)...
, просто взгляните на CompletableFuture
, как сказано. На самом деле, вы также можете быть в порядке с пулом по умолчанию, поэтому вы можете просто написать CompletableFuture.submit(firstTask).andThen(secondTask)...
. На самом деле, есть гораздо лучшие решения того, что вы здесь пытаетесь.
Вы просто хотите освободить свою основную ветку, верно? (Я имею в виду, можно ли запускать в другом потоке?)
Почему у вас несколько пулов и вы сами занимаетесь распределением? Пулы умеют распределять сами себя лучше, чем вы. Это своего рода поражение цели бассейнов. Невозможно вернуть результат будущего, не дождавшись его. Это просто ошибочно. Вы должны изменить свой дизайн. Пусть ваш метод вернет
Future<Observation>
, а затем пользователь сможет решить, как его обрабатывать, когда блокировать или хочет ли он запланировать задачу по завершении и так далее. Предлагаю вам взглянуть на современную многопоточность в Java, основанную наCompletableFuture
.