Почему .async в параллельной очереди в цикле for не ведет себя так же, как DispatchQueue.concurrentPerform?

import Dispatch

class SynchronizedArray<T> {
    private var array: [T] = []
    private let accessQueue = DispatchQueue(label: "SynchronizedArrayAccess", attributes: .concurrent)
    
    var get: [T] {
        accessQueue.sync {
            array
        }
    }
    
    func append(newElement: T) {
        accessQueue.async(flags: .barrier) {
            self.array.append(newElement)
        }
    }
}

Если я запускаю следующий код, 10 000 элементов добавляются к массиву, как и ожидалось, даже если я читаю одновременно:

DispatchQueue.concurrentPerform(iterations: 10000) { i in
    _ threadSafeArray.get
    threadSafeArray.append(newElement: i)
}

Но когда я делаю это, только это никогда не приближается к добавлению 10 000 элементов (в последний раз, когда я запускал его, на моем компьютере было добавлено только 92 элемента).

let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
for i in 0..<10000 {
    concurrent.async {
        _ = threadSafeArray.get
        threadSafeArray.append(newElement: i)
    }
}

Почему первое работает, а второе не работает?

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
334
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Кажется, я столкнулся с взрывом потоков, когда было создано 82 потока, а в приложении закончились потоки, решение, которое я использовал, — это семафор для ограничения количества потоков:

let semaphore = DispatchSemaphore(value: 8)
let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
for i in 0..<10000 {
    concurrent.async {
        _ = threadSafeArray.get
        threadSafeArray.append(newElement: i)
        semaphore.signal()
    }
    
    semaphore.wait()
}

Обновлено: ответ Роба объясняет некоторые проблемы с приведенным выше кодом.

Ответ принят как подходящий

Хорошо, что вы нашли решение для взрыва потока. См. обсуждение темы «Взрыв потока» на WWDC 2015 Создание отзывчивых и эффективных приложений с помощью GCD и снова на WWDC 2016 Параллельное программирование с помощью GCD в Swift 3.

При этом DispatchSemaphore в настоящее время является чем-то вроде анти-паттерна, учитывая наличие concurrentPerform (или OperationQueue с его maxConcurrentOperationCount или комбинацией с его maxPublishers). Все они управляют степенью параллелизма более элегантно, чем диспетчерские семафоры.

Все, что было сказано, несколько замечаний по вашему шаблону семафора:

  1. При использовании этого шаблона DispatchSemaphore вы обычно ставите wait перед concurrent.async { ... } (потому что, как написано, вы получаете девять параллельных операций, а не восемь, что немного вводит в заблуждение).

  2. Более глубокая проблема заключается в том, что вы уменьшили проблему подсчета, но она все еще сохраняется. Учитывать:

    let threadSafeArray = SynchronizedArray<Int>()
    
    let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
    let semaphore = DispatchSemaphore(value: 8)
    for i in 0..<10000 {
        semaphore.wait()
        concurrent.async {
            threadSafeArray.append(newElement: i)
            semaphore.signal()
        }
    }
    
    print(threadSafeArray.get.count)
    

    Когда вы выходите из цикла for, у вас все еще может быть до восьми асинхронных задач в concurrent, а count (несинхронизированных по отношению к очереди concurrent) все еще может быть меньше 10 000. Вы должны добавить еще один concurrent.async(flags: .barrier) { ... }, который просто добавляет второй уровень синхронизации. Например.

    let semaphore = DispatchSemaphore(value: 8)
    for i in 0..<10000 {
        semaphore.wait()
        concurrent.async {
            threadSafeArray.append(newElement: i)
            semaphore.signal()
        }
    }
    
    concurrent.async(flags: .barrier) {
        print(threadSafeArray.get.count)
    }
    

    Или вы можете использовать DispatchGroup, классический механизм для определения окончания серии асинхронно отправленных блоков:

    let semaphore = DispatchSemaphore(value: 8)
    let group = DispatchGroup()
    
    for i in 0..<10000 {
        semaphore.wait()
        concurrent.async(group: group) {
            threadSafeArray.append(newElement: i)
            semaphore.signal()
        }
    }
    
    group.notify(queue: .main) {
        print(threadSafeArray.get.count)
    }
    

    Использование concurrentPerform устраняет необходимость в любом из этих шаблонов, потому что он не будет продолжать выполнение, пока не будут выполнены все параллельные задачи. (Это также автоматически оптимизирует степень параллелизма для количества ядер на вашем устройстве.)

  3. FWIW, гораздо лучшая альтернатива SynchronizedArray — вообще не выставлять базовый массив, а просто реализовывать любые методы, которые вы хотите выставить, интегрируя необходимую синхронизацию. Это делает место вызова более чистым и решает многие проблемы.

    Например, предположив, что вы хотите предоставить оператор индекса и переменную count, вы должны сделать:

    class SynchronizedArray<T> {
        private var array: [T]
        private let accessQueue = DispatchQueue(label: "com.domain.app.reader-writer", attributes: .concurrent)
    
        init(_ array: [T] = []) {
            self.array = array
        }
    
        subscript(index: Int) -> T {
            get { reader { $0[index] } }
            set { writer { $0[index] = newValue } }
        }
    
        var count: Int {
            reader { $0.count }
        }
    
        func append(newElement: T) {
            writer { $0.append(newElement) }
        }
    
        func reader<U>(_ block: ([T]) throws -> U) rethrows -> U {
            try accessQueue.sync { try block(array) }
        }
    
        func writer(_ block: @escaping (inout [T]) -> Void) {
            accessQueue.async(flags: .barrier) { block(&self.array) }
        }
    }
    

    Это решает множество проблем. Например, теперь вы можете сделать:

    print(threadSafeArray.count) // get the count
    print(threadSafeArray[500])  // get the 500th item
    

    Теперь вы также можете делать такие вещи, как:

    let average = threadSafeArray.reader { array -> Double in
        let sum = array.reduce(0, +)
        return Double(sum) / Double(array.count)
    }
    

    Но суть в том, что при работе с коллекциями (или любыми изменяемыми объектами) вы неизменно не хотите раскрывать сам изменяемый объект, а скорее пишете свои собственные синхронизированные методы для общих операций (индексы, count, removeAll и т. д.), и, возможно, также предоставить интерфейс чтения/записи для тех случаев, когда разработчику приложения может потребоваться более широкий механизм синхронизации.

    (FWIW, изменения в этом SynchronizedArray применяются как к семафору, так и к сценариям concurrentPerform; просто в этом случае семафор просто проявляет проблему.)

  4. Излишне говорить, что вы, как правило, должны выполнять больше работы в каждом потоке, потому что столь скромные, как накладные расходы на переключение контекста, здесь, вероятно, достаточно, чтобы компенсировать любые преимущества, полученные от параллельной обработки. (Но я понимаю, что это, скорее всего, была просто концептуальная демонстрация проблемы, а не предлагаемая реализация.) Просто к сведению будущих читателей.

Другие вопросы по теме