Асинхронный вызов grpc-java в ServerInterceptor

У меня есть сервер grpc-java, и мне нужно выполнить асинхронный вызов службы аутентификации перед обработкой запроса. Я думаю, что это должно быть сделано в перехватчике, но для этого требуется синхронно возвращать Listener из interceptCall ()

class AuthInterceptor implements ServerInterceptor {

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
        ServerCall<ReqT, RespT> call,
        Metadata headers,
        ServerCallHandler<ReqT, RespT> next
    ) {
        String token = ""; //get token from headers
        authService.authorize(token).subscribe(
            ok -> // process the request
            error -> call.close(Status.UNAUTHENTICATED, headers)
        );
        // Here we need to return a Listener, but we haven't started a call yet
    }
}

Итак, возникают вопросы: как выполнить асинхронный вызов от ServerInterceptor, и если это невозможно, как правильно выполнить асинхронную аутентификацию запросов на его grpc? Я знаю, что это можно сделать прямо в сервисах grpc с помощью StreamObservers, но авторизация запросов - это сквозная проблема, и перехватчики кажутся идеальным местом для этого.

Можете ли вы обновить свой вопрос, показывая, что у вас получилось, используя DelayedListener? Мне непонятно, как соединить точки.

Abhijit Sarkar 11.03.2021 23:57
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
4
1
1 340
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вам необходимо вернуть ServerCall.Listener. Но поскольку вы не знаете, какой Listener делегировать, вы можете переопределить каждый метод в Listener, чтобы добавить обратный вызов в очередь. После завершения аутентификации опустошите очередь.

class DelayedListener<ReqT> extends Listener<ReqT> {
  private Listener<ReqT> delegate;
  private List<Runnable> events = new ArrayList<Runnable>();

  @Override public synchronized void onMessage(ReqT message) {
    if (delegate == null) {
      events.add(() -> delegate.onMessage(message));
    } else {
      delegate.onMessage(message);
    }
  }
  ...
  public synchronized void setDelegate(Listener<ReqT> delegate) {
    this.delegate = delegate;
    for (Runnable runnable : events) {
      runnable.run();
    }
    events = null;
  }
}

Хорошее решение. Будет ли он работать с ConcurrentLinkedQueue вместо синхронизации обратных вызовов и setDelegate ()?

Eugene 10.12.2018 20:01

ваше решение отлично работает и решает общую проблему. Я нашел другое возможное решение: похоже, что SimpleForwardingServerCallListener, который перенаправляет все обратные вызовы как обычно и только задерживает обратный вызов onHalfClosed(), также работает для меня. Или у этого решения проблемы с потоками?

Eugene 10.12.2018 23:56

Перед прямой пересылкой новых событий необходимо убедиться, что все события в очереди «слиты». Если вы просто использовали ConcurrentLinkedQueue + изменчивый делегат, то у вас есть одна из двух гонок: 1) this.delegate = delegate находится в начале setDelegate() и позволяет новым событиям «гоняться» перед старыми событиями, или 2) this.delegate = delegate находится в конце setDelegate(), и события могут «застрять» в очереди. Вам действительно нужна блокировка при постановке в очередь. Можно избежать блокировки, если delegate != null и при вызове runnable.run(), но это более сложно. Смотрите DelayedStream в grpc

Eric Anderson 11.12.2018 18:26

Верно, что заглушка для унарных и потоковых вызовов задерживает вызов приложения до onHalfClosed(). Однако 1) другие перехватчики по-прежнему будут работать, 2) это неверно для потоковой передачи клиентов и двунаправленных потоков, и 3) в экосистеме есть несколько альтернативных заглушек сервера, и они могут вести себя иначе.

Eric Anderson 11.12.2018 18:27

Я думаю, что могу жить с более простым решением, по крайней мере, сейчас: у нас есть только унарные вызовы, и мы управляем и сервером, и клиентом, поэтому альтернативных реализаций нет. Спасибо за развернутый ответ!

Eugene 11.12.2018 19:16

Мне непонятно, как работает DelayedListener. Вернем ли мы его без установки делегата, а обратный вызов authorize вызывает setDelegate с serverCallHandler.startCall(call, headers)?

Abhijit Sarkar 11.03.2021 22:44

@AbhijitSarkar, да. Он стоит в очереди, ожидая завершения авторизации, а затем вы опустошаете очередь с помощью setDelegate ().

Eric Anderson 13.03.2021 00:15

@EricAnderson Если serverCallHandler.startCall не вызывается до обратного вызова аутентификации, слушатель onMessage все равно вызывается? На первый взгляд кажется, что пока звонок не начнется, onMessage не должен звонить.

Abhijit Sarkar 13.03.2021 00:36

@AbhijitSarkar, вызов запускается, как только вызывается перехватчик. Тем не менее, вызов onMessage () должен, естественно, быть отложен до вызова call.request (), но идея состоит в том, чтобы буферизовать все обратные вызовы, и onMessage () был только примером.

Eric Anderson 15.03.2021 17:30

Другие вопросы по теме