Java: как отключить текущий поток

У меня есть многопоточное приложение Java, которое использует поля ThreadLocal для изоляции потоков друг от друга. В рамках этого приложения у меня также есть требование реализовать тайм-ауты для определенных функций для предотвращения атак DOS.

Я ищу способ отключить функцию Java, работающую в текущем потоке.

Я видел множество решений, таких как Как отключить поток, который создаст будущее для выполнения некоторого кода, запустит его в новом потоке и дождется его завершения. Я хочу, чтобы это работало наоборот.

Рассмотрим следующий код, который будет выполняться в многопоточной среде:

class MyClass {
    // ThreadLocal is not private so callback can access it
    ThreadLocal<AtomicInteger> counter = ThreadLocal.withInitial(AtomicInteger::new);

    public void entry(Function<?, ?> callback) {
        counter.get().set(10);                      // Calling thread performs set up
        I_need_a_timeout(callback, 110);            // Call a function which might take a long time
        int result = counter.get().get();           // If there is no time out this will be 110
    }

    private void I_need_a_timeout(Function<?, ?> callback, int loop) {
        while (loop-- >= 0) {
            counter.get().incrementAndGet();
            callback.apply(null);                   // This may take some time
        }
    }
}

Мне нужно иметь возможность завершить I_need_a_timeout, если он выполняется слишком долго, но если бы я выполнил его в будущем, у него был бы собственный поток и, следовательно, собственный экземпляр AtomicInteger, поэтому значение, считываемое вызывающим кодом, всегда было бы значение, которое я инициализирую (в данном случае 10)

Обновление: я обновил пример кода, чтобы он был ближе к моему реальному приложению. Клиент передает функцию I_need_a_timeout, возврат которой может занять любое время (или потенциально может никогда не вернуться), поэтому решения для опроса не будут работать.

Что вы подразумеваете под «если функция выполняется слишком долго»? Не можете ли вы просто добавить в цикл while дополнительное условие, например атомарный флаг? while (loop-- >= 0 && enableINeedATimeout)

darclander 24.05.2024 16:16

@darclander, К сожалению, я не могу. Приведенный мной пример кода представляет собой (очень) упрощенную версию того, с чем я имею дело. Фактический код включает функцию обратного вызова, которой я не могу управлять и которая может ожидать во внешних системах.

Stormcloud 24.05.2024 16:30

Могу я спросить, почему вы хотите сделать это именно так? Почему бы не выполнить работу над пулом потоков и не отменить будущее, если это занимает слишком много времени?

lance-java 24.05.2024 17:06

@lance-java, мне нужно сделать так, чтобы переменные ThreadLocal всегда считывались из одного и того же потока. Если I_need_a_timeout был запущен из потока в пуле потоков, то counter.get() вернет разные экземпляры AtomicInteger в зависимости от того, кто его вызывает.

Stormcloud 24.05.2024 18:01

Похоже, ваш запрос сводится к тому, как вызвать внешний обратный вызов, ограничивая время его выполнения, не выполняя выполнение в другом потоке. Это правильно?

John Bollinger 24.05.2024 18:43

Отвечает ли это на ваш вопрос? Как убить поток в Java?

Progman 24.05.2024 19:48

Чтобы это работало, операция в вызове callback.apply(null) должна проверять какую-то переменную, например Thread.isInterrupted(). В Java нет другого способа. Это сделано специально.

Lii 24.05.2024 20:43

@Джон Боллинджер - В принципе да. Я собираюсь выполнить некоторый код, и он получит обратный вызов. Поскольку обратный вызов может заблокироваться, мне нужно ограничить его выполнение, но я не могу для этого перейти в новый поток.

Stormcloud 25.05.2024 18:56

Тогда тебе не повезло, @Stormcloud. Вам нужно вернуться к части «Я не могу перейти на новую тему, чтобы сделать это». Пока поток выполняет обратный вызов, он именно это и делает. Этот поток занят, поэтому, если вам нужно, чтобы его операцией управляли или контролировали его, за исключением кода, который он выполняет, то это можно сделать только через другой поток.

John Bollinger 26.05.2024 14:37

@JohnBollinger Я думаю, ОП понял, что отмену «можно выполнить только через другой поток». Суть вопроса в том, что другой поток должен выполнять не операцию, а отмену, в отличие от большинства примеров (цитата: «Я хочу, чтобы все работало наоборот»).

Holger 27.05.2024 12:31
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
3
10
146
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Непрерывная проверка isInterrupted() внутри функции I_need_a_timeout поможет, или определение переменной Volatile или Atomic Boolean для продолжения проверки тайм-аута также поможет.

Проблема ОП, похоже, заключается в том, что реализация callback не находится под их контролем, поэтому они не могут размещать там вызовы isInterrupted() (или что-то еще). Они обеспокоены тем, что выполнение callback.apply() может зависнуть или занять слишком много времени.

John Bollinger 26.05.2024 14:41

ExecutorService тайм-аут работает

Вопрос, на который вы ссылаетесь, действительно дает решение.

Я ищу способ отключить функцию Java, работающую в текущем потоке.

Синтаксис try-with-resources со службой-исполнителем блокируется до завершения фоновых задач. Таким образом, вы получите тот же эффект, что и выполнение этой задачи в том же потоке.

ThreadLocal необходим… Проблема с Callable в том, что он выполняется в другом потоке и, следовательно, извлекает неправильный объект из Threadlocal.

Любые объекты, необходимые фоновому потоку, могут быть переданы конструктору этого экземпляра задачи.

В приведенном ниже примере аргументы final Integer start , final Integer limit могут исходить от ThreadLocal vars.

Пример кода

Определите свою задачу как Callable. Отправьте эту задачу в службу исполнителя, указав время ожидания.

При отправке через invokeAll вы получаете список будущих объектов. Проверьте каждый Future на наличие isCancelled, чтобы определить, что тайм-аут истек.

В вашем примере AtomicInteger не нужен. Задача должна реализовать собственное целое число, а затем вернуть результат через Future.

И не надо ThreadLocal.

class Incrementation implements Callable < Integer >
{
    Integer start , limit ;

    // Constructor
    Incrementation ( final Integer start , final Integer limit ) 
    {
        this.start = start ;
        this.limit = limit ;
    }

    Integer call() 
    {
        int x = this.start ;
        for ( int index = 1 ; index <= this.limit ; index ++ )
        {
            x = x + 1 ;
        }
        return Integer.valueOf( x ) ;
    }

}

Использование:

List < Incrementation > tasks = List.of( new Incrementation( 1 , 100 ) ;
try (
    ExecutorService executorService = Executors. … ;
)
{
    List < Future < Integer > > futures = executorService.invokeAll( tasks , 20 , TimeUnit.SECONDS ) ;
    Future< Integer > future = futures.getFirst() ;
    if ( future.isCancelled() ) 
    {
        … // Likely caused by time-out expiring. 
    }
}

См. этот ответ Дрю Уиллса с моими дополнениями.

Мой пример немного устроен, чтобы код был коротким. В реальном приложении ThreadLocal необходим, потому что запускается много потоков, но он содержит гораздо более сложный тип, чем AtomicInteger. Проблема с Callable заключается в том, что он выполняется в другом потоке и, следовательно, извлекает неправильный объект из Threadlocal. Связанный ответ был одним из примеров кода, который я читал, но не принял во внимание, поскольку исполняемый код выполняется не в том потоке.

Stormcloud 24.05.2024 18:23

Возможно, я неправильно читаю вопрос, но, похоже, это описывает именно тот подход, который ФП явно отвергает («Я хочу, чтобы это работало наоборот»).

John Bollinger 24.05.2024 18:37

@JohnBollinger Я пытаюсь показать, что решение, отклоненное ОП, действительно работает, и работает довольно аккуратно. Я добавил больше прозы, чтобы прояснить эту мысль более четко. Спасибо за ваш комментарий.

Basil Bourque 24.05.2024 19:19

@Basil Bourque: В вашем решении, даже когда таймаут задания исполнителя истечет и getFirst вернется, цикл внутри Incrementation#call продолжит работать в фоновом режиме. Верно? Таким образом, сама операция не истечет по тайм-ауту. Этот поток продолжит работу, а не будет остановлен исключением ThreadDeath или чем-то в этом роде. Я не уверен, приемлемо ли это для постера.

Lii 24.05.2024 20:33

@Basil Bourque: Спасибо, что нашли время помочь. Переменные ThreadLocal необходимы в реальном приложении (они используются функцией обратного вызова). Переосмысление проблемы — не вариант. Насколько я понимаю предлагаемое вами решение, Thread1 создаст экземпляр Incrementation, затем создаст Future для этого экземпляра, а затем будет ждать экземпляра в блоке try. Thread2 будет создан ExecutorService для запуска кода в классе Incrementation. Извините, это противоположно тому, о чем я просил: Thread1 должен выполнить приращение, а Thread2 должен опросить будущее.

Stormcloud 24.05.2024 22:42
Ответ принят как подходящий

Вы можете напрямую создать будущее, операция которого выполняется в текущем потоке, и отменить его из другого потока. Например.

public class CancelCurrentThread {
    public static void main(String[] args) {
        try(var canceler = Executors.newScheduledThreadPool(1)) {
            for(long timeout: List.of(500, 1500)) {
                FutureTask<String> task
                    = new FutureTask<>(CancelCurrentThread::operation);
                System.out.println("Timeout: " + timeout + "ms");
                canceler.schedule(
                    () -> task.cancel(true), timeout, TimeUnit.MILLISECONDS);
                task.run();
                try {
                    System.out.println("Result: " + task.get());
                }
                catch(CancellationException ex) {
                    System.out.println("Canceled");
                }
                catch(InterruptedException|ExecutionException e) {
                    System.out.println("Failed: " + e);
                }
            }
        }
    }
  
    static String operation() throws InterruptedException {
        System.out.println("performing operation in " + Thread.currentThread());
        Thread.sleep(1000);
        return "result";
    }
}
Timeout: 500ms
performing operation in Thread[#1,main,5,main]
Canceled
Timeout: 1500ms
performing operation in Thread[#1,main,5,main]
Result: result

Важно помнить, что применяются те же ограничения, что и в случае с фьючерсами на ExecutorService. Чтобы быстро вернуться после отмены, операция должна реагировать на прерывание либо с помощью операций, которые прерываются при прерывании (например, sleep в примере), либо путем опроса Thread.interrupted() через разумные промежутки времени.

Другие вопросы по теме