Двухэтапная фиксация потока данных TPL

Я хотел реализовать что-то вроде протокола двухфазной фиксации для потребления сообщений.

Для этого я реализовал ITargetBlock сам:

  public class Worker : ITargetBlock<Message>
  {
    // Is connected to remote server
    // Maintaining connection removed for brevity in this example
    private bool _isConnectionAlive;
    private readonly ActionBlock<MessageWithSource> _action;

    public Worker()
    {
      _action = new ActionBlock<MessageWithSource>(DoWork);
    }

    public DataflowMessageStatus OfferMessage(
      DataflowMessageHeader messageHeader, Message messageValue,
      ISourceBlock<Message> source, bool consumeToAccept)
    {
      if (consumeToAccept || source == null)
      {
        return DataflowMessageStatus.Declined;
      }

      if (!_isConnectionAlive)
      {
        return DataflowMessageStatus.Postponed;
      }

      var reservedMessage = source.ReserveMessage(messageHeader, this);
      if (reservedMessage)
      {
        _action.Post(new MessageWithSource(messageValue, source, messageHeader));
      }

      return DataflowMessageStatus.Postponed;
    }

    // Other methods removed for brevity

    private async Task DoWork(MessageWithSource value)
    {
      try
      {
        // sending message to the server removed for brevity


        // commit that we finished processing without error
        var message = value.SourceBlock.ConsumeMessage(value.MessageHeader, this, out _);

        if (message != value.Message)
        {
          // In which cases can we get here?
          throw new InvalidOperationException("Consumed some other message... oh my");
        }
      }
      catch (WebSocketException)
      {
        // Release reservation if we can't finish work, so other Workers can pickup this message and process it
        value.SourceBlock.ReleaseReservation(value.MessageHeader, this);
      }
    }

    private class MessageWithSource
    {
      public Message Message { get; }
      public ISourceBlock<Message> SourceBlock { get; }
      public DataflowMessageHeader MessageHeader { get; }
    }
  }

В документы сказано, что ConsumeMessage может вернуть экземпляр, отличный от предложенного ранее.

Интересно, в каких случаях и как это происходит?

@StephenCleary будет очень признателен, если вы сможете взглянуть

Veikedo 15.03.2019 15:23

Зачем использовать семантику транзакций в поток данных, а тем более двухфазную фиксацию? Какие координаторы транзакций участвуют? Потоковый способ обработки проблем заключается в перенаправлении ошибочного сообщения или сообщения об ошибке в другой блок, например, с помощью предиката в LinkTo(). Таким образом вы избежите блокировки других сообщений в очереди ввода.

Panagiotis Kanavos 15.03.2019 16:00

Перенаправление становится намного проще, если сообщения завернуты в «конверт», который указывает, является ли это хорошим или «плохим» сообщением. Конверт может включать счетчик повторных попыток, чтобы одно и то же сообщение не повторялось бесконечно.

Panagiotis Kanavos 15.03.2019 16:02

@PanagiotisKanavos Мне нужна семантика транзакций, потому что мне нужно обеспечить порядок, а некоторые из этих Worker экземпляров могут быть нарушены на некоторое время. Может быть, есть другой хороший способ сделать это?

Veikedo 18.03.2019 09:46

Моя первоначальная задача звучит так: многие клиенты отправляют сообщения на сервер (A). Сервер обрабатывает каждое сообщение, а затем отправляет его дальше другому серверу (B) через веб-сокеты. Мне нужно обеспечить порядок сообщений для каждого клиента, но мне все равно, отправляются ли сообщения второго клиента раньше первого клиента. У меня есть несколько соединений (также известных как Workers) с сервером B, поэтому, если одно соединение разорвано, другое соединение должно получить это сообщение, по-прежнему обеспечьте заказ

Veikedo 18.03.2019 09:52

То, что вы описываете, - это очередь, обмен сообщениями и повторные попытки. Это не имеет ничего общего с семантикой транзакций. Семантика транзакций означает, что либо все серверы принимают и фиксируют операцию, либо все серверы ее отбрасывают. 2PC, как они это делают.

Panagiotis Kanavos 18.03.2019 11:17
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
6
118
0

Другие вопросы по теме