并行LINQ
并行查询
.NET4在System.Linq名称空间中包含一个新类ParalleIEnumerable ,可以分解查询的工作使其分布在多个线程上。尽管Enmerable类给IEnunerable<T>接口定义了扩展方法,但
ParalleIEnumerable 类的大多数扩展方法是ParallelQuery<TSource>类的扩展。一个重要的例外是AsParallel()方法,它扩展了IEnumerable<TSource>接口,返回ParallelQuery<TSource>类,所以正常的集合类可以以平行方式查询。
例:
const int arraySize = 100000000;
var data = new int[arraySize];
var r = new Random();
for (int i = 0; i < arraySize; i++)
{
data[i] = r.Next(40);
}
现在可以使用LINQ查询筛选数据,获取筛选数据的总和。该查询用where子句定义了一个筛选器,仅会中对应值小于20的项,接着调用聚合函数Sum()方法 。与前面的LINQ查询的唯一区别是,这次调用了AsParallel()方法。
var sum = (from x in data.AsParallel()
where x < 20
select x).Sum();
与前面的LINQ查询一样,编译器会修改语法,以调用AsParallel()、Where()、Select()和Sum()方法。AsParallel()方法用ParallelEnumerable类定义,以扩展IEnumerable<T>接口,所以对简单的数据调用它。AsParallel()方法返回ParallelQuery<TSource>。因为返回的类型,所以编译器选择的Where()方法是ParallelEnumerable.Where(),而不是Enumerable.Where()。在下面的代码中Select()和Sum()方法也来自ParallelEnumerable类。与Enumerable类的实现代码相反,对于ParallelEnumerable类,查询是分区的,以便多个线程可以同时处理该查询。数组可以分为多个部分,其中每个部分由不同的线程处理,以筛选其余项。完成分区的工作后,就需要合并,获得所有部分的总和。
var sum=data.AsParallel().Where(x=>x<20).Select(x=>x).Sum();
运行这行代码就会启动任务管理器,这样就可以看出系统的所有CPU都在忙碌。如果删除AsParallel()方法,就不可能使用多个CPU。当然,如果系统上没有多个CPU,就不会看到并行版本带来改进。
分区器
AsParallel()方法不仅扩展了IEnumerable<T>接口,还扩展了Partition类。通过它,可以影响要创建的分区。
Partitioner类用System.Collection.Concurrent命名空间定义,并且有不同变体。Create方法接受实现了IList<T>类的数组或对象。根据这一点,以及类型的参数loadBalance和该方法的一些重载版本,会返回一个不同的Partitioner类型。对于数组,.Net4包含派生自抽象基类OrderablePartitioner<TSource>的DynamicPartitionerForArray<TSource>类和StaticPartitionerForArray<TSource>类。
var q1 = (from x in Partitioner.Create(data).AsParallel()
where x < 20
select x).Sum();
也可以调用WithExecutionMode()和WithDegreeOfParallelism()方法可以传递ParallelExecutionMode的一个Default值或者ForceParallelism值。默认情况下,并行LINQ避免使用系统开销很高的并行机制。对于WithDegreeOfParallelism()方法,可以传递一个整数值,以指定并行运行的最大任务数。
const int arraySize = 100000000;
Stopwatch watch = new Stopwatch();
watch.Start();
//一种写法,没有添加动态负载均衡,执行完所需要的时间1300毫秒
var q1 = (from x in Partitioner.Create(data).AsParallel()
where x < 80
//第二种写法,添加了动态负载均衡,执行完所需要的时间为660毫秒。
var q1 = (from x in Partitioner.Create(data,true).AsParallel()
watch.Stop();
Console.WriteLine(watch.ElapsedMilliseconds.ToString());
取消
.Net提供了一种标准方式,来取消长时间运行的任务,这也适用于并行LINQ。要取消长时间的查询,可以给查询添加WithCancellation()方法,并传递一个CancellationToken令牌作为参数。CancellationToken令牌从CancellationTokenSource类中创建。该查询在单独的线程中运行,在该线程中,捕获一个OperationCancelException类型的异常。如果取消了查询,就触发这个异常。在主线程中,调用CancellationTokenSource类的Cancel()方法可以取消任务。
var cts = new CancellationTokenSource();
new Thread(() =>
{
try
{
var sum = (from x in data.AsParallel().WithCancellation(cts.Token)
where x < 80
Console.WriteLine("query finished, sum: {0}", sum);
}
catch (OperationCanceledException ex)
Console.WriteLine(ex.Message);
}).Start();
Console.WriteLine("query started");
Console.Write("cancel? ");
int input = Console.Read();
if (input == 'Y' || input == 'y')
// cancel!
cts.Cancel();
}