import Dispatch
class SynchronizedArray<T> {
private var array: [T] = []
private let accessQueue = DispatchQueue(label: "SynchronizedArrayAccess", attributes: .concurrent)
var get: [T] {
accessQueue.sync {
array
}
}
func append(newElement: T) {
accessQueue.async(flags: .barrier) {
self.array.append(newElement)
}
}
}
Если я запускаю следующий код, 10 000 элементов добавляются к массиву, как и ожидалось, даже если я читаю одновременно:
DispatchQueue.concurrentPerform(iterations: 10000) { i in
_ threadSafeArray.get
threadSafeArray.append(newElement: i)
}
Но когда я делаю это, только это никогда не приближается к добавлению 10 000 элементов (в последний раз, когда я запускал его, на моем компьютере было добавлено только 92 элемента).
let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
for i in 0..<10000 {
concurrent.async {
_ = threadSafeArray.get
threadSafeArray.append(newElement: i)
}
}
Почему первое работает, а второе не работает?
Кажется, я столкнулся с взрывом потоков, когда было создано 82 потока, а в приложении закончились потоки, решение, которое я использовал, — это семафор для ограничения количества потоков:
let semaphore = DispatchSemaphore(value: 8)
let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
for i in 0..<10000 {
concurrent.async {
_ = threadSafeArray.get
threadSafeArray.append(newElement: i)
semaphore.signal()
}
semaphore.wait()
}
Обновлено: ответ Роба объясняет некоторые проблемы с приведенным выше кодом.
Хорошо, что вы нашли решение для взрыва потока. См. обсуждение темы «Взрыв потока» на WWDC 2015 Создание отзывчивых и эффективных приложений с помощью GCD и снова на WWDC 2016 Параллельное программирование с помощью GCD в Swift 3.
При этом DispatchSemaphore
в настоящее время является чем-то вроде анти-паттерна, учитывая наличие concurrentPerform
(или OperationQueue
с его maxConcurrentOperationCount
или комбинацией с его maxPublishers
). Все они управляют степенью параллелизма более элегантно, чем диспетчерские семафоры.
Все, что было сказано, несколько замечаний по вашему шаблону семафора:
При использовании этого шаблона DispatchSemaphore
вы обычно ставите wait
перед concurrent.async { ... }
(потому что, как написано, вы получаете девять параллельных операций, а не восемь, что немного вводит в заблуждение).
Более глубокая проблема заключается в том, что вы уменьшили проблему подсчета, но она все еще сохраняется. Учитывать:
let threadSafeArray = SynchronizedArray<Int>()
let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
let semaphore = DispatchSemaphore(value: 8)
for i in 0..<10000 {
semaphore.wait()
concurrent.async {
threadSafeArray.append(newElement: i)
semaphore.signal()
}
}
print(threadSafeArray.get.count)
Когда вы выходите из цикла for
, у вас все еще может быть до восьми асинхронных задач в concurrent
, а count
(несинхронизированных по отношению к очереди concurrent
) все еще может быть меньше 10 000. Вы должны добавить еще один concurrent.async(flags: .barrier) { ... }
, который просто добавляет второй уровень синхронизации. Например.
let semaphore = DispatchSemaphore(value: 8)
for i in 0..<10000 {
semaphore.wait()
concurrent.async {
threadSafeArray.append(newElement: i)
semaphore.signal()
}
}
concurrent.async(flags: .barrier) {
print(threadSafeArray.get.count)
}
Или вы можете использовать DispatchGroup
, классический механизм для определения окончания серии асинхронно отправленных блоков:
let semaphore = DispatchSemaphore(value: 8)
let group = DispatchGroup()
for i in 0..<10000 {
semaphore.wait()
concurrent.async(group: group) {
threadSafeArray.append(newElement: i)
semaphore.signal()
}
}
group.notify(queue: .main) {
print(threadSafeArray.get.count)
}
Использование concurrentPerform
устраняет необходимость в любом из этих шаблонов, потому что он не будет продолжать выполнение, пока не будут выполнены все параллельные задачи. (Это также автоматически оптимизирует степень параллелизма для количества ядер на вашем устройстве.)
FWIW, гораздо лучшая альтернатива SynchronizedArray
— вообще не выставлять базовый массив, а просто реализовывать любые методы, которые вы хотите выставить, интегрируя необходимую синхронизацию. Это делает место вызова более чистым и решает многие проблемы.
Например, предположив, что вы хотите предоставить оператор индекса и переменную count
, вы должны сделать:
class SynchronizedArray<T> {
private var array: [T]
private let accessQueue = DispatchQueue(label: "com.domain.app.reader-writer", attributes: .concurrent)
init(_ array: [T] = []) {
self.array = array
}
subscript(index: Int) -> T {
get { reader { $0[index] } }
set { writer { $0[index] = newValue } }
}
var count: Int {
reader { $0.count }
}
func append(newElement: T) {
writer { $0.append(newElement) }
}
func reader<U>(_ block: ([T]) throws -> U) rethrows -> U {
try accessQueue.sync { try block(array) }
}
func writer(_ block: @escaping (inout [T]) -> Void) {
accessQueue.async(flags: .barrier) { block(&self.array) }
}
}
Это решает множество проблем. Например, теперь вы можете сделать:
print(threadSafeArray.count) // get the count
print(threadSafeArray[500]) // get the 500th item
Теперь вы также можете делать такие вещи, как:
let average = threadSafeArray.reader { array -> Double in
let sum = array.reduce(0, +)
return Double(sum) / Double(array.count)
}
Но суть в том, что при работе с коллекциями (или любыми изменяемыми объектами) вы неизменно не хотите раскрывать сам изменяемый объект, а скорее пишете свои собственные синхронизированные методы для общих операций (индексы, count
, removeAll
и т. д.), и, возможно, также предоставить интерфейс чтения/записи для тех случаев, когда разработчику приложения может потребоваться более широкий механизм синхронизации.
(FWIW, изменения в этом SynchronizedArray
применяются как к семафору, так и к сценариям concurrentPerform
; просто в этом случае семафор просто проявляет проблему.)
Излишне говорить, что вы, как правило, должны выполнять больше работы в каждом потоке, потому что столь скромные, как накладные расходы на переключение контекста, здесь, вероятно, достаточно, чтобы компенсировать любые преимущества, полученные от параллельной обработки. (Но я понимаю, что это, скорее всего, была просто концептуальная демонстрация проблемы, а не предлагаемая реализация.) Просто к сведению будущих читателей.