我一直在做一些调查,看看我们如何创建一个在树中运行的多线程应用程序。
为了找到如何以最好的方式实现这一点,我创建了一个测试应用程序,该应用程序运行在我的C:\ disk中,并打开所有目录。
class Program
{
static void Main(string[] args)
{
//var startDirectory = @"C:\The folder\RecursiveFolder";
var startDirectory = @"C:\";
var w = Stopwatch.StartNew();
ThisIsARecursiveFunction(startDirectory);
Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);
Console.ReadKey();
}
public static void ThisIsARecursiveFunction(String currentDirectory)
{
var lastBit = Path.GetFileName(currentDirectory);
var depth = currentDirectory.Count(t => t == '\\');
//Console.WriteLine(depth + ": " + currentDirectory);
try
{
var children = Directory.GetDirectories(currentDirectory);
//Edit this mode to switch what way of parallelization it should use
int mode = 3;
switch (mode)
{
case 1:
foreach (var child in children)
{
ThisIsARecursiveFunction(child);
}
break;
case 2:
children.AsParallel().ForAll(t =>
{
ThisIsARecursiveFunction(t);
});
break;
case 3:
Parallel.ForEach(children, t =>
{
ThisIsARecursiveFunction(t);
});
break;
default:
break;
}
}
catch (Exception eee)
{
//Exception might occur for directories that can't be accessed.
}
}
}然而,我遇到的情况是,当在模式3 (Parallel.ForEach)中运行这个程序时,代码大约在2.5秒内完成(是的,我有一个SSD ;)。在没有并行化的情况下运行代码,它大约在8秒内完成。并且在模式2 (AsParalle.ForAll())中运行代码,几乎需要无限的时间。
在签入process时,我还会遇到一些奇怪的事实:
Mode1 (No Parallelization):
Cpu: ~25%
Threads: 3
Time to complete: ~8 seconds
Mode2 (AsParallel().ForAll()):
Cpu: ~0%
Threads: Increasing by one per second (I find this strange since it seems to be waiting on the other threads to complete or a second timeout.)
Time to complete: 1 second per node so about 3 days???
Mode3 (Parallel.ForEach()):
Cpu: 100%
Threads: At most 29-30
Time to complete: ~2.5 seconds特别奇怪的是,Parallel.ForEach似乎忽略了仍在运行的任何父线程/任务,而AsParallel().ForAll()似乎在等待前面的任务完成(因为所有父任务仍在等待它们的子任务完成)。
我在MSDN上读到的还有:“如果可能的话,我更喜欢ForAll而不是ForEach。”
来源:http://msdn.microsoft.com/en-us/library/dd997403(v=vs.110).aspx
有人知道为什么会这样吗?
编辑1:
根据马修·沃森的要求,我首先在记忆中加载了这棵树,然后再遍历它。现在,树的加载按顺序进行。
然而,结果是一样的。非并行化和Parallel.ForEach现在大约在0.05秒内完成整个树,而AsParallel().ForAll仍然仅为每秒1步左右。
代码:
class Program
{
private static DirWithSubDirs RootDir;
static void Main(string[] args)
{
//var startDirectory = @"C:\The folder\RecursiveFolder";
var startDirectory = @"C:\";
Console.WriteLine("Loading file system into memory...");
RootDir = new DirWithSubDirs(startDirectory);
Console.WriteLine("Done");
var w = Stopwatch.StartNew();
ThisIsARecursiveFunctionInMemory(RootDir);
Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);
Console.ReadKey();
}
public static void ThisIsARecursiveFunctionInMemory(DirWithSubDirs currentDirectory)
{
var depth = currentDirectory.Path.Count(t => t == '\\');
Console.WriteLine(depth + ": " + currentDirectory.Path);
var children = currentDirectory.SubDirs;
//Edit this mode to switch what way of parallelization it should use
int mode = 2;
switch (mode)
{
case 1:
foreach (var child in children)
{
ThisIsARecursiveFunctionInMemory(child);
}
break;
case 2:
children.AsParallel().ForAll(t =>
{
ThisIsARecursiveFunctionInMemory(t);
});
break;
case 3:
Parallel.ForEach(children, t =>
{
ThisIsARecursiveFunctionInMemory(t);
});
break;
default:
break;
}
}
}
class DirWithSubDirs
{
public List<DirWithSubDirs> SubDirs = new List<DirWithSubDirs>();
public String Path { get; private set; }
public DirWithSubDirs(String path)
{
this.Path = path;
try
{
SubDirs = Directory.GetDirectories(path).Select(t => new DirWithSubDirs(t)).ToList();
}
catch (Exception eee)
{
//Ignore directories that can't be accessed
}
}
}编辑2:
在阅读了Matthew评论的更新后,我尝试在程序中添加以下代码:
ThreadPool.SetMinThreads(4000, 16);
ThreadPool.SetMaxThreads(4000, 16);然而,这并不改变AsParallel的peform方式。尽管如此,头8个步骤在减慢到每秒1步之前,都会在瞬间执行。
(另外请注意,我目前忽略了当我无法通过Directory.GetDirectories()周围的that块访问目录时出现的异常)
编辑3:
另外,我主要感兴趣的是Parallel.ForEach和AsParallel.ForAll之间的区别,因为对我来说,奇怪的是,由于某种原因,第二个线程为每个递归创建一个线程,而第一个线程在最多30个线程中处理所有事情。(以及MSDN建议使用AsParallel的原因,即使它创建了如此多的线程,超时时间为1秒)
编辑4:
我发现的另一件奇怪的事情是:当我试图将线程池上的MinThreads设置在1023以上时,它似乎忽略了这个值,并将其缩小到大约8或16: ThreadPool.SetMinThreads(1023,16);
尽管如此,当我使用1023时,它会非常快地完成第一个1023元素,然后回到我一直在经历的慢节奏。
注意:现在还创建了1000多个线程(相比之下,整个Parallel.ForEach线程创建了30个线程)。
这是否意味着Parallel.ForEach在处理任务时更聪明?
更多信息,当您将值设置在1023之上时,此代码会打印两次8-8:(当您将值设置为1023或更低时,它会打印正确的值)。
int threadsMin;
int completionMin;
ThreadPool.GetMinThreads(out threadsMin, out completionMin);
Console.WriteLine("Cur min threads: " + threadsMin + " and the other thing: " + completionMin);
ThreadPool.SetMinThreads(1023, 16);
ThreadPool.SetMaxThreads(1023, 16);
ThreadPool.GetMinThreads(out threadsMin, out completionMin);
Console.WriteLine("Now min threads: " + threadsMin + " and the other thing: " + completionMin);编辑5:
根据Dean的请求,我创建了另一个用于手动创建任务的案例:
case 4:
var taskList = new List<Task>();
foreach (var todo in children)
{
var itemTodo = todo;
taskList.Add(Task.Run(() => ThisIsARecursiveFunctionInMemory(itemTodo)));
}
Task.WaitAll(taskList.ToArray());
break;这也和Parallel.ForEach()循环一样快。因此,我们仍然无法解释为什么AsParallel().ForAll()要慢得多。
发布于 2014-09-20 15:27:02
这个问题是相当可调试的,当您遇到线程问题时,这是一个不寻常的奢侈品。这里的基本工具是Debug > Windows >线程调试器窗口。显示活动线程,并让您查看它们的堆栈跟踪。您将很容易地看到,一旦速度变慢,就会有数十个线程处于活动状态,而这些线程都被卡住了。它们的堆栈跟踪看起来都是一样的:
mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout, bool exitContext) + 0x16 bytes
mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout) + 0x7 bytes
mscorlib.dll!System.Threading.ManualResetEventSlim.Wait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x182 bytes
mscorlib.dll!System.Threading.Tasks.Task.SpinThenBlockingWait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x93 bytes
mscorlib.dll!System.Threading.Tasks.Task.InternalRunSynchronously(System.Threading.Tasks.TaskScheduler scheduler, bool waitForCompletion) + 0xba bytes
mscorlib.dll!System.Threading.Tasks.Task.RunSynchronously(System.Threading.Tasks.TaskScheduler scheduler) + 0x13 bytes
System.Core.dll!System.Linq.Parallel.SpoolingTask.SpoolForAll<ConsoleApplication1.DirWithSubDirs,int>(System.Linq.Parallel.QueryTaskGroupState groupState, System.Linq.Parallel.PartitionedStream<ConsoleApplication1.DirWithSubDirs,int> partitions, System.Threading.Tasks.TaskScheduler taskScheduler) Line 172 C#
// etc..每当你看到这样的事情,你应该立即想到消防水管的问题.可能是第三大最常见的线程错误,仅次于竞争和死锁。
您可以推理,现在您已经知道原因了,代码的问题是每个完成的线程都会增加N个线程。其中N是目录中子目录的平均数量。实际上,线程的数量呈指数增长,这总是很糟糕的。只有当N= 1时,它才能保持控制,当然,这在典型的磁盘上永远不会发生。
请注意,与几乎任何线程问题一样,这种错误行为往往会很糟糕地重复。你机器里的SSD会把它藏起来。计算机中的RAM也是如此,程序很可能在第二次运行时完成得很快并且没有问题。因为您现在将从文件系统缓存(而不是磁盘)读取数据,所以非常快。修改ThreadPool.SetMinThreads()也会隐藏它,但它无法修复它。它从来不解决任何问题,它只是隐藏它们。因为无论发生什么,指数型数总是超过设定的最小线程数。您只能希望在这种情况发生之前,它已经完成了对驱动器的迭代。对于一个拥有大驱动器的用户来说,希望渺茫。
ParallelEnumerable.ForAll()和Parallel.ForEach()之间的区别现在可能也很容易解释。从堆栈跟踪可以看出,ForAll()做了一些顽皮的事情,RunSynchronously()方法会阻塞,直到所有线程都完成。阻塞是线程池线程不应该做的事情,它会阻塞线程池,不允许它为另一个任务调度处理器。并且有您观察到的效果,线程池很快就会被等待其他线程来完成的线程淹没。这是没有发生的,他们在池中等待,没有得到安排,因为他们已经有那么多的活动。
这是一个非常常见的死锁场景,但是线程池管理器有一个解决办法。当活动线程没有及时完成时,它会监视活动线程和步骤。然后它允许启动一个额外的线程,比SetMinThreads()设置的最小线程多一个线程。但是不超过SetMaxThreads()设置的最大值,即有太多活动的tp线程是危险的,很可能触发OOM。这确实解决了死锁,它将获得一个ForAll()调用来完成。但是这种情况发生的速度非常慢,线程池每秒只执行两次。在它迎头赶上之前你会失去耐心的。
Parallel.ForEach()没有这个问题,它没有阻塞,所以没有阻塞池。
这似乎是解决方案,但请记住,您的程序仍然在激发您的计算机内存,添加更多等待的tp线程到池。这也会使您的程序崩溃,只是不太可能,因为您有大量的内存,并且线程池没有使用大量的内存来跟踪请求。然而,有些程序员却是accomplish that as well。
解决方案非常简单,只是不要使用线程。这是有害的,只有一个磁盘时没有并发性。它也不喜欢被多个线程强占。尤其是在主轴驱动器上,头部寻找的速度非常非常慢。SSD做得更好,但是它仍然需要50微秒的时间,这是你不想要或不需要的开销。访问磁盘的理想线程数(否则无法很好地缓存)始终是one。
发布于 2014-09-18 08:40:52
首先要注意的是,您正在尝试并行执行一个IO绑定操作,这将严重扭曲时间。
要注意的第二件事是并行任务的性质:您正在递归地降目录树。如果创建多个线程来执行此操作,则每个线程可能同时访问磁盘的不同部分--这将导致磁盘读取头在各处跳跃,并大大减慢速度。
尝试更改您的测试以创建内存中的树,并使用多个线程进行访问.然后,您将能够正确地比较时间,而结果被扭曲超过所有的有用性。
此外,您可能正在创建大量线程,它们(默认情况下)将是线程池线程。当线程超过处理器内核的数量时,拥有大量的线程实际上会减慢速度。
还请注意,当您超过线程池最小线程(由ThreadPool.GetMinThreads()定义)时,线程池管理器将在每个新线程池线程创建之间引入延迟。(我认为这大约是每一个新线程0.5s )。
此外,如果线程数超过ThreadPool.GetMaxThreads()返回的值,则创建线程将阻塞,直到其他一个线程退出为止。我认为这很可能会发生。
您可以通过调用ThreadPool.SetMaxThreads()和ThreadPool.SetMinThreads()来增加这些值来检验这一假设,并查看它是否有任何不同。
(最后,请注意,如果您真的试图递归地从C:\下降,那么当IO异常到达受保护的OS文件夹时,您几乎肯定会得到它。)
注意:设置max/min线程池线程如下:
ThreadPool.SetMinThreads(4000, 16);
ThreadPool.SetMaxThreads(4000, 16);跟踪
我尝试过使用上面描述的线程池线程计数设置的测试代码,结果如下(不是运行在整个C:\驱动器上,而是在一个较小的子集上运行):
这与我的预期是一致的;添加大量线程来完成这一任务实际上使其速度比单线程慢,这两种并行方法所用的时间大致相同。
如果其他人想研究这个问题,这里有一些决定性的测试代码(OP的代码不能重复,因为我们不知道他的目录结构)。
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
namespace Demo
{
internal class Program
{
private static DirWithSubDirs RootDir;
private static void Main()
{
Console.WriteLine("Loading file system into memory...");
RootDir = new DirWithSubDirs("Root", 4, 4);
Console.WriteLine("Done");
//ThreadPool.SetMinThreads(4000, 16);
//ThreadPool.SetMaxThreads(4000, 16);
var w = Stopwatch.StartNew();
ThisIsARecursiveFunctionInMemory(RootDir);
Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);
Console.ReadKey();
}
public static void ThisIsARecursiveFunctionInMemory(DirWithSubDirs currentDirectory)
{
var depth = currentDirectory.Path.Count(t => t == '\\');
Console.WriteLine(depth + ": " + currentDirectory.Path);
var children = currentDirectory.SubDirs;
//Edit this mode to switch what way of parallelization it should use
int mode = 3;
switch (mode)
{
case 1:
foreach (var child in children)
{
ThisIsARecursiveFunctionInMemory(child);
}
break;
case 2:
children.AsParallel().ForAll(t =>
{
ThisIsARecursiveFunctionInMemory(t);
});
break;
case 3:
Parallel.ForEach(children, t =>
{
ThisIsARecursiveFunctionInMemory(t);
});
break;
default:
break;
}
}
}
internal class DirWithSubDirs
{
public List<DirWithSubDirs> SubDirs = new List<DirWithSubDirs>();
public String Path { get; private set; }
public DirWithSubDirs(String path, int width, int depth)
{
this.Path = path;
if (depth > 0)
for (int i = 0; i < width; ++i)
SubDirs.Add(new DirWithSubDirs(path + "\\" + i, width, depth - 1));
}
}
}发布于 2014-09-24 20:24:35
Parallel.For和.ForEach方法是在内部实现的,相当于在任务中运行迭代,例如,循环如下:
Parallel.For(0, N, i =>
{
DoWork(i);
});相当于:
var tasks = new List<Task>(N);
for(int i=0; i<N; i++)
{
tasks.Add(Task.Factory.StartNew(state => DoWork((int)state), i));
}
Task.WaitAll(tasks.ToArray());从每一次迭代可能与其他迭代并行运行的角度来看,这是一个正常的心理模型,但在现实中并没有发生。实际上,并行并不一定每次迭代都使用一个任务,因为这比所需的开销要大得多。Parallel.ForEach尝试使用尽可能快地完成循环所需的最小任务数。当线程可用来处理这些任务时,它会增加任务,而这些任务中的每一个都参与了一个管理方案(我认为它被称为分块):一个任务要求进行多次迭代,得到它们,然后处理这些任务,然后返回到更多。块大小根据参与任务的数量、机器上的负载等而不同。
PLINQ的.AsParallel()有一个不同的实现,但它仍然可以同样地将多个迭代提取到一个临时存储中,在线程中进行计算(但不是作为一个任务),并将查询结果放入一个小缓冲区。(您获得了基于ParallelQuery的内容,然后进一步将.Whatever()函数绑定到提供并行实现的一组扩展方法中)。
因此,既然我们对这两种机制的工作方式有了一点了解,我将尝试为您最初的问题提供一个答案:
那么为什么.AsParallel()比Parallel.ForEach慢呢?原因有以下几点。任务(或它们在这里的等效实现)在I/O类调用上执行而不是块。他们“等待”并释放CPU来做其他的事情。但是(引用C#概述):“不阻塞线程,PLINQ就不能执行I/O绑定的工作”。呼叫是同步的。它们的编写意图是,如果(而且只有在)您正在做的事情,例如下载每个任务不占用CPU时间的情况下,就会增加并行性的程度。
函数调用与I/O绑定调用完全类似的原因是:一个线程(称为T)阻塞,在所有子线程完成之前什么也不做,这在这里可能是一个缓慢的过程。T本身并不是CPU密集型的,当它等待子程序解除阻塞时,它只是在等待。因此,它与典型的I/O绑定函数调用相同。
https://stackoverflow.com/questions/25907829
复制相似问题