Я использую TPL DataFlow с BatchBlock, который запускается автоматически, когда количество текущих в очереди или отложенных элементов меньше размера партии после тайм-аута, как указано ниже:
Timer triggerBatchTimer = new(_ => _batchBlock.TriggerBatch());
TransformBlock<string, string> timeoutTransformBlock = new((value) =>
{
triggerBatchTimer.Change(_options.Value.TriggerBatchTime, Timeout.Infinite);
return value;
});
var actionBlock = new ActionBlock<IEnumerable<string>>(action =>
{
GenerateFile(action);
});
_buffer.LinkTo(timeoutTransformBlock);
timeoutTransformBlock.LinkTo(_batchBlock);
_batchBlock.LinkTo(actionBlock);
Т.е. с максимальным размером пакета: 4 и тайм-аутом 10 секунд.
Current behavior
BatchBlock (items): +---------1------------2------3---------------------------------|
Timeout (sec) : +-10--9--10--9--8--7--10--9--10--9--8--7--6--5--4--3--2--1--0---|
ActionBlock : +----------------------------------------------------------Call-|
Expected
BatchBlock (items): +--------1-----------2-----3---------|
Timeout (sec) : +-10--9--8--7--6--5--4--3--2--1--0---|
ActionBlock : +-------------------------------Call-|
Но моя проблема заключается в том, как избежать сброса тайм-аута на 0 каждый раз, когда блок получает новый элемент?
«Моя проблема заключается в том, как избежать сброса тайм-аута на 0 каждый раз, когда блок получает новый элемент». -- Итак, в какой момент и при каких условиях вы хотите сбросить Timer
? Или, если этот вопрос неясен, какое именно поведение вы хотите применить?
Когда Timer
инициируется, каждый раз, когда блок получает элемент, таймер не должен изменять время запуска только при вызове GenerateFile
. То есть: тайм-аут начинается с 30 секунд, блокирует получение первого элемента и тайм-аут не перезапускается через 30 секунд. Я не знаю, ясно ли...
Жульен, если честно, твое объяснение мне непонятно. Возможно, добавление мраморной диаграммы в вопрос поможет прояснить желаемое поведение.
Получится ли желаемое поведение, если вместо вызова triggerBatchTimer.Change
вы просто инициализируете таймер одним и тем же значением для dueTime
и period
? Таким образом, пакеты будут выдаваться периодически или быстрее, если достигнуто значение batchSize
.
Просто инициализировал пакетный блок следующим образом: Timer triggerBatchTimer = new(_ => _batchBlock.TriggerBatch(), null, 10000, 10000);
и удалил triggerBatchTimer.Change
?
Ага. Достигаете ли вы желаемого поведения?
Да, я реализовал один тест на данный момент, и все в порядке. Я сделаю несколько тестов и вернусь к вам за результатом.
Если вы нашли решение, вы можете опубликовать его как самостоятельный ответ. Честно говоря, мраморная диаграмма внутри вопроса, озаглавленная «Ожидается», не имеет достаточной длины, чтобы сделать желаемый шаблон однозначным, поэтому любой, кто попытается ответить на вопрос, будет случайным.
Спасибо @Theodor Zoulias за помощь, это решение:
Просто инициализируйте таймер с одинаковым значением для dueTime
и period
и удалите triggerBatchTimer.Change
в TransformBlock
.
Таким образом, пакетный блок срабатывает каждые X секунд или по достижении размера пакета, а таймер не сбрасывается для каждого нового элемента в пакетном блоке.
Timer triggerBatchTimer = new(_ => _batchBlock.TriggerBatch(), null, options.Value.TriggerBatchTime, options.Value.TriggerBatchTime);
TransformBlock<string, string> timeoutTransformBlock = new((value) =>
{
return value;
});
var actionBlock = new ActionBlock<IEnumerable<string>>(action =>
{
GenerateFile(action);
});
_buffer.LinkTo(timeoutTransformBlock);
timeoutTransformBlock.LinkTo(_batchBlock);
_batchBlock.LinkTo(actionBlock);
Связано: Пакетирование по продолжительности или порогу с использованием потока данных TPL