C# 多线程 10-并行编程模式 02-使用 BlockingCollection 实现并行管道
🏷️ 《C# 多线程》
示例代码
csharp
private const int CollectionsNumber = 4;
private const int Count = 5;
/// <summary>
/// 使用 BlockingCollection 实现并行管道
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
var cts = new CancellationTokenSource();
// 监视 c 键(按下 c 键取消执行)
Task.Run(() =>
{
if (Console.ReadKey().KeyChar == 'c')
{
cts.Cancel();
}
}, cts.Token);
// 创建 4 个集合,每个集合中有 5 个元素
var sourceArrays = new BlockingCollection<int>[CollectionsNumber];
for (int i = 0; i < sourceArrays.Length; i++)
{
sourceArrays[i] = new BlockingCollection<int>(Count);
}
// 第一个管道:将 int 型数据转换成 Decimal 型
var convertToDecimal = new PipelineWorker<int, decimal>(
sourceArrays,
n => Convert.ToDecimal(n * 100),
cts.Token,
"Decimal Converter"
);
// 第二个管道:格式化 Decimal 数据为金额字符串
var stringifyNumber = new PipelineWorker<decimal, string>(
convertToDecimal.Output,
s => $"--{s.ToString("C", CultureInfo.GetCultureInfo("en-us"))}--",
cts.Token,
"Console Formatter"
);
// 第三个管道:打印格式化后的结果
var outputResultToConsole = new PipelineWorker<string, string>(
stringifyNumber.Output,
s => Console.WriteLine($"The final result is {s} on thread id {Thread.CurrentThread.ManagedThreadId}"),
cts.Token,
"Console Output"
);
try
{
Parallel.Invoke(
// 初始化集合数据
() => CreateInitialValues(sourceArrays, cts),
// 将 int 型数据转换成 Decimal 型
() => convertToDecimal.Run(),
// 格式化 Decimal 数据为金额字符串
() => stringifyNumber.Run(),
// 打印格式化后的结果
() => outputResultToConsole.Run()
);
}
catch (AggregateException ae)
{
foreach (var ex in ae.InnerExceptions)
{
Console.WriteLine(ex.Message + ex.StackTrace);
}
}
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("Operation has been canceled! Press ENTER to exit.");
}
else
{
Console.WriteLine("Press ENTER to exit.");
}
Console.ReadLine();
}
/// <summary>
/// 初始化集合数据
/// </summary>
/// <param name="sourceArrays"></param>
/// <param name="cts"></param>
static void CreateInitialValues(BlockingCollection<int>[] sourceArrays, CancellationTokenSource cts)
{
Parallel.For(0, sourceArrays.Length * Count, (j, state) =>
{
if (cts.Token.IsCancellationRequested)
{
state.Stop();
}
int number = GetRandomNumber(j);
int k = BlockingCollection<int>.TryAddToAny(sourceArrays, j);
if (k >= 0)
{
Console.WriteLine($"added {j} to source data on thread id {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(TimeSpan.FromMilliseconds(number));
}
});
foreach (var arr in sourceArrays)
{
arr.CompleteAdding();
}
}
static int GetRandomNumber(int seed)
{
return new Random(seed).Next(500);
}
/// <summary>
/// 管道类
/// </summary>
/// <typeparam name="TInput"></typeparam>
/// <typeparam name="TOutput"></typeparam>
class PipelineWorker<TInput, TOutput>
{
Func<TInput, TOutput> _processor;
Action<TInput> _outputProcessor;
BlockingCollection<TInput>[] _input;
CancellationToken _token;
Random _rnd;
public BlockingCollection<TOutput>[] Output { get; private set; }
public string Name { get; private set; }
public PipelineWorker(
BlockingCollection<TInput>[] input,
Func<TInput, TOutput> processor,
CancellationToken token,
string name)
{
_input = input;
Output = new BlockingCollection<TOutput>[_input.Length];
for (int i = 0; i < Output.Length; i++)
{
Output[i] = null == input[i] ? null : new BlockingCollection<TOutput>(Count);
}
_processor = processor;
_token = token;
Name = name;
_rnd = new Random(DateTime.Now.Millisecond);
}
public PipelineWorker(
BlockingCollection<TInput>[] input,
Action<TInput> renderer,
CancellationToken token,
string name)
{
_input = input;
_outputProcessor = renderer;
_token = token;
Name = name;
Output = null;
_rnd = new Random(DateTime.Now.Millisecond);
}
public void Run()
{
Console.WriteLine($"{Name} is running");
while (!_input.All(bc => bc.IsCompleted) && !_token.IsCancellationRequested)
{
TInput receivedItem;
// 尝试从集合中获取元素;如没有元素则会等待
int i = BlockingCollection<TInput>.TryTakeFromAny(_input, out receivedItem, 50, _token);
if (i >= 0)
{
if (Output != null)
{
TOutput outputItem = _processor(receivedItem);
BlockingCollection<TOutput>.AddToAny(Output, outputItem);
Console.WriteLine($"{Name} sent {outputItem} to next, on thread id {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(TimeSpan.FromMilliseconds(_rnd.Next(200)));
}
else
{
_outputProcessor(receivedItem);
}
}
else
{
Thread.Sleep(TimeSpan.FromMilliseconds(50));
}
}
if (Output != null)
{
foreach (var bc in Output)
{
bc.CompleteAdding();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
运行结果
txt
Decimal Converter is running
Console Output is running
Console Formatter is running
added 0 to source data on thread id 1
Decimal Converter sent 0 to next, on thread id 4
Console Formatter sent --$0.00-- to next, on thread id 6
The final result is --$0.00-- on thread id 5
added 1 to source data on thread id 1
Console Formatter sent --$100.00-- to next, on thread id 6
Decimal Converter sent 100 to next, on thread id 4
The final result is --$100.00-- on thread id 5
added 2 to source data on thread id 1
Decimal Converter sent 200 to next, on thread id 4
The final result is --$200.00-- on thread id 5
Console Formatter sent --$200.00-- to next, on thread id 6
added 3 to source data on thread id 1
Decimal Converter sent 300 to next, on thread id 4
Console Formatter sent --$300.00-- to next, on thread id 6
The final result is --$300.00-- on thread id 5
added 4 to source data on thread id 1
Console Formatter sent --$400.00-- to next, on thread id 6
Decimal Converter sent 400 to next, on thread id 4
added 5 to source data on thread id 7
The final result is --$400.00-- on thread id 5
added 6 to source data on thread id 7
Decimal Converter sent 500 to next, on thread id 4
Console Formatter sent --$500.00-- to next, on thread id 6
Decimal Converter sent 600 to next, on thread id 4
Console Formatter sent --$600.00-- to next, on thread id 6
The final result is --$500.00-- on thread id 5
The final result is --$600.00-- on thread id 5
added 8 to source data on thread id 1
Console Formatter sent --$800.00-- to next, on thread id 6
Decimal Converter sent 800 to next, on thread id 4
The final result is --$800.00-- on thread id 5
added 10 to source data on thread id 8
The final result is --$1,000.00-- on thread id 5
Console Formatter sent --$1,000.00-- to next, on thread id 6
Decimal Converter sent 1000 to next, on thread id 4
added 7 to source data on thread id 7
Decimal Converter sent 700 to next, on thread id 4
Console Formatter sent --$700.00-- to next, on thread id 6
The final result is --$700.00-- on thread id 5
added 11 to source data on thread id 7
Decimal Converter sent 1100 to next, on thread id 4
Console Formatter sent --$1,100.00-- to next, on thread id 6
The final result is --$1,100.00-- on thread id 5
added 9 to source data on thread id 1
The final result is --$900.00-- on thread id 5
Console Formatter sent --$900.00-- to next, on thread id 6
Decimal Converter sent 900 to next, on thread id 4
added 15 to source data on thread id 8
Decimal Converter sent 1500 to next, on thread id 4
Console Formatter sent --$1,500.00-- to next, on thread id 6
added 12 to source data on thread id 7
The final result is --$1,500.00-- on thread id 5
added 17 to source data on thread id 1
Decimal Converter sent 1200 to next, on thread id 4
Console Formatter sent --$1,200.00-- to next, on thread id 6
The final result is --$1,200.00-- on thread id 5
Decimal Converter sent 1700 to next, on thread id 4
Console Formatter sent --$1,700.00-- to next, on thread id 6
The final result is --$1,700.00-- on thread id 5
added 16 to source data on thread id 8
Decimal Converter sent 1600 to next, on thread id 4
Console Formatter sent --$1,600.00-- to next, on thread id 6
The final result is --$1,600.00-- on thread id 5
added 18 to source data on thread id 1
Decimal Converter sent 1800 to next, on thread id 4
Console Formatter sent --$1,800.00-- to next, on thread id 6
The final result is --$1,800.00-- on thread id 5
added 19 to source data on thread id 1
Decimal Converter sent 1900 to next, on thread id 4
Console Formatter sent --$1,900.00-- to next, on thread id 6
The final result is --$1,900.00-- on thread id 5
added 13 to source data on thread id 7
Decimal Converter sent 1300 to next, on thread id 4
Console Formatter sent --$1,300.00-- to next, on thread id 6
The final result is --$1,300.00-- on thread id 5
added 14 to source data on thread id 7
Decimal Converter sent 1400 to next, on thread id 4
Console Formatter sent --$1,400.00-- to next, on thread id 6
The final result is --$1,400.00-- on thread id 5
Press ENTER to exit.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
运行结果分析
混在一起不方便理解,将上面的结果中同类信息分类后整理如下:
1. 初始化集合 CreateInitialValues
因为方法中使用了并行迭代函数Parallel.For
,所以有多个线程 id 出现。
txt
added 0 to source data on thread id 1
added 1 to source data on thread id 1
added 2 to source data on thread id 1
added 3 to source data on thread id 1
added 4 to source data on thread id 1
added 5 to source data on thread id 7
added 6 to source data on thread id 7
added 8 to source data on thread id 1
added 10 to source data on thread id 8
added 7 to source data on thread id 7
added 11 to source data on thread id 7
added 9 to source data on thread id 1
added 15 to source data on thread id 8
added 12 to source data on thread id 7
added 17 to source data on thread id 1
added 16 to source data on thread id 8
added 18 to source data on thread id 1
added 19 to source data on thread id 1
added 13 to source data on thread id 7
added 14 to source data on thread id 7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2. 管道 1:convertToDecimal
这是定义的第一个管道 (PipelineWorker
),将int
型转换成Decimal
型。
注意其执行顺序,因为是基于CreateInitialValues创建的结合创建的管道,所以顺序和加入到集合中的顺序是一致的。
另外线程也是单独的同一个线程。
txt
Decimal Converter sent 0 to next, on thread id 4
Decimal Converter sent 100 to next, on thread id 4
Decimal Converter sent 200 to next, on thread id 4
Decimal Converter sent 300 to next, on thread id 4
Decimal Converter sent 400 to next, on thread id 4
Decimal Converter sent 500 to next, on thread id 4
Decimal Converter sent 600 to next, on thread id 4
Decimal Converter sent 800 to next, on thread id 4
Decimal Converter sent 1000 to next, on thread id 4
Decimal Converter sent 700 to next, on thread id 4
Decimal Converter sent 1100 to next, on thread id 4
Decimal Converter sent 900 to next, on thread id 4
Decimal Converter sent 1500 to next, on thread id 4
Decimal Converter sent 1200 to next, on thread id 4
Decimal Converter sent 1700 to next, on thread id 4
Decimal Converter sent 1600 to next, on thread id 4
Decimal Converter sent 1800 to next, on thread id 4
Decimal Converter sent 1900 to next, on thread id 4
Decimal Converter sent 1300 to next, on thread id 4
Decimal Converter sent 1400 to next, on thread id 4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
3. 管道 2:stringifyNumber
定义的第二个管道,将Decimal
格式化成金额。
同样的顺序同上一步的顺序一致,线程是单独的同一个线程。
txt
Console Formatter sent --$0.00-- to next, on thread id 6
Console Formatter sent --$100.00-- to next, on thread id 6
Console Formatter sent --$200.00-- to next, on thread id 6
Console Formatter sent --$300.00-- to next, on thread id 6
Console Formatter sent --$400.00-- to next, on thread id 6
Console Formatter sent --$500.00-- to next, on thread id 6
Console Formatter sent --$600.00-- to next, on thread id 6
Console Formatter sent --$800.00-- to next, on thread id 6
Console Formatter sent --$1,000.00-- to next, on thread id 6
Console Formatter sent --$700.00-- to next, on thread id 6
Console Formatter sent --$1,100.00-- to next, on thread id 6
Console Formatter sent --$900.00-- to next, on thread id 6
Console Formatter sent --$1,500.00-- to next, on thread id 6
Console Formatter sent --$1,200.00-- to next, on thread id 6
Console Formatter sent --$1,700.00-- to next, on thread id 6
Console Formatter sent --$1,600.00-- to next, on thread id 6
Console Formatter sent --$1,800.00-- to next, on thread id 6
Console Formatter sent --$1,900.00-- to next, on thread id 6
Console Formatter sent --$1,300.00-- to next, on thread id 6
Console Formatter sent --$1,400.00-- to next, on thread id 6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
4. 管道 3:outputResultToConsole
定义的第三个管道,打印格式化后的结果。
同样的顺序也是一致的,线程是单独的同一个线程。
txt
The final result is --$0.00-- on thread id 5
The final result is --$100.00-- on thread id 5
The final result is --$200.00-- on thread id 5
The final result is --$300.00-- on thread id 5
The final result is --$400.00-- on thread id 5
The final result is --$500.00-- on thread id 5
The final result is --$600.00-- on thread id 5
The final result is --$800.00-- on thread id 5
The final result is --$1,000.00-- on thread id 5
The final result is --$700.00-- on thread id 5
The final result is --$1,100.00-- on thread id 5
The final result is --$900.00-- on thread id 5
The final result is --$1,500.00-- on thread id 5
The final result is --$1,200.00-- on thread id 5
The final result is --$1,700.00-- on thread id 5
The final result is --$1,600.00-- on thread id 5
The final result is --$1,800.00-- on thread id 5
The final result is --$1,900.00-- on thread id 5
The final result is --$1,300.00-- on thread id 5
The final result is --$1,400.00-- on thread id 5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20