Skip to content

C# 多线程 07-使用 PLINQ 05-管理 PLINQ 查询中的数据分区

🏷️ 《C# 多线程》

不同的线程在处理奇数长度和偶数长度的字符串

示例代码

csharp
/// <summary>
/// 管理 PLINQ 查询中的数据分区
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    var sw = Stopwatch.StartNew();
    var partitioner = new StringPartitioner(GetTypes());
    var parallelQuery = from t in partitioner.AsParallel()
                        //.WithDegreeOfParallelism(1)
                        select EmulateProcessing(t);

    parallelQuery.ForAll(PrintInfo);
    int count = parallelQuery.Count();
    sw.Stop();
    Console.WriteLine("------------------------------");
    Console.WriteLine($"Total items processed: {count}");
    Console.WriteLine($"Time elapsesd: {sw.Elapsed}");

    Console.ReadLine();
}

static void PrintInfo(string typeName)
{
    Thread.Sleep(TimeSpan.FromMilliseconds(150));
    Console.WriteLine($"{typeName} type was printed on a thread id {Thread.CurrentThread.ManagedThreadId}");
}

static string EmulateProcessing(string typeName)
{
    Thread.Sleep(TimeSpan.FromMilliseconds(150));
    Console.WriteLine($"{typeName} type was processed on a thread id {Thread.CurrentThread.ManagedThreadId}. Has {(typeName.Length % 2 == 0 ? "even" : "odd")} length.");
    return typeName;
}

static IEnumerable<string> GetTypes()
{
    var types = AppDomain.CurrentDomain.GetAssemblies().SelectMany(a => a.GetExportedTypes());
    return from type in types
            where type.Name.StartsWith("Web")
            select type.Name;
}

/// <summary>
/// 自定义的分区器
/// </summary>
public class StringPartitioner : Partitioner<string>
{
    private readonly IEnumerable<string> _data;

    public StringPartitioner(IEnumerable<string> data)
    {
        _data = data;
    }

    /// <summary>
    /// 重写为 false,声明只支持静态分区
    /// </summary>
    public override bool SupportsDynamicPartitions => false;

    /// <summary>
    /// 重载生成静态分区方法(长度为奇数和偶数的字符串,分别放在不同的分区)
    /// </summary>
    /// <param name="partitionCount"></param>
    /// <returns></returns>
    public override IList<IEnumerator<string>> GetPartitions(int partitionCount)
    {
        var result = new List<IEnumerator<string>>(partitionCount);

        for (int i = 1; i <= partitionCount; i++)
        {
            result.Add(CreateEnumerator(i, partitionCount));
        }

        return result;
    }
    
    private IEnumerator<string> CreateEnumerator(int partitionNumber, int partitionCount)
    {
        int evenPartitions = partitionCount / 2;
        bool isEven = partitionNumber % 2 == 0;
        int step = isEven ? evenPartitions : partitionCount - evenPartitions;

        int startIndex = partitionNumber / 2 + partitionNumber % 2;

        var q = _data
            .Where(v => !(v.Length % 2 == 0 ^ isEven) || partitionCount == 1)
            .Skip(startIndex - 1);
        return q
            .Where((x, i) => i % step == 0)
            .GetEnumerator();
    }
}

运行结果

WebExceptionStatus type was processed on a thread id 1. Has even length.
WebRequestMethods type was processed on a thread id 3. Has odd length.
WebClient type was processed on a thread id 5. Has odd length.
WebException type was processed on a thread id 4. Has even length.
WebExceptionStatus type was printed on a thread id 1
WebClient type was printed on a thread id 5
WebRequestMethods type was printed on a thread id 3
WebException type was printed on a thread id 4
WebProxy type was processed on a thread id 1. Has even length.
WebHeaderCollection type was processed on a thread id 3. Has odd length.
WebPermission type was processed on a thread id 5. Has odd length.
WebPermissionAttribute type was processed on a thread id 4. Has even length.
WebProxy type was printed on a thread id 1
WebHeaderCollection type was printed on a thread id 3
WebPermission type was printed on a thread id 5
WebPermissionAttribute type was printed on a thread id 4
WebUtility type was processed on a thread id 1. Has even length.
WebResponse type was processed on a thread id 3. Has odd length.
WebSocket type was processed on a thread id 5. Has odd length.
WebRequest type was processed on a thread id 4. Has even length.
WebUtility type was printed on a thread id 1
WebResponse type was printed on a thread id 3
WebSocket type was printed on a thread id 5
WebRequest type was printed on a thread id 4
WebSocketContext type was processed on a thread id 1. Has even length.
WebProxyScriptElement type was processed on a thread id 3. Has odd length.
WebRequestModuleElement type was processed on a thread id 5. Has odd length.
WebSocketCloseStatus type was processed on a thread id 4. Has even length.
WebSocketContext type was printed on a thread id 1
WebProxyScriptElement type was printed on a thread id 3
WebRequestModuleElement type was printed on a thread id 5
WebSocketCloseStatus type was printed on a thread id 4
WebSocketException type was processed on a thread id 1. Has even length.
WebRequestModuleElementCollection type was processed on a thread id 3. Has odd l
ength.
WebUtilityElement type was processed on a thread id 5. Has odd length.
WebSocketError type was processed on a thread id 4. Has even length.
WebSocketException type was printed on a thread id 1
WebRequestModuleElementCollection type was printed on a thread id 3
WebUtilityElement type was printed on a thread id 5
WebSocketError type was printed on a thread id 4
WebSocketReceiveResult type was processed on a thread id 1. Has even length.
WebSocketMessageType type was processed on a thread id 4. Has even length.
WebSocketReceiveResult type was printed on a thread id 1
WebSocketMessageType type was printed on a thread id 4
WebRequestModulesSection type was processed on a thread id 1. Has even length.
WebSocketState type was processed on a thread id 4. Has even length.
WebRequestModulesSection type was printed on a thread id 1
WebSocketState type was printed on a thread id 4
WebRequestMethods type was processed on a thread id 5. Has odd length.
WebException type was processed on a thread id 3. Has even length.
WebClient type was processed on a thread id 6. Has odd length.
WebExceptionStatus type was processed on a thread id 1. Has even length.
WebPermissionAttribute type was processed on a thread id 3. Has even length.
WebProxy type was processed on a thread id 1. Has even length.
WebHeaderCollection type was processed on a thread id 5. Has odd length.
WebPermission type was processed on a thread id 6. Has odd length.
WebUtility type was processed on a thread id 1. Has even length.
WebResponse type was processed on a thread id 5. Has odd length.
WebSocket type was processed on a thread id 6. Has odd length.
WebRequest type was processed on a thread id 3. Has even length.
WebRequestModuleElement type was processed on a thread id 6. Has odd length.
WebProxyScriptElement type was processed on a thread id 5. Has odd length.
WebSocketContext type was processed on a thread id 1. Has even length.
WebSocketCloseStatus type was processed on a thread id 3. Has even length.
WebSocketException type was processed on a thread id 1. Has even length.
WebSocketError type was processed on a thread id 3. Has even length.
WebRequestModuleElementCollection type was processed on a thread id 5. Has odd l
ength.
WebUtilityElement type was processed on a thread id 6. Has odd length.
WebSocketReceiveResult type was processed on a thread id 1. Has even length.
WebSocketMessageType type was processed on a thread id 3. Has even length.
WebSocketState type was processed on a thread id 3. Has even length.
WebRequestModulesSection type was processed on a thread id 1. Has even length.
------------------------------
Total items processed: 24
Time elapsesd: 00:00:03.1730246