Я хочу использовать AsyncLocal для передачи информации через асинхронные рабочие процессы с целью отслеживания. Теперь столкнулся с проблемой RX.
.
Тиос - это мой тестовый код:
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
public class RxTest
{
private readonly Subject<int> test = new Subject<int>();
private readonly AsyncLocal<int> asyncContext = new AsyncLocal<int>();
public void Test()
{
this.test
// .ObserveOn(Scheduler.Default)
.Subscribe(this.OnNextNormal);
this.test
// .ObserveOn(Scheduler.Default)
.Delay(TimeSpan.FromMilliseconds(1))
.Subscribe(this.OnNextDelayed);
for (var i = 0; i < 2; i++)
{
var index = i;
Task.Run(() =>
{
this.asyncContext.Value = index;
Console.WriteLine(
$"Main\t\t{index} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
this.test.OnNext(index);
});
}
Console.ReadKey();
}
private void OnNextNormal(int obj)
{
Console.WriteLine(
$"OnNextNormal\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
}
private void OnNextDelayed(int obj)
{
Console.WriteLine(
$"OnNextDelayed\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
}
}
Выход:
Main 0 (Thread: 5): AsyncLocal.Value => 0
Main 1 (Thread: 6): AsyncLocal.Value => 1
OnNextNormal 0 (Thread: 5): AsyncLocal.Value => 0
OnNextNormal 1 (Thread: 6): AsyncLocal.Value => 1
OnNextDelayed 0 (Thread: 4): AsyncLocal.Value => 0
OnNextDelayed 1 (Thread: 4): AsyncLocal.Value => 0
Как видите, AsyncLocal.Value не передается в методы с отложенной подпиской. => AsyncValue теряется на отложенном треке
Насколько я понимаю, обычный Subscribe () не использует планировщика, а Delay () использует планировщик. Когда я использую ObserveOn () для обоих вызовов, вывод для обоих выглядит следующим образом
Main 0 (Thread: 5): AsyncLocal.Value => 0
Main 1 (Thread: 7): AsyncLocal.Value => 1
OnNextNormal 0 (Thread: 9): AsyncLocal.Value => 0
OnNextNormal 1 (Thread: 9): AsyncLocal.Value => 0
OnNextDelayed 0 (Thread: 4): AsyncLocal.Value => 0
OnNextDelayed 1 (Thread: 4): AsyncLocal.Value => 0
=> AsyncValue теряется на каждом треке
Есть ли способ, как разрешить ExecutionContext течь с RX?
Я нашел только это, но проблема в другом. Они решили вопрос, как перетекает контекст наблюдателя. Я хочу обрисовать контекст издателя.
Я хочу добиться этого:
Заранее благодарим за ответы.





Свободный контекст выполнения в Rx - это то, что делает его отличным для большинства многопоточных сценариев. Вы можете обеспечить соблюдение контекста потока, обойдя запланированные методы следующим образом:
public static class Extensions
{
public static IObservable<T> TaskPoolDelay<T>(this IObservable<T> observable, TimeSpan delay)
{
return Observable.Create<T>(
observer => observable.Subscribe(
onNext: value => Task.Delay(delay).ContinueWith(_ => observer.OnNext(value)),
onError: observer.OnError,
onCompleted: observer.OnCompleted
)
);
}
}
Выход :
OnNextDelayed 2 (Thread: 6): AsyncLocal.Value => 2
OnNextDelayed 3 (Thread: 10): AsyncLocal.Value => 3
OnNextDelayed 1 (Thread: 7): AsyncLocal.Value => 1
OnNextDelayed 0 (Thread: 5): AsyncLocal.Value => 0
Это переносит контекст, но быстро усложняется для более крупных запросов. Я не уверен, что реализация IScheduler, которая сохраняет контекст при уведомлении, будет работать хорошо. Если копирование сообщений не вызывает слишком больших накладных расходов, это может быть лучше всего для Rx.
Планировщиком по умолчанию, используемым для
this.test.Subscribe(this.OnNextNormal), являетсяScheduler.Immediate. Как только вы вводите какую-либо форму параллелизма, вы сразу же меняете планировщик. Я подозреваю, что из-за этогоAsyncLocal<>не работает. Вам нужно найти другое решение, если вы не имеете дело только с простейшими запросами Rx.