PLINQ

What is a Parallel Query?

Language-Integrated Query (LINQ) was introduced in the .NET Framework version 3.0 It features a unified model for querying any System.Collections.IEnumerable or System.Collections.Generic.IEnumerable data source in a type-safe manner. LINQ to Objects is the name for LINQ queries that are run against in-memory collections such as List and arrays. This article assumes that you have a basic understand of LINQ. Parallel LINQ (PLINQ) is a parallel implementation of the LINQ pattern. A PLINQ query in many ways resembles a non-parallel LINQ to Objects query. PLINQ queries, just like sequential LINQ queries, operate on any in-memory IEnumerable or IEnumerable data source, and have deferred execution, which means they do not begin executing until the query is enumerated. The primary difference is that PLINQ attempts to make full use of all the processors on the system. It does this by partitioning the data source into segments, and then executing the query on each segment on separate worker threads in parallel on multiple processors. In many cases, parallel execution means that the query runs significantly faster.

Through parallel execution, PLINQ can achieve significant performance improvements over legacy code for certain kinds of queries, often just by adding the AsParallel query operation to the data source. However, parallelism can introduce its own complexities, and not all query operations run faster in PLINQ. In fact, parallelization actually slows down certain queries. Therefore, you should understand how issues such as ordering affect parallel queries.

The remainder of this article gives an overview of the main PLINQ classes, and discusses how to create PLINQ queries. Each section contains links to more detailed information and code examples.

The ParallelEnumerable Class

The System.Linq.ParallelEnumerable class exposes almost all of PLINQ’s functionality. It and the rest of the System.Linq namespace types are compiled into the System.Core.dll assembly. The default C# and Visual Basic projects in Visual Studio both reference the assembly and import the namespace.

ParallelEnumerable includes implementations of all the standard query operators that LINQ to Objects supports, although it does not attempt to parallelize each one.

In addition to the standard query operators, the ParallelEnumerable class contains a set of methods that enable behaviors specific to parallel execution.

AsParallel: The entry point for PLINQ. Specifies that the rest of the query should be parallelized, if it is possible.

AsSequential:Specifies that the rest of the query should be run sequentially, as a non-parallel LINQ query.

AsOrdered:Specifies that PLINQ should preserve the ordering of the source sequence for the rest of the query, or until the ordering is changed, for example by the use of an orderby (Order By in Vlsual Basic) clause.

AsUnordered: Specifies that PLINQ for the rest of the query is not required to preserve the ordering of the source sequence.

WithCancellation: Specifies that PLINQ should periodically monitor the state of the provided cancellation token and cancel execution if it is requested.

WithDegreeOfParallelism: Specifies the maximum number of processors that PLINQ should use to parallelize the query.

WithMergeOptions: Provides a hint about how PLINQ should, if it is possible, merge parallel results back into just one sequence on the consuming thread.

WithExecutionMode: Specifies whether PLINQ should parallelize the query even when the default behavior would be to run it sequentially.

ForAll: A multithreaded enumeration method that, unlike iterating over the results of the query, enables results to be processed in parallel without first merging back to the consumer thread.

Aggregate overload: An overload that is unique to PLINQ and enables intermediate aggregation over thread-local partitions, plus a final aggregation function to combine the results of all partitions.


The Opt-in Model

When you write a query, opt in to PLINQ by invoking the ParallelEnumerable.AsParallel extension method on the data source, as shown in the following example.

var source = Enumerable.Range(1, 10000);


// Opt-in to PLINQ with AsParallel
var evenNums = from num in source.AsParallel()
               where Compute(num) > 0
               select num;

The AsParallel extension method binds the subsequent query operators, in this case, where and select, to the System.Linq.ParallelEnumerable implementations.

Execution Modes

By default, PLINQ is conservative. At run time, the PLINQ infrastructure analyzes the overall structure of the query. If the query is likely to yield speedups by parallelization, PLINQ partitions the source sequence into tasks that can be run concurrently. If it is not safe to parallelize a query, PLINQ just runs the query sequentially. If PLINQ has a choice between a potentially expensive parallel algorithm or an inexpensive sequential algorithm, it chooses the sequential algorithm by default. You can use the WithExecutionMode method and the System.Linq.ParallelExecutionMode enumeration to instruct PLINQ to select the parallel algorithm. This is useful when you know by testing and measurement that a particular query executes faster in parallel.

Degree of Parallelism

By default, PLINQ uses all of the processors on the host computer up to a maximum of 64. You can instruct PLINQ to use no more than a specified number of processors by using the WithDegreeOfParallelism method. This is useful when you want to make sure that other processes running on the computer receive a certain amount of CPU time. The following snippet limits the query to utilizing a maximum of two processors.


var query = from item in source.AsParallel().WithDegreeOfParallelism(2)
            where Compute(item) > 42
            select item;

In cases where a query is performing a significant amount of non-compute-bound work such as File I/O, it might be beneficial to specify a degree of parallelism greater than the number of cores on the machine.

Ordered Versus Unordered Parallel Queries

In some queries, a query operator must produce results that preserve the ordering of the source sequence. PLINQ provides the AsOrdered operator for this purpose. AsOrdered is distinct from AsSequential. An AsOrdered sequence is still processed in parallel, but its results are buffered and sorted. Because order preservation typically involves extra work, an AsOrdered sequence might be processed more slowly than the default AsUnordered sequence. Whether a particular ordered parallel operation is faster than a sequential version of the operation depends on many factors.

The following code example shows how to opt in to order preservation.

            evenNums = from num in numbers.AsParallel().AsOrdered()
                       where num % 2 == 0
                       select num;

Parallel vs. Sequential Queries

Some operations require that the source data be delivered in a sequential manner. The ParallelEnumerable query operators revert to sequential mode automatically when it is required. For user-defined query operators and user delegates that require sequential execution, PLINQ provides the AsSequential method. When you use AsSequential, all subsequent operators in the query are executed sequentially until AsParallel is called again.

Options for Merging Query Results

When a PLINQ query executes in parallel, its results from each worker thread must be merged back onto the main thread for consumption by a foreach loop (For Each in Visual Basic), or insertion into a list or array. In some cases, it might be beneficial to specify a particular kind of merge operation, for example, to begin producing results more quickly. For this purpose, PLINQ supports the WithMergeOptions method, and the ParallelMergeOptions enumeration.

The ForAll Operator

In sequential LINQ queries, execution is deferred until the query is enumerated either in a foreach (For Each in Visual Basic) loop or by invoking a method such as ToList , ToArray , or ToDictionary. In PLINQ, you can also use foreach to execute the query and iterate through the results. However, foreach itself does not run in parallel, and therefore, it requires that the output from all parallel tasks be merged back into the thread on which the loop is running. In PLINQ, you can use foreach when you must preserve the final ordering of the query results, and also whenever you are processing the results in a serial manner, for example when you are calling Console.WriteLine for each element. For faster query execution when order preservation is not required and when the processing of the results can itself be parallelized, use the ForAll method to execute a PLINQ query. ForAll does not perform this final merge step. The following code example shows how to use the ForAll method. System.Collections.Concurrent.ConcurrentBag is used here because it is optimized for multiple threads adding concurrently without attempting to remove any items.


var nums = Enumerable.Range(10, 10000);

var query = from num in nums.AsParallel()
            where num % 10 == 0
            select num;

// Process the results as each thread completes
// and add them to a System.Collections.Concurrent.ConcurrentBag(Of Int)
// which can safely accept concurrent add operations
query.ForAll((e) => concurrentBag.Add(Compute(e)));

Cancellation

PLINQ is integrated with the cancellation types in .NET Framework 4. (For more information, see Cancellation.) Therefore, unlike sequential LINQ to Objects queries, PLINQ queries can be canceled. To create a cancelable PLINQ query, use the WithCancellation operator on the query and provide a CancellationToken instance as the argument. When the IsCancellationRequested property on the token is set to true, PLINQ will notice it, stop processing on all threads, and throw an OperationCanceledException.

It is possible that a PLINQ query might continue to process some elements after the cancellation token is set.
For greater responsiveness, you can also respond to cancellation requests in long-running user delegates.

Exceptions

When a PLINQ query executes, multiple exceptions might be thrown from different threads simultaneously. Also, the code to handle the exception might be on a different thread than the code that threw the exception. PLINQ uses the AggregateException type to encapsulate all the exceptions that were thrown by a query, and marshal those exceptions back to the calling thread. On the calling thread, only one try-catch block is required. However, you can iterate through all of the exceptions that are encapsulated in the AggregateException and catch any that you can safely recover from. In rare cases, some exceptions may be thrown that are not wrapped in an AggregateException, and ThreadAbortExceptions are also not wrapped.

When exceptions are allowed to bubble up back to the joining thread, then it is possible that a query may continue to process some items after the exception is raised.

Custom Partitioners

In some cases, you can improve query performance by writing a custom partitioner that takes advantage of some characteristic of the source data. In the query, the custom partitioner itself is the enumerable object that is queried.


[Visual Basic]
Dim arr(10000) As Integer
Dim partitioner = New MyArrayPartitioner(Of Integer)(arr)
Dim query = partitioner.AsParallel().Select(Function(x) SomeFunction(x))


[C#]
int[] arr= ...;
Partitioner partitioner = newMyArrayPartitioner(arr);
var q = partitioner.AsParallel().Select(x => SomeFunction(x));

PLINQ supports a fixed number of partitions (although data may be dynamically reassigned to those partitions during run time for load balancing.). For and ForEach support only dynamic partitioning, which means that the number of partitions changes at run time.

Measuring PLINQ Performance

In many cases, a query can be parallelized, but the overhead of setting up the parallel query outweighs the performance benefit gained. If a query does not perform much computation or if the data source is small, a PLINQ query may be slower than a sequential LINQ to Objects query. You can use the Parallel Performance Analyzer in Visual Studio Team Server to compare the performance of various queries, to locate processing bottlenecks, and to determine whether your query is running in parallel or sequentially.