У меня есть блок кода, который нужно запускать параллельно в разных потоках, используя CompletableFuture и ExecutorService. Блок кода выполняет сетевые запросы (с использованием RetroFit) и ожидает завершения каждого сетевого запроса, прежде чем перейти к следующему запросу и т. д. Могу ли я вызвать SingleThreadExecutor#submit() несколько раз, обходя этого исполнителя?
Я хочу, чтобы все параллельные запуски этого блока кода получали собственный поток и выполняли все свои операции в этом потоке.
По сути, блок кода должен действовать как синхронный код, но иметь несколько экземпляров этого блока, работающих параллельно, каждый экземпляр в своем собственном потоке.
В настоящее время программа просто простаивает, когда мы звоним из Service, и я подозреваю, что это связано с передачей того же SingleThreadExecutor, но я немного застрял.
ArrayList<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < SIZE; i++) {
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
Future<List<String>> stringListFuture = myStringService.getStringList(executor);
// We begin stalling once the next line gets hit.
List<String> stringList = stringListFuture.get();
Future<Integer> integerDataFuture = myIntegerService.getInteger(executor);
Integer integerData = integerDataFuture.get();
executor.shutdown();
}, executor);
futures.add(future);
}
CompletableFuture<Void> completableFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
completableFutures.join();
// Do something after all blocks of code have finished running in parallel.
class MyStringFuture {
public Future<List<String>> getStringList(ExecutorService executor) {
// Could these nested uses of the same executor be the problem?
Future<Double> doubeDataFuture = doubleDataService.getFuture();
Double doubleData = doubeDataFuture.get(executor);
Call<List<Dividend>> data = retrofit.create(Api.class).getData(doubleData);
return executor.submit(() -> data.execute().body());
}
}
И, пожалуйста, напишите минимальный работоспособный пример, иллюстрирующий проблему.
Ваши второй и третий абзацы противоречивы. Отредактируйте, чтобы уточнить.
Знаете ли вы о виртуальных потоках в Java 21+? Как правило, больше нет необходимости писать сложный асинхронный код с помощью CompletableFuture. Просто напишите свою задачу в виде простого последовательного кода, как одну задачу Runnable/Callable, и отправьте ее службе исполнителя, поддерживаемой виртуальными потоками. Значительно более простой код, легко отлаживаемый и чрезвычайно производительный.
«По сути, блок кода должен действовать как синхронный код, но иметь несколько экземпляров этого блока, работающих параллельно, каждый экземпляр в своем собственном потоке». Все это просто противоречие в терминах.
Ваш код никогда не заканчивается, поскольку он передает задачу однопоточному исполнителю из задачи, уже запущенной в том же однопоточном исполнителе. Подзадача никогда не может начаться, пока не завершится первая задача, но первая задача ожидает завершения, вызывая future.get(), который, конечно, блокируется на неопределенный срок, потому что она никогда не запускается.
«У меня есть блок кода, который должен выполняться параллельно...» Какое значение имеет «параллельный запуск», если вы не позволяете ему выполняться параллельно с чем-либо еще? Ваш пример неполон, но выглядит так, будто в каждом месте, где вы передаете задачу исполнителю, следующее, что вы делаете, — это ждете завершения задачи. Если это действительно так, то вы настраиваете задачу на выполнение «параллельно» с вызывающей стороной, которая ничего не делает. С таким же успехом вы могли бы просто вызвать функцию задачи.




Ваш код использует задачу исполнителя одного потока, которая отправляет другую задачу тому же исполнителю одного потока, а затем ожидает выхода из этой подзадачи. Это то же самое, что и в этом примере, в котором будут печататься «ОДИН» и «ТРИ» и никогда не будут печататься «ДВА», «cf» или «ЧЕТЫРЕ»:
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
log("ONE");
// This subtask can never start while current task is still running:
Future<?> cf = executor.submit(() -> log("TWO"));
log("THREE");
try
{
// Blocks forever if run from single thread executor:
log("cf"+cf.get());
}
catch (Exception e)
{
throw new RuntimeException("It failed");
}
log("FOUR");
}, executor);
Подзадача может быть запущена только после выхода из основной задачи (если было напечатано «ЧЕТЫРЕ»), но зависает в ожидании cf.get().
Решение простое — вам следует обработать исходную задачу в отдельном потоке или очереди исполнителя к сервису, используемому подзадачами, или связать каждый компонент подзадач с помощью .thenRun(...).
CompletableFuture<Void> future = CompletableFuture.runAsync(subtask1, executor)
.thenRun(subtask2);
«В настоящее время программа просто простаивает» сделайте дамп потока, чтобы увидеть, что происходит.