Я пытаюсь проверить, что mongodb ведет себя так, как я ожидаю, и заблокировать документ, когда он изменяется в транзакции. я использую
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.1</version>
и Spring-данные с набором реплик
mongo 7.0.5 community
Моя цель состоит в том, чтобы сделать следующее:
Я использую этот код:
@Getter
@Setter
class DummyDocument {
@MongoId
private String uuid;
private String lock;
private List<String> simpleCollection = new ArrayList<>();
public DummyDocument(String uuid) {
this.uuid = uuid;
}
}
public class ConcurrentDocumentAccess {
@Autowired
private MongoClient client;
@Autowired
private MongoTemplate mongoTemplate;
@After
public void cleanup() {
DummyDocument doc = mongoTemplate.findById("test", DummyDocument.class);
mongoTemplate.remove(doc);
}
@Test
public void documentLockingTest() {
// Create a document
DummyDocument doc = new DummyDocument("test");
mongoTemplate.save(doc);
// A query to find the doc
Query findDoc = new Query().addCriteria(Criteria.where("uuid").is("test"));
// An update to change the lock value in the doc
Update lock = new Update().set("lock", "locked");
// An update of the doc
Update updateCollection = new Update().addToSet("simpleCollection", "something");
try (ClientSession session = client.startSession()) {
session.startTransaction();
// Acquire lock on doc by writing on it
UpdateResult lockResult = mongoTemplate.withSession(session).updateFirst(findDoc, lock,
DummyDocument.class);
assertThat(lockResult.getModifiedCount() == 1L).isTrue();
// Try to update the collection out of the transaction
UpdateResult changeResult = mongoTemplate.updateFirst(findDoc, updateCollection, DummyDocument.class);
// assertThat(changeResult.getModifiedCount() == 0L).isTrue();
session.commitTransaction();
} catch (MongoCommandException e) {
e.printStackTrace();
}
DummyDocument updatedDoc = mongoTemplate.findOne(findDoc, DummyDocument.class);
assertThat(updatedDoc.getLock()).isEqualTo("locked");
assertThat(updatedDoc.getSimpleCollection()).doesNotContain("something");
}
}
Что я наблюдаю, так это то, что запрос вне транзакции принудительно выполняется в документе, затем транзакция зависает до тех пор, пока не произойдет сбой с ответом: {"errorLabels": ["TransientTransactionError"], "ok": 0.0, "errmsg": "Transaction with { txnNumber: 2 } has been aborted.", "code": 251, "codeName": "NoSuchTransaction"
Я что-то делаю неправильно или это ожидаемое поведение?
Что ж, я провел еще один тест, и поведение все еще остается загадочным. Когда я использую сеанс для обновления документа, поведение меняется. Это код, который используется:
// Create a document
DummyDocument doc = new DummyDocument("test");
mongoTemplate.save(doc);
// A query to find the doc
Query findDoc = new Query().addCriteria(Criteria.where("uuid").is("test"));
// findDoc.getQueryObject().toBsonDocument();
// An update to change the lock value in the doc
Update lock1 = new Update().set("lock", "lock1");
Update lock2 = new Update().set("lock", "lock2");
// An update of the doc
Update update1 = new Update().addToSet("simpleCollection", "update1");
Update update2 = new Update().addToSet("simpleCollection", "update2");
// Create a first session
ClientSession session1 = client.startSession();
session1.startTransaction();
// Create a second session
ClientSession session2 = client.startSession();
session2.startTransaction();
// Acquire lock on doc by writing on it
try {
UpdateResult lock1Result = mongoTemplate.withSession(session1).updateFirst(findDoc, lock1,
DummyDocument.class);
assertThat(lock1Result.getMatchedCount()).isOne();
assertThat(lock1Result.getModifiedCount()).isOne();
} catch (MongoCommandException e) {
e.printStackTrace();
} catch (UncategorizedMongoDbException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
// Try to update the collection out of the transaction
UpdateResult changeResult = null;
try {
changeResult = mongoTemplate.withSession(session2).updateFirst(findDoc, update1, DummyDocument.class);
} catch (MongoCommandException e) {
e.printStackTrace();
} catch (UncategorizedMongoDbException e) {
// This catch a write error
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
assertThat(changeResult).isNull();
// Try to commit the change on the collection before 1st session commit
try {
session2.commitTransaction();
} catch (MongoCommandException e) {
// This catch a "NoSuchTransaction" error
e.printStackTrace();
} catch (UncategorizedMongoDbException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
// Uncomment this to timeout session 1
// Thread.sleep(120005L);
// Try to commit the change oof session 1, this should work
try {
session1.commitTransaction();
} catch (MongoCommandException e) {
e.printStackTrace();
} catch (UncategorizedMongoDbException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
session1.close();
session2.close();
DummyDocument updatedDoc = mongoTemplate.findOne(findDoc, DummyDocument.class);
assertThat(updatedDoc.getLock()).isEqualTo("lock1");
assertThat(updatedDoc.getSimpleCollection()).doesNotContain("update1");
Здесь обновление документа session2
отклонено из-за конфликта записи, чего я и ожидал. Так почему же эти два метода ведут себя по-разному? Могу ли я настроить клиент так, чтобы первая реализация вела себя как вторая?
Вот логи теста:
{"t":{"$date":"2024-07-03T19:36:01.127+00:00"},"s":"I", "c":"WRITE", "id":51803, "ctx":"conn218","msg":"Slow query","attr":{"type":"update","ns":"test.dummyDocument","command":{"q":{"_id":"test"},"u":{"_id":"test","simpleCollection":[],"_class":"com.test.concurrency.DummyDocument"},"multi":false,"upsert":true},"planSummary":"IDHACK","totalOplogSlotDurationMicros":142,"keysExamined":0,"docsExamined":0,"nMatched":0,"nModified":0,"nUpserted":1,"keysInserted":1,"numYields":0,"locks":{"ParallelBatchWriterMode":{"acquireCount":{"r":2}},"FeatureCompatibilityVersion":{"acquireCount":{"w":2}},"ReplicationStateTransition":{"acquireCount":{"w":2}},"Global":{"acquireCount":{"w":2}},"Database":{"acquireCount":{"w":2}},"Collection":{"acquireCount":{"w":2}}},"flowControl":{"acquireCount":1},"readConcern":{"provenance":"implicitDefault"},"storage":{},"cpuNanos":362193,"remote":"172.18.0.1:34942","durationMillis":0}}
{"t":{"$date":"2024-07-03T19:36:01.220+00:00"},"s":"I", "c":"WRITE", "id":51803, "ctx":"conn218","msg":"Slow query","attr":{"type":"update","ns":"test.dummyDocument","command":{"q":{"_id":"test"},"u":{"$set":{"lock":"lock1"}},"multi":false,"upsert":false},"planSummary":"IDHACK","keysExamined":1,"docsExamined":1,"nMatched":1,"nModified":1,"nUpserted":0,"keysInserted":0,"keysDeleted":0,"numYields":0,"locks":{"Database":{"acquireCount":{"w":1}},"Collection":{"acquireCount":{"w":1}}},"flowControl":{"acquireCount":1},"readConcern":{"level":"local","provenance":"implicitDefault"},"storage":{},"cpuNanos":316767,"remote":"172.18.0.1:34942","durationMillis":0}}
{"t":{"$date":"2024-07-03T19:36:01.281+00:00"},"s":"I", "c":"WRITE", "id":51803, "ctx":"conn218","msg":"Slow query","attr":{"type":"update","ns":"test.dummyDocument","command":{"q":{"_id":"test"},"u":{"$addToSet":{"simpleCollection":"update1"}},"multi":false,"upsert":false},"planSummary":"IDHACK","numYields":0,"ok":0,"errMsg":"Caused by :: Write conflict during plan execution and yielding is disabled. :: Please retry your operation or multi-document transaction.","errName":"WriteConflict","errCode":112,"locks":{"Database":{"acquireCount":{"w":1}},"Collection":{"acquireCount":{"w":1}}},"flowControl":{"acquireCount":2},"readConcern":{"level":"local","provenance":"implicitDefault"},"storage":{},"cpuNanos":391742,"remote":"172.18.0.1:34942","durationMillis":0}}
Да, это ожидаемое поведение, описанное в https://www.mongodb.com/docs/manual/core/transactions/#transactions-and-atomicity, в частности:
Пока транзакция не будет зафиксирована, изменения данных, внесенные в транзакцию, не будут видны за пределами транзакции.
Mongodb блокирует документы, на самом деле существует 6 различных типов блокировок https://www.mongodb.com/docs/manual/faq/concurrency/, но это не имеет ничего общего с транзакциями с несколькими документами. Блокировки используются внутри для обработки одновременных обновлений на уровне документа.
Ошибка, с которой вы столкнулись в первом сценарии «транзакция с одновременной нетранзакционной записью», связана с тем, как mongodb обрабатывает нетранзакционные записи. Он неявно запускает транзакцию для отдельного обновления, поэтому это всегда транзакция на уровне WireTiger.
WT, в свою очередь, устанавливает исключительную блокировку для всех документов в транзакции и выдает ошибку WriteConflict при любой попытке записи в заблокированный документ. Для нетранзакционных записей предполагается, что это временная блокировка, и mongodb пытается разрешить ее с помощью повторных операций записи.
Команда update
вызывает writeConflictRetry
namespace mongo {
UpdateResult update(OperationContext* opCtx,
CollectionAcquisition& coll,
const UpdateRequest& request) {
.....
// The update stage does not create its own collection. As such, if the update is
// an upsert, create the collection that the update stage inserts into beforehand.
writeConflictRetry(opCtx, "createCollection", nsString, [&] {
и writeConflictRetry
продолжает попытки обновить заблокированный документ — запускает транзакцию WT, получает writeConflict, откатывает транзакцию и повторяет попытку:
/**
* Runs the argument function f as many times as needed for f to complete or throw an exception
* other than WriteConflictException or TemporarilyUnavailableException. For each time f throws
* one of these exceptions, logs the error, waits a spell, cleans up, and then tries f again.
* Imposes no upper limit on the number of times to re-try f after a WriteConflictException, so any
* required timeout behavior must be enforced within f. When retrying a
* TemporarilyUnavailableException, f is called a finite number of times before we eventually let
* the error escape.
*
* If we are already in a WriteUnitOfWork, we assume that we are being called within a
* WriteConflictException retry loop up the call stack. Hence, this retry loop is reduced to an
* invocation of the argument function f without any exception handling and retry logic.
*/
template <typename F>
auto writeConflictRetry(OperationContext* opCtx,
Итак, в первом сценарии происходит взаимоблокировка из-за одновременной записи в синхронном приложении:
hangs
повторная попытка записи в заблокированный документЧто произойдет с асинхронными обновлениями (например, обновлениями из разных потоков или клиентов), так это то, что нетранзакционное обновление все равно будет ждать, но транзакция будет успешно зафиксирована без истечения срока действия:
hangs
повторная попытка записи в заблокированный документПри явно запущенной транзакции во втором сценарии «транзакция с параллельной транзакцией» mongo не скрывает ошибку WT writeConflict и сразу же передает ее приложению.
Я предполагаю, что «обновление документа отклонено» относится к session1.commitTransaction()
— это потому, что session2.commitTransaction()
произошло первым. Имейте в виду, что если вы используете асинхронный драйвер, фактический порядок операций в сети может отличаться от порядка операций в коде. Вам необходимо проверить журналы базы данных, чтобы подтвердить, в каком порядке команда commitTransaction
была получена базой данных. Вам нужно будет увеличить уровень детализации журнала до 1, чтобы записать эти журналы.
На самом деле нет, это транзакция session2 терпит неудачу, это подтверждается утверждением в конце теста и журналами, которые я добавил к вопросу.
Сотрите это. Вы правы, он блокируется на всю длину транзакции. Видимо, это распространенное заблуждение. С другой стороны, код никогда не лжет — смотрите мое обновление с некоторыми ссылками. По сути, если вы хотите самостоятельно обрабатывать конфликты записи и реализовывать повторные попытки, запустите транзакцию явно. В противном случае вы будете зависеть от реализации логики повторных попыток в mongodb.
Спасибо за ответ, отправленный вами список операций имеет смысл. Меня немного беспокоит такое поведение повторных попыток, потому что кажется, что оно действительно может снизить производительность и обновления потока данных. Я попытался установить для параметра retryWrite
значение false в строке подключения, но это не сработало. Я вижу, что в коде cpp есть retryLimit, но я не знаю, смогу ли я его как-то настроить. В любом случае я оценил ссылки на реальный код.
Спасибо за попытку помочь. Я обновил свой вопрос вторым тестом. Это показывает, что один и тот же порядок операций ведет себя по-разному в зависимости от того, как я использую клиент. Транзакция блокирует документ со второй реализацией, и последующие обновления из других транзакций отклоняются. Ты понимаешь это ?