У меня есть конвейер потока данных, состоящий из нескольких блоков, обрабатывающих разнородные документы (XLS, PDF и т. д.). Каждый тип документа обрабатывается специальным TransformBlock
. В конце конвейера у меня есть ActionBlock
, который получает все обработанные документы и загружает их один за другим на веб-сервер. Моя проблема в том, что я не могу найти способ удовлетворить требование загрузки документов в том же порядке, в котором они были первоначально введены в конвейер. Например, я не могу использовать опцию VerifiedOrdered в свою пользу, потому что эта опция настраивает поведение одного блока, а не поведение нескольких блоков, которые работают параллельно. Мои требования:
Например, требуется, чтобы документ №8 загружался после документа №7, даже если он обрабатывается до документа №7.
Пятое требование означает, что я не могу дождаться обработки всех документов, затем отсортировать их по индексу и, наконец, загрузить. Загрузка должна происходить одновременно с обработкой.
Вот минимальный пример того, что я пытаюсь сделать. Для простоты я загружаю блоки не экземплярами интерфейса IDocument
, а простыми целыми числами. Значение каждого целого числа представляет порядок, в котором оно было введено в конвейер, и порядок, в котором оно должно быть загружено:
var xlsBlock = new TransformBlock<int, int>(document =>
{
int duration = 300 + document % 3 * 300;
Thread.Sleep(duration); // Simulate CPU-bound work
return document;
});
var pdfBlock = new TransformBlock<int, int>(document =>
{
int duration = 100 + document % 5 * 200;
Thread.Sleep(duration); // Simulate CPU-bound work
return document;
});
var uploader = new ActionBlock<int>(async document =>
{
Console.WriteLine($"Uploading document #{document}");
await Task.Delay(500); // Simulate I/O-bound work
});
xlsBlock.LinkTo(uploader);
pdfBlock.LinkTo(uploader);
foreach (var document in Enumerable.Range(1, 10))
{
if (document % 2 == 0)
xlsBlock.Post(document);
else
pdfBlock.Post(document);
}
xlsBlock.Complete();
pdfBlock.Complete();
_ = Task.WhenAll(xlsBlock.Completion, pdfBlock.Completion)
.ContinueWith(_ => uploader.Complete());
await uploader.Completion;
Результат:
Uploading document #1
Uploading document #2
Uploading document #3
Uploading document #5
Uploading document #4
Uploading document #7
Uploading document #6
Uploading document #9
Uploading document #8
Uploading document #10
Желаемый порядок: №1, №2, №3, №4, №5, №6, №7, №8, №9, №10.
Как восстановить порядок обработанных документов до их отправки в блок uploader
?
Уточнение: кардинально изменить схему конвейера, заменив несколько конкретных TransformBlock
одним общим TransformBlock
, нельзя. Идеальным сценарием будет перехват одного блока между процессорами и загрузчиком, что восстановит порядок документов.
@jdweng, вы можете предположить, что порядковый номер уже является свойством объектов document
. Мне разрешено добавлять к ним свойство public long SequenceNumber
и правильно его инициализировать. Проблема в том, как собрать их обратно после того, как они будут обработаны всеми этими разными блоками.
Загрузчик должен добавить порядковый номер перед каждым блоком, чтобы, когда сервер получает блок, сервер мог комбинировать в правильном порядке. Вы не можете использовать номер, который удаляется во время обработки.
@jdweng Я не могу делегировать восстановление заказа веб-серверу (если вы это имеете в виду). Я должен сделать это в своей собственной программе.
Тогда вы должны сделать это перед загрузкой, и вы не можете загружать параллельно.
Параллельная обработка и загрузка @jdweng является частью моих требований. Я не могу их изменить. Если бы мне разрешили изменить требования, я бы не задавал этот вопрос прямо сейчас.
Вы не можете загружать параллельно, если вам нужно также объединить по порядку и не иметь контроля над повторным объединением. Вам необходимо проверить требования, особенно на стороне сервера, к тому, как элементы объединяются.
@jdweng, если вы считаете, что мои требования невозможно удовлетворить, вы можете опубликовать это как ответ.
Я сказал, что нужно уметь комбинировать по порядку на сервере. Ты сказал, что не можешь. Я думаю, вы ошибаетесь. Вы должны иметь возможность управлять порядком рекомбинации, если вы отправляете данные параллельно.
@jdweng Я не могу ошибаться. Если кто-то из нас лучше знает, что может и чего не может этот конкретный веб-API, то этот человек — я. У меня есть конфиденциальная информация по этому вопросу! 😃
Тогда вы не можете загружать параллельно. Нет никакой гарантии, что данные будут рекомбинированы должным образом.
Кстати, неприятная ошибка в настоящее время существует в конфигурациях потока данных, которые следуют шаблону «один ко многим к одному». Если промежуточные TransformBlock
настроены с BoundedCapacity
, отличным от -1
, они также должны быть настроены с EnsureOrdered = false
, иначе некоторые сообщения могут быть потеряны.
uploader
следует добавить документ в какой-то отсортированный список заполненных документов и проверить, является ли добавленный документ тем, который следует загрузить следующим. Если он должен удалить и загрузить все документы из отсортированного списка, пока не будет пропущен один.
Также есть проблема с синхронизацией. Доступ к этому отсортированному списку должен быть синхронизирован между потоками. Но вы хотите, чтобы все потоки что-то делали, а не ждали, пока другие потоки завершат свою работу. Итак, uploader
должен работать со списком следующим образом:
upload_in_progress
, ничего не делать и вернуться.upload_in_progress
и вернитесь.upload_in_progress
флаг,Надеюсь, я правильно это себе представлял. Как видите, сделать его одновременно безопасным и эффективным достаточно сложно. Конечно, в большинстве случаев есть способ сделать это только с одним замком, но это не сильно увеличит эффективность. Флаг upload_in_progress
распределяется между задачами, как и сам список.
Спасибо Диалектикус за ответ! Я думаю, что ваша идея склоняется к возможному решению. Как вы думаете, я мог бы использовать класс SortedList<TKey, TValue> для решения этой проблемы?
Действительно. TKey
— порядковый номер документа, TValue
— сам документ. Кстати, доступ к списку (а также сама загрузка) должен быть защищен каким-то механизмом синхронизации, например lock
. Теперь это сложно реализовать, но это также хороший новый вопрос для переполнения стека, как только вы сможете увидеть и формализовать проблему.
Хорошо, я попробую реализовать вашу идею с помощью SortedList
, и посмотрим, что получится.
Добавил возможное решение проблемы с синхронизацией. Удачи.
Спасибо! Кстати, uploader
загружает документы последовательно (по одному), поэтому я думаю, что мне может сойти с рук синхронизация. Но я буду иметь это в виду, если требования изменятся в будущем.
Если в любой момент времени работает только один загрузчик, то проблемы с синхронизацией нет, и приведенный выше алгоритм не нужен, но я считаю, что время от времени может работать более одного.
Мне удалось реализовать блок потока данных, который может восстанавливать порядок моего перетасованного конвейера, основываясь на идее Dialecticus отсортированного списка, содержащего обработанные документы. Вместо SortedList
я использовал простой Dictionary
, который, кажется, работает так же хорошо.
/// <summary>Creates a dataflow block that restores the order of
/// a shuffled pipeline.</summary>
public static IPropagatorBlock<T, T> CreateRestoreOrderBlock<T>(
Func<T, long> indexSelector,
long startingIndex = 0L,
DataflowBlockOptions options = null)
{
if (indexSelector == null) throw new ArgumentNullException(nameof(indexSelector));
var executionOptions = new ExecutionDataflowBlockOptions();
if (options != null)
{
executionOptions.CancellationToken = options.CancellationToken;
executionOptions.BoundedCapacity = options.BoundedCapacity;
executionOptions.EnsureOrdered = options.EnsureOrdered;
executionOptions.TaskScheduler = options.TaskScheduler;
executionOptions.MaxMessagesPerTask = options.MaxMessagesPerTask;
executionOptions.NameFormat = options.NameFormat;
}
var buffer = new Dictionary<long, T>();
long minIndex = startingIndex;
IEnumerable<T> Transform(T item)
{
// No synchronization needed because MaxDegreeOfParallelism = 1
long index = indexSelector(item);
if (index < startingIndex)
throw new InvalidOperationException($"Index {index} is out of range.");
if (index < minIndex)
throw new InvalidOperationException($"Index {index} has been consumed.");
if (!buffer.TryAdd(index, item)) // .NET Core only API
throw new InvalidOperationException($"Index {index} is not unique.");
while (buffer.Remove(minIndex, out var minItem)) // .NET Core only API
{
minIndex++;
yield return minItem;
}
}
// Ideally the assertion buffer.Count == 0 should be checked on the completion
// of the block.
return new TransformManyBlock<T, T>(Transform, executionOptions);
}
Пример использования:
var xlsBlock = new TransformBlock<int, int>(document =>
{
int duration = 300 + document % 3 * 300;
Thread.Sleep(duration); // Simulate CPU-bound work
return document;
});
var pdfBlock = new TransformBlock<int, int>(document =>
{
int duration = 100 + document % 5 * 200;
Thread.Sleep(duration); // Simulate CPU-bound work
return document;
});
var orderRestorer = CreateRestoreOrderBlock<int>(
indexSelector: document => document, startingIndex: 1L);
var uploader = new ActionBlock<int>(async document =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Uploading document #{document}");
await Task.Delay(500); // Simulate I/O-bound work
});
xlsBlock.LinkTo(orderRestorer);
pdfBlock.LinkTo(orderRestorer);
orderRestorer.LinkTo(uploader, new DataflowLinkOptions { PropagateCompletion = true });
foreach (var document in Enumerable.Range(1, 10))
{
if (document % 2 == 0)
xlsBlock.Post(document);
else
pdfBlock.Post(document);
}
xlsBlock.Complete();
pdfBlock.Complete();
_ = Task.WhenAll(xlsBlock.Completion, pdfBlock.Completion)
.ContinueWith(_ => orderRestorer.Complete());
await uploader.Completion;
Выход:
09:24:18.846 Uploading document #1
09:24:19.436 Uploading document #2
09:24:19.936 Uploading document #3
09:24:20.441 Uploading document #4
09:24:20.942 Uploading document #5
09:24:21.442 Uploading document #6
09:24:21.941 Uploading document #7
09:24:22.441 Uploading document #8
09:24:22.942 Uploading document #9
09:24:23.442 Uploading document #10
(Попробуйте на Fiddle с версией, совместимой с .NET Framework)
Обычный метод заключается в добавлении порядкового номера в начале передачи, чтобы блоки можно было собрать в правильном порядке.