Как обеспечить потокобезопасность для removeAtIndex, когда массив имеет одновременные чтения в Swift?

Я просматривал этот ответ, который предоставляет код для потокобезопасного массива с одновременным чтением. Как отмечает @tombardey в комментариях, код (соответствующий фрагмент ниже) не совсем безопасен:

public func removeAtIndex(index: Int) {

    self.accessQueue.async(flags:.barrier) {
        self.array.remove(at: index)
    }
}

public var count: Int {
    var count = 0

    self.accessQueue.sync {
        count = self.array.count
    }

    return count
}

... Скажем, синхронизированный массив имеет один элемент, разве это не потерпит неудачу? если synchronizedArray.count == 1 { synchronizedArray.remove(at: 0) } Это состояние гонки, скажем, два потоки выполняют оператор. Оба считывают количество 1 одновременно, оба ставят в очередь блок записи одновременно. Блоки записи выполняются последовательно второй выйдет из строя... (продолжение)

@Роб отвечает:

@tombardey - Вы абсолютно правы, что этот уровень синхронизация (на уровне свойств/методов) часто недостаточно для достижения настоящей потокобезопасности в более широких приложениях. Ваш пример легко решается (добавляя метод, который отправляет блокировать в очередь), но есть и другие, которые не являются (например. «синхронизированный» массив, одновременно используемый UITableViewDataSource и изменено какой-либо фоновой операцией). В этих случаях вы должны реализовать собственную высокоуровневую синхронизацию. Но выше техника, тем не менее, очень полезна в определенных, очень ограниченных ситуации.

Я изо всех сил пытаюсь понять, что означает @Rob под «Ваш пример легко решается (путем добавления метода, который отправляет блок в очередь)». Мне было бы интересно увидеть пример реализации этого метода (или любого другого) метода решения проблемы.

@Rob Как вы думаете, Apple решила заблокировать вызывающую сторону для самой операции записи, потому что использование очереди предположительно блокирует вызывающую сторону на практике дольше, чем блокировка для дешевых операций (таких как добавление)? Или вы думаете, что может быть другая причина? Интересно, что они сделали это, так что спасибо, что направили меня на это

asj123 25.12.2020 19:25

Да, они используют замки. Таким образом, они не могут писать асинхронно (и не могут читать одновременно по отношению к другим операциям чтения), но, тем не менее, в целом это быстрее.

Rob 25.12.2020 21:00
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
3
1 195
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Я вижу несколько проблем с опубликованным кодом и примером:

  1. Функция removeAtIndex не проверяет, действительно ли она может удалить указанный индекс. Поэтому его следует изменить на

    public func removeAtIndex(index: Int) {
    
        // Check if it even makes sense to schedule an update
        // This is optional, but IMO just a better practice
        guard count > index else { return }
    
        self.accessQueue.async(flags: .barrier) {
    
            // Check again before removing to make sure array didn't change
            // Here we can actually check the size of the array, since other threads are blocked
            guard self.array.count > index else { return }
            self.array.remove(at: index)
        }
    }
    
  2. Использование класса потокобезопасности также подразумевает, что вы используете одну операцию для проверки и выполнения операции над элементом, который должен быть потокобезопасным. Поэтому, если вы проверяете размер массива, а затем удаляете его, вы нарушаете эту оболочку безопасности потоков, это неправильное использование класса. Конкретный случай synchronizedArray.count == 1 { synchronizedArray.remove(at: 0) } решается с корректировками функции выше (вам больше не нужно проверять количество, так как функция уже делает это). Но если вам по-прежнему нужна функция, которая одновременно проверяет количество и затем удаляет элемент, вам придется создать функцию в потокобезопасном классе, которая выполняет обе операции без возможности того, что другие потоки изменят промежуточный массив. Вам даже могут понадобиться 2 функции: synchronizedArray.getCountAndRemove (получить количество, затем удалить) , and synchronizedArray.removeAndGetCount` (удалить, затем получить количество).

    public func getCountAndRemoveAtIndex(index: Int) -> Int {
    
        var currentCount = count
        guard currentCount > index else { return currentCount }
    
        // Has to run synchronously to ensure the value is returned
        self.accessQueue.sync {
    
            currentCount = self.array.count
            guard currentCount > index else { break }
            self.array.remove(at: index)
        }
    
        return currentCount
    }
    
  3. В общем, удаление элемента по индексу для массива, который используется из нескольких потоков, совершенно бессмысленно. Вы даже не можете быть уверены, что удаляете. Возможно, в некоторых случаях это имело бы смысл, но обычно имеет смысл либо удалить по какой-то логике (например, определенному значению), либо иметь функцию, которая возвращает значение удаляемого элемента (например, func getAndRemoveAtIndex(index: Int) -> T)

  4. Всегда проверяйте каждую функцию и их комбинацию. Например, если исходный постер протестирован на удаление следующим образом:

    let array = SynchronizedArray<Int>()
    array.append(newElement: 1)
    array.append(newElement: 2)
    array.append(newElement: 3)
    
    DispatchQueue.concurrentPerform(iterations: 5) {_ in
    
        array.removeAtIndex(index: 0)
    }
    

Он получит Fatal error: Index out of range: file Swift/Array.swift, line 1298 в 2 из 5 потоков, поэтому будет ясно, что исходная реализация этой функции неверна. Попробуйте тот же тест с функцией, которую я разместил выше, и вы увидите разницу.

Кстати, мы говорим только о removeAtIndex, но у subscript тоже есть похожая проблема. Но что интересно first() реализован правильно.

@Rob, второй пункт, у вас нет выбора: если вы хотите подсчитать и добавить/обновить/удалить атомарно, вы должны сделать это с помощью sync. Вот почему я сказал ранее, что remove сам по себе не нужен, и вы должны использовать его только в том случае, если вам все же нужна функция, которая и проверяет количество, и затем удаляет элемент. По поводу первого пункта: реализация по книге требует, чтобы вы проверили условие (что дешевле) перед планированием условия блокировки - это то, что я определенно сделал бы на C++. Но если вы сможете доказать, что именно в Swift это неоптимально, спорить не буду.

klangfarb 15.12.2020 18:25

@ Роб, ты вырываешь из контекста. Смысл моего поста был 1 - продемонстрировать, как исправить функцию remove, поэтому вам не нужно делать что-то вроде if synchronizedArray.count == 1 { synchronizedArray.remove(at: 0) }, что является недопустимым использованием класса потокобезопасности. 2 - я продемонстрировал, что вы могли бы сделать, если бы вы все еще хотели выполнить 2 операции потокобезопасным способом. И я сразу сказал, что «это не обязательно, так как фиксированная функция remove позаботится о подсчете, но если вам все еще нужно... и т. д.». Я мог бы использовать "getandRemove" для этого примера - та же идея.

klangfarb 15.12.2020 23:42

И все, что я хочу сказать, это то, что вы можете реализовать «проверку счетчика перед удалением» потокобезопасным способом, не теряя преимущества производительности чтения-записи, как здесь.

Rob 16.12.2020 00:35

Это очень хороший пример того, почему «атомарных» изменяемых операций над отдельными свойствами редко бывает достаточно, и их опасно добавлять без особой осторожности.

Фундаментальная проблема в этом примере заключается в том, что при каждом изменении массива существующие индексы становятся недействительными. Чтобы безопасно использовать индекс, вы должны убедиться, что вся операция «извлечение индекса, использование индекса» является атомарной. Вы не можете просто гарантировать, что каждая часть является атомарной. Нет безопасного способа написать removeAtIndex изолированно, потому что нет безопасного способа получить индекс, который вы передаете. Между моментом, когда вы извлекаете индекс, и временем, когда вы его используете, массив может быть изменен произвольным образом.

Дело в том, что не существует такого понятия, как "поточно-ориентированный (изменяемый) массив", который можно использовать как обычный массив и не беспокоиться о проблемах параллелизма. Поточно-ориентированный изменяемый массив не может возвращать или принимать индексы, поскольку его индексы нестабильны. Какая именно структура данных подходит, зависит от проблемы, которую вы пытаетесь решить. Здесь нет ни одного ответа.

В большинстве случаев ответом является «меньше параллелизма». Вместо того, чтобы пытаться управлять одновременным доступом к отдельным структурам данных, подумайте о более крупных «единицах работы», которые несут все свои собственные данные и имеют к ним эксклюзивный доступ. Поместите эти более крупные единицы работы в очереди. (Во многих случаях даже это является излишним. Вы будете шокированы тем, как часто добавление валюты замедляет работу, если вы не продумаете ее очень тщательно.) Дополнительные рекомендации см. в разделе Модернизация использования Grand Central Dispatch.

Это имеет смысл, поскольку использование индексов в «поточно-ориентированном» массиве не имеет смысла, но прежде чем я приму этот ответ, просто хочу проверить значение «не могу» принимать индексы: может ли предоставленный Кирилом ответ привести к выходу индекса ошибки диапазона? или под «не могу» вы имеете в виду только с точки зрения практического использования?

asj123 13.12.2020 00:26
Ответ принят как подходящий

Вы сказали:

Я изо всех сил пытаюсь понять, что @Rob имеет в виду под «Ваш пример легко решается (путем добавления [a] метода, который отправляет блок в очередь)». Мне было бы интересно увидеть пример реализации этого метода (или любого другого) метода решения проблемы.

Давайте расширим пример, который я разместил в ответ на ваш другой вопрос (см. пункт 3 в этого ответа), добавив еще несколько Array методов:

class SynchronizedArray<T> {
    private var array: [T]
    private let accessQueue = DispatchQueue(label: "com.domain.app.reader-writer", attributes: .concurrent)

    init(_ array: [T] = []) {
        self.array = array
    }

    subscript(index: Int) -> T {
        get { reader { $0[index] } }
        set { writer { $0[index] = newValue } }
    }

    var count: Int {
        reader { $0.count }
    }

    func append(newElement: T) {
        writer { $0.append(newElement) }
    }

    func remove(at index: Int) {
        writer { $0.remove(at: index) }
    }

    func reader<U>(_ block: ([T]) throws -> U) rethrows -> U {
        try accessQueue.sync { try block(array) }
    }

    func writer(_ block: @escaping (inout [T]) -> Void) {
        accessQueue.async(flags: .barrier) { block(&self.array) }
    }
}

Итак, давайте представим, что вы хотите удалить элемент, если в массиве был только один элемент. Учитывать:

let numbers = SynchronizedArray([42])

...

if numbers.count == 1 { 
    numbers.remove(at: 0) 
}

Это выглядит достаточно невинно, но это не потокобезопасно. У вас может возникнуть состояние гонки, если другие потоки вставляют или удаляют значения. Например, если какой-то другой поток добавил значение между временем, когда вы тестировали count, и тем, когда вы удалили значение.

Вы можете исправить это, обернув всю операцию (тест if и последующее удаление) в один блок, который синхронизируется. Таким образом, вы могли:

numbers.writer { array in
    if array.count == 1 { 
        array.remove(at: 0) 
    } 
}

Этот метод writer (в этой синхронизации на основе чтения-записи) является примером того, что я имел в виду под «методом, который отправляет блок в очередь».


Теперь, очевидно, вы также можете дать своему SynchronizedArray собственный метод, который сделает это за вас, например:

func safelyRemove(at index: Int) {
    writer { array in
        if index < array.count {
            array.remove(at: index)
        }
    }
}

Затем вы можете сделать:

numbers.safelyRemove(at: index)

... и это потокобезопасно, но по-прежнему пользуется преимуществами производительности синхронизации чтения-записи.

Но общая идея заключается в том, что при работе с потокобезопасной коллекцией у вас всегда будет ряд задач, которые вы захотите синхронизировать вместе на более высоком уровне абстракции. Предоставляя методы синхронизации reader и writer, вы получаете простой универсальный механизм для этого.


Все это было сказано, как говорили другие, лучший способ написать потокобезопасный код — полностью избежать параллельного доступа. Но если вы должны сделать изменяемый объект потокобезопасным, то идентификация последовательности задач, которые должны выполняться как одна синхронизированная операция, лежит на вызывающем объекте.

Другие вопросы по теме