Skip to content

C# 多线程 07-使用 PLINQ 03-调整 PLINQ 查询的参数

🏷️ 《C# 多线程》

调整 PLINQ 查询的参数

ParallelQuery

本例中使用了 ParallelQuery 中的如下方法

  • WithDegreeOfParallelism

    设置要在查询中使用的并行度。并行度是将用于处理查询的同时执行的任务的最大数目。

  • WithExecutionMode

    设置查询的执行模式

    • 参数

      • ParallelExecutionMode 查询执行模式

        • Default

          此设置为默认设置。PLINQ 将检查查询的结构,仅在可能带来加速时才对查询进行并行化。如果查询结构指示不可能获得加速,则 PLINQ 会将查询当作普通的 LINQ to Objects 查询来执行。

        • ForceParallelism

          并行化整个查询,即使要使用系统开销大的算法。如果认为并行执行查询将带来加速,则使用此标志,但处于默认模式的 PLINQ 将按顺序执行它。

  • WithMergeOptions

    设置此查询的合并选项,它指定查询对输出进行缓冲处理的方式。

    • 参数

      • ParallelMergeOptions

        指定查询中要使用的输出合并的首选类型。换言之,它指示 PLINQ 如何将多个分区的结果合并回单个结果序列。这仅是一个提示,系统在并行处理所有查询时可能不会考虑这一点。

        • Default

          使用默认合并类型,即 AutoBuffered

        • NotBuffered

          不利用输出缓冲区进行合并。一旦计算出结果元素,就向查询使用者提供这些元素。

        • AutoBuffered

          利用系统选定大小的输出缓冲区进行合并。在向查询使用者提供结果之前,会先将结果累计到输出缓冲区中。

        • FullyBuffered

          利用整个输出缓冲区进行合并。在向查询使用者提供任何结果之前,系统会先累计所有结果。

  • WithCancellation

    设置要与查询关联的 System.Threading.CancellationToken

  • AsOrdered

    启用将数据源视为“已经排序”的处理方法,重写默认的将数据源视为“未经排序”的处理方法。只可以对由 AsParallelParallelEnumerable.RangeParallelEnumerable.Repeat 返回的泛型序列调用 AsOrdered

示例代码

点击查看代码
csharp
/// <summary>
/// 调整 PLINQ 查询的参数
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    var parallelQuery = from t in GetTypes().AsParallel()
                        select EmulateProcessing(t);
    var cts = new CancellationTokenSource();
    cts.CancelAfter(TimeSpan.FromSeconds(3));

    try
    {
        parallelQuery
            // 设置要在查询中使用的并行度。并行度是将用于处理查询的同时执行的任务的最大数目。
            .WithDegreeOfParallelism(Environment.ProcessorCount)
            // 设置查询的执行模式
            // (ForceParallelism:并行化整个查询,即使要使用系统开销大的算法。)
            .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
            // 设置此查询的合并选项,它指定查询对输出进行缓冲处理的方式。
            // (Default:使用默认合并类型,即 AutoBuffered。)
            // (AutoBuffered:利用系统选定大小的输出缓冲区进行合并。在向查询使用者提供结果之前,会先将结果累计到输出缓冲区中。)
            .WithMergeOptions(ParallelMergeOptions.Default)
            // 设置要与查询关联的 System.Threading.CancellationToken。
            .WithCancellation(cts.Token)
            .ForAll(Console.WriteLine);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("---");
        Console.WriteLine("操作已被取消");
    }

    Console.WriteLine("---");
    Console.WriteLine("执行未排序的 PLINQ 查询");

    var unorderedQuery = from i in ParallelEnumerable.Range(1, 30)
                            select i;

    foreach (var i in unorderedQuery)
    {
        Console.WriteLine(i);
    }

    Console.WriteLine("---");
    Console.WriteLine("执行已排序的 PLINQ 查询");
    var orderedQuery = from i in ParallelEnumerable.Range(1, 30).AsOrdered()
                        select i;

    foreach (var i in orderedQuery)
    {
        Console.WriteLine(i);
    }

    Console.ReadLine();
}

static string EmulateProcessing(string typeName)
{
    Thread.Sleep(TimeSpan.FromMilliseconds(new Random(DateTime.Now.Millisecond).Next(250,350)));
    Console.WriteLine($"{typeName} type was processed on a thread id {Thread.CurrentThread.ManagedThreadId}");
    return typeName;
}

/// <summary>
/// 使用反射 API 查询加载到当前应用程序域中的所有组件中名称以“Web”开头的类型
/// </summary>
/// <returns></returns>
static IEnumerable<string
GetTypes()
{
    return from assembly in AppDomain.CurrentDomain.GetAssemblies()
            from type in assembly.GetExportedTypes()
            where type.Name.StartsWith("Web")
            orderby type.Name.Length
            select type.Name;
}

运行结果

点击查看运行结果
txt
WebSocket type was processed on a thread id 1
WebClient type was processed on a thread id 5
WebClient
WebSocket
WebRequest type was processed on a thread id 4
WebProxy type was processed on a thread id 6
WebProxy
WebRequest
WebPermission type was processed on a thread id 4
WebPermission
WebException type was processed on a thread id 6
WebException
WebUtility type was processed on a thread id 5
WebUtility
WebResponse type was processed on a thread id 1
WebResponse
WebSocketState type was processed on a thread id 6
WebSocketState
WebSocketError type was processed on a thread id 4
WebSocketError
WebSocketContext type was processed on a thread id 5
WebSocketContext
WebRequestMethods type was processed on a thread id 1
WebRequestMethods
WebUtilityElement type was processed on a thread id 6
WebUtilityElement
WebExceptionStatus type was processed on a thread id 4
WebExceptionStatus
WebHeaderCollection type was processed on a thread id 1
WebHeaderCollection
WebSocketException type was processed on a thread id 5
WebSocketException
WebSocketCloseStatus type was processed on a thread id 6
WebSocketCloseStatus
WebSocketMessageType type was processed on a thread id 4
WebSocketMessageType
WebProxyScriptElement type was processed on a thread id 1
WebProxyScriptElement
WebPermissionAttribute type was processed on a thread id 5
WebPermissionAttribute
WebSocketReceiveResult type was processed on a thread id 6
WebSocketReceiveResult
WebRequestModuleElement type was processed on a thread id 4
WebRequestModuleElement
WebRequestModulesSection type was processed on a thread id 1
WebRequestModulesSection
WebRequestModuleElementCollection type was processed on a thread id 5
WebRequestModuleElementCollection
---
执行未排序的PLINQ查询
1
9
17
24
2
10
18
25
3
11
19
26
4
12
20
27
5
13
21
28
6
14
22
29
7
15
23
30
8
16
---
执行已排序的PLINQ查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30