Я пытаюсь НАДЕЖНО реализовать этот шаблон. Для практических целей предположим, что у нас есть что-то похожее на клон Twitter (в cassandra и nodejs).
Итак, у пользователя А 500 тысяч подписчиков. Когда пользователь А публикует твит, нам нужно написать об этом в 500 тысяч каналов/лент.
Концептуально это легко: найдите подписчиков для пользователя А, для каждого из них: напишите твит в его/ее ленту. Но это не «атомарно» (под атомарностью я имею в виду, что в какой-то момент все записи будут успешными или ни одна из них не будет успешной).
async function updateFeeds(userId, tweet) {
let followers = await fetchFollowersFor(userId)
for(let f of followers) {
await insertIntoFeed(f, tweet)
}
}
Это похоже на DoS-атаку:
async function updateFeeds(userId, tweet) {
let followers = await fetchFollowersFor(userId)
await Promise.all(followers.map(f => insertIntoFeed(f, tweet)))
}
Как мне отслеживать процесс? Как возобновить работу в случае неудачи? Я не прошу учебник или что-то в этом роде, просто укажите мне правильное направление (ключевые слова для поиска), если можете.
Promise.all()
никогда не будет тем, что вам нужно, если вы хотите отслеживать, какие каналы были успешными, а какие нет, поскольку он замыкается накоротко при первой неудаче и ничего не сообщает обо всех остальных запросах. И да, это будет выглядеть как минимум нарушением ограничения скорости, если не DOS-атакой.
@jfriend00, конечно, я отредактировал «атомную» часть.
Я бы начал с настройки брокера сообщений (например, Kafka) и записывал все твиты в тему.
Затем разработайте агент, который будет получать сообщения. Для каждого сообщения агент выбирает группу пользователей, которые являются подписчиками, но еще не имеют твита в своей ленте, и вставляет твит в ленту каждого пользователя. Когда больше нет пользователей, которые являются подписчиками, но не имеют твита, агент фиксирует сообщение и обрабатывает следующие сообщения. Причиной такого поведения является устойчивость: если по какой-либо причине агент будет перезапущен, он возобновит работу с того места, где остановился.
Настройте тему с большим количеством разделов, чтобы иметь возможность масштабировать обработку сообщений. Если у вас ОДИН раздел, у вас может быть ОДИН агент для обработки сообщений. Если у вас N разделов, вы можете иметь до N агентов для параллельной обработки сообщений.
Чтобы отслеживать общую обработку, вы можете отслеживать «задержку» в брокере сообщений, которая представляет собой количество сообщений, которые еще предстоит обработать в теме. Если оно слишком велико и слишком долго, вам придется увеличить количество агентов.
Если вы хотите отслеживать обработку определенного сообщения, агент может запросить, сколько пользователей еще предстоит обработать, прежде чем обрабатывать пакет пользователей. Затем агент может зарегистрировать это число, или предоставить его через свой API, или представить его как метрику Prometheus...
Я не знаю, знакомы ли вы с cassandra db, но что бы вы считали группой последователей в данном конкретном случае?
Партия представляет собой ограниченное количество элементов. Вы не хотите запрашивать ВСЕ элементы одновременно, если их сотни или тысячи (у вас возникнет исключение OutOfMemory). Итак, просто запросите сотню элементов, обработайте их, запросите еще сотню, обработайте их и так далее, пока ни один не будет возвращен.
Помимо ответа @christophe-quintard есть еще одна хитрость, которую следует учитывать. То есть... не использовать здесь шаблон записи в виде разветвления.
По сути, вместо того, чтобы писать большое количество твитов в 500 тысячах лент, вы просто создаете отдельную абстракцию для «популярных»/«горячих» учетных записей (ее можно подсчитать, например, на основе количества подписчиков или количества подписчиков, возможно, количества твитов в день). тоже может быть рассмотрено) и на лету строить график для своих подписчиков. Следовательно, вы выбираете «обычную» временную шкалу и присоединяете ее ко всем «популярным» для пользователя, когда она запрашивается, таким образом вы можете уменьшить объем хранимых и обрабатываемых данных.
Для «негорячих» учетных записей вы просто выполняете некоторую пакетную обработку плюс, в конечном итоге, согласованную обработку, т. е. вы отправляете сообщение какому-то фоновому процессору, который выполнит некоторую пакетную обработку (здесь есть несколько вариантов/вещей, которые следует учитывать).
верно. не могли бы вы рассказать подробнее об этом последнем абзаце? по крайней мере 2 или 3 варианта, которые следует рассмотреть здесь
Расширение @InglouriousBastard по сути повторит ответ Кристофа - вы перемещаете обработку в некоторый асинхронный конвейер, который будет обрабатывать чтение списка подписчиков и отправку разветвленных сообщений. Существуют некоторые различия в том, как вы можете это сделать, но это может во многом зависеть от того, как хранятся данные и какие технологии вы используете. Я не работал с Cassandra, поэтому мне мало что говорит о том, как данные хранятся в вашем случае.
@GuruStron привет, мастер Sql. как ты думаешь, ты мог бы помочь в этом? stackoverflow.com/questions/78552841/…
Не существует атомарного способа записи в 500-тысячные каналы, поэтому непонятно, почему вы об этом упоминаете. Что касается ошибок, просто добавьте любой канал, из-за которого вы получили ошибку, в список каналов, чтобы повторить попытку, когда вы закончите с остальными. Затем повторите их после небольшой задержки максимум N раз (где N — небольшое число), и каждая повторная попытка имеет более длительную задержку по сравнению с предыдущей.