Данные не упорядочены должным образом в очереди spsc без блокировки

Я пытаюсь написать ограниченную очередь без блокировки с одним производителем и одним потребителем, однако порядок элементов, вставленных в очередь, не совпадает с порядком элементов, удаленных из очереди. Тестовый код в main.cpp добавляет 100 000 000 последовательных целых чисел в очередь в одном потоке и извлекает значения в другой, проверяя, что значения являются последовательными.

В настоящее время при запуске программы она печатает «О нет! Последнее значение было: 219718, но это значение — 220231» (числовые значения различаются в зависимости от запуска программы). Диапазон чисел, по-видимому, зависит от yield_frequency в writer_thread, более низкие значения yield_frequency приводят к тому, что тест (часто, но не всегда) завершается неудачно при числе, превышающем 100.

(Код является незавершенным, поэтому, возможно, какой-то код не соответствует лучшим практикам C++)

spsc_queue.hpp

#pragma once

#include <memory>
#include <utility>
#include <bit>
#include <atomic>
#include <optional>

template <typename T> class ChannelReader;
template <typename T> class ChannelWriter;

// instances of this class cannot be created directly, make_queue returns a pair of ChannelReader and ChannelWriter instances that wrap the queue to prevent reads (or writes) from multiple threads
template <typename T>
class SpscQueue {
    friend ChannelReader<T>;
    friend ChannelWriter<T>;
    public:
    static std::pair<ChannelReader<T>, ChannelWriter<T>> make_queue(size_t size) {
        if (!std::__has_single_bit(size)) throw "";
        std::shared_ptr<SpscQueue<T>> queue_ptr(new SpscQueue<T>(size));
        return std::make_pair(ChannelReader<T>(queue_ptr), ChannelWriter<T>(queue_ptr));
    }
    ~SpscQueue() {
        delete[] data;
    }
    private:
    SpscQueue (size_t _size): data(new T[_size]()), size(_size) {}
    alignas(64) T* data;
    const size_t size;
    alignas(64) std::atomic<size_t> read_idx{0};
    alignas(64) std::atomic<size_t> write_idx{0};
};

// wrapper class that allows ONE thread to read from a SPSC queue
template <typename T>
class ChannelReader {
    friend SpscQueue<T>;
    public:
    // attempt to read from the queue, returning the value if successful, and std::nullopt otherwise
    std::optional<T> try_get_next() {
        size_t read_idx = queue->read_idx;
        size_t write_idx = cached_write_idx;
        if (write_idx <= read_idx) {
            write_idx = queue->write_idx;
            cached_write_idx = write_idx;
        }
        if (write_idx <= read_idx) {
            return std::nullopt;
        } else {
            queue->read_idx++;
            return std::move(queue->data[read_idx % queue->size]);
        }
    }
    ChannelReader(const ChannelReader&) = delete;
    ChannelReader& operator=(const ChannelReader&) = delete;
    ChannelReader(ChannelReader&&) = default;
    ChannelReader& operator=(ChannelReader&&) = default;
    private:
    ChannelReader(std::shared_ptr<SpscQueue<T>> ptr): queue(ptr) {}
    std::shared_ptr<SpscQueue<T>> queue;
    alignas(64) size_t cached_write_idx{0};
};

// wrapper class that allows ONE thread to write to a SPSC queue
template <typename T>
class ChannelWriter {
    friend SpscQueue<T>;
    public:
    // attempt to write to the queue, returns true if the write was successful, and false otherwise
    bool try_write_next(const T& obj) {
        size_t read_idx = cached_read_idx;
        size_t write_idx = queue->write_idx;
        if (write_idx >= read_idx + queue->size) {
            read_idx = queue->read_idx;
            cached_read_idx = read_idx;          
        }
        if (write_idx >= read_idx + queue->size) {
            return false;
        } else {
            queue->data[write_idx % queue->size] = obj;
            ++queue->write_idx;
            return true;
        }
    }
    ChannelWriter(const ChannelWriter&) = delete;
    ChannelWriter& operator=(const ChannelWriter&) = delete;
    ChannelWriter(ChannelWriter&&) = default;
    ChannelWriter& operator=(ChannelWriter&&) = default;
    private:
    ChannelWriter(std::shared_ptr<SpscQueue<T>> ptr): queue(ptr) {}
    std::shared_ptr<SpscQueue<T>> queue;
    alignas(64) size_t cached_read_idx{0};
};

main.cpp

#include <atomic>
#include <chrono>
#include <iostream>
#include <thread>

#include "../include/spsc_queue.hpp"

namespace chrono = std::chrono;

int main() {
  std::atomic<bool> latch{false};
  auto [reader, writer] = SpscQueue<int>::make_queue(512);
  std::thread reader_thread([_reader = std::move(reader), &latch]() mutable {
    latch.wait(false);
    std::cerr << "Starting reader...\n";
    int last_val = -1;
    while (last_val != 100'000'000 - 1) {
      if (auto data = _reader.try_get_next()) {
        if (*data != last_val + 1) {
          std::cerr << "Oh no! last value was: " << last_val
                    << " but this value is " << *data << '\n';
          std::exit(1);
        }
        last_val = *data;
        // std::cerr << last_val << " Read\n";
      }
    }
  });
  std::thread writer_thread([_writer = std::move(writer), &latch]() mutable {
    latch.wait(false);
    std::cerr << "Starting writer...\n";
    for (int i = 0; i < 100'000'000; ++i) {
      for (int j = 0; !_writer.try_write_next(i); ++j) {
        constexpr int yield_frequency = 1 << 0;
        if (j % yield_frequency)
          std::this_thread::yield();
      }
      // std::cerr << "writer wrote value" << i << '\n';
      // if (i == 10'000'000) std::exit(1);
    }
  });
  std::cout << "Start" << std::endl;
  {
    auto start = chrono::steady_clock::now();
    latch = true;
    latch.notify_all();
    reader_thread.join();
    writer_thread.join();
    auto finish = chrono::steady_clock::now();
    auto elapsed_seconds =
        chrono::duration_cast<chrono::duration<double>>(finish - start).count();
    std::cout << elapsed_seconds << std::endl;
  }
  std::cout << "End" << std::endl;
}

Код не похож на очередь чтения и записи без блокировки. При написании параллельного кода C++ необходимо как можно более четко описать, для чего предназначен каждый метод, и почему в целом это приводит к правильному поведению. Это позволяет читателям (а) убедиться, что ваш план действителен, и б) убедиться, что ваш код соответствует вашему плану. Вместо этого вы хотите, чтобы люди вывели ваш план из вашего кода, доказали себе, что выведенный план верен, а затем проверили ваш код на соответствие воображаемому плану. А это в тысячи раз сложнее, в миллионы, если у вас допущена глупая опечатка.

Yakk - Adam Nevraumont 04.09.2024 21:52

Подождите, может быть, вы хотите, чтобы ваш код поддерживал одновременно только 1 программу чтения и 1 запись?! Это могло бы объяснить некоторые пробелы в конструкции. Но тогда почему вы используете общие PTR, когда есть четкий вариант владения?

Yakk - Adam Nevraumont 04.09.2024 21:55

Кроме того, в некоторых кодах подразумевается кольцевой буфер, в других такая возможность игнорируется? Не уверен, что с этим связано.

Yakk - Adam Nevraumont 04.09.2024 21:56

@Yakk-AdamNevraumont 1) Не могли бы вы рассказать об этом подробнее? Просто документация для каждого метода? 2) SPSC означает один производитель и один потребитель, я ожидаю, что это подразумевает одного читателя и одного писателя (я отредактировал сообщение, чтобы использовать полную форму вместо аббревиатуры, если это помогает) 3) Где не подразумевается кольцевой буфер?

ayaan098 04.09.2024 22:01
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
2
4
54
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

220231 - 219718 = 513 и размер кольцевого буфера равен 512, поэтому записывающее устройство и средство чтения состязаются по этому последнему значению.... вы увеличиваете индекс чтения перед чтением значения, записывающее устройство может перезаписать значение, которое собирается прочитать считывающее устройство.

queue->read_idx++; // tell writer data is already read
// writer modifies data here
return std::move(queue->data[read_idx % queue->size]); // read new data

решение состоит в том, чтобы прочитать значение перед увеличением, так же, как и при записи: сначала запишите значение, а затем увеличьте индекс записи.

auto result = std::move(queue->data[read_idx % queue->size]);
++queue->read_idx;
return result;

Похоже, что (read_idx + 1) приводит к ошибке на единицу, однако замена ее на read_idx работает отлично. Я внес изменения, чтобы исправить это, как только оно будет одобрено, я приму этот ответ. Спасибо за вашу помощь!

ayaan098 04.09.2024 23:54

Другие вопросы по теме