У меня есть сервер grpc-java, и мне нужно выполнить асинхронный вызов службы аутентификации перед обработкой запроса. Я думаю, что это должно быть сделано в перехватчике, но для этого требуется синхронно возвращать Listener из interceptCall ()
class AuthInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next
) {
String token = ""; //get token from headers
authService.authorize(token).subscribe(
ok -> // process the request
error -> call.close(Status.UNAUTHENTICATED, headers)
);
// Here we need to return a Listener, but we haven't started a call yet
}
}
Итак, возникают вопросы: как выполнить асинхронный вызов от ServerInterceptor, и если это невозможно, как правильно выполнить асинхронную аутентификацию запросов на его grpc? Я знаю, что это можно сделать прямо в сервисах grpc с помощью StreamObservers, но авторизация запросов - это сквозная проблема, и перехватчики кажутся идеальным местом для этого.




Вам необходимо вернуть ServerCall.Listener. Но поскольку вы не знаете, какой Listener делегировать, вы можете переопределить каждый метод в Listener, чтобы добавить обратный вызов в очередь. После завершения аутентификации опустошите очередь.
class DelayedListener<ReqT> extends Listener<ReqT> {
private Listener<ReqT> delegate;
private List<Runnable> events = new ArrayList<Runnable>();
@Override public synchronized void onMessage(ReqT message) {
if (delegate == null) {
events.add(() -> delegate.onMessage(message));
} else {
delegate.onMessage(message);
}
}
...
public synchronized void setDelegate(Listener<ReqT> delegate) {
this.delegate = delegate;
for (Runnable runnable : events) {
runnable.run();
}
events = null;
}
}
Хорошее решение. Будет ли он работать с ConcurrentLinkedQueue вместо синхронизации обратных вызовов и setDelegate ()?
ваше решение отлично работает и решает общую проблему. Я нашел другое возможное решение: похоже, что SimpleForwardingServerCallListener, который перенаправляет все обратные вызовы как обычно и только задерживает обратный вызов onHalfClosed(), также работает для меня. Или у этого решения проблемы с потоками?
Перед прямой пересылкой новых событий необходимо убедиться, что все события в очереди «слиты». Если вы просто использовали ConcurrentLinkedQueue + изменчивый делегат, то у вас есть одна из двух гонок: 1) this.delegate = delegate находится в начале setDelegate() и позволяет новым событиям «гоняться» перед старыми событиями, или 2) this.delegate = delegate находится в конце setDelegate(), и события могут «застрять» в очереди. Вам действительно нужна блокировка при постановке в очередь. Можно избежать блокировки, если delegate != null и при вызове runnable.run(), но это более сложно. Смотрите DelayedStream в grpc
Верно, что заглушка для унарных и потоковых вызовов задерживает вызов приложения до onHalfClosed(). Однако 1) другие перехватчики по-прежнему будут работать, 2) это неверно для потоковой передачи клиентов и двунаправленных потоков, и 3) в экосистеме есть несколько альтернативных заглушек сервера, и они могут вести себя иначе.
Я думаю, что могу жить с более простым решением, по крайней мере, сейчас: у нас есть только унарные вызовы, и мы управляем и сервером, и клиентом, поэтому альтернативных реализаций нет. Спасибо за развернутый ответ!
Мне непонятно, как работает DelayedListener. Вернем ли мы его без установки делегата, а обратный вызов authorize вызывает setDelegate с serverCallHandler.startCall(call, headers)?
@AbhijitSarkar, да. Он стоит в очереди, ожидая завершения авторизации, а затем вы опустошаете очередь с помощью setDelegate ().
@EricAnderson Если serverCallHandler.startCall не вызывается до обратного вызова аутентификации, слушатель onMessage все равно вызывается? На первый взгляд кажется, что пока звонок не начнется, onMessage не должен звонить.
@AbhijitSarkar, вызов запускается, как только вызывается перехватчик. Тем не менее, вызов onMessage () должен, естественно, быть отложен до вызова call.request (), но идея состоит в том, чтобы буферизовать все обратные вызовы, и onMessage () был только примером.
Можете ли вы обновить свой вопрос, показывая, что у вас получилось, используя
DelayedListener? Мне непонятно, как соединить точки.