我希望两个线程处理一个队列。第一个线程应该每2秒调用一次,第二个线程应该每3秒调用一次。两个线程应该同时启动。在访问队列的第一个元素时,我遇到了问题。两个线程都接受索引0的元素。有时,这种情况会发生在队列的其他元素中,而不仅仅是第一个元素。我在控制台上有这样一个输出:
以此类推。
下面是我使用的代码:
ConcurrentQueue<int> sharedQueue = new ConcurrentQueue<int>();
for (int i = 0; i < 10; i++)
{
sharedQueue.Enqueue(i);
}
int itemCount= 0;
Task[] tasks = new Task[2];
for (int i = 0; i < tasks.Length; i++)
{
// create the new task
tasks[i] = new Task(() =>
{
while (sharedQueue.Count > 0)
{
// define a variable for the dequeue requests
int queueElement;
// take an item from the queue
bool gotElement = sharedQueue.TryDequeue(out queueElement);
// increment the count of items processed
if (gotElement)
{
DateTime dt = DateTime.Now;
Console.WriteLine("Item " + itemCount + "processed by "
+ Task.CurrentId + " Time: " + dt.Hour + ":" + dt.Minute + ":" + dt.Second);
Interlocked.Increment(ref itemCount);
if (Task.CurrentId == 1)
Thread.Sleep(2000);
else
Thread.Sleep(3000);
}
}
});
// start the new task
tasks[i].Start();
}
// wait for the tasks to complete
Task.WaitAll(tasks);
// report on the number of items processed
Console.WriteLine("Items processed: {0}", itemCount);
// wait for input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}发布于 2013-09-06 00:44:43
替换以下一行:
Console.WriteLine("Item " + itemCount + "processed by " ...);用这一行:
Console.WriteLine("Item " + queueElement + "processed by " ...);您所看到的问题很可能是由于几乎同时执行Console.WriteLine的任务造成的,而且两者都看到了itemCount的相同值,因为它们交织在一起,以至于Interlocked.Increment调用还没有发生。打印queueElement可能更有意义,因为它更有意义。
发布于 2013-09-06 19:26:01
关于您的布赖恩·吉迪恩的好答案问题,请参阅itemCount。
您可以考虑重写代码以使用BlockingCollection而不是ConcurrentQueue<T>。工作起来容易多了。BlockingCollection是并发集合的包装器。在其默认配置中,备份存储是一个ConcurrentQueue。因此,您可以获得相同的并发队列功能,但接口要好得多。
BlockingCollection<int> sharedQueue = new BlockingCollection<int>();
for (int i = 0; i < 10; i++)
{
sharedQueue.Add(i);
}
// CompleteAdding marks the queue as "complete for adding,"
// meaning that no more items will be added.
sharedQueue.CompleteAdding();
int itemCount= 0;
Task[] tasks = new Task[2];
for (int i = 0; i < tasks.Length; i++)
{
// create the new task
tasks[i] = new Task(() =>
{
foreach (var queueElement in sharedQueue.GetConsumingEnumerable())
{
DateTime dt = DateTime.Now;
Console.WriteLine("Item " + itemCount + "processed by "
+ Task.CurrentId + " Time: " + dt.Hour + ":" + dt.Minute + ":" + dt.Second);
Interlocked.Increment(ref itemCount);
if (Task.CurrentId == 1)
Thread.Sleep(2000);
else
Thread.Sleep(3000);
}
});
// start the new task
tasks[i].Start();
}GetConsumingEnumerable返回一个枚举数,该枚举数将从队列中获取下一项,直到队列为空。它还能很好地处理取消,这对ConcurrentQueue来说有点困难。
通常,每当您想到使用ConcurrentQueue<T>时,都可能需要BlockingCollection<T>。
https://stackoverflow.com/questions/18647678
复制相似问题