Является ли следующий код безопасным для потоков

У меня есть сценарий, в котором мне нужно поддерживать карту, которая может быть заполнена несколькими потоками, каждый из которых изменяет соответствующий список (уникальный идентификатор / ключ является именем потока), и когда размер списка для потока превышает фиксированный размер пакета, мы должны сохранять записи в БД.

Пример кода ниже:

private volatile ConcurrentHashMap<String, List<T>>  instrumentMap = new ConcurrentHashMap<String, List<T>>();
private ReadWriteLock lock ;

public void addAll(List<T> entityList, String threadName) {
    try {
        lock.readLock().lock();
        List<T> instrumentList = instrumentMap.get(threadName);
        if (instrumentList == null) {
            instrumentList = new ArrayList<T>(batchSize);
            instrumentMap.put(threadName, instrumentList);
        }

        if (instrumentList.size() >= batchSize -1){
            instrumentList.addAll(entityList);
            recordSaver.persist(instrumentList); 
            instrumentList.clear();
        } else {
            instrumentList.addAll(entityList);  
        }
    } finally {
        lock.readLock().unlock();
    }

}

Через каждые 2 минуты запускается еще один отдельный поток для сохранения всех записей в Map (чтобы убедиться, что у нас что-то сохраняется через каждые 2 минуты и размер карты не становится слишком большим), и когда он запускается, он блокирует все другие потоки (проверьте readLock и writeLock нас, где writeLock имеет более высокий приоритет)

if (//Some condition) {
                    Thread.sleep(//2 minutes);
                    aggregator.getLock().writeLock().lock();
                    List<T> instrumentList = instrumentMap .values().stream().flatMap(x->x.stream()).collect(Collectors.toList());
                    if (instrumentList.size() > 0) {

                        saver.persist(instrumentList);
                        instrumentMap .values().parallelStream().forEach(x -> x.clear());
                    aggregator.getLock().writeLock().unlock();
                }

Это решение работает нормально почти для каждого сценария, который мы тестировали, за исключением того, что иногда мы видим, что некоторые записи пропали, то есть вообще не сохранялись, хотя они были добавлены в Map.

У меня вопрос, в чем проблема с этим кодом? ConcurrentHashMap - не лучшее решение здесь? Есть ли здесь проблемы с использованием блокировки чтения / записи? Следует ли мне использовать последовательную обработку?

Подождите, вы сохраняете что-то под блокировкой читать? Как вы думаете, может быть, там будет правильная блокировка записи?

Andy Turner 07.08.2018 08:33

Если вы используете блокировку и одновременно HashMap, вы часто делаете что-то неправильно.

Andy Turner 07.08.2018 08:34

@ Энди Тернер, в чем проблема с блокировкой чтения, когда каждый поток изменяет / сохраняет свой собственный индивидуальный список?

Amit 07.08.2018 08:36

Что вы здесь предлагаете? Должен ли я использовать только блокировку записи? Я не могу использовать здесь Normal HashMap

Amit 07.08.2018 08:37

@AndyTurner решение, которое вы предложили, хорошее. Но есть еще одна часть проблемы, как я уже упоминал, то есть «еще один отдельный поток, запускаемый через каждые 2 минуты», теперь этот поток использует writeLock, который, безусловно, отличается от вычислений атомарных операций (блокировка, используемая внутри вычислений). Что вы предлагаете, если я должен использовать блокировку записи для обоих по справедливости?

Amit 10.08.2018 09:41
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
3
5
104
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Нет, это не потокобезопасный.

Проблема в том, что вы используете блокировку читать ReadWriteLock. Это не гарантирует эксклюзивный доступ для создания обновлений. Для этого вам нужно будет использовать блокировку написать.

Но вам вообще не нужно использовать отдельную блокировку. Вы можете просто использовать метод ConcurrentHashMap.compute:

instrumentMap.compute(threadName, (tn, instrumentList) -> {
  if (instrumentList == null) {
    instrumentList = new ArrayList<>();
  }

  if (instrumentList.size() >= batchSize -1) {
    instrumentList.addAll(entityList); 
    recordSaver.persist(instrumentList); 
    instrumentList.clear();
  } else {
    instrumentList.addAll(entityList);
  }

  return instrumentList;
});

Это позволяет обновлять элементы в списке, а также гарантирует эксклюзивный доступ к списку для данного ключа.

Я подозреваю, что вы можете разделить вызов compute на computeIfAbsent (чтобы добавить список, если его нет), а затем на computeIfPresent (для обновления / сохранения списка): атомарность этих двух операций здесь не требуется. Но нет никакого смысла их разделять.


Кроме того, instrumentMap почти наверняка не должен быть нестабильным. Если вы действительно не хотите переназначить его значение (с учетом этого кода, я сомневаюсь в этом), удалите volatile и сделайте его окончательным.

Точно так же сомнительны и неокончательные блокировки. Если вы продолжаете использовать замок, сделайте и его окончательным.

Спасибо за помощь, попробую.

Amit 07.08.2018 08:47

"InstrumentList почти наверняка не должен быть изменчивым" Вы имеете в виду instrumentMap?

Amit 07.08.2018 08:56

@ Амит, да. Опечатка.

Andy Turner 07.08.2018 08:56

Не может ли этот код вызвать проблемы, если функция переназначения вызывается дважды? Из Javadoc: "The default implementation may retry these steps when multiple threads attempt updates including potentially calling the remapping function multiple times."

Finn 07.08.2018 15:05

Фактически, его вызов в вычислениях не мешает двум потокам одновременно добавлять к одному и тому же instrumentList.

Finn 07.08.2018 15:06

@FinnVoichick, вы смотрите на Javadoc ConcurrentMap, а не на ConcurrentHashMap?

Andy Turner 07.08.2018 15:09

Ах да, извините, я не осознавал, что ConcurrentHashMap отменяет реализацию по умолчанию. Но гарантированно ли ConcurrentHashMap сделает это безопасно?

Finn 07.08.2018 15:17

@FinnVoichick CHM запускает compute* атомарно.

Andy Turner 07.08.2018 15:53

@AndyTurner решение, которое вы предложили, хорошее. Но есть еще одна часть проблемы, как я уже упоминал, то есть «еще один отдельный поток, запускаемый через каждые 2 минуты», теперь этот поток использует writeLock, который, безусловно, отличается от вычислений атомарных операций. Что вы предлагаете, если я должен использовать блокировку записи для обоих по справедливости?

Amit 10.08.2018 09:42

Другие вопросы по теме