У меня возникла проблема с использованием Polly при попытке выполнить следующее:
Логика повторного подключения - я попытался создать политику Polly, которая работает, когда вы пытаетесь выполнить StartAsync
без подключения к Интернету. Однако, когда он достигает ReceiveLoop
, политика больше не влияет на этот метод, и если наше соединение прервется в этот момент, оно никогда не попытается снова подключиться. Он просто выдает следующее исключение: Disconnected: The remote party closed the WebSocket connection without completing the close handshake.
. Возможно, у меня должно быть два полиса: один в StartAsync
и один в ReceiveLoop
, но мне это почему-то кажется неправильным, поэтому я и задаю вопрос.
Тайм-ауты - я хочу добавить тайм-ауты для каждого вызова метода ClientWebSocket
, например. ConnectAsync, SendAsync и т. д. Я не очень хорошо знаком с Polly, но думаю, что эта политика автоматически делает это за нас. Однако мне нужен кто-то, кто это подтвердит. Под таймаутом я подразумеваю аналогичную _webSocket.ConnectAsync(_url, CancellationToken.None).TimeoutAfter(timeoutMilliseconds)
логику, реализацию TimeoutAfter можно найти здесь. Пример того, как это сделали другие репозитории, можно найти здесь.
Упрощенно, я хочу сделать этот класс устойчивым, что означает, что вместо того, чтобы безуспешно пытаться подключиться к мертвому серверу веб-сокетов в течение 30 секунд, независимо от причины, он должен быстро дать сбой -> повторить попытку через 10 секунд -> быстро сбой. -> повторить попытку и так далее. Этот подождите и повторите логику должен повторяться до тех пор, пока мы не вызовем StopAsync
или не избавимся от экземпляра.
Вы можете найти класс WebSocketDuplexPipe на Гитхаб.
public sealed class Client : IDisposable
{
private const int RetrySeconds = 10;
private readonly WebSocketDuplexPipe _webSocketPipe;
private readonly string _url;
public Client(string url)
{
_url = url;
_webSocketPipe = new WebSocketDuplexPipe();
}
public Task StartAsync(CancellationToken cancellationToken = default)
{
var retryPolicy = Policy
.Handle<Exception>(e => !cancellationToken.IsCancellationRequested)
.WaitAndRetryForeverAsync(_ => TimeSpan.FromSeconds(RetrySeconds),
(exception, calculatedWaitDuration) =>
{
Console.WriteLine($"{exception.Message}. Retry in {calculatedWaitDuration.TotalSeconds} seconds.");
});
return retryPolicy.ExecuteAsync(async () =>
{
await _webSocketPipe.StartAsync(_url, cancellationToken).ConfigureAwait(false);
_ = ReceiveLoop();
});
}
public Task StopAsync()
{
return _webSocketPipe.StopAsync();
}
public async Task SendAsync(string data, CancellationToken cancellationToken = default)
{
var encoded = Encoding.UTF8.GetBytes(data);
var bufferSend = new ArraySegment<byte>(encoded, 0, encoded.Length);
await _webSocketPipe.Output.WriteAsync(bufferSend, cancellationToken).ConfigureAwait(false);
}
private async Task ReceiveLoop()
{
var input = _webSocketPipe.Input;
try
{
while (true)
{
var result = await input.ReadAsync().ConfigureAwait(false);
var buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
break;
}
if (!buffer.IsEmpty)
{
while (MessageParser.TryParse(ref buffer, out var payload))
{
var message = Encoding.UTF8.GetString(payload);
_messageReceivedSubject.OnNext(message);
}
}
if (result.IsCompleted)
{
break;
}
}
finally
{
input.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Disconnected: {ex.Message}");
}
}
}
@PeterCsala, Q1: я хочу применить ту же политику для ReceiveLoop
, потому что он не переподключается, если достигает этой части. Q2: Интересно, Полли автоматически заботится о тайм-аутах? Я описал ожидаемое поведение тайм-аута (TimeoutAfter), но с Полли.
Ваша политика повторных попыток будет успешно завершена из ExecuteAsync, пока вы ждете завершения input.ReadAsync. Поскольку вы не ждете ReceiveLoop, а просто запускаете его в виде огня и забываете, поэтому ваша политика не окажет на него никакого влияния.
Кстати, вы можете и должны передать параметр cancelToken в ExecuteAsync.
@PeterCsala, спасибо, я могу передать CancellationToken, но если я жду ReceiveLoop, он блокирует пользовательский интерфейс. Есть ли у Полли способ разблокировать тред или мне просто Task.Run
?
Ну, дело в том, что если вы хотите правильно покрыть это повтором, то либо вам нужно переместить логику повтора внутри ReceiveLoop
, либо, как вы сказали, вы можете переместить эту операцию в выделенный поток.
Что касается вашего вопроса о тайм-ауте: политика тайм-аута Полли может быть настроена на использование оптимистичной или пессимистичной стратегии. Бывший сильно зависит от CancellationToken
. Поэтому, если вы передадите CancellationToken.None
ExecuteAsync
, он создаст для вас токен и будет использовать его для отмены операции всякий раз, когда истечет время ожидания. НО имейте в виду, что это бросит TimeoutRejectedException
.
Я ответил на ваши вопросы или вам нужны дополнительные разъяснения?
@PeterCsala, честно говоря, не знаю. pastebin.com/3HavmN6z. Не знаю, как напечатать сообщение, когда происходит тайм-аут или что-то в этом роде. Я могу видеть, что он повторно подключается через 30 секунд.
TimeoutAsync имеет перегрузку, которая ожидает делегата onTimeoutAsync. Этот делегат вызывается всякий раз, когда происходит тайм-аут.
@PeterCsala, посмотри, что я сделал pastebin.com/M1ZCHkff. Вот лог выполнения: pastebin.com/GzzXJTQG. Тайм-аут никогда не срабатывает, может быть, потому, что политика WaitAndRetryForever
переопределяет его? Если вы также обратите внимание на журнал, когда он действительно достигает ReceiveLoop
, это будем пытается подключиться, но это не удается, вероятно, потому что веб-сокет больше не жив.
Проблема в вашем ExecuteAsync
, поскольку вы вызываете его без передачи токена отмены, например: dotnetfiddle.net/YfTomG
@PeterCsala, я понимаю, в чем проблема. Это прекрасно работает, если я отключаю W-Fi до того, как мы установим соединение с веб-сокетом, но когда он запускает соединение с веб-сокетом, и он в основном проходит через ReceiveLoop
, а затем я отключаю Wi-Fi, он бросает WebSocketException
внутри обертки, которую он обрабатывает сам. Он никогда не пытается переподключиться обратно, потому что нет открытого соединения через веб-сокет, а каналы завершены. github.com/ninjastacktech/ninja-websocket-net/blob/master/src/…. Может надо сменить обертку?
@nob У меня не так много опыта работы с WebSocket, поэтому я не могу помочь в этом. Если у вас есть вопросы относительно Полли, я здесь, чтобы помочь вам.
@PeterCsala, я понимаю. Вы можете написать это как ответ, чтобы я мог принять его. Если это возможно, не могли бы вы отредактировать свой пример с URL-адресом веб-сокета, чтобы я знал, что он точно работает для него, то есть wss://url
вместо HttpClient.
Хорошо, на выходных займусь. :)
Я что-то упустил из своего поста?
@PeterCsala, о, спасибо за вопрос, потому что я не заметил ответа. С нетерпением жду этого как можно скорее.
Позвольте мне отразить в ответе суть нашего разговора через комментарии.
ReceiveLoop
с повторомВаша политика повторных попыток будет успешно завершена из ExecuteAsync
, пока вы ждете завершения input.ReadAsync
. Причина в том, что вы не ждете ReceiveLoop
, а просто бросаете его в огонь и забываете.
Другими словами, ваша логика повтора будет применяться только к StartAsync
и коду перед ожиданием внутри ReceiveLoop
.
Исправление состоит в том, чтобы переместить логику повторных попыток внутрь ReceiveLoop
.
Политика тайм-аута Полли может использовать как оптимистическую, так и пессимистическую стратегию. Tон бывший сильно зависит от CancellationToken
.
CancellationToken.None
в ExecuteAsync
, то вы в основном говорите, что TimeoutPolicy обрабатывает процесс отмены.Пожалуйста, имейте в виду, что он выкинет TimeoutRejectedException
, а не OperationCanceledException
.
onTimeoutAsync
TimeoutAsync
имеет несколько перегрузок, которые могут принимать одного из двух делегатов onTimeoutAsync
Func<Context, TimeSpan, Task, Task> onTimeoutAsync
или
Func<Context, TimeSpan, Task, Exception, Task> onTimeoutAsync
Это может быть полезно для регистрации факта тайм-аута, если у вас есть внешняя политика (например, повторная попытка), которая срабатывает на TimeoutRejectedException
.
Я предлагаю использовать Policy.WrapAsync
статический метод вместо WrapAsync
метода экземпляра AsyncPolicy
.
var timeoutPolicy = Policy.TimeoutAsync(TimeSpan.FromMilliseconds(timeoutMs), TimeoutStrategy.Optimistic,
(context, timeSpan, task, ex) =>
{
Console.WriteLine($"Timeout {timeSpan.TotalSeconds} seconds");
return Task.CompletedTask;
});
var retryPolicy = Policy
.Handle<Exception>(ex =>
{
Console.WriteLine($"Exception tralal: {ex.Message}");
return true;
})
.WaitAndRetryForeverAsync(_ => TimeSpan.FromMilliseconds(retryBackOffMs),
(ex, retryCount, calculatedWaitDuration) =>
{
Console.WriteLine(
$"Retrying in {calculatedWaitDuration.TotalSeconds} seconds (Reason: {ex.Message}) (Retry count: {retryCount})");
});
var resilientStrategy = Policy.WrapAsync(retryPolicy, timeoutPolicy);
При таком подходе определение вашей политики повторных попыток не ссылается явно на политику тайм-аута. Скорее у вас есть две отдельные политики и цепочка.
Спасибо за ваш ответ! Я написал вам сообщение в чате StackOverflow
Хотели бы вы дать что-то еще в качестве формы контакта? Не уверен, почему это не работает для вас. Возможно разногласия?
Хочу выразить особую благодарность этому парню. Если бы я мог дать ему больше репутации, я бы сделал это, но это невозможно.
Итак, какой у вас вопрос?