佳佳的博客
Menu
首页
《.NET Core 实战》 [No.200] 跨线程访问 BlockingCollection 集合
Posted by
佳佳
on 2020-03-19
IT
C#
.NET Core
《.NET Core 实战》
读书笔记
<!-- # 《.NET Core 实战》 [No.200] 跨线程访问 BlockingCollection 集合 --> <!-- dotnet-core-blocking-collection --> `BlockingCollection<T>` 类似于 `ConcurrentBag<T>` ,也是一个用于多线程访问的集合类,但是功能上要强大很多。 - `BlockingCollection<T>` 本身实现了类似于消息队列(*MQ*)的**生产者-消费者模式**。 - 可以设置集合的**容量上限**。 只能在创建实例时设置。 通过 *BoundedCapacity* 只读属性可以获取其容量上限。 若未指定,则默认值为 *int.MaxValue*,此时 *BoundedCapacity* 属性值为 *-1*。 - 集合为空时移除(*Take*)操作会被阻塞,集合满时新增(*Add*)操作会被阻塞。 *消费者*可以阻塞处理,直到*生产者*新增。 而当*消费者*消费过慢导致集合堆积达到容量上限时,会阻塞新增操作,直到*消费者*消费后释放了容量空间。 如果不想新增和移除处理被阻塞,`BlockingCollection<T>` 也提供了 *TryAdd* 和 *TryTake* 方法。 生产者可以调用 *CompleteAdding* 方法来标记生产已结束,此时消费者可以根据 *IsCompleted* 来判断是否所有的项都已被消费完毕。 - 封装实现了 `IProducerConsumerCollection<T>` 的任何集合类型。 在[上一篇博客][1]中介绍了 `ConcurrentBag<T>`,并在最后提到了 `ConcurrentQueue<T>` 和 `ConcurrentStack<T>` 类,这三种类型都实现了 `IProducerConsumerCollection<T>` 接口。 可在创建实例时指定封装的集合类型,默认为 `ConcurrentQueue<T>` 类型。 ```csharp BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000); ``` - 支持使用 `CancellationToken` 取消 *Take*、*Add* 等操作。 所有 `BlockingCollection<T>` 的操作都有带 `CancellationToken` 参数的重载。 取消时会触发 `OperationCanceledException` 异常,需要时可以手动捕捉该异常来响应取消请求。 下面是示例代码同[上一篇博客][1]一样,只是改成使用 `BlockingCollection<T>` 来实现。 由于 `BlockingCollection<T>` 实现了 `IDisposable` 接口,所以推荐使用 `using` 的方式创建实例。 ```csharp using (BlockingCollection<int> bc = new BlockingCollection<int>(10)) { Task t1 = Task.Run(() => { for (int i = 0; i < 20; i++) { Console.WriteLine($"即将添加元素:{i}"); bc.Add(i); } bc.CompleteAdding(); }); Task t2 = Task.Run(() => { while (true) { if (bc.TryTake(out int item)) { Console.WriteLine($"T2已取出元素:{item}"); } if (bc.IsCompleted) break; } }); Task t3 = Task.Run(() => { while (true) { if (bc.TryTake(out int item)) { Console.WriteLine($"T3已取出元素:{item}"); } if (bc.IsCompleted) break; } }); Task.WaitAll(t1, t2, t3); t1.Dispose(); t2.Dispose(); t3.Dispose(); } ``` 执行结果如下:  上面的示例中移除集合项使用的是 *TryTake* 方法,没有利用到 *Take* 方法的阻塞效果。 改成使用 *Take* 方法后代码如下。 ```csharp using (BlockingCollection<int> bc = new BlockingCollection<int>(10)) { Task t1 = Task.Run(() => { for (int i = 0; i < 20; i++) { Console.WriteLine($"即将添加元素:{i}"); bc.Add(i); } bc.CompleteAdding(); }); Task t2 = Task.Run(() => { while (true) { var item = bc.Take(); Console.WriteLine($"T2已取出元素:{item}"); if (bc.IsCompleted) break; } }); Task t3 = Task.Run(() => { while (true) { var item = bc.Take(); Console.WriteLine($"T2已取出元素:{item}"); if (bc.IsCompleted) break; } }); Task.WaitAll(t1, t2, t3); t1.Dispose(); t2.Dispose(); t3.Dispose(); } ``` 上面的代码有时可以正常执行,有时会引发如下异常: ```csharp System.InvalidOperationException HResult=0x80131509 Message=The collection argument is empty and has been marked as complete with regards to additions. Source=System.Collections.Concurrent StackTrace: at System.Collections.Concurrent.BlockingCollection`1.Take() at BlockingCollectionDemo.TakeDemo.<>c__DisplayClass0_0.<Main>b__2() in C:\Users\Administrator\source\repos\ConcurrentBagDemo\BlockingCollectionDemo\TakeDemo.cs:line 36 at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state) ``` 这是由于集合调用了 *CompleteAdding* 方法,且集合已经全部被消费结束后,仍然还有消费者调用 *Take* 方法时,就会引发该异常。 若要使用 *Take* 方法最好是加上 *try/catch* 处理,以免程序崩溃。 <!-- 链接 --> [1]: https://www.liujiajia.me/2020/3/18/dotnet-core-concurrent-bag (《.NET Core 实战》 [No.199] 多个 Task 同时操作 ConcurrentBag 集合) [2]: https://docs.microsoft.com/zh-cn/dotnet/standard/collections/thread-safe/blockingcollection-overview (BlockingCollection 概述) --- > 购买本书 => [《.NET Core实战:手把手教你掌握380个精彩案例》][10] -- *周家安* 著 --- [10]:https://union-click.jd.com/jdc?e=&p=AyIGZRhaEwAQBFUZXBIyEgRSEl0QCxc3EUQDS10iXhBeGlcJDBkNXg9JHU4YDk5ER1xOGRNLGEEcVV8BXURFUFdfC0RVU1JRUy1OVxUBFQ5THlIQMm1AEkRdb11GZyNTK0BBZwYIbylWcHILWStaJQITBlYbXB0LFQJlK1sSMkBpja3tzaejG4Gx1MCKhTdUK1sRCxQBVxtTEQIQBlwrXBULIloNXwZBXUReEStrJQEiN2UbaxYyUGlUG1kUBhcGUBILQgUXDlMeUkBVRlUBS10XBkIABhkJRzIQBlQfUg%3D%3D (《.NET Core实战:手把手教你掌握380个精彩案例》)
版权声明:原创文章,未经允许不得转载。
https://www.liujiajia.me/2020/3/19/dotnet-core-blocking-collection
“Buy me a nongfu spring”
« 《.NET Core 实战》 [No.201~207] 元组
《.NET Core 实战》 [No.199] 多个 Task 同时操作 ConcurrentBag 集合 »
昵称
*
电子邮箱
*
回复内容
*
(回复审核后才会显示)
提交
目录
AUTHOR
刘佳佳
江苏 - 苏州
软件工程师
梦嘉集团