首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >.net核心AsyncLocal与System.Reactive失去上下文

.net核心AsyncLocal与System.Reactive失去上下文
EN

Stack Overflow用户
提问于 2018-07-11 07:32:45
回答 1查看 423关注 0票数 0

我希望使用AsyncLocal通过异步工作流传递信息,以便进行跟踪。现在我遇到了RX的问题。

Thios是我的测试代码:

代码语言:javascript
复制
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;

public class RxTest
{
    private readonly Subject<int> test = new Subject<int>();

    private readonly AsyncLocal<int> asyncContext = new AsyncLocal<int>();

    public void Test()
    {
        this.test
             // .ObserveOn(Scheduler.Default)
            .Subscribe(this.OnNextNormal);
        this.test
             // .ObserveOn(Scheduler.Default)
            .Delay(TimeSpan.FromMilliseconds(1))
            .Subscribe(this.OnNextDelayed);

        for (var i = 0; i < 2; i++)
        {
            var index = i;
            Task.Run(() =>
            {
                this.asyncContext.Value = index;
                Console.WriteLine(
                    $"Main\t\t{index} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
                this.test.OnNext(index);
            });
        }

        Console.ReadKey();
    }

    private void OnNextNormal(int obj)
    {
        Console.WriteLine(
            $"OnNextNormal\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
    }

    private void OnNextDelayed(int obj)
    {
        Console.WriteLine(
            $"OnNextDelayed\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
    }
}

产出如下:

主0(线程: 5):AsyncLocal.Value => 0 主1(线程: 6):AsyncLocal.Value => 1 OnNextNormal 0(线程: 5):AsyncLocal.Value => 0 OnNextNormal 1(线程: 6):AsyncLocal.Value => 1 OnNextDelayed 0(线程: 4):AsyncLocal.Value => 0 OnNextDelayed 1(线程: 4):AsyncLocal.Value => 0

如您所见,AsyncLocal.Value不会流向延迟订阅的方法。

=> AsyncValue在延迟轨道上迷路

据我所知,普通订阅()不使用调度器,延迟()使用调度器。

当我对两个调用都使用ObserveOn()时,两个调用的输出如下所示

主0(线程: 5):AsyncLocal.Value => 0 主1(线程: 7):AsyncLocal.Value => 1 OnNextNormal 0(线程: 9):AsyncLocal.Value => 0 OnNextNormal 1(线程: 9):AsyncLocal.Value => 0 OnNextDelayed 0(线程: 4):AsyncLocal.Value => 0 OnNextDelayed 1(线程: 4):AsyncLocal.Value => 0

=> AsyncValue在每一首曲目上都迷失了方向

有什么办法让ExecutionContext与RX一起流动吗?

我只找到了,但这是另一个问题。他们解决了观察员环境如何流动的问题。我想要流出版商的上下文。

我想要做到的是:

  1. 来自“外面”的信息来为我服务
  2. 在服务中分发消息(RX)
  3. 记录消息时,用MessageId格式化日志消息
  4. 我不想把信息传递到任何地方

提前谢谢你的回答。

EN

回答 1

Stack Overflow用户

发布于 2018-07-16 19:18:54

Rx中的自由流动执行上下文使得它在大多数多线程场景中非常出色。您可以通过绕过计划的方法来强制执行线程上下文,如下所示:

代码语言:javascript
复制
public static class Extensions
{
    public static IObservable<T> TaskPoolDelay<T>(this IObservable<T> observable, TimeSpan delay)
    {
        return Observable.Create<T>(
            observer => observable.Subscribe(
                onNext: value => Task.Delay(delay).ContinueWith(_ => observer.OnNext(value)),
                onError: observer.OnError,
                onCompleted: observer.OnCompleted
            )
        );
    }
}

产出:

代码语言:javascript
复制
OnNextDelayed   2 (Thread: 6): AsyncLocal.Value => 2
OnNextDelayed   3 (Thread: 10): AsyncLocal.Value => 3
OnNextDelayed   1 (Thread: 7): AsyncLocal.Value => 1
OnNextDelayed   0 (Thread: 5): AsyncLocal.Value => 0

这确实传递了上下文,但是对于更大的查询,它很快变得复杂起来。我不确定在通知时保留上下文的IScheduler的实现是否能正常工作。如果消息复制不是太多的开销,这可能是最适合Rx的。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51279665

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档