Как использовать MDC с parallelStream в Java и выполнить логбэк

Мне нужно зарегистрировать несколько атрибутов запроса, таких как идентификатор запроса и локаль, но при использовании parallelStream кажется, что ThreadLocal MDC теряет информацию.

Я проанализировал решение передачи контекста MDC между потоками при создании parallelStream, но оно кажется грязным, и у меня также много случаев использования parallelStream.

Есть ли другой способ сделать это?

Спасибо

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
6
0
1 840
2

Ответы 2

Единственное решение, которое я нашел, - скопировать контекст в конечную переменную вне потока и применить его для каждой итерации:

Map<String, String> contextMap = MDC.getCopyOfContextMap();
Stream.iterate(0, i -> i + 1).parallel()
    .peek(i -> MDC.setContextMap(contextMap))
    // ...logic...
    // in case you're using a filter, you need to use a predicate and combine it with a clear step:
    filter(yourPredicate.or(i -> {
                MDC.clear();
                return false;
            }))
    // clear right before terminal operation
    .peek(i -> MDC.clear())
    .findFirst();

// since the initial thread is also used within the stream and the context is cleared there, 
// we need to set it again to its initial state
MDC.setContextMap(contextMap);    

Стоимость этого решения составляет 1) несколько микросекунд на 100 итераций и 2) худшая читаемость, но оба приемлемы:

  1. Это тест, сравнивающий IntStream.range(0, 100).parallel().sum() (= базовый уровень) с тем же потоком, который использует эту логику копирования MDC:
Benchmark               Mode  Cnt   Score   Error   Units
MDC_CopyTest.baseline  thrpt    5   0,038 ± 0,005  ops/us
MDC_CopyTest.withMdc   thrpt    5   0,024 ± 0,001  ops/us
MDC_CopyTest.baseline   avgt    5  28,239 ± 1,308   us/op
MDC_CopyTest.withMdc    avgt    5  40,178 ± 0,761   us/op
  1. Чтобы улучшить читаемость, его можно обернуть в небольшой вспомогательный класс:
public class MDCCopyHelper {
    private Map<String, String> contextMap = MDC.getCopyOfContextMap();

    public void set(Object... any) {
        MDC.setContextMap(contextMap);
    }

    public void clear(Object... any) {
        MDC.clear();
    }

    public boolean clearAndFail() {
        MDC.clear();
        return false;
    }
}

Тогда потоковый код выглядит немного лучше:

MDCCopyHelper mdcHelper = new MDCCopyHelper();
Optional<Integer> findFirst = Stream.iterate(0, i -> i + 1)
        .parallel()
        .peek(mdcHelper::set)
        // ...logic...
        // filters predicates should be combined with clear step
        .filter(yourPredicate.or(mdcHelper::clearAndFail))
        // before terminal call:
        .peek(mdcHelper::clear)
        .findFirst();
mdcHelper.set();

Что делать, если внутри логики возникает исключение? Разве это не оставило бы MDC грязным? Удалось ли придумать решение с учетом исключений?

fasfsfgs 18.11.2019 16:12

Нет, я не знаю, как с этим справиться. Потоки ужасны с точки зрения обработки исключений.

rudi 09.04.2020 16:23

Мое решение - обернуть их функциональный интерфейс. Аналогичен шаблону статического прокси. Например

public static void main(String[] args) {
    System.err.println(Thread.currentThread().getName());
    String traceId = "100";
    MDC.put("id", traceId);
    System.err.println("------------------------");
    Stream.of(1, 2, 3, 4)
          .parallel()
          .forEach((num -> {
              System.err.println(Thread.currentThread().getName()+" "+ traceId.equals(MDC.get("id")));
          }));
    System.err.println("------------------------");
    Stream.of(1, 2, 3, 4)
          .parallel()
          // the key is the TraceableConsumer
          .forEach(new TraceableConsumer<>(num -> {
              System.err.println(Thread.currentThread().getName() + " " + traceId.equals(MDC.get("id")));
          }));
}

public class TraceableConsumer<T> extends MDCTraceable implements Consumer<T> {

    private final Consumer<T> target;

    public TraceableConsumer(Consumer<T> target) {
        this.target = target;
    }

    @Override
    public void accept(T t) {
        setMDC();
        target.accept(t);
    }
}

public abstract class MDCTraceable {

    private final Long id;

    private final Long userId;

    public MDCTraceable() {
        id = Optional.ofNullable(MDC.get("id")).map(Long::parseLong).orElse(0L);
        userId = Optional.ofNullable(MDC.get("userId")).map(Long::parseLong).orElse(0L);
    }

    public void setMDC(){
        MDC.put("id", id.toString());
        MDC.put("userId", userId.toString());
    }

    public void cleanMDC(){
        MDC.clear();
    }
}

Другие вопросы по теме