Skip to content

C# 多线程 06-使用并发集合 02-使用 ConcurrentQueue 实现异步处理

🏷️ 《C# 多线程》

使用 ConcurrentQueue 实现异步处理

csharp
/// <summary>
/// 使用 ConcurrentQueue 实现异步处理
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    Task t = RunProgram();
    t.Wait();

    Console.ReadLine();
}

static async Task RunProgram()
{
    var taskQueue = new ConcurrentQueue<CustomTask>();
    var cts = new CancellationTokenSource();
    // 异步创建任务
    var taskSource = Task.Run(() => TaskProducer(taskQueue));

    // 创建 4 个任务处理线程
    Task[] processors = new Task[4];
    for (int i = 1; i <= 4; i++)
    {
        string processorId = i.ToString();
        processors[i - 1] = Task.Run(() => TaskProcessor(taskQueue, $"Processor {processorId}", cts.Token));
    }

    // 等待创建任务结束
    await taskSource;
    // 延迟 2 秒发送取消指令,确保创建的任务被处理完
    cts.CancelAfter(TimeSpan.FromSeconds(2));
}

/// <summary>
/// 创建任务
/// </summary>
/// <param name="queue"></param>
/// <returns></returns>
static async Task TaskProducer(ConcurrentQueue<CustomTask> queue)
{
    for (int i = 1; i <= 20; i++)
    {
        await Task.Delay(50);
        // 创建任务加入队列
        var workItem = new CustomTask { Id = i };
        queue.Enqueue(workItem);
        Console.WriteLine($"Task {workItem.Id} has been posted");
    }
}

/// <summary>
/// 任务处理程序
/// </summary>
/// <param name="queue">队列</param>
/// <param name="name">消费程序名</param>
/// <param name="token">令牌(取消任务用)</param>
/// <returns></returns>
static async Task TaskProcessor(ConcurrentQueue<CustomTask> queue, string name, CancellationToken token)
{
    CustomTask workItem;
    bool dequeueSuccesful = false;

    // 若任务未取消,则延迟随机时间后尝试从队列中获取任务
    await GetRandomDelay();
    do
    {
        dequeueSuccesful = queue.TryDequeue(out workItem);
        if (dequeueSuccesful)
        {
            Console.WriteLine($"Task {workItem.Id} has been processed by {name}");
        }
        await GetRandomDelay();
    } while (!token.IsCancellationRequested);

}

/// <summary>
/// 获取随机的延迟时间
/// </summary>
/// <returns></returns>
static Task GetRandomDelay()
{
    int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
    return Task.Delay(delay);
}

private class CustomTask
{
    public int Id { get; set; }
}

运行结果

txt
Task 1 has been posted
Task 1 has been processed by Processor 3
Task 2 has been posted
Task 3 has been posted
Task 2 has been processed by Processor 3
Task 3 has been processed by Processor 2
Task 4 has been posted
Task 5 has been posted
Task 6 has been posted
Task 7 has been posted
Task 4 has been processed by Processor 3
Task 7 has been processed by Processor 4
Task 5 has been processed by Processor 2
Task 6 has been processed by Processor 1
Task 8 has been posted
Task 9 has been posted
Task 9 has been processed by Processor 2
Task 8 has been processed by Processor 1
Task 10 has been posted
Task 10 has been processed by Processor 1
Task 11 has been posted
Task 12 has been posted
Task 11 has been processed by Processor 4
Task 12 has been processed by Processor 1
Task 13 has been posted
Task 14 has been posted
Task 13 has been processed by Processor 2
Task 14 has been processed by Processor 3
Task 15 has been posted
Task 15 has been processed by Processor 4
Task 16 has been posted
Task 16 has been processed by Processor 2
Task 17 has been posted
Task 18 has been posted
Task 17 has been processed by Processor 3
Task 18 has been processed by Processor 3
Task 19 has been posted
Task 19 has been processed by Processor 1
Task 20 has been posted
Task 20 has been processed by Processor 4