У меня есть сценарий производитель-потребитель¹, основанный на ограниченном канале <T> .
Channel<Item> channel = Channel.CreateBounded<Item>(10);
Элементы поступают из СУБД, к которой производитель подключается и извлекает их один за другим. Особенность в том, что я не могу позволить себе роскошь поддерживать соединение с БД в течение всего срока службы производителя. Я должен закрыть соединение, когда канал заполнен, и снова открыть соединение, когда на канале снова появится место для нового элемента. Поэтому я реализовал производителя следующим образом:
// Producer
while (true)
{
await channel.Writer.WaitToWriteAsync();
connection.Open();
Item item;
while (true)
{
item = connection.GetNextItem(); // This is fast
if (!channel.Writer.TryWrite(item)) break;
}
connection.Close();
await channel.Writer.WriteAsync(item);
}
Производитель ждет завершения задачи channel.Writer.WaitToWriteAsync(), затем открывает соединение с БД, записывает в канал столько элементов, сколько может, пока один из них не будет отклонен, закрывает соединение с БД, асинхронно записывает отклоненный элемент, и возвращается к ожиданию.
Потребитель довольно стандартный:
// Consumer
await foreach (Item item in channel.Reader.ReadAllAsync())
{
// Process the item (this is slow)
}
Моя проблема с этим дизайном заключается в том, что соединение с БД открывается и закрывается слишком часто. Как открытие, так и закрытие соединения имеют существенные накладные расходы, поэтому я хотел бы свести к минимуму частоту, с которой это происходит. Хотя вместимость канала равна 10, я бы предпочел, чтобы задача WaitToWriteAsync
завершалась, когда канал наполовину заполнен (5 элементов), а не сразу, когда количество сохраненных элементов уменьшается с 10 до 9.
Мой вопрос: как я могу изменить своего производителя, чтобы он подключался к базе данных, когда в канале 5 или менее элементов, и закрывал соединение, когда канал заполнен 10 элементами?
Ниже приведен вывод минимального примера, который я написал, который воспроизводит нежелательное поведение:
19:20:55.811 [4] > Opening connection -->
19:20:55.933 [4] > Produced #1
19:20:55.934 [4] > Produced #2
19:20:55.934 [4] > Produced #3
19:20:55.934 [4] > Produced #4
19:20:55.934 [4] > Produced #5
19:20:55.934 [4] > Produced #6
19:20:55.935 [4] > Produced #7
19:20:55.935 [4] > Produced #8
19:20:55.935 [4] > Produced #9
19:20:55.935 [4] > Produced #10
19:20:55.935 [4] > Produced #11
19:20:55.935 [4] > Closing connection <--
19:20:55.936 [6] > Consuming: 1
19:20:56.037 [4] > Consuming: 2
19:20:56.037 [6] > Opening connection -->
19:20:56.137 [6] > Produced #12
19:20:56.137 [6] > Produced #13
19:20:56.137 [6] > Closing connection <--
19:20:56.137 [4] > Consuming: 3
19:20:56.238 [6] > Consuming: 4
19:20:56.238 [4] > Opening connection -->
19:20:56.338 [4] > Produced #14
19:20:56.338 [4] > Produced #15
19:20:56.338 [4] > Closing connection <--
19:20:56.338 [6] > Consuming: 5
19:20:56.439 [4] > Consuming: 6
19:20:56.439 [6] > Opening connection -->
19:20:56.539 [6] > Produced #16
19:20:56.539 [6] > Produced #17
19:20:56.539 [6] > Closing connection <--
19:20:56.539 [4] > Consuming: 7
19:20:56.644 [6] > Consuming: 8
19:20:56.644 [4] > Opening connection -->
19:20:56.744 [4] > Produced #18
19:20:56.745 [7] > Consuming: 9
19:20:56.745 [4] > Produced #19
19:20:56.745 [4] > Produced #20
19:20:56.745 [4] > Closing connection <--
19:20:56.846 [7] > Consuming: 10
19:20:56.847 [4] > Producer completed
19:20:56.946 [4] > Consuming: 11
19:20:57.046 [4] > Consuming: 12
19:20:57.147 [4] > Consuming: 13
19:20:57.247 [4] > Consuming: 14
19:20:57.347 [4] > Consuming: 15
19:20:57.452 [4] > Consuming: 16
19:20:57.552 [4] > Consuming: 17
19:20:57.653 [4] > Consuming: 18
19:20:57.753 [4] > Consuming: 19
19:20:57.854 [4] > Consuming: 20
19:20:57.955 [1] > Finished
Как вы можете видеть, происходит много «открытия/закрытия соединения».
Мой вопрос похож на этот старый вопрос:
Имея API внешнего производителя, который можно останавливать и запускать, эффективно останавливайте производителя, когда локальный буфер заполнен.
Разница в том, что в моем случае производитель — это цикл, а не обработчик события сервиса, как в другом вопросе.
¹ This scenario is contrived. It was inspired by a relatively recent GitHub API proposal.
Уточнение: Канал не следует полностью сливать перед повторным подключением к БД. Это связано с тем, что открытие соединения занимает некоторое время, и я не хочу, чтобы потребитель простаивал в это время. Таким образом, производитель должен повторно подключаться, когда канал упал до 5 элементов или меньше, а не когда он полностью пуст.
Эта проблема аналогична буферизации ввода-вывода. Попробуйте просто увеличить размер вашего ограниченного канала и начать запись в канал, когда он пуст, а затем прекратить запись в канал, когда он заполнен или нет больше данных для записи. Таким образом, циклы запуска-остановки производителя ограничиваются условиями пустого/заполненного канала. Ваш продюсер не должен пытаться отслеживать каждую пару действий потребителей на канале.
Следующий пример написан с использованием языка программирования Ада в ответ на просьбу автора привести пример на языке по моему выбору.
В этом примере создается задача производителя и задача потребителя, которые обмениваются данными через общий буфер. Производитель начинает производство, когда общий буфер заполняется, и останавливается, когда общий буфер становится пустым. Производитель намного быстрее, чем потребитель.
Спецификация пакета Ada для этого примера, который предоставляет задачи производителя и потребителя основной процедуре:
package prod_con is
task fast_producer;
task slow_consumer;
end prod_con;
Реализация этих задач и общий буфер находятся в теле пакета.
with Ada.Text_IO; use Ada.Text_IO;
package body prod_con is
type Buf_Index is range 0 .. 10;
type Buf_Arr is array (Buf_Index) of Integer;
protected Buffer is
entry Write (Value : in Integer);
entry Read (Value : out Integer);
private
Write_Allowed : Boolean := True;
Buff : Buf_Arr;
Write_Index : Buf_Index := 0;
Read_Index : Buf_Index := 0;
Count : Natural := 0;
end Buffer;
protected body Buffer is
entry Write (Value : in Integer) when Write_Allowed is
begin
Buff (Write_Index) := Value;
Count := Count + 1;
if Write_Index < Buf_Index'Last then
Write_Index := Write_Index + 1;
else
Write_Index := 0;
Write_Allowed := False;
end if;
end Write;
entry Read (Value : out Integer) when Count > 0 is
begin
Value := Buff (Read_Index);
Count := Count - 1;
if Read_Index < Buf_Index'Last then
Read_Index := Read_Index + 1;
else
Write_Allowed := True;
Read_Index := 0;
end if;
end Read;
end Buffer;
-------------------
-- fast_producer --
-------------------
task body fast_producer is
begin
for I in 1 .. 22 loop
Buffer.Write (I);
Put_Line ("Producer produced value:" & I'Image);
end loop;
Put_Line ("Producer done producing.");
end fast_producer;
-------------------
-- slow_consumer --
-------------------
task body slow_consumer is
Next_Value : Integer;
begin
loop
select
Buffer.Read (Next_Value);
Put_Line ("Consumer read value:" & Next_Value'Image);
delay 1.0; -- sleep for one second
or
delay 0.1;
exit;
end select;
end loop;
Put_Line ("Consumer done consuming.");
end slow_consumer;
end prod_con;
with Ada.Text_IO; use Ada.Text_IO;
package body prod_con is
type Buf_Index is range 0 .. 10;
type Buf_Arr is array (Buf_Index) of Integer;
protected Buffer is
entry Write (Value : in Integer);
entry Read (Value : out Integer);
private
Write_Allowed : Boolean := True;
Buff : Buf_Arr;
Write_Index : Buf_Index := 0;
Read_Index : Buf_Index := 0;
Count : Natural := 0;
end Buffer;
protected body Buffer is
entry Write (Value : in Integer) when Write_Allowed is
begin
Buff (Write_Index) := Value;
Count := Count + 1;
if Write_Index < Buf_Index'Last then
Write_Index := Write_Index + 1;
else
Write_Index := 0;
Write_Allowed := False;
end if;
end Write;
entry Read (Value : out Integer) when Count > 0 is
begin
Value := Buff (Read_Index);
Count := Count - 1;
if Read_Index < Buf_Index'Last then
Read_Index := Read_Index + 1;
else
Write_Allowed := True;
Read_Index := 0;
end if;
end Read;
end Buffer;
-------------------
-- fast_producer --
-------------------
task body fast_producer is
begin
for I in 1 .. 22 loop
Buffer.Write (I);
Put_Line ("Producer produced value:" & I'Image);
end loop;
Put_Line ("Producer done producing.");
end fast_producer;
-------------------
-- slow_consumer --
-------------------
task body slow_consumer is
Next_Value : Integer;
begin
loop
select
Buffer.Read (Next_Value);
Put_Line ("Consumer read value:" & Next_Value'Image);
delay 1.0; -- sleep for one second
or
delay 0.1;
exit;
end select;
end loop;
Put_Line ("Consumer done consuming.");
end slow_consumer;
end prod_con;
Защищенный объект Ada с именем Buffer защищен от условий гонки. В этом примере используются защищенные записи. Защищенные записи разрешают доступ для чтения и записи к защищенному объекту под контролем граничного условия. Запись записи разрешена для выполнения, когда Write_Allowed имеет значение true. Каждый раз, когда выполняется запись Write, она присваивает параметр Value внутреннему массиву с именем Buff по значению индекса, содержащемуся в Write_Index, и увеличивает количество значений в массиве Buff.
Когда Write_Index меньше максимального значения индекса, Write_Index увеличивается. Если Write_Index не меньше максимального значения индекса, то Write_Index присваивается 0, а Write_Allowed присваивается False. Буфер заполнен, и производитель будет приостановлен в очереди записи при следующем вызове Write до тех пор, пока Write_Allowed не примет значение True.
Запись Read используется потребителем для чтения следующего значения из буфера, когда буфер не пуст.
Каждый вызов записи Read присваивает значение Buff по индексу Read_Index параметру Value. Значение передается задаче потребителя. Количество уменьшается. Read_Index увеличивается, если его текущее значение меньше максимального значения индекса массива. Если Read_Index не меньше максимального значения индекса массива, Read_Index устанавливается равным 0, а Write_Allowed устанавливается равным True.
Задача Producer просто записывает значения от 1 до 22 в Buffer так быстро, как только может. Производитель будет приостановлен, пока граничное условие записи записи имеет значение False, и автоматически возобновит выполнение, когда Write_Allowed имеет значение true.
Потребитель циклически выполняет чтение из буфера, а затем засыпает на одну секунду, чтобы имитировать медленного потребителя. Вызов Buffer.Read помещается в команду выбора, которая выполняет условный вызов записи Buffer.Read. Установленное условие гласит, что вызов Buffer.Read будет отменен, если он не будет завершен в течение 0,1 секунды. Когда Buffer.Read отменяется, потребитель завершает работу.
Результат этого примера:
Producer produced value: 1
Producer produced value: 2
Producer produced value: 3
Producer produced value: 4
Producer produced value: 5
Producer produced value: 6
Producer produced value: 7
Producer produced value: 8
Producer produced value: 9
Producer produced value: 10
Producer produced value: 11
Consumer read value: 1
Consumer read value: 2
Consumer read value: 3
Consumer read value: 4
Consumer read value: 5
Consumer read value: 6
Consumer read value: 7
Consumer read value: 8
Consumer read value: 9
Consumer read value: 10
Consumer read value: 11
Producer produced value: 12
Producer produced value: 13
Producer produced value: 14
Producer produced value: 15
Producer produced value: 16
Producer produced value: 17
Producer produced value: 18
Producer produced value: 19
Producer produced value: 20
Producer produced value: 21
Producer produced value: 22
Producer done producing.
Consumer read value: 12
Consumer read value: 13
Consumer read value: 14
Consumer read value: 15
Consumer read value: 16
Consumer read value: 17
Consumer read value: 18
Consumer read value: 19
Consumer read value: 20
Consumer read value: 21
Consumer read value: 22
Consumer done consuming.
Я не программист на С#, но логика проста. Установите логический флаг для производителя, указывающий, что он должен создавать значения для канала, когда канал пуст. Удерживайте этот флаг в положении «ИСТИНА», пока канал не заполнится, затем установите флаг в «ЛОЖЬ». Производитель будет производить значения до тех пор, пока флаг "TRUE". Производитель прекратит производство значений до тех пор, пока канал снова не станет пустым, после чего он снова будет производить значения до тех пор, пока канал не заполнится или не останется значений для производства.
Хм, означает ли это, что производитель должен безрезультатно наблюдать за значением логического флага, пока флаг не изменит значение? Это возможное решение, но оно грязное и неэффективное, а также может включать некоторые условия гонки, которые приведут к тому, что мой производитель перестанет работать. Не могли бы вы включить какой-нибудь псевдокод или код на выбранном вами языке?
Я отредактировал свой ответ, чтобы показать, как производитель может быть приостановлен до тех пор, пока ему снова не будет разрешено производить без расточительного опроса.
Спасибо Джим за пример. Из вывода я вижу, что поведение не совсем то, что я хочу. Я ожидаю увидеть запись Producer produced value: 12
после записи Consumer read value: 6
, а не после Consumer read value: 11
. Я не хочу, чтобы мой потребитель полностью опустошил канал до того, как производитель начнет новый круг производства. Я хочу, чтобы производитель повторно подключался к базе данных, когда канал наполовину заполнен (или наполовину пуст, если хотите). Является ли это неотъемлемым ограничением вашего решения или его можно исправить?
Почему вы хотите избежать слива канала?
Потому что connection.Open();
занимает значительное количество времени, и я не хочу, чтобы потребитель бездействовал в это время.
Затем вы будете либо медленным потребителем, как в вашей версии, либо ваш производитель перезапишет значения до того, как они будут использованы. В моем решении вы перезапускаете канал после каждого заполнения буфера, в то время как ваше перезапускает канал при каждой второй записи после запуска потребителя. Мое решение сводит к минимуму перезапуски канала.
Один из способов решить эту проблему — использовать два канала: ограниченный Channel<T>
с желаемой пропускной способностью и второй ограниченный Channel<int>
с пропускной способностью 1, который используется только для функциональности WaitToWriteAsync
. Синхронизация двух каналов не тривиальна, поэтому я написал собственную Channel<T>
реализацию, которая объединяет эти два канала и выполняет внутреннюю синхронизацию:
public sealed class DoubleCapacityChannel<T> : Channel<T>
{
private readonly Channel<T> _channel;
private readonly Channel<int> _channelLow;
private readonly int _lowCapacity;
public DoubleCapacityChannel(int highCapacity, int lowCapacity)
{
if (highCapacity < 1)
throw new ArgumentOutOfRangeException(nameof(highCapacity));
if (lowCapacity < 1 || lowCapacity > highCapacity)
throw new ArgumentOutOfRangeException(nameof(lowCapacity));
_lowCapacity = lowCapacity;
_channel = Channel.CreateBounded<T>(highCapacity);
Debug.Assert(_channel.Reader.CanCount);
_channelLow = Channel.CreateBounded<int>(1);
this.Writer = new ChannelWriter(this);
this.Reader = new ChannelReader(this);
}
private class ChannelWriter : ChannelWriter<T>
{
private readonly DoubleCapacityChannel<T> _parent;
public ChannelWriter(DoubleCapacityChannel<T> parent) => _parent = parent;
public override bool TryComplete(Exception error = null)
{
lock (_parent._channel)
{
bool success = _parent._channel.Writer.TryComplete(error);
if (success) _parent._channelLow.Writer.TryComplete(error);
return success;
}
}
public override bool TryWrite(T item)
{
lock (_parent._channel)
{
bool success = _parent._channel.Writer.TryWrite(item);
if (!success || _parent._channel.Reader.Count >= _parent._lowCapacity)
_parent._channelLow.Writer.TryWrite(0);
return success;
}
}
public override async ValueTask WriteAsync(T item,
CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
while (true)
{
if (this.TryWrite(item)) break;
try
{
if (await _parent._channel.Writer.WaitToWriteAsync(
cancellationToken).ConfigureAwait(false)) continue;
}
catch (Exception ex) { throw new ChannelClosedException(ex); }
throw new ChannelClosedException();
}
}
public override ValueTask<bool> WaitToWriteAsync(
CancellationToken cancellationToken = default)
=> _parent._channelLow.Writer.WaitToWriteAsync(cancellationToken);
}
private class ChannelReader : ChannelReader<T>
{
private readonly DoubleCapacityChannel<T> _parent;
public ChannelReader(DoubleCapacityChannel<T> parent) => _parent = parent;
public override Task Completion => _parent._channel.Reader.Completion;
public override bool CanCount => _parent._channel.Reader.CanCount;
public override int Count => _parent._channel.Reader.Count;
public override bool TryRead(out T item)
{
lock (_parent._channel)
{
bool success = _parent._channel.Reader.TryRead(out item);
if (!success || _parent._channel.Reader.Count < _parent._lowCapacity)
_parent._channelLow.Reader.TryRead(out _);
return success;
}
}
public override ValueTask<T> ReadAsync(
CancellationToken cancellationToken = default)
=> _parent._channel.Reader.ReadAsync(cancellationToken);
public override ValueTask<bool> WaitToReadAsync(
CancellationToken cancellationToken = default)
=> _parent._channel.Reader.WaitToReadAsync(cancellationToken);
public override bool CanPeek => _parent._channel.Reader.CanPeek;
public override bool TryPeek(out T item)
=> _parent._channel.Reader.TryPeek(out item);
}
}
lowCapacity
влияет только на метод WaitToWriteAsync
.
Класс DoubleCapacityChannel<T>
может решить проблему в вопросе, изменив строку:
Channel<Item> channel = Channel.CreateBounded<Item>(10);
к
Channel<Item> channel = new DoubleCapacityChannel<Item>(10, 5);
Ниже приведен вывод исходного минимального примера, модифицированного для использования DoubleCapacityChannel<T>
:
19:26:19.119 [4] > Opening connection -->
19:26:19.241 [4] > Produced #1
19:26:19.243 [4] > Produced #2
19:26:19.243 [4] > Produced #3
19:26:19.243 [4] > Produced #4
19:26:19.243 [4] > Produced #5
19:26:19.243 [4] > Produced #6
19:26:19.243 [4] > Produced #7
19:26:19.243 [4] > Produced #8
19:26:19.243 [4] > Produced #9
19:26:19.243 [4] > Produced #10
19:26:19.243 [4] > Produced #11
19:26:19.243 [4] > Closing connection <--
19:26:19.244 [6] > Consuming: 1
19:26:19.345 [6] > Consuming: 2
19:26:19.446 [6] > Consuming: 3
19:26:19.547 [6] > Consuming: 4
19:26:19.651 [6] > Consuming: 5
19:26:19.752 [6] > Consuming: 6
19:26:19.852 [6] > Consuming: 7
19:26:19.853 [4] > Opening connection -->
19:26:19.953 [6] > Consuming: 8
19:26:19.953 [4] > Produced #12
19:26:19.953 [4] > Produced #13
19:26:19.953 [4] > Produced #14
19:26:19.953 [4] > Produced #15
19:26:19.953 [4] > Produced #16
19:26:19.953 [4] > Produced #17
19:26:19.953 [4] > Produced #18
19:26:19.953 [4] > Produced #19
19:26:19.953 [4] > Closing connection <--
19:26:20.053 [6] > Consuming: 9
19:26:20.154 [4] > Consuming: 10
19:26:20.254 [4] > Consuming: 11
19:26:20.355 [4] > Consuming: 12
19:26:20.455 [4] > Consuming: 13
19:26:20.556 [4] > Consuming: 14
19:26:20.656 [4] > Consuming: 15
19:26:20.656 [6] > Opening connection -->
19:26:20.757 [6] > Produced #20
19:26:20.757 [6] > Produced #21
19:26:20.757 [6] > Produced #22
19:26:20.757 [6] > Produced #23
19:26:20.757 [6] > Produced #24
19:26:20.757 [6] > Produced #25
19:26:20.757 [6] > Produced #26
19:26:20.757 [6] > Produced #27
19:26:20.757 [6] > Closing connection <--
19:26:20.757 [4] > Consuming: 16
19:26:20.858 [4] > Consuming: 17
19:26:20.859 [6] > Producer completed
19:26:20.959 [6] > Consuming: 18
19:26:21.059 [6] > Consuming: 19
19:26:21.160 [6] > Consuming: 20
19:26:21.260 [6] > Consuming: 21
19:26:21.361 [6] > Consuming: 22
19:26:21.461 [6] > Consuming: 23
19:26:21.562 [6] > Consuming: 24
19:26:21.662 [6] > Consuming: 25
19:26:21.763 [6] > Consuming: 26
19:26:21.863 [7] > Consuming: 27
19:26:21.964 [1] > Finished
Спасибо Джим за ответ. Не могли бы вы включить код, показывающий, как мне изменить производителя, чтобы он реже подключался к базе данных?