我有一个后台服务,将在应用程序执行启动时启动。后台服务将根据设置的员工数量开始创建多个任务。就像我做各种试验一样,监视DB上的开放连接。打开的连接总是与我设置的工作人员相同的值。假设我设置了32个工作人员,那么当我使用查询检查连接时,连接将始终是32个打开的连接。FYI我使用Postgres作为DB服务器。为了检查打开的连接,我使用下面的查询来检查应用程序运行时的连接。
select * from pg_stat_activity where application_name = 'myapplication';下面是后台服务代码。
public class MessagingService : BackgroundService {
private int worker = 32;
protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
var tasks = new List<Task>();
for (int i=0; i<worker; i++) {
tasks.Add(DoJob(cancellationToken));
}
while (!cancellationToken.IsCancellationRequested) {
try {
var completed = await Task.WhenAny(tasks);
tasks.Remove(completed);
} catch (Exception) {
await Task.Delay(1000, cancellationToken);
}
if (!cancellationToken.IsCancellationRequested) {
tasks.Add(DoJob(cancellationToken));
}
}
}
private async Task DoJob(CancellationToken cancellationToken) {
using (var scope = _services.CreateScope()) {
var service = scope.ServiceProvider
.GetRequiredService<MessageService>();
try {
//do select and update query on db if null return false otherwise send mail
if (!await service.Run(cancellationToken)) {
await Task.Delay(1000, cancellationToken);
}
} catch (Exception) {
await Task.Delay(1000, cancellationToken);
}
}
}
} 工作流是不正确的,因为它将继续创建任务,并使连接处于打开状态和空闲状态。此外,在运行这些任务时,CPU和内存使用率很高。当DB上没有记录时,我如何才能使一个工作人员保持当前的运行状态?如果找到一个记录或更多的记录,它将不断增加,直到预置的最大工作人员,然后当记录小于最大工作人员时减少工作人员。如果这个问题过于模糊或基于意见,请让我知道,我会尽力使它尽可能具体。
更新目的
此服务的目的是执行电子邮件传递。还有另一个API将用于创建计划作业。一旦作业被添加到DB中,此服务将在预定时间执行电子邮件传递。例如,5k计划作业被添加到DB中,执行作业的计划时间是'2021-12-31 08:00:00‘,创建计划作业的时间是2021-12-31 :00:00’。服务将从00:00一直循环到08:00,32名员工同时运行,然后开始发送电子邮件。我怎样才能把它提高到更高的效率,就像正常情况下,没有作业调度,只有一个工人在运行。当它检查有5k的计划作业时,它将充分利用所有的工人。在5k的工作完成后,它将回到1名工人。
发布于 2021-12-31 06:05:56
我的建议是通过使用来自ActionBlock库的TPL数据流来避免手工创建和维护工作任务的负担。此组件是输入队列和Action<T>委托的组合。在其构造函数中指定委托,并使用其Post方法向其提供消息。该组件为它接收到的每个消息调用委托,并具有指定的并行度。当没有更多的消息要发送时,您可以通过调用它的Complete方法通知它,然后调用它的Completion来通知它,这样您就可以知道所有委托给它的工作都已经完成。
下面是一个粗略的演示,如果您可以使用这个组件:
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
var processor = new ActionBlock<Job>(async job =>
{
await ProcessJob(job);
await MarkJobAsCompleted(job);
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 32
});
try
{
while (true)
{
Task delayTask = Task.Delay(TimeSpan.FromSeconds(60), cancellationToken);
Job[] jobs = await FetchReadyToProcessJobs();
foreach (var job in jobs)
{
await MarkJobAsPending(job);
processor.Post(job);
}
await delayTask; // Will throw when the token is canceled
}
}
finally
{
processor.Complete();
await processor.Completion;
}
}FetchReadyToProcessJobs方法应该连接到数据库,并获取所有需要处理的作业。在上面的示例中,该方法每60秒调用一次。Task.Delay是在调用该方法之前创建的,并在返回的作业被发布到ActionBlock<T>之后等待。这样,调用之间的间隔将是稳定和一致的。
https://stackoverflow.com/questions/70538462
复制相似问题