ThreadPool.RegisterWaitForSingleObject пропускает объекты RegisteredWaitHandle (и память) с течением времени

Я создаю метод расширения для WaitHandles (особенно ManualResetEventSlim), чтобы их можно было использовать в асинхронном коде. Использование выглядит следующим образом:

public class WaitHandleExampleClass : IDisposable
{
    private readonly ManualResetEventSlim _mre;
    public WaitHandleExampleClass()
    {
        _mre = new ManualResetEventSlim(false);
    }

    public async Task WaitForHandle()
    {
        using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
        {
            var cancelled = await _mre.WaitHandle.WaitForSignalOrCancelAsync(cts.Token)
.ConfigureAwait(false);
            
            if (cancelled)
            {
                DoCancelAction();
                return;
            }

            Proceed();
            // later, _mre.Reset() is called
        }
    }

    public void TriggerWaitHandleFromOtherThread()
    {
        _mre.Set();
    }

    public void Dispose()
    {
        _mre.Dispose();
    }
}

Мой метод расширения WaitForSignalOrCancelAsync() выглядит следующим образом:

public static Task<bool> WaitForSignalOrCancelAsync(this WaitHandle waitHandle, CancellationToken ct)
{
    if (waitHandle == null)
        throw new ArgumentNullException(nameof(waitHandle));
    
    var tcs = new TaskCompletionSource<bool>();
    var task = tcs.Task;
    var container = new CleanupContainer();
    // dont pass ct to ContinueWith - otherwise rwh is not unregistered (even with TaskContinuationOptions.None)
    task.ContinueWith(TaskContinueWith, container, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

    container.Ctr = ct.Register(SetCancel, tcs, false);
    container.Rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle, RwhCallback, tcs, -1, true);
    
    return task;
}
private static void SetCancel(object state) => ((TaskCompletionSource<bool>)state).TrySetResult(true);

private static void RwhCallback(object state, bool o) => ((TaskCompletionSource<bool>)state).TrySetResult(false);

private static void TaskContinueWith(Task t, object state)
{
    try
    {
        ((CleanupContainer)state).Cleanup();            
    }
    catch (Exception e)
    {
        DebugAssert.ShouldNotBeCalled();
    }
}

private class CleanupContainer
{
    public RegisteredWaitHandle Rwh { get; set; }
    public CancellationTokenRegistration Ctr { get; set; }

    public void Cleanup()
    {
        // so this just waits until the condition is true (returns true) or timeout is reached (returns false)
        var conditionOk = SpinWait.WaitFor(() => Rwh != null && Ctr != default, timeoutMs: 200);
        DebugAssert.MustBeTrue(conditionOk); // always true in debug -> Rwh and Ctr are set

        var rwhUnregisterOkay = Rwh?.Unregister(null);
        DebugAssert.MustBeTrue(rwhUnregisterOkay == true); // also always true
        Ctr.Dispose();

        Rwh = null;
        Ctr = default;
    }
}

После развертывания и запуска этого кода в качестве службы Windows в течение нескольких часов я вижу, что использование памяти увеличивается. Также увеличивается загрузка ЦП, но я не уверен, связано ли это с этим. Итак, я прикрепил dotmemory. Со временем у меня появляется все больше и больше объектов:

  • TaskCompletionSource<bool> -> 241 890
  • ОтменаCallbackInfo-> 241,984
  • ThreadPoolWaitOrTimerCallback -> 241 890

Скриншот Dotmemory

Не поможет, если я заставлю GC с помощью dotmemory

Я предполагаю, что это связано с описанным выше методом расширения, но я не понимаю, почему. За основу я взял этот код MSDN («От дескрипторов ожидания до TAP»): https://learn.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/interop-with-other-asynchronous -шаблоны-и-типы#см.также

Я также написал свой первый модульный тест с интеграцией dotmemory, не уверен, что он правильный и полезный. Но при этом я знаю, что Rwh.Unregister() делает другое, так как количество объектов отличается, если оно закомментировано в методе Cleanup. Он красный, потому что зарегистрированныйWaitHandles.ObjectsCount == 1, а не 0.

[TestMethod]
[DotMemoryUnit(FailIfRunWithoutSupport = false)]
public async Task WaitForSignalOrCancelAsync_ResourcesAreReleased()
{
using (var waitHandle = new ManualResetEventSlim(false))
{
var cancellationTokenSource = new CancellationTokenSource();
var task = waitHandle.WaitHandle.WaitForSignalOrCancelAsync(cancellationTokenSource.Token);

        cancellationTokenSource.Cancel();
        await task;
        
        for (var i = 0; i < 10; i++)
        {
            GC.Collect();
            GC.WaitForPendingFinalizers();
        }
        
        Thread.Sleep(200);
    
        for (var i = 0; i < 10; i++)
        {
            GC.Collect();
            GC.WaitForPendingFinalizers();
        }
        
        dotMemory.Check(memory =>
        {
            var registeredWaitHandles = memory.GetObjects(where => where.Type.Is<RegisteredWaitHandle>());
            // if i comment out the Rwh.Unregister() in CleanupContainer.Cleanup() i get 2 as count here
            // with Rwh.Unregister(), i get 1 here
            registeredWaitHandles.ObjectsCount.MustBeEqualTo(0, "rwh > 1");
    
            var cancellationTokenRegistrations = memory.GetObjects(where => where.Type.Is<CancellationTokenRegistration>());
            cancellationTokenRegistrations.ObjectsCount.MustBeEqualTo(0, "ctr > 0");
        });
    }

}

Редактировать: Я попробовал решение Ивана (ссылка на stackoverflow), но столкнулся с той же проблемой. Мой модульный тест все еще красный с двумя оставшимися ссылками. Я кратко проверил это на сервере, через 20 минут вижу увеличение новых объектов 3-х типов (TaskCompletionSource,...)

Я быстро попробовал AsyncEx, но при этом мой модульный тест тоже стал красным

Код, который я пробовал:

public static async Task<bool> WaitForSignalOrCancelAsync(this WaitHandle waitHandle, CancellationToken cancellationToken, int timeoutMilliseconds = Timeout.Infinite)
{
    try
    {
        await waitHandle.WaitOneAsync(cancellationToken, timeoutMilliseconds).ConfigureAwait(false);
        return false; // no cancel
    }
    catch (OperationCanceledException)
    {
        return true; // cancel
    }
}

private static Task WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken, int timeoutMilliseconds = Timeout.Infinite)
{
    if (waitHandle == null)
        throw new ArgumentNullException(nameof(waitHandle));

    TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
    CancellationTokenRegistration ctr = cancellationToken.Register(() => tcs.TrySetCanceled());
    TimeSpan timeout = timeoutMilliseconds > Timeout.Infinite ? TimeSpan.FromMilliseconds(timeoutMilliseconds) : Timeout.InfiniteTimeSpan;

    RegisteredWaitHandle rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
        (_, timedOut) =>
        {
            if (timedOut)
            {
                tcs.TrySetCanceled();
            }
            else
            {
                tcs.TrySetResult(true);
            }
        },
        null, timeout, true);

    Task<bool> task = tcs.Task;

    _ = task.ContinueWith(_ =>
    {
        var ok = rwh.Unregister(null);
        var ok2 = rwh.Unregister(waitHandle);
        ctr.Dispose();
    }, CancellationToken.None);

    return task;
}

возможно, посмотрите (используете) эту библиотеку и ее реализацию , а также этот ответ

Ivan Petrov 13.07.2024 03:32

Спасибо, я пробовал, но не получилось :( Я обновил свой вопрос

behindwhere 13.07.2024 14:29

Вы компилируете для отладки или выпуска?

Charlieface 14.07.2024 02:53
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
3
3
59
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Попробуйте немного другой подход — не полагайтесь на Task.ContinueWith.

Кроме того, SpinWait.WaitFor в вашем коде немного странный, необходимость в нем показывает, что у вас есть проблемы с состоянием гонки.

Попробуйте следующий код:

public static class WaitHandleExtensions
{
    public static Task<bool> WaitForSignalOrCancelAsync(this WaitHandle waitHandle, CancellationToken ct)
    {
        ArgumentNullException.ThrowIfNull(waitHandle);

        if (ct.IsCancellationRequested)
            return Task.FromResult(true);

        if (waitHandle.WaitOne(0))
            return Task.FromResult(false);

        var op = new WaitForSignalOrCancelOperation(waitHandle, ct);
        return op.Task;
    }

    private sealed class WaitForSignalOrCancelOperation
    {
        private const int State_Awaiting = 0, State_Completed = 1;
        private static readonly WaitOrTimerCallback _onWaitCompleteCallback;
        private static readonly Action<object?> _onCancelCallback;
        private readonly TaskCompletionSource<bool> _tcs = new();
        private volatile int _state = State_Awaiting;
        private CancellationTokenRegistration _ctr;
        private RegisteredWaitHandle? _rwh;

        public Task<bool> Task
        {
            get => _tcs.Task;
        }

        static WaitForSignalOrCancelOperation()
        {
            _onWaitCompleteCallback = static (object? state, bool timedOut) =>
            {
                var self = (WaitForSignalOrCancelOperation)state!;
                self.HandleWaitComplete();
            };

            _onCancelCallback = static (object? state) =>
            {
                var self = (WaitForSignalOrCancelOperation)state!;
                self.HandleCanceled();
            };
        }

        public WaitForSignalOrCancelOperation(WaitHandle waitHandle, CancellationToken ct)
        {
            if (ct.CanBeCanceled)
            {
                _ctr = ct.Register(_onCancelCallback, this, useSynchronizationContext: false);

                if (_state != State_Awaiting)
                {
                    // CancellationToken is already canceled.
                    // _onCancelCallback has been called already, the result is already set.
                    UnregisterAll();
                    return;
                }
            }

            _rwh = ThreadPool.RegisterWaitForSingleObject(
                waitHandle,
                _onWaitCompleteCallback,
                state: this,
                Timeout.Infinite,
                executeOnlyOnce: true);

            if (_state != State_Awaiting && _rwh != null)
            {
                // waitHandle is already signaled.
                // _onWaitCompleteCallback has been called already, the result is already set.
                UnregisterAll();
            }
        }

        private void HandleWaitComplete()
        {
            // Check if already called and mark as called if not.
            if (Interlocked.Exchange(ref _state, State_Completed) == State_Awaiting)
            {
                UnregisterAll();
                _tcs.TrySetResult(false);
            }
        }

        private void HandleCanceled()
        {
            // Check if already called and mark as called if not.
            if (Interlocked.Exchange(ref _state, State_Completed) == State_Awaiting)
            {
                UnregisterAll();
                _tcs.TrySetResult(true);
            }
        }

        private void UnregisterAll()
        {
            // Unregister and clear CancellationTokenRegistration
            try
            {
                _ctr.Unregister();
                _ctr = default;
            }
            catch (ObjectDisposedException)
            {
                // Ignore.
            }
            catch (Exception)
            {
                // Ignore or log, whatever...
            }

            // Unregister and clear RegisteredWaitHandle
            if (_rwh != null)
            {
                try
                {
                    _rwh.Unregister(null);
                    _rwh = null;
                }
                catch (ObjectDisposedException)
                {
                    // Ignore.
                }
                catch (Exception)
                {
                    // Ignore or log, whatever...
                }
            }
        }
    }
}

Обратите внимание, что среда тестирования также может использовать RegisteredWaitHandle сама по себе, поэтому registeredWaitHandles.ObjectsCount может быть больше нуля, даже если вы вообще не используете WaitForSignalOrCancelAsync.

Большое спасибо, попробую!

behindwhere 15.07.2024 11:46

Большое спасибо, у меня это работает, также развернуто как сервис!

behindwhere 16.07.2024 09:46

Другие вопросы по теме