Я просматривал этот ответ, который предоставляет код для потокобезопасного массива с одновременным чтением. Как отмечает @tombardey в комментариях, код (соответствующий фрагмент ниже) не совсем безопасен:
public func removeAtIndex(index: Int) {
self.accessQueue.async(flags:.barrier) {
self.array.remove(at: index)
}
}
public var count: Int {
var count = 0
self.accessQueue.sync {
count = self.array.count
}
return count
}
... Скажем, синхронизированный массив имеет один элемент, разве это не потерпит неудачу? если synchronizedArray.count == 1 { synchronizedArray.remove(at: 0) } Это состояние гонки, скажем, два потоки выполняют оператор. Оба считывают количество 1 одновременно, оба ставят в очередь блок записи одновременно. Блоки записи выполняются последовательно второй выйдет из строя... (продолжение)
@Роб отвечает:
@tombardey - Вы абсолютно правы, что этот уровень синхронизация (на уровне свойств/методов) часто недостаточно для достижения настоящей потокобезопасности в более широких приложениях. Ваш пример легко решается (добавляя метод, который отправляет блокировать в очередь), но есть и другие, которые не являются (например. «синхронизированный» массив, одновременно используемый UITableViewDataSource и изменено какой-либо фоновой операцией). В этих случаях вы должны реализовать собственную высокоуровневую синхронизацию. Но выше техника, тем не менее, очень полезна в определенных, очень ограниченных ситуации.
Я изо всех сил пытаюсь понять, что означает @Rob под «Ваш пример легко решается (путем добавления метода, который отправляет блок в очередь)». Мне было бы интересно увидеть пример реализации этого метода (или любого другого) метода решения проблемы.
@Rob Как вы думаете, Apple решила заблокировать вызывающую сторону для самой операции записи, потому что использование очереди предположительно блокирует вызывающую сторону на практике дольше, чем блокировка для дешевых операций (таких как добавление)? Или вы думаете, что может быть другая причина? Интересно, что они сделали это, так что спасибо, что направили меня на это
Да, они используют замки. Таким образом, они не могут писать асинхронно (и не могут читать одновременно по отношению к другим операциям чтения), но, тем не менее, в целом это быстрее.
Я вижу несколько проблем с опубликованным кодом и примером:
Функция removeAtIndex
не проверяет, действительно ли она может удалить указанный индекс. Поэтому его следует изменить на
public func removeAtIndex(index: Int) {
// Check if it even makes sense to schedule an update
// This is optional, but IMO just a better practice
guard count > index else { return }
self.accessQueue.async(flags: .barrier) {
// Check again before removing to make sure array didn't change
// Here we can actually check the size of the array, since other threads are blocked
guard self.array.count > index else { return }
self.array.remove(at: index)
}
}
Использование класса потокобезопасности также подразумевает, что вы используете одну операцию для проверки и выполнения операции над элементом, который должен быть потокобезопасным. Поэтому, если вы проверяете размер массива, а затем удаляете его, вы нарушаете эту оболочку безопасности потоков, это неправильное использование класса. Конкретный случай synchronizedArray.count == 1 { synchronizedArray.remove(at: 0) }
решается с корректировками функции выше (вам больше не нужно проверять количество, так как функция уже делает это). Но если вам по-прежнему нужна функция, которая одновременно проверяет количество и затем удаляет элемент, вам придется создать функцию в потокобезопасном классе, которая выполняет обе операции без возможности того, что другие потоки изменят промежуточный массив. Вам даже могут понадобиться 2 функции: synchronizedArray.getCountAndRemove
(получить количество, затем удалить) , and
synchronizedArray.removeAndGetCount` (удалить, затем получить количество).
public func getCountAndRemoveAtIndex(index: Int) -> Int {
var currentCount = count
guard currentCount > index else { return currentCount }
// Has to run synchronously to ensure the value is returned
self.accessQueue.sync {
currentCount = self.array.count
guard currentCount > index else { break }
self.array.remove(at: index)
}
return currentCount
}
В общем, удаление элемента по индексу для массива, который используется из нескольких потоков, совершенно бессмысленно. Вы даже не можете быть уверены, что удаляете. Возможно, в некоторых случаях это имело бы смысл, но обычно имеет смысл либо удалить по какой-то логике (например, определенному значению), либо иметь функцию, которая возвращает значение удаляемого элемента (например, func getAndRemoveAtIndex(index: Int) -> T
)
Всегда проверяйте каждую функцию и их комбинацию. Например, если исходный постер протестирован на удаление следующим образом:
let array = SynchronizedArray<Int>()
array.append(newElement: 1)
array.append(newElement: 2)
array.append(newElement: 3)
DispatchQueue.concurrentPerform(iterations: 5) {_ in
array.removeAtIndex(index: 0)
}
Он получит Fatal error: Index out of range: file Swift/Array.swift, line 1298
в 2 из 5 потоков, поэтому будет ясно, что исходная реализация этой функции неверна. Попробуйте тот же тест с функцией, которую я разместил выше, и вы увидите разницу.
Кстати, мы говорим только о removeAtIndex
, но у subscript
тоже есть похожая проблема. Но что интересно first()
реализован правильно.
@Rob, второй пункт, у вас нет выбора: если вы хотите подсчитать и добавить/обновить/удалить атомарно, вы должны сделать это с помощью sync
. Вот почему я сказал ранее, что remove
сам по себе не нужен, и вы должны использовать его только в том случае, если вам все же нужна функция, которая и проверяет количество, и затем удаляет элемент. По поводу первого пункта: реализация по книге требует, чтобы вы проверили условие (что дешевле) перед планированием условия блокировки - это то, что я определенно сделал бы на C++. Но если вы сможете доказать, что именно в Swift это неоптимально, спорить не буду.
@ Роб, ты вырываешь из контекста. Смысл моего поста был 1 - продемонстрировать, как исправить функцию remove
, поэтому вам не нужно делать что-то вроде if synchronizedArray.count == 1 { synchronizedArray.remove(at: 0) }
, что является недопустимым использованием класса потокобезопасности. 2 - я продемонстрировал, что вы могли бы сделать, если бы вы все еще хотели выполнить 2 операции потокобезопасным способом. И я сразу сказал, что «это не обязательно, так как фиксированная функция remove
позаботится о подсчете, но если вам все еще нужно... и т. д.». Я мог бы использовать "getandRemove" для этого примера - та же идея.
И все, что я хочу сказать, это то, что вы можете реализовать «проверку счетчика перед удалением» потокобезопасным способом, не теряя преимущества производительности чтения-записи, как здесь.
Это очень хороший пример того, почему «атомарных» изменяемых операций над отдельными свойствами редко бывает достаточно, и их опасно добавлять без особой осторожности.
Фундаментальная проблема в этом примере заключается в том, что при каждом изменении массива существующие индексы становятся недействительными. Чтобы безопасно использовать индекс, вы должны убедиться, что вся операция «извлечение индекса, использование индекса» является атомарной. Вы не можете просто гарантировать, что каждая часть является атомарной. Нет безопасного способа написать removeAtIndex
изолированно, потому что нет безопасного способа получить индекс, который вы передаете. Между моментом, когда вы извлекаете индекс, и временем, когда вы его используете, массив может быть изменен произвольным образом.
Дело в том, что не существует такого понятия, как "поточно-ориентированный (изменяемый) массив", который можно использовать как обычный массив и не беспокоиться о проблемах параллелизма. Поточно-ориентированный изменяемый массив не может возвращать или принимать индексы, поскольку его индексы нестабильны. Какая именно структура данных подходит, зависит от проблемы, которую вы пытаетесь решить. Здесь нет ни одного ответа.
В большинстве случаев ответом является «меньше параллелизма». Вместо того, чтобы пытаться управлять одновременным доступом к отдельным структурам данных, подумайте о более крупных «единицах работы», которые несут все свои собственные данные и имеют к ним эксклюзивный доступ. Поместите эти более крупные единицы работы в очереди. (Во многих случаях даже это является излишним. Вы будете шокированы тем, как часто добавление валюты замедляет работу, если вы не продумаете ее очень тщательно.) Дополнительные рекомендации см. в разделе Модернизация использования Grand Central Dispatch.
Это имеет смысл, поскольку использование индексов в «поточно-ориентированном» массиве не имеет смысла, но прежде чем я приму этот ответ, просто хочу проверить значение «не могу» принимать индексы: может ли предоставленный Кирилом ответ привести к выходу индекса ошибки диапазона? или под «не могу» вы имеете в виду только с точки зрения практического использования?
Вы сказали:
Я изо всех сил пытаюсь понять, что @Rob имеет в виду под «Ваш пример легко решается (путем добавления [a] метода, который отправляет блок в очередь)». Мне было бы интересно увидеть пример реализации этого метода (или любого другого) метода решения проблемы.
Давайте расширим пример, который я разместил в ответ на ваш другой вопрос (см. пункт 3 в этого ответа), добавив еще несколько Array
методов:
class SynchronizedArray<T> {
private var array: [T]
private let accessQueue = DispatchQueue(label: "com.domain.app.reader-writer", attributes: .concurrent)
init(_ array: [T] = []) {
self.array = array
}
subscript(index: Int) -> T {
get { reader { $0[index] } }
set { writer { $0[index] = newValue } }
}
var count: Int {
reader { $0.count }
}
func append(newElement: T) {
writer { $0.append(newElement) }
}
func remove(at index: Int) {
writer { $0.remove(at: index) }
}
func reader<U>(_ block: ([T]) throws -> U) rethrows -> U {
try accessQueue.sync { try block(array) }
}
func writer(_ block: @escaping (inout [T]) -> Void) {
accessQueue.async(flags: .barrier) { block(&self.array) }
}
}
Итак, давайте представим, что вы хотите удалить элемент, если в массиве был только один элемент. Учитывать:
let numbers = SynchronizedArray([42])
...
if numbers.count == 1 {
numbers.remove(at: 0)
}
Это выглядит достаточно невинно, но это не потокобезопасно. У вас может возникнуть состояние гонки, если другие потоки вставляют или удаляют значения. Например, если какой-то другой поток добавил значение между временем, когда вы тестировали count
, и тем, когда вы удалили значение.
Вы можете исправить это, обернув всю операцию (тест if
и последующее удаление) в один блок, который синхронизируется. Таким образом, вы могли:
numbers.writer { array in
if array.count == 1 {
array.remove(at: 0)
}
}
Этот метод writer
(в этой синхронизации на основе чтения-записи) является примером того, что я имел в виду под «методом, который отправляет блок в очередь».
Теперь, очевидно, вы также можете дать своему SynchronizedArray
собственный метод, который сделает это за вас, например:
func safelyRemove(at index: Int) {
writer { array in
if index < array.count {
array.remove(at: index)
}
}
}
Затем вы можете сделать:
numbers.safelyRemove(at: index)
... и это потокобезопасно, но по-прежнему пользуется преимуществами производительности синхронизации чтения-записи.
Но общая идея заключается в том, что при работе с потокобезопасной коллекцией у вас всегда будет ряд задач, которые вы захотите синхронизировать вместе на более высоком уровне абстракции. Предоставляя методы синхронизации reader
и writer
, вы получаете простой универсальный механизм для этого.
Все это было сказано, как говорили другие, лучший способ написать потокобезопасный код — полностью избежать параллельного доступа. Но если вы должны сделать изменяемый объект потокобезопасным, то идентификация последовательности задач, которые должны выполняться как одна синхронизированная операция, лежит на вызывающем объекте.
FWIW, это пример того, как Apple реализовала свой потокобезопасный массив.