Skip to content

C# 多线程 10-并行编程模式 03-使用 TPL 数据流实现并行管道

🏷️ 《C# 多线程》

TIP

需要添加对 Microsoft.Tpl.Dataflow 包的引用(通过 nuget 搜【Microsoft.Tpl.Dataflow】)

示例代码

csharp
/// <summary>
/// 使用 TPL 数据流实现并行管道
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    var t = ProcessAsynchronously();
    t.GetAwaiter().GetResult();
}

async static Task ProcessAsynchronously()
{
    var cts = new CancellationTokenSource();
    Random _rnd = new Random(DateTime.Now.Millisecond);

    Task.Run(() =>
    {
            if (Console.ReadKey().KeyChar == 'c')
            {
                cts.Cancel();
            }
    }, cts.Token);

    // BufferBlock:将元素传给流中的下一个块
    // BoundedCapacity:指定其容量,超过时不再接受新元素,直到一个现有元素被传递给下一个块
    var inputBlock = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = cts.Token });

    // TransformBlock:用于数据转换步骤
    //   MaxDegreeOfParallelism:通过该选项指定最大工作者线程数
    // 将 int 转换为 decimal
    var convertToDecimalBlock = new TransformBlock<int, decimal>(n =>
    {
        decimal result = Convert.ToDecimal(n * 100);
        Console.WriteLine($"Decimal Converter sent {result} to the next stage on {Thread.CurrentThread.ManagedThreadId}");
        Thread.Sleep(TimeSpan.FromMilliseconds(_rnd.Next(200)));
        return result;
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, CancellationToken = cts.Token });

    // 将 decimal 转换为 string
    var stringifyBlock = new TransformBlock<decimal, string>(n =>
    {
        string result = $"--{n.ToString("C", CultureInfo.GetCultureInfo("en-us"))}--";
        Console.WriteLine($"String Formatter sent {result} to the next stage on {Thread.CurrentThread.ManagedThreadId}");
        Thread.Sleep(TimeSpan.FromMilliseconds(_rnd.Next(200)));
        return result;
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, CancellationToken = cts.Token });

    // ActionBlock:对每个传入的元素运行一个指定的操作
    var outputBlock = new ActionBlock<string>(s => {
        Console.WriteLine($"The final result is {s} on thread id {Thread.CurrentThread.ManagedThreadId}");
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, CancellationToken = cts.Token });

    // 通过 LinkTo 方法将这些块连接到一起
    // PropagateCompletion = true : 当前步骤完成时,自动将结果和异常传播到下一个阶段
    inputBlock.LinkTo(convertToDecimalBlock, new DataflowLinkOptions { PropagateCompletion = true });
    convertToDecimalBlock.LinkTo(stringifyBlock, new DataflowLinkOptions { PropagateCompletion = true });
    stringifyBlock.LinkTo(outputBlock, new DataflowLinkOptions { PropagateCompletion = true });

    try
    {
        // 向块中添加项
        Parallel.For(0, 20, new ParallelOptions {
            MaxDegreeOfParallelism = 4, CancellationToken = cts.Token
        }, i => {
            Console.WriteLine($"added {i} to source data on thread id {Thread.CurrentThread.ManagedThreadId}");
            inputBlock.SendAsync(i).GetAwaiter().GetResult();
        });
        // 添加完成后需要调用 Complete 方法
        inputBlock.Complete();
        // 等待最后的块完成
        await outputBlock.Completion;
        Console.WriteLine("Press ENTER to exit.");
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Operation has been canceled! Press ENTER to exit.");
    }

    Console.ReadLine();
}

运行结果

txt
added 10 to source data on thread id 5
added 0 to source data on thread id 1
added 5 to source data on thread id 4
added 15 to source data on thread id 6
added 11 to source data on thread id 5
added 12 to source data on thread id 5
added 16 to source data on thread id 6
added 1 to source data on thread id 1
added 6 to source data on thread id 4
Decimal Converter sent 1000 to the next stage on 7
Decimal Converter sent 0 to the next stage on 7
Decimal Converter sent 1500 to the next stage on 7
Decimal Converter sent 500 to the next stage on 7
Decimal Converter sent 1100 to the next stage on 7
String Formatter sent --$1,000.00-- to the next stage on 7
String Formatter sent --$0.00-- to the next stage on 7
String Formatter sent --$1,500.00-- to the next stage on 7
String Formatter sent --$500.00-- to the next stage on 7
String Formatter sent --$1,100.00-- to the next stage on 7
added 7 to source data on thread id 4
added 8 to source data on thread id 4
added 9 to source data on thread id 4
added 13 to source data on thread id 4
added 17 to source data on thread id 6
added 2 to source data on thread id 1
Decimal Converter sent 1200 to the next stage on 8
added 14 to source data on thread id 4
added 18 to source data on thread id 6
The final result is --$1,000.00-- on thread id 7
The final result is --$0.00-- on thread id 7
The final result is --$1,500.00-- on thread id 7
The final result is --$500.00-- on thread id 7
The final result is --$1,100.00-- on thread id 7
Decimal Converter sent 1600 to the next stage on 7
Decimal Converter sent 100 to the next stage on 8
Decimal Converter sent 600 to the next stage on 7
Decimal Converter sent 700 to the next stage on 7
Decimal Converter sent 800 to the next stage on 8
Decimal Converter sent 900 to the next stage on 8
Decimal Converter sent 1300 to the next stage on 7
Decimal Converter sent 1700 to the next stage on 8
Decimal Converter sent 200 to the next stage on 7
added 3 to source data on thread id 4
added 4 to source data on thread id 4
Decimal Converter sent 1400 to the next stage on 4
added 19 to source data on thread id 6
Decimal Converter sent 1800 to the next stage on 6
String Formatter sent --$1,200.00-- to the next stage on 5
Decimal Converter sent 300 to the next stage on 7
Decimal Converter sent 400 to the next stage on 8
Decimal Converter sent 1900 to the next stage on 7
String Formatter sent --$1,600.00-- to the next stage on 5
String Formatter sent --$100.00-- to the next stage on 9
String Formatter sent --$600.00-- to the next stage on 6
String Formatter sent --$700.00-- to the next stage on 4
The final result is --$1,200.00-- on thread id 7
String Formatter sent --$800.00-- to the next stage on 5
The final result is --$1,600.00-- on thread id 7
String Formatter sent --$900.00-- to the next stage on 5
String Formatter sent --$1,300.00-- to the next stage on 6
String Formatter sent --$1,700.00-- to the next stage on 6
String Formatter sent --$200.00-- to the next stage on 4
String Formatter sent --$1,400.00-- to the next stage on 5
String Formatter sent --$1,800.00-- to the next stage on 6
The final result is --$100.00-- on thread id 8
The final result is --$700.00-- on thread id 8
The final result is --$800.00-- on thread id 8
The final result is --$900.00-- on thread id 8
The final result is --$600.00-- on thread id 7
String Formatter sent --$300.00-- to the next stage on 9
The final result is --$1,300.00-- on thread id 8
The final result is --$1,700.00-- on thread id 7
String Formatter sent --$400.00-- to the next stage on 9
String Formatter sent --$1,900.00-- to the next stage on 9
The final result is --$200.00-- on thread id 4
The final result is --$1,400.00-- on thread id 9
The final result is --$300.00-- on thread id 7
The final result is --$1,900.00-- on thread id 9
The final result is --$1,800.00-- on thread id 5
The final result is --$400.00-- on thread id 8
Press ENTER to exit.

运行结果分析

将打印结果分类整理后如下。

可以看出同上一个使用 BlockingCollection 的例子还是有区别的。

管道的执行顺序同加入块中的顺序有一小部分是不一致的,而且管道也不在是一个线程。

可以通过设定管道的最大线程数为 1 来实现顺序执行的效果。

txt
added 10 to source data on thread id 5
added 0 to source data on thread id 1
added 5 to source data on thread id 4
added 15 to source data on thread id 6
added 11 to source data on thread id 5
added 12 to source data on thread id 5
added 16 to source data on thread id 6
added 1 to source data on thread id 1
added 6 to source data on thread id 4
added 7 to source data on thread id 4
added 8 to source data on thread id 4
added 9 to source data on thread id 4
added 13 to source data on thread id 4
added 17 to source data on thread id 6
added 2 to source data on thread id 1
added 14 to source data on thread id 4
added 18 to source data on thread id 6
added 3 to source data on thread id 4
added 4 to source data on thread id 4
added 19 to source data on thread id 6
txt
Decimal Converter sent 1000 to the next stage on 7
Decimal Converter sent 0 to the next stage on 7
Decimal Converter sent 1500 to the next stage on 7
Decimal Converter sent 500 to the next stage on 7
Decimal Converter sent 1100 to the next stage on 7
Decimal Converter sent 1200 to the next stage on 8
Decimal Converter sent 1600 to the next stage on 7
Decimal Converter sent 100 to the next stage on 8
Decimal Converter sent 600 to the next stage on 7
Decimal Converter sent 700 to the next stage on 7
Decimal Converter sent 800 to the next stage on 8
Decimal Converter sent 900 to the next stage on 8
Decimal Converter sent 1300 to the next stage on 7
Decimal Converter sent 1700 to the next stage on 8
Decimal Converter sent 200 to the next stage on 7
Decimal Converter sent 1400 to the next stage on 4
Decimal Converter sent 1800 to the next stage on 6
Decimal Converter sent 300 to the next stage on 7
Decimal Converter sent 400 to the next stage on 8
Decimal Converter sent 1900 to the next stage on 7
txt
String Formatter sent --$1,000.00-- to the next stage on 7
String Formatter sent --$0.00-- to the next stage on 7
String Formatter sent --$1,500.00-- to the next stage on 7
String Formatter sent --$500.00-- to the next stage on 7
String Formatter sent --$1,100.00-- to the next stage on 7
String Formatter sent --$1,200.00-- to the next stage on 5
String Formatter sent --$1,600.00-- to the next stage on 5
String Formatter sent --$100.00-- to the next stage on 9
String Formatter sent --$600.00-- to the next stage on 6
String Formatter sent --$700.00-- to the next stage on 4
String Formatter sent --$800.00-- to the next stage on 5
String Formatter sent --$900.00-- to the next stage on 5
String Formatter sent --$1,300.00-- to the next stage on 6
String Formatter sent --$1,700.00-- to the next stage on 6
String Formatter sent --$200.00-- to the next stage on 4
String Formatter sent --$1,400.00-- to the next stage on 5
String Formatter sent --$1,800.00-- to the next stage on 6
String Formatter sent --$300.00-- to the next stage on 9
String Formatter sent --$400.00-- to the next stage on 9
String Formatter sent --$1,900.00-- to the next stage on 9
txt
The final result is --$1,000.00-- on thread id 7
The final result is --$0.00-- on thread id 7
The final result is --$1,500.00-- on thread id 7
The final result is --$500.00-- on thread id 7
The final result is --$1,100.00-- on thread id 7
The final result is --$1,200.00-- on thread id 7
The final result is --$1,600.00-- on thread id 7
The final result is --$100.00-- on thread id 8
The final result is --$700.00-- on thread id 8
The final result is --$800.00-- on thread id 8
The final result is --$900.00-- on thread id 8
The final result is --$600.00-- on thread id 7
The final result is --$1,300.00-- on thread id 8
The final result is --$1,700.00-- on thread id 7
The final result is --$200.00-- on thread id 4
The final result is --$1,400.00-- on thread id 9
The final result is --$300.00-- on thread id 7
The final result is --$1,900.00-- on thread id 9
The final result is --$1,800.00-- on thread id 5
The final result is --$400.00-- on thread id 8