Быстрый производитель, медленный потребитель, ограниченный канал, уменьшить частоту пробуждения производителя

У меня есть сценарий производитель-потребитель¹, основанный на ограниченном канале <T> .

Channel<Item> channel = Channel.CreateBounded<Item>(10);

Элементы поступают из СУБД, к которой производитель подключается и извлекает их один за другим. Особенность в том, что я не могу позволить себе роскошь поддерживать соединение с БД в течение всего срока службы производителя. Я должен закрыть соединение, когда канал заполнен, и снова открыть соединение, когда на канале снова появится место для нового элемента. Поэтому я реализовал производителя следующим образом:

// Producer
while (true)
{
    await channel.Writer.WaitToWriteAsync();
    connection.Open();
    Item item;
    while (true)
    {
        item = connection.GetNextItem(); // This is fast
        if (!channel.Writer.TryWrite(item)) break;
    }
    connection.Close();
    await channel.Writer.WriteAsync(item);
}

Производитель ждет завершения задачи channel.Writer.WaitToWriteAsync(), затем открывает соединение с БД, записывает в канал столько элементов, сколько может, пока один из них не будет отклонен, закрывает соединение с БД, асинхронно записывает отклоненный элемент, и возвращается к ожиданию.

Потребитель довольно стандартный:

// Consumer
await foreach (Item item in channel.Reader.ReadAllAsync())
{
    // Process the item (this is slow)
}

Моя проблема с этим дизайном заключается в том, что соединение с БД открывается и закрывается слишком часто. Как открытие, так и закрытие соединения имеют существенные накладные расходы, поэтому я хотел бы свести к минимуму частоту, с которой это происходит. Хотя вместимость канала равна 10, я бы предпочел, чтобы задача WaitToWriteAsync завершалась, когда канал наполовину заполнен (5 элементов), а не сразу, когда количество сохраненных элементов уменьшается с 10 до 9.

Мой вопрос: как я могу изменить своего производителя, чтобы он подключался к базе данных, когда в канале 5 или менее элементов, и закрывал соединение, когда канал заполнен 10 элементами?

Ниже приведен вывод минимального примера, который я написал, который воспроизводит нежелательное поведение:

19:20:55.811 [4] > Opening connection -->
19:20:55.933 [4] > Produced #1
19:20:55.934 [4] > Produced #2
19:20:55.934 [4] > Produced #3
19:20:55.934 [4] > Produced #4
19:20:55.934 [4] > Produced #5
19:20:55.934 [4] > Produced #6
19:20:55.935 [4] > Produced #7
19:20:55.935 [4] > Produced #8
19:20:55.935 [4] > Produced #9
19:20:55.935 [4] > Produced #10
19:20:55.935 [4] > Produced #11
19:20:55.935 [4] > Closing connection <--
19:20:55.936 [6] > Consuming: 1
19:20:56.037 [4] > Consuming: 2
19:20:56.037 [6] > Opening connection -->
19:20:56.137 [6] > Produced #12
19:20:56.137 [6] > Produced #13
19:20:56.137 [6] > Closing connection <--
19:20:56.137 [4] > Consuming: 3
19:20:56.238 [6] > Consuming: 4
19:20:56.238 [4] > Opening connection -->
19:20:56.338 [4] > Produced #14
19:20:56.338 [4] > Produced #15
19:20:56.338 [4] > Closing connection <--
19:20:56.338 [6] > Consuming: 5
19:20:56.439 [4] > Consuming: 6
19:20:56.439 [6] > Opening connection -->
19:20:56.539 [6] > Produced #16
19:20:56.539 [6] > Produced #17
19:20:56.539 [6] > Closing connection <--
19:20:56.539 [4] > Consuming: 7
19:20:56.644 [6] > Consuming: 8
19:20:56.644 [4] > Opening connection -->
19:20:56.744 [4] > Produced #18
19:20:56.745 [7] > Consuming: 9
19:20:56.745 [4] > Produced #19
19:20:56.745 [4] > Produced #20
19:20:56.745 [4] > Closing connection <--
19:20:56.846 [7] > Consuming: 10
19:20:56.847 [4] > Producer completed
19:20:56.946 [4] > Consuming: 11
19:20:57.046 [4] > Consuming: 12
19:20:57.147 [4] > Consuming: 13
19:20:57.247 [4] > Consuming: 14
19:20:57.347 [4] > Consuming: 15
19:20:57.452 [4] > Consuming: 16
19:20:57.552 [4] > Consuming: 17
19:20:57.653 [4] > Consuming: 18
19:20:57.753 [4] > Consuming: 19
19:20:57.854 [4] > Consuming: 20
19:20:57.955 [1] > Finished

Как вы можете видеть, происходит много «открытия/закрытия соединения».

Мой вопрос похож на этот старый вопрос:
Имея API внешнего производителя, который можно останавливать и запускать, эффективно останавливайте производителя, когда локальный буфер заполнен.
Разница в том, что в моем случае производитель — это цикл, а не обработчик события сервиса, как в другом вопросе.

¹ This scenario is contrived. It was inspired by a relatively recent GitHub API proposal.


Уточнение: Канал не следует полностью сливать перед повторным подключением к БД. Это связано с тем, что открытие соединения занимает некоторое время, и я не хочу, чтобы потребитель простаивал в это время. Таким образом, производитель должен повторно подключаться, когда канал упал до 5 элементов или меньше, а не когда он полностью пуст.

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
95
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Эта проблема аналогична буферизации ввода-вывода. Попробуйте просто увеличить размер вашего ограниченного канала и начать запись в канал, когда он пуст, а затем прекратить запись в канал, когда он заполнен или нет больше данных для записи. Таким образом, циклы запуска-остановки производителя ограничиваются условиями пустого/заполненного канала. Ваш продюсер не должен пытаться отслеживать каждую пару действий потребителей на канале.

Следующий пример написан с использованием языка программирования Ада в ответ на просьбу автора привести пример на языке по моему выбору.

В этом примере создается задача производителя и задача потребителя, которые обмениваются данными через общий буфер. Производитель начинает производство, когда общий буфер заполняется, и останавливается, когда общий буфер становится пустым. Производитель намного быстрее, чем потребитель.

Спецификация пакета Ada для этого примера, который предоставляет задачи производителя и потребителя основной процедуре:

package prod_con is
   task fast_producer;
   task slow_consumer;
end prod_con;

Реализация этих задач и общий буфер находятся в теле пакета.

with Ada.Text_IO; use Ada.Text_IO;

package body prod_con is

   type Buf_Index is range 0 .. 10;
   type Buf_Arr is array (Buf_Index) of Integer;

   protected Buffer is
      entry Write (Value : in Integer);
      entry Read (Value : out Integer);
   private
      Write_Allowed : Boolean   := True;
      Buff          : Buf_Arr;
      Write_Index   : Buf_Index := 0;
      Read_Index    : Buf_Index := 0;
      Count         : Natural   := 0;
   end Buffer;

   protected body Buffer is
      entry Write (Value : in Integer) when Write_Allowed is
      begin
         Buff (Write_Index) := Value;
         Count              := Count + 1;
         if Write_Index < Buf_Index'Last then
            Write_Index := Write_Index + 1;
         else
            Write_Index   := 0;
            Write_Allowed := False;
         end if;
      end Write;

      entry Read (Value : out Integer) when Count > 0 is
      begin
         Value := Buff (Read_Index);
         Count := Count - 1;
         if Read_Index < Buf_Index'Last then
            Read_Index := Read_Index + 1;
         else
            Write_Allowed := True;
            Read_Index    := 0;
         end if;
      end Read;
   end Buffer;

   -------------------
   -- fast_producer --
   -------------------

   task body fast_producer is
   begin
      for I in 1 .. 22 loop
         Buffer.Write (I);
         Put_Line ("Producer produced value:" & I'Image);
      end loop;
      Put_Line ("Producer done producing.");
   end fast_producer;

   -------------------
   -- slow_consumer --
   -------------------

   task body slow_consumer is
      Next_Value : Integer;
   begin
      loop
         select
            Buffer.Read (Next_Value);
            Put_Line ("Consumer read value:" & Next_Value'Image);
            delay 1.0; -- sleep for one second
         or
            delay 0.1;
            exit;
         end select;
      end loop;
      Put_Line ("Consumer done consuming.");
   end slow_consumer;

end prod_con;
with Ada.Text_IO; use Ada.Text_IO;

package body prod_con is

   type Buf_Index is range 0 .. 10;
   type Buf_Arr is array (Buf_Index) of Integer;

   protected Buffer is
      entry Write (Value : in Integer);
      entry Read (Value : out Integer);
   private
      Write_Allowed : Boolean   := True;
      Buff          : Buf_Arr;
      Write_Index   : Buf_Index := 0;
      Read_Index    : Buf_Index := 0;
      Count         : Natural   := 0;
   end Buffer;

   protected body Buffer is
      entry Write (Value : in Integer) when Write_Allowed is
      begin
         Buff (Write_Index) := Value;
         Count              := Count + 1;
         if Write_Index < Buf_Index'Last then
            Write_Index := Write_Index + 1;
         else
            Write_Index   := 0;
            Write_Allowed := False;
         end if;
      end Write;

      entry Read (Value : out Integer) when Count > 0 is
      begin
         Value := Buff (Read_Index);
         Count := Count - 1;
         if Read_Index < Buf_Index'Last then
            Read_Index := Read_Index + 1;
         else
            Write_Allowed := True;
            Read_Index    := 0;
         end if;
      end Read;
   end Buffer;

   -------------------
   -- fast_producer --
   -------------------

   task body fast_producer is
   begin
      for I in 1 .. 22 loop
         Buffer.Write (I);
         Put_Line ("Producer produced value:" & I'Image);
      end loop;
      Put_Line ("Producer done producing.");
   end fast_producer;

   -------------------
   -- slow_consumer --
   -------------------

   task body slow_consumer is
      Next_Value : Integer;
   begin
      loop
         select
            Buffer.Read (Next_Value);
            Put_Line ("Consumer read value:" & Next_Value'Image);
            delay 1.0; -- sleep for one second
         or
            delay 0.1;
            exit;
         end select;
      end loop;
      Put_Line ("Consumer done consuming.");
   end slow_consumer;

end prod_con;

Защищенный объект Ada с именем Buffer защищен от условий гонки. В этом примере используются защищенные записи. Защищенные записи разрешают доступ для чтения и записи к защищенному объекту под контролем граничного условия. Запись записи разрешена для выполнения, когда Write_Allowed имеет значение true. Каждый раз, когда выполняется запись Write, она присваивает параметр Value внутреннему массиву с именем Buff по значению индекса, содержащемуся в Write_Index, и увеличивает количество значений в массиве Buff.

Когда Write_Index меньше максимального значения индекса, Write_Index увеличивается. Если Write_Index не меньше максимального значения индекса, то Write_Index присваивается 0, а Write_Allowed присваивается False. Буфер заполнен, и производитель будет приостановлен в очереди записи при следующем вызове Write до тех пор, пока Write_Allowed не примет значение True.

Запись Read используется потребителем для чтения следующего значения из буфера, когда буфер не пуст.

Каждый вызов записи Read присваивает значение Buff по индексу Read_Index параметру Value. Значение передается задаче потребителя. Количество уменьшается. Read_Index увеличивается, если его текущее значение меньше максимального значения индекса массива. Если Read_Index не меньше максимального значения индекса массива, Read_Index устанавливается равным 0, а Write_Allowed устанавливается равным True.

Задача Producer просто записывает значения от 1 до 22 в Buffer так быстро, как только может. Производитель будет приостановлен, пока граничное условие записи записи имеет значение False, и автоматически возобновит выполнение, когда Write_Allowed имеет значение true.

Потребитель циклически выполняет чтение из буфера, а затем засыпает на одну секунду, чтобы имитировать медленного потребителя. Вызов Buffer.Read помещается в команду выбора, которая выполняет условный вызов записи Buffer.Read. Установленное условие гласит, что вызов Buffer.Read будет отменен, если он не будет завершен в течение 0,1 секунды. Когда Buffer.Read отменяется, потребитель завершает работу.

Результат этого примера:

Producer produced value: 1
Producer produced value: 2
Producer produced value: 3
Producer produced value: 4
Producer produced value: 5
Producer produced value: 6
Producer produced value: 7
Producer produced value: 8
Producer produced value: 9
Producer produced value: 10
Producer produced value: 11
Consumer read value: 1
Consumer read value: 2
Consumer read value: 3
Consumer read value: 4
Consumer read value: 5
Consumer read value: 6
Consumer read value: 7
Consumer read value: 8
Consumer read value: 9
Consumer read value: 10
Consumer read value: 11
Producer produced value: 12
Producer produced value: 13
Producer produced value: 14
Producer produced value: 15
Producer produced value: 16
Producer produced value: 17
Producer produced value: 18
Producer produced value: 19
Producer produced value: 20
Producer produced value: 21
Producer produced value: 22
Producer done producing.
Consumer read value: 12
Consumer read value: 13
Consumer read value: 14
Consumer read value: 15
Consumer read value: 16
Consumer read value: 17
Consumer read value: 18
Consumer read value: 19
Consumer read value: 20
Consumer read value: 21
Consumer read value: 22
Consumer done consuming.

Спасибо Джим за ответ. Не могли бы вы включить код, показывающий, как мне изменить производителя, чтобы он реже подключался к базе данных?

Theodor Zoulias 30.12.2022 18:43

Я не программист на С#, но логика проста. Установите логический флаг для производителя, указывающий, что он должен создавать значения для канала, когда канал пуст. Удерживайте этот флаг в положении «ИСТИНА», пока канал не заполнится, затем установите флаг в «ЛОЖЬ». Производитель будет производить значения до тех пор, пока флаг "TRUE". Производитель прекратит производство значений до тех пор, пока канал снова не станет пустым, после чего он снова будет производить значения до тех пор, пока канал не заполнится или не останется значений для производства.

Jim Rogers 30.12.2022 19:13

Хм, означает ли это, что производитель должен безрезультатно наблюдать за значением логического флага, пока флаг не изменит значение? Это возможное решение, но оно грязное и неэффективное, а также может включать некоторые условия гонки, которые приведут к тому, что мой производитель перестанет работать. Не могли бы вы включить какой-нибудь псевдокод или код на выбранном вами языке?

Theodor Zoulias 30.12.2022 19:23

Я отредактировал свой ответ, чтобы показать, как производитель может быть приостановлен до тех пор, пока ему снова не будет разрешено производить без расточительного опроса.

Jim Rogers 30.12.2022 20:37

Спасибо Джим за пример. Из вывода я вижу, что поведение не совсем то, что я хочу. Я ожидаю увидеть запись Producer produced value: 12 после записи Consumer read value: 6, а не после Consumer read value: 11. Я не хочу, чтобы мой потребитель полностью опустошил канал до того, как производитель начнет новый круг производства. Я хочу, чтобы производитель повторно подключался к базе данных, когда канал наполовину заполнен (или наполовину пуст, если хотите). Является ли это неотъемлемым ограничением вашего решения или его можно исправить?

Theodor Zoulias 30.12.2022 20:49

Почему вы хотите избежать слива канала?

Jim Rogers 30.12.2022 21:06

Потому что connection.Open(); занимает значительное количество времени, и я не хочу, чтобы потребитель бездействовал в это время.

Theodor Zoulias 30.12.2022 21:08

Затем вы будете либо медленным потребителем, как в вашей версии, либо ваш производитель перезапишет значения до того, как они будут использованы. В моем решении вы перезапускаете канал после каждого заполнения буфера, в то время как ваше перезапускает канал при каждой второй записи после запуска потребителя. Мое решение сводит к минимуму перезапуски канала.

Jim Rogers 30.12.2022 21:20
Ответ принят как подходящий

Один из способов решить эту проблему — использовать два канала: ограниченный Channel<T> с желаемой пропускной способностью и второй ограниченный Channel<int> с пропускной способностью 1, который используется только для функциональности WaitToWriteAsync. Синхронизация двух каналов не тривиальна, поэтому я написал собственную Channel<T> реализацию, которая объединяет эти два канала и выполняет внутреннюю синхронизацию:

public sealed class DoubleCapacityChannel<T> : Channel<T>
{
    private readonly Channel<T> _channel;
    private readonly Channel<int> _channelLow;
    private readonly int _lowCapacity;

    public DoubleCapacityChannel(int highCapacity, int lowCapacity)
    {
        if (highCapacity < 1)
            throw new ArgumentOutOfRangeException(nameof(highCapacity));
        if (lowCapacity < 1 || lowCapacity > highCapacity)
            throw new ArgumentOutOfRangeException(nameof(lowCapacity));
        _lowCapacity = lowCapacity;
        _channel = Channel.CreateBounded<T>(highCapacity);
        Debug.Assert(_channel.Reader.CanCount);
        _channelLow = Channel.CreateBounded<int>(1);
        this.Writer = new ChannelWriter(this);
        this.Reader = new ChannelReader(this);
    }

    private class ChannelWriter : ChannelWriter<T>
    {
        private readonly DoubleCapacityChannel<T> _parent;
        public ChannelWriter(DoubleCapacityChannel<T> parent) => _parent = parent;
        public override bool TryComplete(Exception error = null)
        {
            lock (_parent._channel)
            {
                bool success = _parent._channel.Writer.TryComplete(error);
                if (success) _parent._channelLow.Writer.TryComplete(error);
                return success;
            }
        }
        public override bool TryWrite(T item)
        {
            lock (_parent._channel)
            {
                bool success = _parent._channel.Writer.TryWrite(item);
                if (!success || _parent._channel.Reader.Count >= _parent._lowCapacity)
                    _parent._channelLow.Writer.TryWrite(0);
                return success;
            }
        }
        public override async ValueTask WriteAsync(T item,
            CancellationToken cancellationToken = default)
        {
            cancellationToken.ThrowIfCancellationRequested();
            while (true)
            {
                if (this.TryWrite(item)) break;
                try
                {
                    if (await _parent._channel.Writer.WaitToWriteAsync(
                        cancellationToken).ConfigureAwait(false)) continue;
                }
                catch (Exception ex) { throw new ChannelClosedException(ex); }
                throw new ChannelClosedException();
            }
        }
        public override ValueTask<bool> WaitToWriteAsync(
            CancellationToken cancellationToken = default)
               => _parent._channelLow.Writer.WaitToWriteAsync(cancellationToken);
    }

    private class ChannelReader : ChannelReader<T>
    {
        private readonly DoubleCapacityChannel<T> _parent;
        public ChannelReader(DoubleCapacityChannel<T> parent) => _parent = parent;
        public override Task Completion => _parent._channel.Reader.Completion;
        public override bool CanCount => _parent._channel.Reader.CanCount;
        public override int Count => _parent._channel.Reader.Count;
        public override bool TryRead(out T item)
        {
            lock (_parent._channel)
            {
                bool success = _parent._channel.Reader.TryRead(out item);
                if (!success || _parent._channel.Reader.Count < _parent._lowCapacity)
                    _parent._channelLow.Reader.TryRead(out _);
                return success;
            }
        }
        public override ValueTask<T> ReadAsync(
            CancellationToken cancellationToken = default)
                => _parent._channel.Reader.ReadAsync(cancellationToken);
        public override ValueTask<bool> WaitToReadAsync(
            CancellationToken cancellationToken = default)
                => _parent._channel.Reader.WaitToReadAsync(cancellationToken);
        public override bool CanPeek => _parent._channel.Reader.CanPeek;
        public override bool TryPeek(out T item)
            => _parent._channel.Reader.TryPeek(out item);
    }
}

lowCapacity влияет только на метод WaitToWriteAsync.

Класс DoubleCapacityChannel<T> может решить проблему в вопросе, изменив строку:

Channel<Item> channel = Channel.CreateBounded<Item>(10);

к

Channel<Item> channel = new DoubleCapacityChannel<Item>(10, 5);

Ниже приведен вывод исходного минимального примера, модифицированного для использования DoubleCapacityChannel<T>:

19:26:19.119 [4] > Opening connection -->
19:26:19.241 [4] > Produced #1
19:26:19.243 [4] > Produced #2
19:26:19.243 [4] > Produced #3
19:26:19.243 [4] > Produced #4
19:26:19.243 [4] > Produced #5
19:26:19.243 [4] > Produced #6
19:26:19.243 [4] > Produced #7
19:26:19.243 [4] > Produced #8
19:26:19.243 [4] > Produced #9
19:26:19.243 [4] > Produced #10
19:26:19.243 [4] > Produced #11
19:26:19.243 [4] > Closing connection <--
19:26:19.244 [6] > Consuming: 1
19:26:19.345 [6] > Consuming: 2
19:26:19.446 [6] > Consuming: 3
19:26:19.547 [6] > Consuming: 4
19:26:19.651 [6] > Consuming: 5
19:26:19.752 [6] > Consuming: 6
19:26:19.852 [6] > Consuming: 7
19:26:19.853 [4] > Opening connection -->
19:26:19.953 [6] > Consuming: 8
19:26:19.953 [4] > Produced #12
19:26:19.953 [4] > Produced #13
19:26:19.953 [4] > Produced #14
19:26:19.953 [4] > Produced #15
19:26:19.953 [4] > Produced #16
19:26:19.953 [4] > Produced #17
19:26:19.953 [4] > Produced #18
19:26:19.953 [4] > Produced #19
19:26:19.953 [4] > Closing connection <--
19:26:20.053 [6] > Consuming: 9
19:26:20.154 [4] > Consuming: 10
19:26:20.254 [4] > Consuming: 11
19:26:20.355 [4] > Consuming: 12
19:26:20.455 [4] > Consuming: 13
19:26:20.556 [4] > Consuming: 14
19:26:20.656 [4] > Consuming: 15
19:26:20.656 [6] > Opening connection -->
19:26:20.757 [6] > Produced #20
19:26:20.757 [6] > Produced #21
19:26:20.757 [6] > Produced #22
19:26:20.757 [6] > Produced #23
19:26:20.757 [6] > Produced #24
19:26:20.757 [6] > Produced #25
19:26:20.757 [6] > Produced #26
19:26:20.757 [6] > Produced #27
19:26:20.757 [6] > Closing connection <--
19:26:20.757 [4] > Consuming: 16
19:26:20.858 [4] > Consuming: 17
19:26:20.859 [6] > Producer completed
19:26:20.959 [6] > Consuming: 18
19:26:21.059 [6] > Consuming: 19
19:26:21.160 [6] > Consuming: 20
19:26:21.260 [6] > Consuming: 21
19:26:21.361 [6] > Consuming: 22
19:26:21.461 [6] > Consuming: 23
19:26:21.562 [6] > Consuming: 24
19:26:21.662 [6] > Consuming: 25
19:26:21.763 [6] > Consuming: 26
19:26:21.863 [7] > Consuming: 27
19:26:21.964 [1] > Finished

Другие вопросы по теме