Могу ли я отправить заявку одному и тому же однопоточному исполнителю несколько раз?

У меня есть блок кода, который нужно запускать параллельно в разных потоках, используя CompletableFuture и ExecutorService. Блок кода выполняет сетевые запросы (с использованием RetroFit) и ожидает завершения каждого сетевого запроса, прежде чем перейти к следующему запросу и т. д. Могу ли я вызвать SingleThreadExecutor#submit() несколько раз, обходя этого исполнителя?

Я хочу, чтобы все параллельные запуски этого блока кода получали собственный поток и выполняли все свои операции в этом потоке.

По сути, блок кода должен действовать как синхронный код, но иметь несколько экземпляров этого блока, работающих параллельно, каждый экземпляр в своем собственном потоке.

В настоящее время программа просто простаивает, когда мы звоним из Service, и я подозреваю, что это связано с передачей того же SingleThreadExecutor, но я немного застрял.

ArrayList<CompletableFuture<Void>> futures = new ArrayList<>();

for (int i = 0; i < SIZE; i++) {
    ExecutorService executor = Executors.newSingleThreadExecutor();

    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        Future<List<String>> stringListFuture = myStringService.getStringList(executor);
        // We begin stalling once the next line gets hit.
        List<String> stringList = stringListFuture.get();
        
        Future<Integer> integerDataFuture = myIntegerService.getInteger(executor);
        Integer integerData = integerDataFuture.get();

        executor.shutdown();
    }, executor);

    futures.add(future);
}

CompletableFuture<Void> completableFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        completableFutures.join();

// Do something after all blocks of code have finished running in parallel.
class MyStringFuture {
    public Future<List<String>> getStringList(ExecutorService executor) {

        // Could these nested uses of the same executor be the problem?
        Future<Double> doubeDataFuture = doubleDataService.getFuture();
        Double doubleData = doubeDataFuture.get(executor);

        Call<List<Dividend>> data = retrofit.create(Api.class).getData(doubleData);

        return executor.submit(() -> data.execute().body());
    }
}

«В настоящее время программа просто простаивает» сделайте дамп потока, чтобы увидеть, что происходит.

tgdavies 10.03.2024 04:45

И, пожалуйста, напишите минимальный работоспособный пример, иллюстрирующий проблему.

tgdavies 10.03.2024 04:46

Ваши второй и третий абзацы противоречивы. Отредактируйте, чтобы уточнить.

Basil Bourque 10.03.2024 06:14

Знаете ли вы о виртуальных потоках в Java 21+? Как правило, больше нет необходимости писать сложный асинхронный код с помощью CompletableFuture. Просто напишите свою задачу в виде простого последовательного кода, как одну задачу Runnable/Callable, и отправьте ее службе исполнителя, поддерживаемой виртуальными потоками. Значительно более простой код, легко отлаживаемый и чрезвычайно производительный.

Basil Bourque 10.03.2024 06:25

«По сути, блок кода должен действовать как синхронный код, но иметь несколько экземпляров этого блока, работающих параллельно, каждый экземпляр в своем собственном потоке». Все это просто противоречие в терминах.

user207421 10.03.2024 10:43

Ваш код никогда не заканчивается, поскольку он передает задачу однопоточному исполнителю из задачи, уже запущенной в том же однопоточном исполнителе. Подзадача никогда не может начаться, пока не завершится первая задача, но первая задача ожидает завершения, вызывая future.get(), который, конечно, блокируется на неопределенный срок, потому что она никогда не запускается.

DuncG 10.03.2024 11:13

«У меня есть блок кода, который должен выполняться параллельно...» Какое значение имеет «параллельный запуск», если вы не позволяете ему выполняться параллельно с чем-либо еще? Ваш пример неполон, но выглядит так, будто в каждом месте, где вы передаете задачу исполнителю, следующее, что вы делаете, — это ждете завершения задачи. Если это действительно так, то вы настраиваете задачу на выполнение «параллельно» с вызывающей стороной, которая ничего не делает. С таким же успехом вы могли бы просто вызвать функцию задачи.

Solomon Slow 10.03.2024 11:33
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
7
106
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Ваш код использует задачу исполнителя одного потока, которая отправляет другую задачу тому же исполнителю одного потока, а затем ожидает выхода из этой подзадачи. Это то же самое, что и в этом примере, в котором будут печататься «ОДИН» и «ТРИ» и никогда не будут печататься «ДВА», «cf» или «ЧЕТЫРЕ»:

ExecutorService executor = Executors.newSingleThreadExecutor();

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    log("ONE");
    // This subtask can never start while current task is still running:
    Future<?> cf = executor.submit(() -> log("TWO"));
    log("THREE");
    try
    {
        // Blocks forever if run from single thread executor:
        log("cf"+cf.get());
    }
    catch (Exception e)
    {
        throw new RuntimeException("It failed");
    }
    log("FOUR");
}, executor);

Подзадача может быть запущена только после выхода из основной задачи (если было напечатано «ЧЕТЫРЕ»), но зависает в ожидании cf.get().

Решение простое — вам следует обработать исходную задачу в отдельном потоке или очереди исполнителя к сервису, используемому подзадачами, или связать каждый компонент подзадач с помощью .thenRun(...).

CompletableFuture<Void> future = CompletableFuture.runAsync(subtask1, executor)
                                                  .thenRun(subtask2);

Другие вопросы по теме