Как восстановить порядок перетасованного конвейера потока данных?

У меня есть конвейер потока данных, состоящий из нескольких блоков, обрабатывающих разнородные документы (XLS, PDF и т. д.). Каждый тип документа обрабатывается специальным TransformBlock. В конце конвейера у меня есть ActionBlock, который получает все обработанные документы и загружает их один за другим на веб-сервер. Моя проблема в том, что я не могу найти способ удовлетворить требование загрузки документов в том же порядке, в котором они были первоначально введены в конвейер. Например, я не могу использовать опцию VerifiedOrdered в свою пользу, потому что эта опция настраивает поведение одного блока, а не поведение нескольких блоков, которые работают параллельно. Мои требования:

  1. Вставьте документы в конвейер в определенном порядке.
  2. Обрабатывайте каждый документ по-разному, в зависимости от его типа.
  3. Документы определенного типа должны обрабатываться последовательно.
  4. Документы разных типов можно (и нужно) обрабатывать параллельно.
  5. Все документы должны быть загружены как можно скорее после их обработки.
  6. Документы должны загружаться последовательно и в том же порядке, в котором они были введены в конвейер.

Например, требуется, чтобы документ №8 загружался после документа №7, даже если он обрабатывается до документа №7.

Пятое требование означает, что я не могу дождаться обработки всех документов, затем отсортировать их по индексу и, наконец, загрузить. Загрузка должна происходить одновременно с обработкой.

Вот минимальный пример того, что я пытаюсь сделать. Для простоты я загружаю блоки не экземплярами интерфейса IDocument, а простыми целыми числами. Значение каждого целого числа представляет порядок, в котором оно было введено в конвейер, и порядок, в котором оно должно быть загружено:

var xlsBlock = new TransformBlock<int, int>(document =>
{
    int duration = 300 + document % 3 * 300;
    Thread.Sleep(duration); // Simulate CPU-bound work
    return document;
});
var pdfBlock = new TransformBlock<int, int>(document =>
{
    int duration = 100 + document % 5 * 200;
    Thread.Sleep(duration); // Simulate CPU-bound work
    return document;
});

var uploader = new ActionBlock<int>(async document =>
{
    Console.WriteLine($"Uploading document #{document}");
    await Task.Delay(500); // Simulate I/O-bound work
});

xlsBlock.LinkTo(uploader);
pdfBlock.LinkTo(uploader);

foreach (var document in Enumerable.Range(1, 10))
{
    if (document % 2 == 0)
        xlsBlock.Post(document);
    else
        pdfBlock.Post(document);
}
xlsBlock.Complete();
pdfBlock.Complete();
_ = Task.WhenAll(xlsBlock.Completion, pdfBlock.Completion)
    .ContinueWith(_ => uploader.Complete());

await uploader.Completion;

Результат:

Uploading document #1
Uploading document #2
Uploading document #3
Uploading document #5
Uploading document #4
Uploading document #7
Uploading document #6
Uploading document #9
Uploading document #8
Uploading document #10

(Попробуйте на Fiddle)

Желаемый порядок: №1, №2, №3, №4, №5, №6, №7, №8, №9, №10.

Как восстановить порядок обработанных документов до их отправки в блок uploader?

Уточнение: кардинально изменить схему конвейера, заменив несколько конкретных TransformBlock одним общим TransformBlock, нельзя. Идеальным сценарием будет перехват одного блока между процессорами и загрузчиком, что восстановит порядок документов.

Обычный метод заключается в добавлении порядкового номера в начале передачи, чтобы блоки можно было собрать в правильном порядке.

jdweng 18.12.2020 15:14

@jdweng, вы можете предположить, что порядковый номер уже является свойством объектов document. Мне разрешено добавлять к ним свойство public long SequenceNumber и правильно его инициализировать. Проблема в том, как собрать их обратно после того, как они будут обработаны всеми этими разными блоками.

Theodor Zoulias 18.12.2020 15:18

Загрузчик должен добавить порядковый номер перед каждым блоком, чтобы, когда сервер получает блок, сервер мог комбинировать в правильном порядке. Вы не можете использовать номер, который удаляется во время обработки.

jdweng 18.12.2020 15:24

@jdweng Я не могу делегировать восстановление заказа веб-серверу (если вы это имеете в виду). Я должен сделать это в своей собственной программе.

Theodor Zoulias 18.12.2020 15:28

Тогда вы должны сделать это перед загрузкой, и вы не можете загружать параллельно.

jdweng 18.12.2020 15:34

Параллельная обработка и загрузка @jdweng является частью моих требований. Я не могу их изменить. Если бы мне разрешили изменить требования, я бы не задавал этот вопрос прямо сейчас.

Theodor Zoulias 18.12.2020 15:42

Вы не можете загружать параллельно, если вам нужно также объединить по порядку и не иметь контроля над повторным объединением. Вам необходимо проверить требования, особенно на стороне сервера, к тому, как элементы объединяются.

jdweng 18.12.2020 15:54

@jdweng, если вы считаете, что мои требования невозможно удовлетворить, вы можете опубликовать это как ответ.

Theodor Zoulias 18.12.2020 16:03

Я сказал, что нужно уметь комбинировать по порядку на сервере. Ты сказал, что не можешь. Я думаю, вы ошибаетесь. Вы должны иметь возможность управлять порядком рекомбинации, если вы отправляете данные параллельно.

jdweng 18.12.2020 16:19

@jdweng Я не могу ошибаться. Если кто-то из нас лучше знает, что может и чего не может этот конкретный веб-API, то этот человек — я. У меня есть конфиденциальная информация по этому вопросу! 😃

Theodor Zoulias 18.12.2020 16:26

Тогда вы не можете загружать параллельно. Нет никакой гарантии, что данные будут рекомбинированы должным образом.

jdweng 18.12.2020 16:30

Кстати, неприятная ошибка в настоящее время существует в конфигурациях потока данных, которые следуют шаблону «один ко многим к одному». Если промежуточные TransformBlock настроены с BoundedCapacity, отличным от -1, они также должны быть настроены с EnsureOrdered = false, иначе некоторые сообщения могут быть потеряны.

Theodor Zoulias 24.11.2022 18:38
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
12
180
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

uploader следует добавить документ в какой-то отсортированный список заполненных документов и проверить, является ли добавленный документ тем, который следует загрузить следующим. Если он должен удалить и загрузить все документы из отсортированного списка, пока не будет пропущен один.

Также есть проблема с синхронизацией. Доступ к этому отсортированному списку должен быть синхронизирован между потоками. Но вы хотите, чтобы все потоки что-то делали, а не ждали, пока другие потоки завершат свою работу. Итак, uploader должен работать со списком следующим образом:

  • В блокировке синхронизации добавьте новый документ в список и снимите блокировку.
  • В петле
    • снова введите ту же блокировку синхронизации,
    • если установлен флаг upload_in_progress, ничего не делать и вернуться.
    • проверить, должен ли быть загружен документ в верхней части списка,
      • если нет, то сбросьте флаг upload_in_progress и вернитесь.
      • в противном случае удалить документ из списка,
      • установить upload_in_progress флаг,
      • открыть замок,
      • загрузить документ.

Надеюсь, я правильно это себе представлял. Как видите, сделать его одновременно безопасным и эффективным достаточно сложно. Конечно, в большинстве случаев есть способ сделать это только с одним замком, но это не сильно увеличит эффективность. Флаг upload_in_progress распределяется между задачами, как и сам список.

Спасибо Диалектикус за ответ! Я думаю, что ваша идея склоняется к возможному решению. Как вы думаете, я мог бы использовать класс SortedList<TKey, TValue> для решения этой проблемы?

Theodor Zoulias 18.12.2020 16:13

Действительно. TKey — порядковый номер документа, TValue — сам документ. Кстати, доступ к списку (а также сама загрузка) должен быть защищен каким-то механизмом синхронизации, например lock. Теперь это сложно реализовать, но это также хороший новый вопрос для переполнения стека, как только вы сможете увидеть и формализовать проблему.

Dialecticus 18.12.2020 16:17

Хорошо, я попробую реализовать вашу идею с помощью SortedList, и посмотрим, что получится.

Theodor Zoulias 18.12.2020 16:25

Добавил возможное решение проблемы с синхронизацией. Удачи.

Dialecticus 18.12.2020 16:36

Спасибо! Кстати, uploader загружает документы последовательно (по одному), поэтому я думаю, что мне может сойти с рук синхронизация. Но я буду иметь это в виду, если требования изменятся в будущем.

Theodor Zoulias 18.12.2020 16:38

Если в любой момент времени работает только один загрузчик, то проблемы с синхронизацией нет, и приведенный выше алгоритм не нужен, но я считаю, что время от времени может работать более одного.

Dialecticus 18.12.2020 16:41

Мне удалось реализовать блок потока данных, который может восстанавливать порядок моего перетасованного конвейера, основываясь на идее Dialecticus отсортированного списка, содержащего обработанные документы. Вместо SortedList я использовал простой Dictionary, который, кажется, работает так же хорошо.

/// <summary>Creates a dataflow block that restores the order of
/// a shuffled pipeline.</summary>
public static IPropagatorBlock<T, T> CreateRestoreOrderBlock<T>(
    Func<T, long> indexSelector,
    long startingIndex = 0L,
    DataflowBlockOptions options = null)
{
    if (indexSelector == null) throw new ArgumentNullException(nameof(indexSelector));
    var executionOptions = new ExecutionDataflowBlockOptions();
    if (options != null)
    {
        executionOptions.CancellationToken = options.CancellationToken;
        executionOptions.BoundedCapacity = options.BoundedCapacity;
        executionOptions.EnsureOrdered = options.EnsureOrdered;
        executionOptions.TaskScheduler = options.TaskScheduler;
        executionOptions.MaxMessagesPerTask = options.MaxMessagesPerTask;
        executionOptions.NameFormat = options.NameFormat;
    }

    var buffer = new Dictionary<long, T>();
    long minIndex = startingIndex;

    IEnumerable<T> Transform(T item)
    {
        // No synchronization needed because MaxDegreeOfParallelism = 1
        long index = indexSelector(item);
        if (index < startingIndex)
            throw new InvalidOperationException($"Index {index} is out of range.");
        if (index < minIndex)
            throw new InvalidOperationException($"Index {index} has been consumed.");
        if (!buffer.TryAdd(index, item)) // .NET Core only API
            throw new InvalidOperationException($"Index {index} is not unique.");
        while (buffer.Remove(minIndex, out var minItem)) // .NET Core only API
        {
            minIndex++;
            yield return minItem;
        }
    }

    // Ideally the assertion buffer.Count == 0 should be checked on the completion
    // of the block.
    return new TransformManyBlock<T, T>(Transform, executionOptions);
}

Пример использования:

var xlsBlock = new TransformBlock<int, int>(document =>
{
    int duration = 300 + document % 3 * 300;
    Thread.Sleep(duration); // Simulate CPU-bound work
    return document;
});
var pdfBlock = new TransformBlock<int, int>(document =>
{
    int duration = 100 + document % 5 * 200;
    Thread.Sleep(duration); // Simulate CPU-bound work
    return document;
});

var orderRestorer = CreateRestoreOrderBlock<int>(
    indexSelector: document => document, startingIndex: 1L);

var uploader = new ActionBlock<int>(async document =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Uploading document #{document}");
    await Task.Delay(500); // Simulate I/O-bound work
});

xlsBlock.LinkTo(orderRestorer);
pdfBlock.LinkTo(orderRestorer);
orderRestorer.LinkTo(uploader, new DataflowLinkOptions { PropagateCompletion = true });

foreach (var document in Enumerable.Range(1, 10))
{
    if (document % 2 == 0)
        xlsBlock.Post(document);
    else
        pdfBlock.Post(document);
}
xlsBlock.Complete();
pdfBlock.Complete();
_ = Task.WhenAll(xlsBlock.Completion, pdfBlock.Completion)
    .ContinueWith(_ => orderRestorer.Complete());

await uploader.Completion;

Выход:

09:24:18.846 Uploading document #1
09:24:19.436 Uploading document #2
09:24:19.936 Uploading document #3
09:24:20.441 Uploading document #4
09:24:20.942 Uploading document #5
09:24:21.442 Uploading document #6
09:24:21.941 Uploading document #7
09:24:22.441 Uploading document #8
09:24:22.942 Uploading document #9
09:24:23.442 Uploading document #10

(Попробуйте на Fiddle с версией, совместимой с .NET Framework)

Другие вопросы по теме