c# Parallel类的使用
Parallel类是对线程的抽象,提供数据与任务的并行性。类定义了静态方法For和ForEach,使用多个任务来完成多个作业。Parallel.For和Parallel.ForEach方法在每次迭代的时候调用相同的代码,而Parallel.Invoke()方法允许同时调用不同的方法。Parallel.ForEach()方法用于数据的并行性,Parallel.Invoke()方法用于任务的并行性。
1、For()方法
For()方法用于多次执行一个任务,可以并行运行迭代,但迭代的顺序并没指定。For()方法前两个参数为定义循环的开始和结束,第三个参数为Action
staticvoidParallelFor()
{
ParallelLoopResultresult=
Parallel.For(0,10,asynci=>
{
Console.WriteLine("{0},task:{1},thread:{2}",i,
Task.CurrentId,Thread.CurrentThread.ManagedThreadId);
awaitTask.Delay(10);//异步方法,用于释放线程供其他任务使用。完成后,可能看不到方法的输出,因为主(前台线)程结束,所有的后台线程也将结束
Console.WriteLine("{0},task:{1},thread:{2}",i,Task.CurrentId,Thread.CurrentThread.ManagedThreadId);
});
Console.WriteLine("Iscompleted:{0}",result.IsCompleted);
}
异步功能虽然方便,但是知道后台发生了什么仍然重要,必须留意。
提前停止For()方法
可以根据条件提前停止For()方法,而不必完成全部的迭代。,传入参数ParallelLoopState的对象,调用Break()方法或者Stop()方法。如调用Break()方法,当迭代值大于15的时候中断(当前线程结束,类似于普通for的Continue),但其他任务可以同时运行,有其他值的任务也可以运行(如果当前线程是主线程,那么就等同于Stop(),结束所有线程)。Stop()方法结束的是所有操作(类似于普通for的Break)。利用LowestBreakIteration属性可以忽略其他任务的结果:
staticvoidParallelFor()
{
ParallelLoopResultresult=Parallel.For(10,40,(inti,ParallelLoopStatepls)=>
{
Console.WriteLine("i:{0}task{1}",i,Task.CurrentId);
Thread.Sleep(10);
if(i>15)
pls.Break();
});
Console.WriteLine("Iscompleted:{0}",result.IsCompleted);
if(!result.IsCompleted)
Console.WriteLine("lowestbreakiteration:{0}",result.LowestBreakIteration);
}
For()方法可以使用几个线程执行循环。如果要对每个线程进行初始化,就需要使用到For
- 前两个参数是对应的循环起始和终止条件;
- 第二个参数类型是Func
,返回一个值,传递给第三个参数。 - 第三个参数类型是Func
,是循环体的委托,其内部的第一个参数是循环迭代,内部第二个参数允许停止迭代,内部第三个参数用于接收For()方法的前一个参数的返回值。循环体应当返回与For()循环泛型类型一致的值。 - 第四个参数是指定的一个委托,用于执行相关后续操作。
staticvoidParallelFor()
{
Parallel.For(0,20,()=>
{
//invokedonceforeachthread
Console.WriteLine("initthread{0},task{1}",Thread.CurrentThread.ManagedThreadId,Task.CurrentId);
returnString.Format("t{0}",Thread.CurrentThread.ManagedThreadId);
},
(i,pls,str1)=>
{
//invokedforeachmember
Console.WriteLine("bodyi{0}str1{1}thread{2}task{3}",i,str1,Thread.CurrentThread.ManagedThreadId,Task.CurrentId);
Thread.Sleep(10);
returnString.Format("i{0}",i);
},
(str1)=>
{
//finalactiononeachthread
Console.WriteLine("finally{0}",str1);
});
}
2、使用ForEach()方法循环
ForEach()方法遍历实现了IEnumerable的集合,其方式类似于foreach语句,但是以异步方式遍历,没有确定的顺序。如果要中断循环,同样可以采用ParallelLoopState参数。ForEach
staticvoidParallelForeach()
{
string[]data={"zero","one","two","three","four","five","six","seven","eight","nine","ten","eleven","twelve"};
ParallelLoopResultresult=Parallel.ForEach(data,s=>
{
Console.WriteLine(s);
});
Parallel.ForEach(data,(s,pls,l)=>
{
Console.WriteLine("{0}{1}",s,l);
});
}
3、调用多个方法
如果有多个任务并行,可以使用Parallel.Invoke()方法,它提供任务的并行性模式:
staticvoidParallelInvoke()
{
Parallel.Invoke(Foo,Bar);
}
staticvoidFoo()
{
Console.WriteLine("foo");
}
staticvoidBar()
{
Console.WriteLine("bar");
}
4、For()方法的取消
在For()方法的重载方法中,可以传递一个ParallelOptions类型的参数,利用此参数可以传递一个CancellationToken参数。使用CancellationTokenSource对象用于注册CancellationToken,并允许调用Cancel方法用于取消操作。
一旦取消操作,For()方法就抛出一个OperationCanceledException类型的异常,使用CancellationToken可以注册取消操作时的信息。调用Register方法,传递一个在取消操作时调用的委托。通过取消操作,可以将其他的迭代操作在启动之前取消,但已经启动的迭代操作允许完成。取消操作是以协作方式进行的,以避免在取消迭代操作的中间泄露资源。
staticvoidCancelParallelLoop()
{
varcts=newCancellationTokenSource();
cts.Token.ThrowIfCancellationRequested();
cts.Token.Register(()=>Console.WriteLine("**tokencancelled"));
//在500ms后取消标记
cts.CancelAfter(500);
try
{
ParallelLoopResultresult=Parallel.For(0,100,
newParallelOptions()
{
CancellationToken=cts.Token
},
x=>
{
Console.WriteLine("loop{0}started",x);
intsum=0;
for(inti=0;i<100;i++)
{
Thread.Sleep(2);
sum+=i;
}
Console.WriteLine("loop{0}finished",x);
});
}
catch(OperationCanceledExceptionex)
{
Console.WriteLine(ex.Message);
}
}
5、发现存在的问题
使用并行循环时,若出现以下两个问题,需要使用Partitioner(命名空间System.Collections.Concurrent中)解决。
- 使用并行循环时,应确保每次迭代的工作量要明显大于同步共享状态的开销。如果循环把时间都耗在了阻塞式的访问共享的循环变量上,那么并行执行的好处就很容易完全丧失。尽可能让每次循环迭代都只是在局部进行,避免阻塞式访问造成的损耗。见示例1
- 并行循环的每一次迭代都会生成一个委托,如果每次生成委托或方法的开销比迭代完成的工作量大,使用并行方案就不适合了(委托会设计两类开销:构造开销和调用开销。大多数调用开销和普通方法的调用差不多。但委托是一种对象,构造开销可能相当大,最好是只做一次构造,然后把对象缓存起来)。见示例2
示例1中,求1000000000以内所有自然数开方的和。第一部分采用直接计算的方式,第二部分采用分区计算。第二部分的Partitioner会把需要迭代的区间分拆为多个不同的空间,并存入Tuple对象中。
/* 示例1 */publicstaticvoidPartitionerTest()
{
//使用计时器
System.Diagnostics.Stopwatchstopwatch=newSystem.Diagnostics.Stopwatch();
constintmaxValue=1000000000;
longsum=0;
stopwatch.Restart();//开始计时
Parallel.For(0,maxValue,(i)=>{
Interlocked.Add(refsum,(long)Math.Sqrt(i));//Interlocked是原子操作,多线程访问时的线程互斥操作
});
stopwatch.Stop();
Console.WriteLine($"Parallel.For:{stopwatch.Elapsed}");//我的机器运行出的时间是:00:01:37.0391204
varpartitioner=System.Collections.Concurrent.Partitioner.Create(0,maxValue);//拆分区间
sum=0;
stopwatch.Restart();
Parallel.ForEach(partitioner,(rang)=>{
longpartialSum=0;
//迭代区间的数据
for(inti=rang.Item1;i
Partitioner的分区是静态的,只要迭代分区划分完成,每个分区上都会运行一个委托。如果某一段区间的迭代次数提前完成,也不会尝试重新分区并让处理器分担工作。对于任意IEnumerable类型都可以创建不指定区间的分区,但这样就会让每个迭代项目都创建一个委托,而不是对每个区间创建委托。创建自定义的Partitioner可以解决这个问题,代码比较复杂。请自行参阅:http://www.writinghighperf.net/go/20
示例2中,采用一个委托方法来计算两个数之间的关系值。前一种是每次运行都重新构造委托,后一种是先构造出委托的方法而后每一次调用。
//声明一个委托
privatedelegateintMathOp(intx,inty);
privateintAdd(intx,inty)
{
returnx+y;
}
privateintDoOperation(MathOpop,intx,inty)
{
returnop(x,y);
}
/*
*委托会设计两类开销:构造开销和调用开销。大多数调用开销和普通方法的调用差不多。但委托是一种对象,构造开销可能相当大,最好是只做一次构造,然后把对象缓存起来。
*/
publicvoidTest()
{
System.Diagnostics.Stopwatchstopwatch=newSystem.Diagnostics.Stopwatch();
stopwatch.Restart();
for(inti=0;i<10;i++)
{
//每一次遍历循环,都会产生一次构造和调用开销
DoOperation(Add,1,2);
}
stopwatch.Stop();
Console.WriteLine("Constructionandinvocation:{0}",stopwatch.Elapsed);//00:00:00.0003812
stopwatch.Restart();
MathOpop=Add;//只产生一次构造开销
for(inti=0;i<10;i++)
{
DoOperation(op,1,2);//每一次遍历都只产生遍历开销
}
stopwatch.Stop();
Console.WriteLine("OnceConstructionandinvocation:{0}",stopwatch.Elapsed);//00:00:00.0000011
}
以上就是c#Parallel类的使用的详细内容,更多关于c#Parallel类的资料请关注毛票票其它相关文章!