Если я укажу параметр maxPublishers, тогда исходные события после первых событий maxPublishers не будут отображаться в плоском отображении. Пока хочу ограничить только параллелизм. То есть для продолжения обработки следующих событий после того, как некоторые из первых издателей плоских карт maxPublishers завершили работу.
Publishers.Merge(
addImageRequestSubject
.flatMap(maxPublishers: .max(3)) { self.compressImage($0) }
.compactMap { $0 }
.flatMap(maxPublishers: .max(3)) { self.addImage($0) },
addVideoRequestSubject
.flatMap(maxPublishers: .max(3)) { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)
Я также пытался ограничить параллелизм с помощью OperationQueue. Но maxConcurrentOperationCount вроде не действует.
Publishers.Merge(
addImageRequestSubject
.receive(on: imageCompressionQueue)
.flatMap { self.compressImage($0) }
.compactMap { $0 }
.receive(on: mediaAddingQueue)
.flatMap { self.addImage($0) },
addVideoRequestSubject
.receive(on: mediaAddingQueue)
.flatMap { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)
private lazy var imageCompressionQueue: OperationQueue = {
var queue = OperationQueue()
queue.maxConcurrentOperationCount = 3
return queue
}()
private lazy var mediaAddingQueue: OperationQueue = {
var queue = OperationQueue()
queue.maxConcurrentOperationCount = 3
return queue
}()
Издатели плоских карт выглядят так:
func compressImage(_ image: UIImage) -> Future<Data?, Never> {
Future { promise in
DispatchQueue.global().async {
let result = image.compressTo(15)?.jpegData(compressionQuality: 1)
promise(Result.success(result))
}
}
}
@matt в отношении maxPublishers игнорируются те исходные события, которые отправляются, пока загружается оператор flatMap с указанным максимальным количеством издателей. После завершения операций с flatMap я могу отправлять новые исходные события, и они будут успешно обработаны. Вот что я могу наблюдать. Но мне нужно, чтобы все исходные события обрабатывались с учетом спроса на операторы flatMap.
Хорошо, может тебе нужно добавить буфер?
@matt ты прав, буфер - это то, что я пропустил! Хотите добавить ответ?





Вы очень красиво наткнулись на вариант использования оператора .buffer. Его цель - компенсировать противодавление .flatMap путем накопления значений, которые в противном случае были бы потеряны.
Проиллюстрирую полностью искусственным примером:
class ViewController: UIViewController {
let sub = PassthroughSubject<Int,Never>()
var storage = Set<AnyCancellable>()
var timer : Timer!
override func viewDidLoad() {
super.viewDidLoad()
sub
.flatMap(maxPublishers:.max(3)) { i in
return Just(i)
.delay(for: 3, scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
.sink { print($0) }
.store(in: &storage)
var count = 0
self.timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) {
_ in
count += 1
self.sub.send(count)
}
}
}
Итак, наш издатель выдает увеличивающееся целое число каждую секунду, но наш flatMap имеет .max(3) и повторно публикует значение за 3 секунды. В результате мы начинаем пропускать значения:
1
2
3
5
6
7
9
10
11
...
Решение состоит в том, чтобы поставить буфер перед flatMap. Он должен быть достаточно большим, чтобы удерживать любые пропущенные значения достаточно долго, чтобы их можно было запросить:
sub
.buffer(size: 20, prefetch: .keepFull, whenFull: .dropOldest)
.flatMap(maxPublishers:.max(3)) { i in
В результате все числовые значения действительно поступают в sink. Конечно, в реальной жизни мы мог все равно теряем значения, если буфер недостаточно велик, чтобы компенсировать несоответствие между скоростью передачи значения от издателя и скоростью передачи значения от flatMap с противодавлением.
@matt спасибо за ваш ответ. Я попытался отложить свои фьючерсы, но все еще обрабатывает только 3 изображения. Где вы имеете в виду, что я переключаю потоки неправильным образом и должен использовать receiveOn?