Я хотел реализовать что-то вроде протокола двухфазной фиксации для потребления сообщений.
Для этого я реализовал ITargetBlock сам:
public class Worker : ITargetBlock<Message>
{
// Is connected to remote server
// Maintaining connection removed for brevity in this example
private bool _isConnectionAlive;
private readonly ActionBlock<MessageWithSource> _action;
public Worker()
{
_action = new ActionBlock<MessageWithSource>(DoWork);
}
public DataflowMessageStatus OfferMessage(
DataflowMessageHeader messageHeader, Message messageValue,
ISourceBlock<Message> source, bool consumeToAccept)
{
if (consumeToAccept || source == null)
{
return DataflowMessageStatus.Declined;
}
if (!_isConnectionAlive)
{
return DataflowMessageStatus.Postponed;
}
var reservedMessage = source.ReserveMessage(messageHeader, this);
if (reservedMessage)
{
_action.Post(new MessageWithSource(messageValue, source, messageHeader));
}
return DataflowMessageStatus.Postponed;
}
// Other methods removed for brevity
private async Task DoWork(MessageWithSource value)
{
try
{
// sending message to the server removed for brevity
// commit that we finished processing without error
var message = value.SourceBlock.ConsumeMessage(value.MessageHeader, this, out _);
if (message != value.Message)
{
// In which cases can we get here?
throw new InvalidOperationException("Consumed some other message... oh my");
}
}
catch (WebSocketException)
{
// Release reservation if we can't finish work, so other Workers can pickup this message and process it
value.SourceBlock.ReleaseReservation(value.MessageHeader, this);
}
}
private class MessageWithSource
{
public Message Message { get; }
public ISourceBlock<Message> SourceBlock { get; }
public DataflowMessageHeader MessageHeader { get; }
}
}
В документы сказано, что ConsumeMessage может вернуть экземпляр, отличный от предложенного ранее.
Интересно, в каких случаях и как это происходит?
Зачем использовать семантику транзакций в поток данных, а тем более двухфазную фиксацию? Какие координаторы транзакций участвуют? Потоковый способ обработки проблем заключается в перенаправлении ошибочного сообщения или сообщения об ошибке в другой блок, например, с помощью предиката в LinkTo(). Таким образом вы избежите блокировки других сообщений в очереди ввода.
Перенаправление становится намного проще, если сообщения завернуты в «конверт», который указывает, является ли это хорошим или «плохим» сообщением. Конверт может включать счетчик повторных попыток, чтобы одно и то же сообщение не повторялось бесконечно.
@PanagiotisKanavos Мне нужна семантика транзакций, потому что мне нужно обеспечить порядок, а некоторые из этих Worker экземпляров могут быть нарушены на некоторое время. Может быть, есть другой хороший способ сделать это?
Моя первоначальная задача звучит так: многие клиенты отправляют сообщения на сервер (A). Сервер обрабатывает каждое сообщение, а затем отправляет его дальше другому серверу (B) через веб-сокеты. Мне нужно обеспечить порядок сообщений для каждого клиента, но мне все равно, отправляются ли сообщения второго клиента раньше первого клиента. У меня есть несколько соединений (также известных как Workers) с сервером B, поэтому, если одно соединение разорвано, другое соединение должно получить это сообщение, по-прежнему обеспечьте заказ
То, что вы описываете, - это очередь, обмен сообщениями и повторные попытки. Это не имеет ничего общего с семантикой транзакций. Семантика транзакций означает, что либо все серверы принимают и фиксируют операцию, либо все серверы ее отбрасывают. 2PC, как они это делают.





@StephenCleary будет очень признателен, если вы сможете взглянуть