首先,我要说,我对反应性扩展和DynamicData都很陌生,所以我可能在这里遗漏了一些非常明显的东西。
目的:我想根据新的市场价格计算一些交易(类似于:https://dynamic-data.org/2014/11/22/trading-example-part-2-manage-market-data/)的利润(或亏损)。为了简单起见,让我们说,在最后一秒钟收到的所有价格都被认为是新的。
我的问题是:即使使用批处理编辑,同一货币在1秒内也会发生多个更新事件(如EURUSD)。理想情况下,我只想根据最新的可用值引发1个事件,以避免进行不必要的计算。
到目前为止我的代码是:
Main.cs
using System.Reactive.Linq;
using DynamicData;
TickService tickService = new();
tickService.NewTicks
.Connect()
.Watch("EURUSD")
.Subscribe(price => Console.WriteLine(price));
/*
* In the actual project the prices come from an external system,
* but that is irrelevant here
* so for the sake of simplicity I'm adding them manually (with a delay)
* to TickService
*/
Task.Run(() =>
{
for (int i = 0; i < 10; i++)
{
var newTicks = new List<Tick>() { new Tick() { Name = "EURUSD", Price = i, LastUpdate = DateTime.UtcNow } };
tickService.AddTicks(newTicks);
Console.WriteLine($"Added: i -> {i}");
Thread.Sleep(250);
}
});
Console.ReadLine();Tick.cs
public class Tick
{
public string Name { get; set; } = "";
public double Price { get; set; }
public DateTime LastUpdate { get; set; }
public override string ToString()
{
return $"Name: {Name}, Price: {Price}, LastUpdate: {LastUpdate}";
}
}TickService.cs (以最后一秒的价格公开一个IObservableCache )
using DynamicData;
public class TickService
{
private readonly SourceCache<Tick, string> _ticksCache = new(x => x.Name);
public IObservableCache<Tick, string> NewTicks { get; }
public TickService()
{
NewTicks = _ticksCache
.Connect()
.Filter(tick => tick.LastUpdate > DateTime.UtcNow - TimeSpan.FromSeconds(1))
.Batch(TimeSpan.FromSeconds(1))
.AsObservableCache();
}
public void AddTicks(ICollection<Tick> newTicks)
{
_ticksCache.Edit(innerCache => innerCache.AddOrUpdate(newTicks));
}
}上述代码的结果如下:
Added: i -> 0
Added: i -> 1
Added: i -> 2
Added: i -> 3
Add, Key: EURUSD, Current: Name: EURUSD, Price: 0, LastUpdate: 14/03/2022 4:14:20 pm, Previous: <None>
Update, Key: EURUSD, Current: Name: EURUSD, Price: 1, LastUpdate: 14/03/2022 4:14:20 pm, Previous: Name: EURUSD, Price: 0, LastUpdate: 14/03/2022 4:14:20 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 2, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 1, LastUpdate: 14/03/2022 4:14:20 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 3, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 2, LastUpdate: 14/03/2022 4:14:21 pm
Added: i -> 4
Added: i -> 5
Added: i -> 6
Added: i -> 7
Update, Key: EURUSD, Current: Name: EURUSD, Price: 4, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 3, LastUpdate: 14/03/2022 4:14:21 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 5, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 4, LastUpdate: 14/03/2022 4:14:21 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 6, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 5, LastUpdate: 14/03/2022 4:14:21 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 7, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 6, LastUpdate: 14/03/2022 4:14:22 pm
Added: i -> 8
Added: i -> 9
Update, Key: EURUSD, Current: Name: EURUSD, Price: 8, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 7, LastUpdate: 14/03/2022 4:14:22 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 9, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 8, LastUpdate: 14/03/2022 4:14:22 pm我想要的是:
Added: i -> 0
Added: i -> 1
Added: i -> 2
Added: i -> 3
Update, Key: EURUSD, Current: Name: EURUSD, Price: 3, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 2, LastUpdate: 14/03/2022 4:14:21 pm
Added: i -> 4
Added: i -> 5
Added: i -> 6
Added: i -> 7
Update, Key: EURUSD, Current: Name: EURUSD, Price: 7, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 6, LastUpdate: 14/03/2022 4:14:22 pm
Added: i -> 8
Added: i -> 9
Update, Key: EURUSD, Current: Name: EURUSD, Price: 9, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 8, LastUpdate: 14/03/2022 4:14:22 pm由于我使用的是批处理编辑,所以我只希望每秒钟为EURUSD生成一个更新事件。关于我在这里缺少的东西,有什么想法或建议吗?
谢谢
发布于 2022-05-23 09:39:12
这是几个人提出的一个长期的请求,从来没有变成一个操作符,所以我在动态数据中添加了一个新的操作符,以实现更改集通知中的唯一性。
新运算符称为EnsureUniqueKeys,可应用于缓存。
var uniqueChanges = _myCache.Connect().EnsureUniqueKeys()哪里
_myCache.Edit(innerCache =>
{
innerCache.AddOrUpdate(new Person("Me", 20));
innerCache.AddOrUpdate(new Person("Me", 21));
innerCache.AddOrUpdate(new Person("Me", 22));
});将为个人生成一个单独的添加通知(“Me”,22)
和
_myCache.Edit(innerCache =>
{
innerCache.AddOrUpdate(new Person("Me", 20));
innerCache.AddOrUpdate(new Person("Me", 21));
innerCache.RemoveKey("Me");
});将产生一个空的更改集,因为该项是在同一个编辑中添加、更新和删除的。
运算符可以在TickService中应用,如下所示:
NewTicks = _ticksCache
.Connect()
.Filter(tick => tick.LastUpdate > DateTime.UtcNow - TimeSpan.FromSeconds(1))
.Batch(TimeSpan.FromSeconds(1))
.EnsureUniqueKeys()
.AsObservableCache();https://stackoverflow.com/questions/71471772
复制相似问题