Использование Polly для повторных подключений и тайм-аутов

У меня возникла проблема с использованием Polly при попытке выполнить следующее:

  • Логика повторного подключения - я попытался создать политику Polly, которая работает, когда вы пытаетесь выполнить StartAsync без подключения к Интернету. Однако, когда он достигает ReceiveLoop, политика больше не влияет на этот метод, и если наше соединение прервется в этот момент, оно никогда не попытается снова подключиться. Он просто выдает следующее исключение: Disconnected: The remote party closed the WebSocket connection without completing the close handshake.. Возможно, у меня должно быть два полиса: один в StartAsync и один в ReceiveLoop, но мне это почему-то кажется неправильным, поэтому я и задаю вопрос.

  • Тайм-ауты - я хочу добавить тайм-ауты для каждого вызова метода ClientWebSocket, например. ConnectAsync, SendAsync и т. д. Я не очень хорошо знаком с Polly, но думаю, что эта политика автоматически делает это за нас. Однако мне нужен кто-то, кто это подтвердит. Под таймаутом я подразумеваю аналогичную _webSocket.ConnectAsync(_url, CancellationToken.None).TimeoutAfter(timeoutMilliseconds) логику, реализацию TimeoutAfter можно найти здесь. Пример того, как это сделали другие репозитории, можно найти здесь.

Упрощенно, я хочу сделать этот класс устойчивым, что означает, что вместо того, чтобы безуспешно пытаться подключиться к мертвому серверу веб-сокетов в течение 30 секунд, независимо от причины, он должен быстро дать сбой -> повторить попытку через 10 секунд -> быстро сбой. -> повторить попытку и так далее. Этот подождите и повторите логику должен повторяться до тех пор, пока мы не вызовем StopAsync или не избавимся от экземпляра.

Вы можете найти класс WebSocketDuplexPipe на Гитхаб.

public sealed class Client : IDisposable
{
    private const int RetrySeconds = 10;
    private readonly WebSocketDuplexPipe _webSocketPipe;
    private readonly string _url;

    public Client(string url)
    {
        _url = url;
        _webSocketPipe = new WebSocketDuplexPipe();
    }

    public Task StartAsync(CancellationToken cancellationToken = default)
    {
        var retryPolicy = Policy
            .Handle<Exception>(e => !cancellationToken.IsCancellationRequested)
            .WaitAndRetryForeverAsync(_ => TimeSpan.FromSeconds(RetrySeconds),
                (exception, calculatedWaitDuration) =>
                {
                    Console.WriteLine($"{exception.Message}. Retry in {calculatedWaitDuration.TotalSeconds} seconds.");
                });

        return retryPolicy.ExecuteAsync(async () =>
        {
            await _webSocketPipe.StartAsync(_url, cancellationToken).ConfigureAwait(false);
            _ = ReceiveLoop();
        });
    }

    public Task StopAsync()
    {
        return _webSocketPipe.StopAsync();
    }

    public async Task SendAsync(string data, CancellationToken cancellationToken = default)
    {
        var encoded = Encoding.UTF8.GetBytes(data);
        var bufferSend = new ArraySegment<byte>(encoded, 0, encoded.Length);
        await _webSocketPipe.Output.WriteAsync(bufferSend, cancellationToken).ConfigureAwait(false);
    }

    private async Task ReceiveLoop()
    {
        var input = _webSocketPipe.Input;

        try
        {
            while (true)
            {
                var result = await input.ReadAsync().ConfigureAwait(false);
                var buffer = result.Buffer;

                try
                {
                    if (result.IsCanceled)
                    {
                        break;
                    }

                    if (!buffer.IsEmpty)
                    {
                        while (MessageParser.TryParse(ref buffer, out var payload))
                        {
                            var message = Encoding.UTF8.GetString(payload);

                            _messageReceivedSubject.OnNext(message);
                        }
                    }

                    if (result.IsCompleted)
                    {
                        break;
                    }
                }
                finally
                {
                    input.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Disconnected: {ex.Message}");
        }
    }
}

Итак, какой у вас вопрос?

Peter Csala 16.03.2022 20:27

@PeterCsala, Q1: я хочу применить ту же политику для ReceiveLoop, потому что он не переподключается, если достигает этой части. Q2: Интересно, Полли автоматически заботится о тайм-аутах? Я описал ожидаемое поведение тайм-аута (TimeoutAfter), но с Полли.

nop 16.03.2022 20:32

Ваша политика повторных попыток будет успешно завершена из ExecuteAsync, пока вы ждете завершения input.ReadAsync. Поскольку вы не ждете ReceiveLoop, а просто запускаете его в виде огня и забываете, поэтому ваша политика не окажет на него никакого влияния.

Peter Csala 16.03.2022 23:26

Кстати, вы можете и должны передать параметр cancelToken в ExecuteAsync.

Peter Csala 16.03.2022 23:30

@PeterCsala, спасибо, я могу передать CancellationToken, но если я жду ReceiveLoop, он блокирует пользовательский интерфейс. Есть ли у Полли способ разблокировать тред или мне просто Task.Run?

nop 17.03.2022 01:12

Ну, дело в том, что если вы хотите правильно покрыть это повтором, то либо вам нужно переместить логику повтора внутри ReceiveLoop, либо, как вы сказали, вы можете переместить эту операцию в выделенный поток.

Peter Csala 17.03.2022 13:50

Что касается вашего вопроса о тайм-ауте: политика тайм-аута Полли может быть настроена на использование оптимистичной или пессимистичной стратегии. Бывший сильно зависит от CancellationToken. Поэтому, если вы передадите CancellationToken.NoneExecuteAsync, он создаст для вас токен и будет использовать его для отмены операции всякий раз, когда истечет время ожидания. НО имейте в виду, что это бросит TimeoutRejectedException.

Peter Csala 17.03.2022 13:58

Я ответил на ваши вопросы или вам нужны дополнительные разъяснения?

Peter Csala 17.03.2022 17:08

@PeterCsala, честно говоря, не знаю. pastebin.com/3HavmN6z. Не знаю, как напечатать сообщение, когда происходит тайм-аут или что-то в этом роде. Я могу видеть, что он повторно подключается через 30 секунд.

nop 17.03.2022 17:17

TimeoutAsync имеет перегрузку, которая ожидает делегата onTimeoutAsync. Этот делегат вызывается всякий раз, когда происходит тайм-аут.

Peter Csala 17.03.2022 17:53

@PeterCsala, посмотри, что я сделал pastebin.com/M1ZCHkff. Вот лог выполнения: pastebin.com/GzzXJTQG. Тайм-аут никогда не срабатывает, может быть, потому, что политика WaitAndRetryForever переопределяет его? Если вы также обратите внимание на журнал, когда он действительно достигает ReceiveLoop, это будем пытается подключиться, но это не удается, вероятно, потому что веб-сокет больше не жив.

nop 17.03.2022 21:08

Проблема в вашем ExecuteAsync, поскольку вы вызываете его без передачи токена отмены, например: dotnetfiddle.net/YfTomG

Peter Csala 18.03.2022 09:13

@PeterCsala, я понимаю, в чем проблема. Это прекрасно работает, если я отключаю W-Fi до того, как мы установим соединение с веб-сокетом, но когда он запускает соединение с веб-сокетом, и он в основном проходит через ReceiveLoop, а затем я отключаю Wi-Fi, он бросает WebSocketException внутри обертки, которую он обрабатывает сам. Он никогда не пытается переподключиться обратно, потому что нет открытого соединения через веб-сокет, а каналы завершены. github.com/ninjastacktech/ninja-websocket-net/blob/master/sr‌​c/…. Может надо сменить обертку?

nop 18.03.2022 12:41

@nob У меня не так много опыта работы с WebSocket, поэтому я не могу помочь в этом. Если у вас есть вопросы относительно Полли, я здесь, чтобы помочь вам.

Peter Csala 18.03.2022 12:54

@PeterCsala, я понимаю. Вы можете написать это как ответ, чтобы я мог принять его. Если это возможно, не могли бы вы отредактировать свой пример с URL-адресом веб-сокета, чтобы я знал, что он точно работает для него, то есть wss://url вместо HttpClient.

nop 18.03.2022 12:58

Хорошо, на выходных займусь. :)

Peter Csala 18.03.2022 18:19

Я что-то упустил из своего поста?

Peter Csala 21.03.2022 08:27

@PeterCsala, о, спасибо за вопрос, потому что я не заметил ответа. С нетерпением жду этого как можно скорее.

nop 21.03.2022 08:38
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
18
131
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Позвольте мне отразить в ответе суть нашего разговора через комментарии.

ReceiveLoop с повтором

Ваша политика повторных попыток будет успешно завершена из ExecuteAsync, пока вы ждете завершения input.ReadAsync. Причина в том, что вы не ждете ReceiveLoop, а просто бросаете его в огонь и забываете.

Другими словами, ваша логика повтора будет применяться только к StartAsync и коду перед ожиданием внутри ReceiveLoop.

Исправление состоит в том, чтобы переместить логику повторных попыток внутрь ReceiveLoop.

Тайм-аут

Политика тайм-аута Полли может использовать как оптимистическую, так и пессимистическую стратегию. Tон бывший сильно зависит от CancellationToken.

  • Итак, если вы передаете, например, CancellationToken.None в ExecuteAsync, то вы в основном говорите, что TimeoutPolicy обрабатывает процесс отмены.
  • Если вы передадите уже существующий токен, то украшенную Задачу можно будет отменено TimeoutPolicy или предоставленным токеном.

Пожалуйста, имейте в виду, что он выкинет TimeoutRejectedException, а не OperationCanceledException.

onTimeoutAsync

TimeoutAsync имеет несколько перегрузок, которые могут принимать одного из двух делегатов onTimeoutAsync

Func<Context, TimeSpan, Task, Task> onTimeoutAsync

или

Func<Context, TimeSpan, Task, Exception, Task> onTimeoutAsync

Это может быть полезно для регистрации факта тайм-аута, если у вас есть внешняя политика (например, повторная попытка), которая срабатывает на TimeoutRejectedException.

Сцепление политик

Я предлагаю использовать Policy.WrapAsync статический метод вместо WrapAsync метода экземпляра AsyncPolicy.

var timeoutPolicy = Policy.TimeoutAsync(TimeSpan.FromMilliseconds(timeoutMs), TimeoutStrategy.Optimistic,
    (context, timeSpan, task, ex) =>
    {
        Console.WriteLine($"Timeout {timeSpan.TotalSeconds} seconds");
        return Task.CompletedTask;
    });

var retryPolicy = Policy
    .Handle<Exception>(ex =>
    {
        Console.WriteLine($"Exception tralal: {ex.Message}");
        return true;
    })
    .WaitAndRetryForeverAsync(_ => TimeSpan.FromMilliseconds(retryBackOffMs),
    (ex, retryCount, calculatedWaitDuration) =>
    {
        Console.WriteLine(
            $"Retrying in {calculatedWaitDuration.TotalSeconds} seconds (Reason: {ex.Message}) (Retry count: {retryCount})");
    });

var resilientStrategy = Policy.WrapAsync(retryPolicy, timeoutPolicy);

При таком подходе определение вашей политики повторных попыток не ссылается явно на политику тайм-аута. Скорее у вас есть две отдельные политики и цепочка.

Спасибо за ваш ответ! Я написал вам сообщение в чате StackOverflow

nop 21.03.2022 09:26

Хотели бы вы дать что-то еще в качестве формы контакта? Не уверен, почему это не работает для вас. Возможно разногласия?

nop 21.03.2022 09:37

Хочу выразить особую благодарность этому парню. Если бы я мог дать ему больше репутации, я бы сделал это, но это невозможно.

nop 22.03.2022 10:33

Другие вопросы по теме