Как вы используете WebFlux для анализа потока событий, который не соответствует событиям, отправленным сервером?

Я пытаюсь использовать WebClient для работы с конечной точкой Docker /events. Однако это не соответствует контракту text/eventstream в том смысле, что каждое сообщение разделяется LF 2. Он просто отправляет его как один документ JSON, за которым следует другой.

Он также устанавливает тип MIME на application/json, а не на text/eventstream.

То, о чем я думаю, но еще не реализовано, - это создать прокси-сервер узла, который добавит требуемый перевод строки и поместит его между ними, но я надеялся избежать такого обходного пути.

1
0
56
1

Ответы 1

Вместо того, чтобы пытаться обрабатывать ServerSentEvent, просто примите его как String. Затем попытайтесь проанализировать его как JSON (игнорируя те, которые терпят неудачу, что, как я предполагаю, может произойти, но я сам не ударил его)

@PostConstruct
public void setUpStreamer() {
    final Map<String, List<String>> filters = new HashMap<>();
    filters.put("type", Collections.singletonList("service"));

    WebClient.create(daemonEndpoint)
        .get()
        .uri("/events?filters = {filters}",
           mapper.writeValueAsString(filters))
        .retrieve()
        .bodyToFlux(String.class)
        .flatMap(Mono::justOrEmpty)
        .map(s -> {
            try {
                return mapper.readValue(s, Map.class);
            } catch (IOException e) {
                log.warn("unable to parse {} as JSON", s);
                return null;
            }
        })
        .flatMap(Mono::justOrEmpty)
        .subscribe(
            event -> {
                log.trace("event = {}", event);
                refreshRoutes();
            },
            throwable -> log.error("Error on event stream: {}", throwable.getMessage(), throwable),
            () -> log.warn("event stream completed")
        );
}

Другие вопросы по теме