我希望使用AsyncLocal通过异步工作流传递信息,以便进行跟踪。现在我遇到了RX的问题。
Thios是我的测试代码:
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
public class RxTest
{
private readonly Subject<int> test = new Subject<int>();
private readonly AsyncLocal<int> asyncContext = new AsyncLocal<int>();
public void Test()
{
this.test
// .ObserveOn(Scheduler.Default)
.Subscribe(this.OnNextNormal);
this.test
// .ObserveOn(Scheduler.Default)
.Delay(TimeSpan.FromMilliseconds(1))
.Subscribe(this.OnNextDelayed);
for (var i = 0; i < 2; i++)
{
var index = i;
Task.Run(() =>
{
this.asyncContext.Value = index;
Console.WriteLine(
$"Main\t\t{index} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
this.test.OnNext(index);
});
}
Console.ReadKey();
}
private void OnNextNormal(int obj)
{
Console.WriteLine(
$"OnNextNormal\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
}
private void OnNextDelayed(int obj)
{
Console.WriteLine(
$"OnNextDelayed\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
}
}产出如下:
主0(线程: 5):AsyncLocal.Value => 0 主1(线程: 6):AsyncLocal.Value => 1 OnNextNormal 0(线程: 5):AsyncLocal.Value => 0 OnNextNormal 1(线程: 6):AsyncLocal.Value => 1 OnNextDelayed 0(线程: 4):AsyncLocal.Value => 0 OnNextDelayed 1(线程: 4):AsyncLocal.Value => 0
如您所见,AsyncLocal.Value不会流向延迟订阅的方法。
=> AsyncValue在延迟轨道上迷路
据我所知,普通订阅()不使用调度器,延迟()使用调度器。
当我对两个调用都使用ObserveOn()时,两个调用的输出如下所示
主0(线程: 5):AsyncLocal.Value => 0 主1(线程: 7):AsyncLocal.Value => 1 OnNextNormal 0(线程: 9):AsyncLocal.Value => 0 OnNextNormal 1(线程: 9):AsyncLocal.Value => 0 OnNextDelayed 0(线程: 4):AsyncLocal.Value => 0 OnNextDelayed 1(线程: 4):AsyncLocal.Value => 0
=> AsyncValue在每一首曲目上都迷失了方向
有什么办法让ExecutionContext与RX一起流动吗?
我只找到了这,但这是另一个问题。他们解决了观察员环境如何流动的问题。我想要流出版商的上下文。
我想要做到的是:
提前谢谢你的回答。
发布于 2018-07-16 19:18:54
Rx中的自由流动执行上下文使得它在大多数多线程场景中非常出色。您可以通过绕过计划的方法来强制执行线程上下文,如下所示:
public static class Extensions
{
public static IObservable<T> TaskPoolDelay<T>(this IObservable<T> observable, TimeSpan delay)
{
return Observable.Create<T>(
observer => observable.Subscribe(
onNext: value => Task.Delay(delay).ContinueWith(_ => observer.OnNext(value)),
onError: observer.OnError,
onCompleted: observer.OnCompleted
)
);
}
}产出:
OnNextDelayed 2 (Thread: 6): AsyncLocal.Value => 2
OnNextDelayed 3 (Thread: 10): AsyncLocal.Value => 3
OnNextDelayed 1 (Thread: 7): AsyncLocal.Value => 1
OnNextDelayed 0 (Thread: 5): AsyncLocal.Value => 0这确实传递了上下文,但是对于更大的查询,它很快变得复杂起来。我不确定在通知时保留上下文的IScheduler的实现是否能正常工作。如果消息复制不是太多的开销,这可能是最适合Rx的。
https://stackoverflow.com/questions/51279665
复制相似问题