首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >ConcurrentQueue一个元素由两个线程获取

ConcurrentQueue一个元素由两个线程获取
EN

Stack Overflow用户
提问于 2013-09-05 23:43:00
回答 2查看 1.7K关注 0票数 1

我希望两个线程处理一个队列。第一个线程应该每2秒调用一次,第二个线程应该每3秒调用一次。两个线程应该同时启动。在访问队列的第一个元素时,我遇到了问题。两个线程都接受索引0的元素。有时,这种情况会发生在队列的其他元素中,而不仅仅是第一个元素。我在控制台上有这样一个输出:

  • 项目0经1次处理: 3:27:8
  • 项目0经2次处理: 3:27:8
  • 项目2 1次处理: 3:27:10
  • 项目3经2次处理: 3:27:11
  • 项目4经1次处理: 3:27:12

以此类推。

下面是我使用的代码:

代码语言:javascript
复制
    ConcurrentQueue<int> sharedQueue = new ConcurrentQueue<int>();

    for (int i = 0; i < 10; i++)
    {
        sharedQueue.Enqueue(i);
    }


    int itemCount= 0;


    Task[] tasks = new Task[2];
    for (int i = 0; i < tasks.Length; i++)
    {
        // create the new task
        tasks[i] = new Task(() =>
        {
            while (sharedQueue.Count > 0)
            {
                // define a variable for the dequeue requests
                int queueElement;
                // take an item from the queue
                bool gotElement = sharedQueue.TryDequeue(out queueElement);
                // increment the count of items processed
                if (gotElement)
                {
                    DateTime dt = DateTime.Now;
                    Console.WriteLine("Item " + itemCount + "processed by " 
                        + Task.CurrentId + " Time: " + dt.Hour + ":" + dt.Minute + ":" + dt.Second);
                    Interlocked.Increment(ref itemCount);   
               if (Task.CurrentId == 1) 
                    Thread.Sleep(2000);
                else 
                    Thread.Sleep(3000);                       
                }

            }
        });
        // start the new task
        tasks[i].Start();


    }
    // wait for the tasks to complete
    Task.WaitAll(tasks);
    // report on the number of items processed
    Console.WriteLine("Items processed: {0}", itemCount);
    // wait for input before exiting
    Console.WriteLine("Press enter to finish");
    Console.ReadLine();
}
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2013-09-06 00:44:43

替换以下一行:

代码语言:javascript
复制
Console.WriteLine("Item " + itemCount + "processed by " ...);

用这一行:

代码语言:javascript
复制
Console.WriteLine("Item " + queueElement + "processed by " ...);

您所看到的问题很可能是由于几乎同时执行Console.WriteLine的任务造成的,而且两者都看到了itemCount的相同值,因为它们交织在一起,以至于Interlocked.Increment调用还没有发生。打印queueElement可能更有意义,因为它更有意义。

票数 5
EN

Stack Overflow用户

发布于 2013-09-06 19:26:01

关于您的布赖恩·吉迪恩的好答案问题,请参阅itemCount

您可以考虑重写代码以使用BlockingCollection而不是ConcurrentQueue<T>。工作起来容易多了。BlockingCollection是并发集合的包装器。在其默认配置中,备份存储是一个ConcurrentQueue。因此,您可以获得相同的并发队列功能,但接口要好得多。

代码语言:javascript
复制
BlockingCollection<int> sharedQueue = new BlockingCollection<int>();

for (int i = 0; i < 10; i++)
{
    sharedQueue.Add(i);
}

// CompleteAdding marks the queue as "complete for adding,"
// meaning that no more items will be added.
sharedQueue.CompleteAdding();

int itemCount= 0;

Task[] tasks = new Task[2];
for (int i = 0; i < tasks.Length; i++)
{
    // create the new task
    tasks[i] = new Task(() =>
    {
        foreach (var queueElement in sharedQueue.GetConsumingEnumerable())
        {
            DateTime dt = DateTime.Now;
            Console.WriteLine("Item " + itemCount + "processed by " 
                + Task.CurrentId + " Time: " + dt.Hour + ":" + dt.Minute + ":" + dt.Second);
            Interlocked.Increment(ref itemCount);   
            if (Task.CurrentId == 1) 
                Thread.Sleep(2000);
            else 
                Thread.Sleep(3000);                       
        }
    });

    // start the new task
    tasks[i].Start();
}

GetConsumingEnumerable返回一个枚举数,该枚举数将从队列中获取下一项,直到队列为空。它还能很好地处理取消,这对ConcurrentQueue来说有点困难。

通常,每当您想到使用ConcurrentQueue<T>时,都可能需要BlockingCollection<T>

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/18647678

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档