У меня есть многопоточное приложение Java, которое использует поля ThreadLocal для изоляции потоков друг от друга. В рамках этого приложения у меня также есть требование реализовать тайм-ауты для определенных функций для предотвращения атак DOS.
Я ищу способ отключить функцию Java, работающую в текущем потоке.
Я видел множество решений, таких как Как отключить поток, который создаст будущее для выполнения некоторого кода, запустит его в новом потоке и дождется его завершения. Я хочу, чтобы это работало наоборот.
Рассмотрим следующий код, который будет выполняться в многопоточной среде:
class MyClass {
// ThreadLocal is not private so callback can access it
ThreadLocal<AtomicInteger> counter = ThreadLocal.withInitial(AtomicInteger::new);
public void entry(Function<?, ?> callback) {
counter.get().set(10); // Calling thread performs set up
I_need_a_timeout(callback, 110); // Call a function which might take a long time
int result = counter.get().get(); // If there is no time out this will be 110
}
private void I_need_a_timeout(Function<?, ?> callback, int loop) {
while (loop-- >= 0) {
counter.get().incrementAndGet();
callback.apply(null); // This may take some time
}
}
}
Мне нужно иметь возможность завершить I_need_a_timeout, если он выполняется слишком долго, но если бы я выполнил его в будущем, у него был бы собственный поток и, следовательно, собственный экземпляр AtomicInteger, поэтому значение, считываемое вызывающим кодом, всегда было бы значение, которое я инициализирую (в данном случае 10)
Обновление: я обновил пример кода, чтобы он был ближе к моему реальному приложению. Клиент передает функцию I_need_a_timeout, возврат которой может занять любое время (или потенциально может никогда не вернуться), поэтому решения для опроса не будут работать.
@darclander, К сожалению, я не могу. Приведенный мной пример кода представляет собой (очень) упрощенную версию того, с чем я имею дело. Фактический код включает функцию обратного вызова, которой я не могу управлять и которая может ожидать во внешних системах.
Могу я спросить, почему вы хотите сделать это именно так? Почему бы не выполнить работу над пулом потоков и не отменить будущее, если это занимает слишком много времени?
@lance-java, мне нужно сделать так, чтобы переменные ThreadLocal всегда считывались из одного и того же потока. Если I_need_a_timeout был запущен из потока в пуле потоков, то counter.get() вернет разные экземпляры AtomicInteger в зависимости от того, кто его вызывает.
Похоже, ваш запрос сводится к тому, как вызвать внешний обратный вызов, ограничивая время его выполнения, не выполняя выполнение в другом потоке. Это правильно?
Отвечает ли это на ваш вопрос? Как убить поток в Java?
Чтобы это работало, операция в вызове callback.apply(null) должна проверять какую-то переменную, например Thread.isInterrupted(). В Java нет другого способа. Это сделано специально.
@Джон Боллинджер - В принципе да. Я собираюсь выполнить некоторый код, и он получит обратный вызов. Поскольку обратный вызов может заблокироваться, мне нужно ограничить его выполнение, но я не могу для этого перейти в новый поток.
Тогда тебе не повезло, @Stormcloud. Вам нужно вернуться к части «Я не могу перейти на новую тему, чтобы сделать это». Пока поток выполняет обратный вызов, он именно это и делает. Этот поток занят, поэтому, если вам нужно, чтобы его операцией управляли или контролировали его, за исключением кода, который он выполняет, то это можно сделать только через другой поток.
@JohnBollinger Я думаю, ОП понял, что отмену «можно выполнить только через другой поток». Суть вопроса в том, что другой поток должен выполнять не операцию, а отмену, в отличие от большинства примеров (цитата: «Я хочу, чтобы все работало наоборот»).




Непрерывная проверка isInterrupted() внутри функции I_need_a_timeout поможет, или определение переменной Volatile или Atomic Boolean для продолжения проверки тайм-аута также поможет.
Как сейчас написано, ваш ответ неясен. Пожалуйста, отредактируйте , чтобы добавить дополнительную информацию, которая поможет другим понять, как это относится к заданному вопросу. Более подробную информацию о том, как писать хорошие ответы, вы можете найти в справочном центре.
Проблема ОП, похоже, заключается в том, что реализация callback не находится под их контролем, поэтому они не могут размещать там вызовы isInterrupted() (или что-то еще). Они обеспокоены тем, что выполнение callback.apply() может зависнуть или занять слишком много времени.
ExecutorService тайм-аут работаетВопрос, на который вы ссылаетесь, действительно дает решение.
Я ищу способ отключить функцию Java, работающую в текущем потоке.
Синтаксис try-with-resources со службой-исполнителем блокируется до завершения фоновых задач. Таким образом, вы получите тот же эффект, что и выполнение этой задачи в том же потоке.
ThreadLocal необходим… Проблема с Callable в том, что он выполняется в другом потоке и, следовательно, извлекает неправильный объект из Threadlocal.
Любые объекты, необходимые фоновому потоку, могут быть переданы конструктору этого экземпляра задачи.
В приведенном ниже примере аргументы final Integer start , final Integer limit могут исходить от ThreadLocal vars.
Определите свою задачу как Callable. Отправьте эту задачу в службу исполнителя, указав время ожидания.
При отправке через invokeAll вы получаете список будущих объектов. Проверьте каждый Future на наличие isCancelled, чтобы определить, что тайм-аут истек.
В вашем примере AtomicInteger не нужен. Задача должна реализовать собственное целое число, а затем вернуть результат через Future.
И не надо ThreadLocal.
class Incrementation implements Callable < Integer >
{
Integer start , limit ;
// Constructor
Incrementation ( final Integer start , final Integer limit )
{
this.start = start ;
this.limit = limit ;
}
Integer call()
{
int x = this.start ;
for ( int index = 1 ; index <= this.limit ; index ++ )
{
x = x + 1 ;
}
return Integer.valueOf( x ) ;
}
}
Использование:
List < Incrementation > tasks = List.of( new Incrementation( 1 , 100 ) ;
try (
ExecutorService executorService = Executors. … ;
)
{
List < Future < Integer > > futures = executorService.invokeAll( tasks , 20 , TimeUnit.SECONDS ) ;
Future< Integer > future = futures.getFirst() ;
if ( future.isCancelled() )
{
… // Likely caused by time-out expiring.
}
}
См. этот ответ Дрю Уиллса с моими дополнениями.
Мой пример немного устроен, чтобы код был коротким. В реальном приложении ThreadLocal необходим, потому что запускается много потоков, но он содержит гораздо более сложный тип, чем AtomicInteger. Проблема с Callable заключается в том, что он выполняется в другом потоке и, следовательно, извлекает неправильный объект из Threadlocal. Связанный ответ был одним из примеров кода, который я читал, но не принял во внимание, поскольку исполняемый код выполняется не в том потоке.
Возможно, я неправильно читаю вопрос, но, похоже, это описывает именно тот подход, который ФП явно отвергает («Я хочу, чтобы это работало наоборот»).
@JohnBollinger Я пытаюсь показать, что решение, отклоненное ОП, действительно работает, и работает довольно аккуратно. Я добавил больше прозы, чтобы прояснить эту мысль более четко. Спасибо за ваш комментарий.
@Basil Bourque: В вашем решении, даже когда таймаут задания исполнителя истечет и getFirst вернется, цикл внутри Incrementation#call продолжит работать в фоновом режиме. Верно? Таким образом, сама операция не истечет по тайм-ауту. Этот поток продолжит работу, а не будет остановлен исключением ThreadDeath или чем-то в этом роде. Я не уверен, приемлемо ли это для постера.
@Basil Bourque: Спасибо, что нашли время помочь. Переменные ThreadLocal необходимы в реальном приложении (они используются функцией обратного вызова). Переосмысление проблемы — не вариант. Насколько я понимаю предлагаемое вами решение, Thread1 создаст экземпляр Incrementation, затем создаст Future для этого экземпляра, а затем будет ждать экземпляра в блоке try. Thread2 будет создан ExecutorService для запуска кода в классе Incrementation. Извините, это противоположно тому, о чем я просил: Thread1 должен выполнить приращение, а Thread2 должен опросить будущее.
Вы можете напрямую создать будущее, операция которого выполняется в текущем потоке, и отменить его из другого потока. Например.
public class CancelCurrentThread {
public static void main(String[] args) {
try(var canceler = Executors.newScheduledThreadPool(1)) {
for(long timeout: List.of(500, 1500)) {
FutureTask<String> task
= new FutureTask<>(CancelCurrentThread::operation);
System.out.println("Timeout: " + timeout + "ms");
canceler.schedule(
() -> task.cancel(true), timeout, TimeUnit.MILLISECONDS);
task.run();
try {
System.out.println("Result: " + task.get());
}
catch(CancellationException ex) {
System.out.println("Canceled");
}
catch(InterruptedException|ExecutionException e) {
System.out.println("Failed: " + e);
}
}
}
}
static String operation() throws InterruptedException {
System.out.println("performing operation in " + Thread.currentThread());
Thread.sleep(1000);
return "result";
}
}
Timeout: 500ms
performing operation in Thread[#1,main,5,main]
Canceled
Timeout: 1500ms
performing operation in Thread[#1,main,5,main]
Result: result
Важно помнить, что применяются те же ограничения, что и в случае с фьючерсами на ExecutorService. Чтобы быстро вернуться после отмены, операция должна реагировать на прерывание либо с помощью операций, которые прерываются при прерывании (например, sleep в примере), либо путем опроса Thread.interrupted() через разумные промежутки времени.
Что вы подразумеваете под «если функция выполняется слишком долго»? Не можете ли вы просто добавить в цикл while дополнительное условие, например атомарный флаг?
while (loop-- >= 0 && enableINeedATimeout)