Vert.x: как обработать HttpRequest с операцией блокировки

Я только начал работать с Vert.x и хотел бы понять, как правильно обрабатывать потенциально длительные (блокирующие) операции в рамках обработки REST HttpRequest. Само приложение является приложением Spring.

Вот упрощенный сервис REST, который у меня есть:

public class MainApp { 
   // instantiated by Spring
   private AlertsRestService alertsRestService;

   @PostConstruct
   public void init() {
       Vertx.vertx().deployVerticle(alertsRestService);
   }
}

public class AlertsRestService extends AbstractVerticle {
  // instantiated by Spring
    private PostgresService pgService;
    @Value("${rest.endpoint.port:8080}")
    private int restEndpointPort;

    @Override
    public void start(Future<Void> futureStartResult) {
        HttpServer server = vertx.createHttpServer();
        Router router = Router.router(vertx);
        //enable reading of the request body for all routes 
        router.route().handler(BodyHandler.create());

        router.route(HttpMethod.GET, "/allDefinitions")
              .handler(this::handleGetAllDefinitions);

        server.requestHandler(router)
            .listen(restEndpointPort, 
                result -> {
                    if (result.succeeded()) {
                        futureStartResult.complete();
                    } else {
                futureStartResult.fail(result.cause());
                    }
                }
            );
    }

  private void handleGetAllDefinitions( RoutingContext routingContext) {
        HttpServerResponse response = routingContext.response();
        Collection<AlertDefinition> allDefinitions = null;
        try {
            allDefinitions = pgService.getAllDefinitions();
        } catch (Exception e) {
            response.setStatusCode(500).end(e.getMessage());
        }           
        response.putHeader("content-type", "application/json")
            .setStatusCode(200)
            .end(Json.encodePrettily(allAlertDefinitions));
    }

}

Весенняя конфигурация:

    <bean id = "alertsRestService" class = "com.my.AlertsRestService"
      p:pgService-ref = "postgresService"
      p:restEndpointPort = "${rest.endpoint.port}"
    />
    <bean id = "mainApp" class = "com.my.MainApp"
      p:alertsRestService-ref = "alertsRestService"
    />

Теперь вопрос: как правильно обрабатывать (блокирующий) вызов моего postgresService, что может занять больше времени, если нужно получить/вернуть много предметов?

Изучив и просмотрев несколько примеров, я вижу несколько способов сделать это, но не совсем понимаю различия между ними:

Опция 1. преобразует мой AlertsRestService в Вертикаль рабочего и использует пул рабочих потоков:

public class MainApp { 
   private AlertsRestService alertsRestService;
        
   @PostConstruct
   public void init() {
       DeploymentOptions options = new DeploymentOptions().setWorker(true);
       Vertx.vertx().deployVerticle(alertsRestService, options);
   }
}

Что меня здесь смущает, так это утверждение из документации Vert.x: «Экземпляры рабочих вершин никогда не выполняются Vert.x одновременно более чем одним потоком, но могут [выполняться] разными потоками в разное время»

Означает ли это, что все HTTP-запросы к моему alertsRestService будут эффективно регулироваться для последовательного выполнения одним потоком за раз? Это не то, что я хотел бы: этот сервис не имеет состояния и должен нормально обрабатывать одновременные запросы ....

Итак, возможно, мне нужно посмотреть на следующий вариант:

Вариант 2. преобразовать мой сервис в многопоточная рабочая версия, выполнив что-то похожее на пример в документации:

public class MainApp { 
   private AlertsRestService alertsRestService;
        
   @PostConstruct
   public void init() {
     DeploymentOptions options = new DeploymentOptions()
	  	.setWorker(true)
	  	.setInstances(5) // matches the worker pool size below
	  	.setWorkerPoolName("the-specific-pool")
	  	.setWorkerPoolSize(5);
     Vertx.vertx().deployVerticle(alertsRestService, options);
   }
}

Итак, в этом примере — что именно будет происходить? Насколько я понимаю, директива «.setInstances(5)» означает, что будет создано 5 экземпляров моего «alertsRestService». Я настроил эту службу как компонент Spring с зависимостями, связанными с инфраструктурой Spring. Однако в этом случае мне кажется, что 5 экземпляров будут созданы не Spring, а Vert.x - это правда? и как я могу изменить это, чтобы вместо этого использовать Spring?

Вариант 3. использует 'обработчик блокировки' для маршрутизации. Единственное изменение в коде будет в методе AlertsRestService.start() в том, как я определяю обработчик для маршрутизатора:

boolean ordered = false;
router.route(HttpMethod.GET, "/allDefinitions")
	.blockingHandler(this::handleGetAllDefinitions, ordered);

Насколько я понимаю, установка параметра «ordered» в значение TRUE означает, что обработчик может быть вызван одновременно. Означает ли это, что этот вариант эквивалентен варианту № 2 с многопоточными рабочими вертикалями? В чем разница? что асинхронное многопоточное выполнение относится только к одному конкретному HTTP-запросу (тому, что для пути /allDefinitions), а не ко всей вершине AlertsRestService?

Вариант 4., и последний вариант, который я нашел, — использовать директиву 'выполнить блокировку()' явно для запуска только вложенного кода в рабочих потоках. Я не смог найти много примеров того, как это сделать с обработкой HTTP-запросов, поэтому ниже моя попытка - возможно, неправильная. Разница здесь только в реализации метода-обработчика, handleGetAllAlertDefinitions() - но он довольно сложный... :

private void handleGetAllAlertDefinitions(RoutingContext routingContext)    {
  vertx.executeBlocking(
      fut -> { fut.complete( sendAsyncRequestToDB(routingContext)); },
      false,
      res -> { handleAsyncResponse(res, routingContext); }
  );
}

public Collection<AlertDefinition> sendAsyncRequestToDB(RoutingContext routingContext) {
  Collection<AlertDefinition> allAlertDefinitions = new LinkedList<>();
  try {
      alertDefinitionsDao.getAllAlertDefinitions();
  } catch (Exception e) {
      routingContext.response().setStatusCode(500)
        .end(e.getMessage());
  }
  return allAlertDefinitions;
}

private void handleAsyncResponse(AsyncResult<Object> asyncResult, RoutingContext routingContext){
  if (asyncResult.succeeded()){
      try { 
          routingContext.response().putHeader("content-type", "application/json")
            .setStatusCode(200)
            .end(Json.encodePrettily(asyncResult.result()));
      } catch(EncodeException e) {
           routingContext.response().setStatusCode(500)
             .end(e.getMessage());
      }
   } else {
      routingContext.response().setStatusCode(500)
        .end(asyncResult.cause());
   }
}

Как это отличается от других вариантов? А вариант 4 обеспечивает параллельное выполнение обработчика или однопоточное, как в варианте 1?

Наконец, возвращаясь к исходному вопросу: каков наиболее подходящий вариант для обработки более длительных операций при обработке запросов REST?

Извините за такой длинный пост.... :)

Спасибо!

0
0
624
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Это большой вопрос, и я не уверен, что смогу ответить на него полностью. Но давайте попробуем:

В варианте № 1 на самом деле это означает, что вы не должны использовать ThreadLocal в своих рабочих вершинах, если вы используете более одного рабочего одного и того же типа. Использование только одного работника означает, что ваши запросы будут сериализованы.

Вариант №2 просто неверный. Вы не можете использовать setInstances с экземпляром класса, только с его именем. Однако вы правы в том, что если вы решите использовать имя класса, Vert.x создаст их экземпляры.

Вариант № 3 менее одновременен, чем использование Workers, и его не следует использовать.

Вариант № 4 executeBlocking в основном делает вариант № 3, и тоже довольно плохой.

Спасибо, Алексей! Итак, похоже, лучший вариант - № 1... Я все еще не уверен, как я могу по-настоящему масштабировать его, хотя я определенно не хочу серийный доступ к своим обработчикам... Похоже, один из способов - создать и развернуть несколько экземпляров моих вершин AlertsRestService. Если это правда, должен ли каждый экземпляр вершины создавать свой собственный HttpServer и Router, возможно, на разных портах? Или мой MainApp должен сделать это и передать один и тот же экземпляр HttpServer каждому экземпляру вершины? Кроме того, похоже, мне придется создавать экземпляры AlertsRestService через контекст Spring.getBean() ... Спасибо!

Marina 05.02.2019 01:50

Путь к настоящему масштабированию — создать конвейер для вершин, которые взаимодействуют через EventBus. Одного HTTP-сервера более чем достаточно для маршрутизации. Вы также можете следить за этой презентацией: slideshare.net/АлексейСошин/concurrency-with-vertx

Alexey Soshin 05.02.2019 09:05

Другие вопросы по теме