У меня есть сценарий, в котором мне нужно поддерживать карту, которая может быть заполнена несколькими потоками, каждый из которых изменяет соответствующий список (уникальный идентификатор / ключ является именем потока), и когда размер списка для потока превышает фиксированный размер пакета, мы должны сохранять записи в БД.
Пример кода ниже:
private volatile ConcurrentHashMap<String, List<T>> instrumentMap = new ConcurrentHashMap<String, List<T>>();
private ReadWriteLock lock ;
public void addAll(List<T> entityList, String threadName) {
try {
lock.readLock().lock();
List<T> instrumentList = instrumentMap.get(threadName);
if (instrumentList == null) {
instrumentList = new ArrayList<T>(batchSize);
instrumentMap.put(threadName, instrumentList);
}
if (instrumentList.size() >= batchSize -1){
instrumentList.addAll(entityList);
recordSaver.persist(instrumentList);
instrumentList.clear();
} else {
instrumentList.addAll(entityList);
}
} finally {
lock.readLock().unlock();
}
}
Через каждые 2 минуты запускается еще один отдельный поток для сохранения всех записей в Map (чтобы убедиться, что у нас что-то сохраняется через каждые 2 минуты и размер карты не становится слишком большим), и когда он запускается, он блокирует все другие потоки (проверьте readLock и writeLock нас, где writeLock имеет более высокий приоритет)
if (//Some condition) {
Thread.sleep(//2 minutes);
aggregator.getLock().writeLock().lock();
List<T> instrumentList = instrumentMap .values().stream().flatMap(x->x.stream()).collect(Collectors.toList());
if (instrumentList.size() > 0) {
saver.persist(instrumentList);
instrumentMap .values().parallelStream().forEach(x -> x.clear());
aggregator.getLock().writeLock().unlock();
}
Это решение работает нормально почти для каждого сценария, который мы тестировали, за исключением того, что иногда мы видим, что некоторые записи пропали, то есть вообще не сохранялись, хотя они были добавлены в Map.
У меня вопрос, в чем проблема с этим кодом? ConcurrentHashMap - не лучшее решение здесь? Есть ли здесь проблемы с использованием блокировки чтения / записи? Следует ли мне использовать последовательную обработку?
Если вы используете блокировку и одновременно HashMap, вы часто делаете что-то неправильно.
@ Энди Тернер, в чем проблема с блокировкой чтения, когда каждый поток изменяет / сохраняет свой собственный индивидуальный список?
Что вы здесь предлагаете? Должен ли я использовать только блокировку записи? Я не могу использовать здесь Normal HashMap
@AndyTurner решение, которое вы предложили, хорошее. Но есть еще одна часть проблемы, как я уже упоминал, то есть «еще один отдельный поток, запускаемый через каждые 2 минуты», теперь этот поток использует writeLock, который, безусловно, отличается от вычислений атомарных операций (блокировка, используемая внутри вычислений). Что вы предлагаете, если я должен использовать блокировку записи для обоих по справедливости?




Нет, это не потокобезопасный.
Проблема в том, что вы используете блокировку читать ReadWriteLock. Это не гарантирует эксклюзивный доступ для создания обновлений. Для этого вам нужно будет использовать блокировку написать.
Но вам вообще не нужно использовать отдельную блокировку. Вы можете просто использовать метод ConcurrentHashMap.compute:
instrumentMap.compute(threadName, (tn, instrumentList) -> {
if (instrumentList == null) {
instrumentList = new ArrayList<>();
}
if (instrumentList.size() >= batchSize -1) {
instrumentList.addAll(entityList);
recordSaver.persist(instrumentList);
instrumentList.clear();
} else {
instrumentList.addAll(entityList);
}
return instrumentList;
});
Это позволяет обновлять элементы в списке, а также гарантирует эксклюзивный доступ к списку для данного ключа.
Я подозреваю, что вы можете разделить вызов compute на computeIfAbsent (чтобы добавить список, если его нет), а затем на computeIfPresent (для обновления / сохранения списка): атомарность этих двух операций здесь не требуется. Но нет никакого смысла их разделять.
Кроме того, instrumentMap почти наверняка не должен быть нестабильным. Если вы действительно не хотите переназначить его значение (с учетом этого кода, я сомневаюсь в этом), удалите volatile и сделайте его окончательным.
Точно так же сомнительны и неокончательные блокировки. Если вы продолжаете использовать замок, сделайте и его окончательным.
Спасибо за помощь, попробую.
"InstrumentList почти наверняка не должен быть изменчивым" Вы имеете в виду instrumentMap?
@ Амит, да. Опечатка.
Не может ли этот код вызвать проблемы, если функция переназначения вызывается дважды? Из Javadoc: "The default implementation may retry these steps when multiple threads attempt updates including potentially calling the remapping function multiple times."
Фактически, его вызов в вычислениях не мешает двум потокам одновременно добавлять к одному и тому же instrumentList.
@FinnVoichick, вы смотрите на Javadoc ConcurrentMap, а не на ConcurrentHashMap?
Ах да, извините, я не осознавал, что ConcurrentHashMap отменяет реализацию по умолчанию. Но гарантированно ли ConcurrentHashMap сделает это безопасно?
@FinnVoichick CHM запускает compute* атомарно.
@AndyTurner решение, которое вы предложили, хорошее. Но есть еще одна часть проблемы, как я уже упоминал, то есть «еще один отдельный поток, запускаемый через каждые 2 минуты», теперь этот поток использует writeLock, который, безусловно, отличается от вычислений атомарных операций. Что вы предлагаете, если я должен использовать блокировку записи для обоих по справедливости?
Подождите, вы сохраняете что-то под блокировкой читать? Как вы думаете, может быть, там будет правильная блокировка записи?