Как ограничить параллелизм с FlatMap в Combine, при котором все исходные события все еще обрабатываются?

Если я укажу параметр maxPublishers, тогда исходные события после первых событий maxPublishers не будут отображаться в плоском отображении. Пока хочу ограничить только параллелизм. То есть для продолжения обработки следующих событий после того, как некоторые из первых издателей плоских карт maxPublishers завершили работу.

Publishers.Merge(
    addImageRequestSubject
        .flatMap(maxPublishers: .max(3)) { self.compressImage($0) }
        .compactMap { $0 }
        .flatMap(maxPublishers: .max(3)) { self.addImage($0) },
    addVideoRequestSubject
        .flatMap(maxPublishers: .max(3)) { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)

Я также пытался ограничить параллелизм с помощью OperationQueue. Но maxConcurrentOperationCount вроде не действует.

Publishers.Merge(
    addImageRequestSubject
        .receive(on: imageCompressionQueue)
        .flatMap { self.compressImage($0) }
        .compactMap { $0 }
        .receive(on: mediaAddingQueue)
        .flatMap { self.addImage($0) },
    addVideoRequestSubject
        .receive(on: mediaAddingQueue)
        .flatMap { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)

private lazy var imageCompressionQueue: OperationQueue = {
    var queue = OperationQueue()
    queue.maxConcurrentOperationCount = 3

    return queue
}()

private lazy var mediaAddingQueue: OperationQueue = {
    var queue = OperationQueue()
    queue.maxConcurrentOperationCount = 3

    return queue
}()

Издатели плоских карт выглядят так:

func compressImage(_ image: UIImage) -> Future<Data?, Never> {
    Future { promise in
        DispatchQueue.global().async {
            let result = image.compressTo(15)?.jpegData(compressionQuality: 1)
            promise(Result.success(result))
        }
    }
}

@matt спасибо за ваш ответ. Я попытался отложить свои фьючерсы, но все еще обрабатывает только 3 изображения. Где вы имеете в виду, что я переключаю потоки неправильным образом и должен использовать receiveOn?

Ihor Vovk 07.04.2021 13:43

@matt в отношении maxPublishers игнорируются те исходные события, которые отправляются, пока загружается оператор flatMap с указанным максимальным количеством издателей. После завершения операций с flatMap я могу отправлять новые исходные события, и они будут успешно обработаны. Вот что я могу наблюдать. Но мне нужно, чтобы все исходные события обрабатывались с учетом спроса на операторы flatMap.

Ihor Vovk 08.04.2021 11:14

Хорошо, может тебе нужно добавить буфер?

matt 08.04.2021 11:15

@matt ты прав, буфер - это то, что я пропустил! Хотите добавить ответ?

Ihor Vovk 08.04.2021 13:47
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
4
45
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы очень красиво наткнулись на вариант использования оператора .buffer. Его цель - компенсировать противодавление .flatMap путем накопления значений, которые в противном случае были бы потеряны.

Проиллюстрирую полностью искусственным примером:

class ViewController: UIViewController {
    let sub = PassthroughSubject<Int,Never>()
    var storage = Set<AnyCancellable>()
    var timer : Timer!
    override func viewDidLoad() {
        super.viewDidLoad()
        sub
            .flatMap(maxPublishers:.max(3)) { i in
                return Just(i)
                    .delay(for: 3, scheduler: DispatchQueue.main)
                    .eraseToAnyPublisher()
            }
            .sink { print($0) }
            .store(in: &storage)
        
        var count = 0
        self.timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) { 
            _ in
            count += 1
            self.sub.send(count)
        }
    }
}

Итак, наш издатель выдает увеличивающееся целое число каждую секунду, но наш flatMap имеет .max(3) и повторно публикует значение за 3 секунды. В результате мы начинаем пропускать значения:

1
2
3
5
6
7
9
10
11
...

Решение состоит в том, чтобы поставить буфер перед flatMap. Он должен быть достаточно большим, чтобы удерживать любые пропущенные значения достаточно долго, чтобы их можно было запросить:

        sub
            .buffer(size: 20, prefetch: .keepFull, whenFull: .dropOldest)
            .flatMap(maxPublishers:.max(3)) { i in

В результате все числовые значения действительно поступают в sink. Конечно, в реальной жизни мы мог все равно теряем значения, если буфер недостаточно велик, чтобы компенсировать несоответствие между скоростью передачи значения от издателя и скоростью передачи значения от flatMap с противодавлением.

Другие вопросы по теме