Skip to content

.NET Core 实战 [No.199] 多个 Task 同时操作 ConcurrentBag 集合

🏷️ 《.NET Core 实战》

ConcurrentBag<T> 特点:泛型集合无序线程安全

  • 泛型集合

    • Add 方法添加元素
    • TryTake 方法取出元素然后从集合中删除该元素
    • TryPeek 方法取出元素但不会从集合中删除该元素
    • IsEmpty 属性表示集合是否是空集合
  • 无序

    从集合中取出元素的顺序和放入的顺序 可能 不一致。

  • 线程安全

    ConcurrentBag<T> 的优点就是可以在多个线程上都可以访问,而且是线程安全的。
    后面的几个示例就是为了验证这一点,并且另外使用 List<T>Queue<T> 来做对比。

示例

使用三个线程同时操作同一个集合,一个线程用来添加元素到集合,另外两个取出元素打印到窗口并从集合中删除该元素。

1. ConcurrentBag<T>

csharp
ConcurrentBag<int> bag = new ConcurrentBag<int>();
bool isOver = false;

Task t1 = Task.Run(() => {
    for (int i = 0; i < 20; i++)
    {
        Console.WriteLine($"即将添加元素:{i}");
        bag.Add(i);
    }
    isOver = true;
});

Task t2 = Task.Run(() => {
    while (true)
    {
        if (isOver && bag.IsEmpty)
        {
            break;
        }
        if (bag.TryTake(out int item))
        {
            Console.WriteLine($"T2 已取出元素:{item}");
        }
    }
});

Task t3 = Task.Run(() => {
    while (true)
    {
        if (isOver && bag.IsEmpty)
        {
            break;
        }
        if (bag.TryTake(out int item))
        {
            Console.WriteLine($"T3 已取出元素:{item}");
        }
    }
});

Task.WaitAll(t1, t2, t3);

由于 ConcurrentBag<T> 线程安全的特性,多线程同时操作时,无需额外的处理即可正常的执行。

由下图可以看出,每个元素被正确的添加一次和取出一次

2. List<T>

这里改成使用 List<T> 实现相同的功能。由于列表没有类似 TryTake 方法的功能,这里使用下标 0 来获取收个元素,然后使用 Remove 方法删除。

注意:列表中如果有重复的元素,多线程中使用 Remove 方法会导致删除本不应被删除的列表项。

csharp
List<int> list = new List<int>();
bool isOver = false;

Task t1 = Task.Run(() => {
    for (int i = 0; i < 20; i++)
    {
        Console.WriteLine($"即将添加元素:{i}");
        list.Add(i);
    }
    isOver = true;
});

Task t2 = Task.Run(() => {
    while (true)
    {
        if (isOver && list.Count <= 0)
        {
            break;
        }
        if (list.Count > 0)
        {
            var item = list[0];
            Console.WriteLine($"T2 已取出元素:{item}");
            list.Remove(item);
        }
    }
});

Task t3 = Task.Run(() => {
    while (true)
    {
        if (isOver && list.Count <= 0)
        {
            break;
        }
        if (list.Count > 0)
        {
            var item = list[0];
            Console.WriteLine($"T3 已取出元素:{item}");
            list.Remove(item);
        }
    }
});

Task.WaitAll(t1, t2, t3);

从下面结果的截图可以看出,同一个元素很容易被两个线程同时取出

3. Queue<T>

“先进先出”的队列(Queue<T>)类型和 ConcurrentBag<T> 比较像,有类似的 EnqueueTryDequeue 方法。

csharp
Queue<int> queue = new Queue<int>();
bool isOver = false;

Task t1 = Task.Run(() => {
    for (int i = 0; i < 20; i++)
    {
        Console.WriteLine($"即将添加元素:{i}");
        queue.Enqueue(i);
    }
    isOver = true;
});

Task t2 = Task.Run(() => {
    while (true)
    {
        if (isOver && queue.Count <= 0)
        {
            break;
        }
        if (queue.TryDequeue(out int item))
        {
            Console.WriteLine($"T2 已取出元素:{item}");
        }
    }
});

Task t3 = Task.Run(() => {
    while (true)
    {
        if (isOver && queue.Count <= 0)
        {
            break;
        }
        if (queue.TryDequeue(out int item))
        {
            Console.WriteLine($"T3 已取出元素:{item}");
        }
    }
});

Task.WaitAll(t1, t2, t3);

从下面的运行结果截图可以看出,Queue<T> 的表现相当奇怪,队列中应该是没有值的情况下 TryDequeue 方法的返回值仍然会 True 。而且不仅两个线程会打印重复的数值,同一个线程也会打印重复的数值

有时运行还会引发如下异常:

csharp
System.ArgumentException
 HResult=0x80070057
 Message=Source array was not long enough. Check the source index, length, and the array's lower bounds.
Arg_ParamName_Name
 Source=System.Private.CoreLib
 StackTrace:
  at System.Array.Copy(Array sourceArray, Int32 sourceIndex, Array destinationArray, Int32 destinationIndex, Int32 length, Boolean reliable)
  at System.Collections.Generic.Queue`1.SetCapacity(Int32 capacity)
  at System.Collections.Generic.Queue`1.Enqueue(T item)
  at QueueDemo.Program.<>c__DisplayClass0_0.<Main>b__0() in C:\Users\Administrator\source\repos\ConcurrentBagDemo\QueueDemo\Program.cs:line 18
  at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)

那么,如果我们需要在多线程中使用“先进先出”的队列或者“先进后出”的堆栈应该怎么实现呢?
C# 为我们提供了线程安全版的队列 ConcurrentQueue<T> 和堆栈 ConcurrentStack<T> 类型。这两个类型都在 System.Collections.Concurrent 命名空间下面。


参考:《.NET Core 实战:手把手教你掌握 380 个精彩案例》 -- 周家安 著