.net core AsyncLocal теряет контекст с System.Reactive

Я хочу использовать AsyncLocal для передачи информации через асинхронные рабочие процессы с целью отслеживания. Теперь столкнулся с проблемой RX.
. Тиос - это мой тестовый код:

using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;

public class RxTest
{
    private readonly Subject<int> test = new Subject<int>();

    private readonly AsyncLocal<int> asyncContext = new AsyncLocal<int>();

    public void Test()
    {
        this.test
             // .ObserveOn(Scheduler.Default)
            .Subscribe(this.OnNextNormal);
        this.test
             // .ObserveOn(Scheduler.Default)
            .Delay(TimeSpan.FromMilliseconds(1))
            .Subscribe(this.OnNextDelayed);

        for (var i = 0; i < 2; i++)
        {
            var index = i;
            Task.Run(() =>
            {
                this.asyncContext.Value = index;
                Console.WriteLine(
                    $"Main\t\t{index} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
                this.test.OnNext(index);
            });
        }

        Console.ReadKey();
    }

    private void OnNextNormal(int obj)
    {
        Console.WriteLine(
            $"OnNextNormal\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
    }

    private void OnNextDelayed(int obj)
    {
        Console.WriteLine(
            $"OnNextDelayed\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
    }
}

Выход:

Main 0 (Thread: 5): AsyncLocal.Value => 0
Main 1 (Thread: 6): AsyncLocal.Value => 1
OnNextNormal 0 (Thread: 5): AsyncLocal.Value => 0
OnNextNormal 1 (Thread: 6): AsyncLocal.Value => 1
OnNextDelayed 0 (Thread: 4): AsyncLocal.Value => 0
OnNextDelayed 1 (Thread: 4): AsyncLocal.Value => 0

Как видите, AsyncLocal.Value не передается в методы с отложенной подпиской. => AsyncValue теряется на отложенном треке

Насколько я понимаю, обычный Subscribe () не использует планировщика, а Delay () использует планировщик. Когда я использую ObserveOn () для обоих вызовов, вывод для обоих выглядит следующим образом

Main 0 (Thread: 5): AsyncLocal.Value => 0
Main 1 (Thread: 7): AsyncLocal.Value => 1
OnNextNormal 0 (Thread: 9): AsyncLocal.Value => 0
OnNextNormal 1 (Thread: 9): AsyncLocal.Value => 0
OnNextDelayed 0 (Thread: 4): AsyncLocal.Value => 0
OnNextDelayed 1 (Thread: 4): AsyncLocal.Value => 0

=> AsyncValue теряется на каждом треке

Есть ли способ, как разрешить ExecutionContext течь с RX?
Я нашел только это, но проблема в другом. Они решили вопрос, как перетекает контекст наблюдателя. Я хочу обрисовать контекст издателя.

Я хочу добиться этого:

  1. Мне на службу приходит сообщение "извне"
  2. Распространение сообщения внутри службы (RX)
  3. При регистрации сообщения форматируйте сообщение журнала с помощью MessageId.
  4. Я не хочу никуда передавать сообщение

Заранее благодарим за ответы.

Планировщиком по умолчанию, используемым для this.test.Subscribe(this.OnNextNormal), является Scheduler.Immediate. Как только вы вводите какую-либо форму параллелизма, вы сразу же меняете планировщик. Я подозреваю, что из-за этого AsyncLocal<> не работает. Вам нужно найти другое решение, если вы не имеете дело только с простейшими запросами Rx.

Enigmativity 11.07.2018 14:48
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
1
322
1

Ответы 1

Свободный контекст выполнения в Rx - это то, что делает его отличным для большинства многопоточных сценариев. Вы можете обеспечить соблюдение контекста потока, обойдя запланированные методы следующим образом:

public static class Extensions
{
    public static IObservable<T> TaskPoolDelay<T>(this IObservable<T> observable, TimeSpan delay)
    {
        return Observable.Create<T>(
            observer => observable.Subscribe(
                onNext: value => Task.Delay(delay).ContinueWith(_ => observer.OnNext(value)),
                onError: observer.OnError,
                onCompleted: observer.OnCompleted
            )
        );
    }
}

Выход :

OnNextDelayed   2 (Thread: 6): AsyncLocal.Value => 2
OnNextDelayed   3 (Thread: 10): AsyncLocal.Value => 3
OnNextDelayed   1 (Thread: 7): AsyncLocal.Value => 1
OnNextDelayed   0 (Thread: 5): AsyncLocal.Value => 0

Это переносит контекст, но быстро усложняется для более крупных запросов. Я не уверен, что реализация IScheduler, которая сохраняет контекст при уведомлении, будет работать хорошо. Если копирование сообщений не вызывает слишком больших накладных расходов, это может быть лучше всего для Rx.

Другие вопросы по теме