首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么Parallel.ForEach比AsParallel().ForAll()要快得多,尽管MSDN的建议并非如此?

为什么Parallel.ForEach比AsParallel().ForAll()要快得多,尽管MSDN的建议并非如此?
EN

Stack Overflow用户
提问于 2014-09-18 08:32:54
回答 4查看 15.3K关注 0票数 33

我一直在做一些调查,看看我们如何创建一个在树中运行的多线程应用程序。

为了找到如何以最好的方式实现这一点,我创建了一个测试应用程序,该应用程序运行在我的C:\ disk中,并打开所有目录。

代码语言:javascript
复制
class Program
{
    static void Main(string[] args)
    {
        //var startDirectory = @"C:\The folder\RecursiveFolder";
        var startDirectory = @"C:\";

        var w = Stopwatch.StartNew();

        ThisIsARecursiveFunction(startDirectory);

        Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);

        Console.ReadKey();
    }

    public static void ThisIsARecursiveFunction(String currentDirectory)
    {
        var lastBit = Path.GetFileName(currentDirectory);
        var depth = currentDirectory.Count(t => t == '\\');
        //Console.WriteLine(depth + ": " + currentDirectory);

        try
        {
            var children = Directory.GetDirectories(currentDirectory);

            //Edit this mode to switch what way of parallelization it should use
            int mode = 3;

            switch (mode)
            {
                case 1:
                    foreach (var child in children)
                    {
                        ThisIsARecursiveFunction(child);
                    }
                    break;
                case 2:
                    children.AsParallel().ForAll(t =>
                    {
                        ThisIsARecursiveFunction(t);
                    });
                    break;
                case 3:
                    Parallel.ForEach(children, t =>
                    {
                        ThisIsARecursiveFunction(t);
                    });
                    break;
                default:
                    break;
            }

        }
        catch (Exception eee)
        {
            //Exception might occur for directories that can't be accessed.
        }
    }
}

然而,我遇到的情况是,当在模式3 (Parallel.ForEach)中运行这个程序时,代码大约在2.5秒内完成(是的,我有一个SSD ;)。在没有并行化的情况下运行代码,它大约在8秒内完成。并且在模式2 (AsParalle.ForAll())中运行代码,几乎需要无限的时间。

在签入process时,我还会遇到一些奇怪的事实:

代码语言:javascript
复制
Mode1 (No Parallelization):
Cpu:     ~25%
Threads: 3
Time to complete: ~8 seconds

Mode2 (AsParallel().ForAll()):
Cpu:     ~0%
Threads: Increasing by one per second (I find this strange since it seems to be waiting on the other threads to complete or a second timeout.)
Time to complete: 1 second per node so about 3 days???

Mode3 (Parallel.ForEach()):
Cpu:     100%
Threads: At most 29-30
Time to complete: ~2.5 seconds

特别奇怪的是,Parallel.ForEach似乎忽略了仍在运行的任何父线程/任务,而AsParallel().ForAll()似乎在等待前面的任务完成(因为所有父任务仍在等待它们的子任务完成)。

我在MSDN上读到的还有:“如果可能的话,我更喜欢ForAll而不是ForEach。”

来源:http://msdn.microsoft.com/en-us/library/dd997403(v=vs.110).aspx

有人知道为什么会这样吗?

编辑1:

根据马修·沃森的要求,我首先在记忆中加载了这棵树,然后再遍历它。现在,树的加载按顺序进行。

然而,结果是一样的。非并行化和Parallel.ForEach现在大约在0.05秒内完成整个树,而AsParallel().ForAll仍然仅为每秒1步左右。

代码:

代码语言:javascript
复制
class Program
{
    private static DirWithSubDirs RootDir;

    static void Main(string[] args)
    {
        //var startDirectory = @"C:\The folder\RecursiveFolder";
        var startDirectory = @"C:\";

        Console.WriteLine("Loading file system into memory...");
        RootDir = new DirWithSubDirs(startDirectory);
        Console.WriteLine("Done");


        var w = Stopwatch.StartNew();

        ThisIsARecursiveFunctionInMemory(RootDir);

        Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);

        Console.ReadKey();
    }        

    public static void ThisIsARecursiveFunctionInMemory(DirWithSubDirs currentDirectory)
    {
        var depth = currentDirectory.Path.Count(t => t == '\\');
        Console.WriteLine(depth + ": " + currentDirectory.Path);

        var children = currentDirectory.SubDirs;

        //Edit this mode to switch what way of parallelization it should use
        int mode = 2;

        switch (mode)
        {
            case 1:
                foreach (var child in children)
                {
                    ThisIsARecursiveFunctionInMemory(child);
                }
                break;
            case 2:
                children.AsParallel().ForAll(t =>
                {
                    ThisIsARecursiveFunctionInMemory(t);
                });
                break;
            case 3:
                Parallel.ForEach(children, t =>
                {
                    ThisIsARecursiveFunctionInMemory(t);
                });
                break;
            default:
                break;
        }
    }
}

class DirWithSubDirs
{
    public List<DirWithSubDirs> SubDirs = new List<DirWithSubDirs>();
    public String Path { get; private set; }

    public DirWithSubDirs(String path)
    {
        this.Path = path;
        try
        {
            SubDirs = Directory.GetDirectories(path).Select(t => new DirWithSubDirs(t)).ToList();
        }
        catch (Exception eee)
        {
            //Ignore directories that can't be accessed
        }
    }
}

编辑2:

在阅读了Matthew评论的更新后,我尝试在程序中添加以下代码:

代码语言:javascript
复制
ThreadPool.SetMinThreads(4000, 16);
ThreadPool.SetMaxThreads(4000, 16);

然而,这并不改变AsParallel的peform方式。尽管如此,头8个步骤在减慢到每秒1步之前,都会在瞬间执行。

(另外请注意,我目前忽略了当我无法通过Directory.GetDirectories()周围的that块访问目录时出现的异常)

编辑3:

另外,我主要感兴趣的是Parallel.ForEach和AsParallel.ForAll之间的区别,因为对我来说,奇怪的是,由于某种原因,第二个线程为每个递归创建一个线程,而第一个线程在最多30个线程中处理所有事情。(以及MSDN建议使用AsParallel的原因,即使它创建了如此多的线程,超时时间为1秒)

编辑4:

我发现的另一件奇怪的事情是:当我试图将线程池上的MinThreads设置在1023以上时,它似乎忽略了这个值,并将其缩小到大约8或16: ThreadPool.SetMinThreads(1023,16);

尽管如此,当我使用1023时,它会非常快地完成第一个1023元素,然后回到我一直在经历的慢节奏。

注意:现在还创建了1000多个线程(相比之下,整个Parallel.ForEach线程创建了30个线程)。

这是否意味着Parallel.ForEach在处理任务时更聪明?

更多信息,当您将值设置在1023之上时,此代码会打印两次8-8:(当您将值设置为1023或更低时,它会打印正确的值)。

代码语言:javascript
复制
        int threadsMin;
        int completionMin;
        ThreadPool.GetMinThreads(out threadsMin, out completionMin);
        Console.WriteLine("Cur min threads: " + threadsMin + " and the other thing: " + completionMin);

        ThreadPool.SetMinThreads(1023, 16);
        ThreadPool.SetMaxThreads(1023, 16);

        ThreadPool.GetMinThreads(out threadsMin, out completionMin);
        Console.WriteLine("Now min threads: " + threadsMin + " and the other thing: " + completionMin);

编辑5:

根据Dean的请求,我创建了另一个用于手动创建任务的案例:

代码语言:javascript
复制
case 4:
    var taskList = new List<Task>();
    foreach (var todo in children)
    {
        var itemTodo = todo;
        taskList.Add(Task.Run(() => ThisIsARecursiveFunctionInMemory(itemTodo)));
    }
    Task.WaitAll(taskList.ToArray());
    break;

这也和Parallel.ForEach()循环一样快。因此,我们仍然无法解释为什么AsParallel().ForAll()要慢得多。

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2014-09-20 15:27:02

这个问题是相当可调试的,当您遇到线程问题时,这是一个不寻常的奢侈品。这里的基本工具是Debug > Windows >线程调试器窗口。显示活动线程,并让您查看它们的堆栈跟踪。您将很容易地看到,一旦速度变慢,就会有数十个线程处于活动状态,而这些线程都被卡住了。它们的堆栈跟踪看起来都是一样的:

代码语言:javascript
复制
    mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout, bool exitContext) + 0x16 bytes  
    mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout) + 0x7 bytes 
    mscorlib.dll!System.Threading.ManualResetEventSlim.Wait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x182 bytes    
    mscorlib.dll!System.Threading.Tasks.Task.SpinThenBlockingWait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x93 bytes   
    mscorlib.dll!System.Threading.Tasks.Task.InternalRunSynchronously(System.Threading.Tasks.TaskScheduler scheduler, bool waitForCompletion) + 0xba bytes  
    mscorlib.dll!System.Threading.Tasks.Task.RunSynchronously(System.Threading.Tasks.TaskScheduler scheduler) + 0x13 bytes  
    System.Core.dll!System.Linq.Parallel.SpoolingTask.SpoolForAll<ConsoleApplication1.DirWithSubDirs,int>(System.Linq.Parallel.QueryTaskGroupState groupState, System.Linq.Parallel.PartitionedStream<ConsoleApplication1.DirWithSubDirs,int> partitions, System.Threading.Tasks.TaskScheduler taskScheduler) Line 172  C#
// etc..

每当你看到这样的事情,你应该立即想到消防水管的问题.可能是第三大最常见的线程错误,仅次于竞争和死锁。

您可以推理,现在您已经知道原因了,代码的问题是每个完成的线程都会增加N个线程。其中N是目录中子目录的平均数量。实际上,线程的数量呈指数增长,这总是很糟糕的。只有当N= 1时,它才能保持控制,当然,这在典型的磁盘上永远不会发生。

请注意,与几乎任何线程问题一样,这种错误行为往往会很糟糕地重复。你机器里的SSD会把它藏起来。计算机中的RAM也是如此,程序很可能在第二次运行时完成得很快并且没有问题。因为您现在将从文件系统缓存(而不是磁盘)读取数据,所以非常快。修改ThreadPool.SetMinThreads()也会隐藏它,但它无法修复它。它从来不解决任何问题,它只是隐藏它们。因为无论发生什么,指数型数总是超过设定的最小线程数。您只能希望在这种情况发生之前,它已经完成了对驱动器的迭代。对于一个拥有大驱动器的用户来说,希望渺茫。

ParallelEnumerable.ForAll()和Parallel.ForEach()之间的区别现在可能也很容易解释。从堆栈跟踪可以看出,ForAll()做了一些顽皮的事情,RunSynchronously()方法会阻塞,直到所有线程都完成。阻塞是线程池线程不应该做的事情,它会阻塞线程池,不允许它为另一个任务调度处理器。并且有您观察到的效果,线程池很快就会被等待其他线程来完成的线程淹没。这是没有发生的,他们在池中等待,没有得到安排,因为他们已经有那么多的活动。

这是一个非常常见的死锁场景,但是线程池管理器有一个解决办法。当活动线程没有及时完成时,它会监视活动线程和步骤。然后它允许启动一个额外的线程,比SetMinThreads()设置的最小线程多一个线程。但是不超过SetMaxThreads()设置的最大值,即有太多活动的tp线程是危险的,很可能触发OOM。这确实解决了死锁,它将获得一个ForAll()调用来完成。但是这种情况发生的速度非常慢,线程池每秒只执行两次。在它迎头赶上之前你会失去耐心的。

Parallel.ForEach()没有这个问题,它没有阻塞,所以没有阻塞池。

这似乎是解决方案,但请记住,您的程序仍然在激发您的计算机内存,添加更多等待的tp线程到池。这也会使您的程序崩溃,只是不太可能,因为您有大量的内存,并且线程池没有使用大量的内存来跟踪请求。然而,有些程序员却是accomplish that as well

解决方案非常简单,只是不要使用线程。这是有害的,只有一个磁盘时没有并发性。它也不喜欢被多个线程强占。尤其是在主轴驱动器上,头部寻找的速度非常非常慢。SSD做得更好,但是它仍然需要50微秒的时间,这是你不想要或不需要的开销。访问磁盘的理想线程数(否则无法很好地缓存)始终是one

票数 51
EN

Stack Overflow用户

发布于 2014-09-18 08:40:52

首先要注意的是,您正在尝试并行执行一个IO绑定操作,这将严重扭曲时间。

要注意的第二件事是并行任务的性质:您正在递归地降目录树。如果创建多个线程来执行此操作,则每个线程可能同时访问磁盘的不同部分--这将导致磁盘读取头在各处跳跃,并大大减慢速度。

尝试更改您的测试以创建内存中的树,并使用多个线程进行访问.然后,您将能够正确地比较时间,而结果被扭曲超过所有的有用性。

此外,您可能正在创建大量线程,它们(默认情况下)将是线程池线程。当线程超过处理器内核的数量时,拥有大量的线程实际上会减慢速度。

还请注意,当您超过线程池最小线程(由ThreadPool.GetMinThreads()定义)时,线程池管理器将在每个新线程池线程创建之间引入延迟。(我认为这大约是每一个新线程0.5s )。

此外,如果线程数超过ThreadPool.GetMaxThreads()返回的值,则创建线程将阻塞,直到其他一个线程退出为止。我认为这很可能会发生。

您可以通过调用ThreadPool.SetMaxThreads()ThreadPool.SetMinThreads()来增加这些值来检验这一假设,并查看它是否有任何不同。

(最后,请注意,如果您真的试图递归地从C:\下降,那么当IO异常到达受保护的OS文件夹时,您几乎肯定会得到它。)

注意:设置max/min线程池线程如下:

代码语言:javascript
复制
ThreadPool.SetMinThreads(4000, 16);
ThreadPool.SetMaxThreads(4000, 16);

跟踪

我尝试过使用上面描述的线程池线程计数设置的测试代码,结果如下(不是运行在整个C:\驱动器上,而是在一个较小的子集上运行):

  • 模式1花了06.5秒。
  • 模式2用了15.7秒。
  • 模式3用了16.4秒。

这与我的预期是一致的;添加大量线程来完成这一任务实际上使其速度比单线程慢,这两种并行方法所用的时间大致相同。

如果其他人想研究这个问题,这里有一些决定性的测试代码(OP的代码不能重复,因为我们不知道他的目录结构)。

代码语言:javascript
复制
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;

namespace Demo
{
    internal class Program
    {
        private static DirWithSubDirs RootDir;

        private static void Main()
        {
            Console.WriteLine("Loading file system into memory...");
            RootDir = new DirWithSubDirs("Root", 4, 4);
            Console.WriteLine("Done");

            //ThreadPool.SetMinThreads(4000, 16);
            //ThreadPool.SetMaxThreads(4000, 16);

            var w = Stopwatch.StartNew();
            ThisIsARecursiveFunctionInMemory(RootDir);

            Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);
            Console.ReadKey();
        }

        public static void ThisIsARecursiveFunctionInMemory(DirWithSubDirs currentDirectory)
        {
            var depth = currentDirectory.Path.Count(t => t == '\\');
            Console.WriteLine(depth + ": " + currentDirectory.Path);

            var children = currentDirectory.SubDirs;

            //Edit this mode to switch what way of parallelization it should use
            int mode = 3;

            switch (mode)
            {
                case 1:
                    foreach (var child in children)
                    {
                        ThisIsARecursiveFunctionInMemory(child);
                    }
                    break;

                case 2:
                    children.AsParallel().ForAll(t =>
                    {
                        ThisIsARecursiveFunctionInMemory(t);
                    });
                    break;

                case 3:
                    Parallel.ForEach(children, t =>
                    {
                        ThisIsARecursiveFunctionInMemory(t);
                    });
                    break;

                default:
                    break;
            }
        }
    }

    internal class DirWithSubDirs
    {
        public List<DirWithSubDirs> SubDirs = new List<DirWithSubDirs>();

        public String Path { get; private set; }

        public DirWithSubDirs(String path, int width, int depth)
        {
            this.Path = path;

            if (depth > 0)
                for (int i = 0; i < width; ++i)
                    SubDirs.Add(new DirWithSubDirs(path + "\\" + i, width, depth - 1));
        }
    }
}
票数 7
EN

Stack Overflow用户

发布于 2014-09-24 20:24:35

Parallel.For和.ForEach方法是在内部实现的,相当于在任务中运行迭代,例如,循环如下:

代码语言:javascript
复制
Parallel.For(0, N, i => 
{ 
  DoWork(i); 
});

相当于:

代码语言:javascript
复制
var tasks = new List<Task>(N); 
for(int i=0; i<N; i++) 
{ 
tasks.Add(Task.Factory.StartNew(state => DoWork((int)state), i)); 
} 
Task.WaitAll(tasks.ToArray());

从每一次迭代可能与其他迭代并行运行的角度来看,这是一个正常的心理模型,但在现实中并没有发生。实际上,并行并不一定每次迭代都使用一个任务,因为这比所需的开销要大得多。Parallel.ForEach尝试使用尽可能快地完成循环所需的最小任务数。当线程可用来处理这些任务时,它会增加任务,而这些任务中的每一个都参与了一个管理方案(我认为它被称为分块):一个任务要求进行多次迭代,得到它们,然后处理这些任务,然后返回到更多。块大小根据参与任务的数量、机器上的负载等而不同。

PLINQ的.AsParallel()有一个不同的实现,但它仍然可以同样地将多个迭代提取到一个临时存储中,在线程中进行计算(但不是作为一个任务),并将查询结果放入一个小缓冲区。(您获得了基于ParallelQuery的内容,然后进一步将.Whatever()函数绑定到提供并行实现的一组扩展方法中)。

因此,既然我们对这两种机制的工作方式有了一点了解,我将尝试为您最初的问题提供一个答案:

那么为什么.AsParallel()比Parallel.ForEach慢呢?原因有以下几点。任务(或它们在这里的等效实现)在I/O类调用上执行而不是块。他们“等待”并释放CPU来做其他的事情。但是(引用C#概述):“不阻塞线程,PLINQ就不能执行I/O绑定的工作”。呼叫是同步的。它们的编写意图是,如果(而且只有在)您正在做的事情,例如下载每个任务不占用CPU时间的情况下,就会增加并行性的程度。

函数调用与I/O绑定调用完全类似的原因是:一个线程(称为T)阻塞,在所有子线程完成之前什么也不做,这在这里可能是一个缓慢的过程。T本身并不是CPU密集型的,当它等待子程序解除阻塞时,它只是在等待。因此,它与典型的I/O绑定函数调用相同。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/25907829

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档