Я только что понял, что переслать событие от издателя субъекту можно двумя способами:
publisher.sink { subject.send() }publisher.subscribe(subject)Первый вариант стандартный, а второй мне кажется более симпатичным. Однако, похоже, за оператором subscribe стоит какой-то дьявол, который поддерживает подписку по-другому. Мне интересно, в чем причина разницы.
Ниже приведен минимальный код, иллюстрирующий разницу:
func pipeToSubject(from publisher: AnyPublisher<Void, Never>) -> AnyCancellable {
// Create an internal subject that never falls out of the method
let subject = PassthroughSubject<Void, Never>()
// Pipe the event from the publisher to the subject
return publisher.print().sink { subject.send() } // <--- the standard way to notify subject
}
// Create a never ending publisher
let publisher = Empty<Void, Never>(completeImmediately: false).eraseToAnyPublisher()
// Get the subscription
let cancellable = pipeToSubject(from: publisher)
// Cancel the subscription
cancellable.cancel()
Вот результат печати:
receive subscription: (Empty)
request unlimited
receive cancel
Это ожидаемо, поскольку предмет является внутренним и никто, кроме sink, не хранит его. Как только подписка будет отменена, sink будет освобожден, как и тема.
Теперь, когда дело доходит до другого способа пересылки к теме:
func pipeToSubject(from publisher: AnyPublisher<Void, Never>) -> AnyCancellable {
// Create an internal subject that never falls out of the method
let subject = PassthroughSubject<Void, Never>()
// Pipe the event from the publisher to the subject
return publisher.print().subscribe(subject) // <--- notify subject by `subscribe`
}
// Create a never ending publisher
let publisher = Empty<Void, Never>(completeImmediately: false).eraseToAnyPublisher()
// Get the subscription
let cancellable = pipeToSubject(from: publisher)
// Cancel the subscription
cancellable.cancel()
Вот результат печати:
receive subscription: (Empty)
Никакой отмены вообще, даже если мы отменим подписку, возвращенную subscribe(subject) — очевидно, что subject сохраняется в подписке внутри страны. Но я не понимаю, subscribe(subject) уже вернул отменяемый объект, почему он все еще сохраняет ссылку внутри? Это по определению или это баг?





Я думаю, что свидетельство того, что здесь происходит, находится в строке вывода:
request unlimited
Комбинат использует модель вытягивания. Издатель будет реагировать и запускать код только тогда, когда subscriber запросит значения.
Для каждого SubscriberPublisher создаёт Subscription. Subscriber выпускает «спрос» (формально Subscribers.Demand) через Subscription к Publisher. Когда Publisher получает этот запрос, он может начать асинхронный цикл для реагирования на новые входные данные или события жизненного цикла Subscription.
Когда sink получает Subscription, он выдает .unlimited требование со своего Subscription. Publisher создает код для активного мониторинга Subscription.
Если вы просто используете subscribe, ваша подписка инертна, она не требует ничего от Publisher и не выполняется асинхронный код, который мог бы реагировать на события жизненного цикла. Когда вы запускаете cancel, Subscription знает, что оно было отменено, но у Publisher нет возможности побежать, чтобы что-то с этим сделать.
Мы можем форсировать проблему и дать Publisher такой шанс:
func pipeToSubject(from publisher: AnyPublisher<Void, Never>) -> AnyCancellable {
// Create an internal subject that never falls out of the method
let subject = PassthroughSubject<Void, Never>()
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + .seconds(5)) {
subject.send()
}
// Pipe the event from the publisher to the subject
return publisher.print().subscribe(subject) // <--- notify subject by `subscribe`
}
// Create a never ending publisher
let publisher = Empty<Void, Never>(completeImmediately: false).eraseToAnyPublisher()
// Get the subscription
var cancellable = pipeToSubject(from: publisher)
// Cancel the subscription
cancellable.cancel()
Теперь, через 5 секунд, попытка отправить что-то дает Publisher шанс убежать. У него есть возможность заметить, что один из его Subscriptions отменен, и что-то с этим сделать. Теперь вывод:
receive subscription: (Empty)
receive cancel
Таким образом, sink генерирует спрос и заставляет Publisher создавать активный код для реагирования на этот спрос и на события жизненного цикла.
subscribe сам по себе не требует Publisher, поэтому он никогда не запускает код для этого Subscription. Вам нужно каким-то образом вызвать Publisher и дать ему возможность запуститься, прежде чем он заметит отмену и отреагирует на нее.
Я думаю, то, что вы видите, не является ошибкой. Это ожидаемое поведение, основанное на том факте, что Комбайн — это «вытягивающая» система, основанная на спросе, позволяющая оказывать противодавление, а не модель выталкивания.
Мой еще один вопрос: почему отправка темы дает издателю такую возможность? Я думал, что субъект должен был получать то, что приходит от издателя, но заставить субъекта отправить звучит наоборот.
Это интересный вопрос, и я не знаю ответа. Интересно, что Publisher имеет специальную версию subscribe только для подписки Subjects на конвейеры и что эта конкретная реализация возвращает AnyCancellable. Поскольку субъект появляется в середине конвейера, возможно, он отслеживает вышестоящих издателей и каким-то образом уведомляет их. Возможно, было бы полезно попытаться написать собственный издатель, который позволял бы вам подписываться на тему.
Приятно, приятно услышать основную концепцию вытягивающей системы. Спасибо за ответ!