Skip to content

C# 多线程 10-并行编程模式 04-使用 PLINQ 实现 Map/Reduce 模式

🏷️ 《C# 多线程》

Map/Reduce 模式

Map/Reduce 功能是另一个重要的并行编程模式。它适用于小程序以及拥有大量的多个服务器端计算的场景。

该模式的含义是你有两个特殊的功能要应用于你的数据。

  1. Map 函数

    接收一组键/值列表的初始数据,并产生另一组键/值序列,将初始数据转换为适合的格式以便进行下一部处理。

  2. Reduce 函数

    使用 Map 函数的结果,并将其转换为我们真正需要的尽可能小的数据集。

示例代码

csharp
class Program
{
    /// <summary>
    ///  分隔符
    /// </summary>
    static char[] delimiters = { ' ', ',', ';', '\"', '.' };

    /// <summary>
    /// 使用 PLINQ 实现 Map/Reduce 模式
    /// </summary>
    /// <param name="args"></param>
    static void Main(string[] args)
    {
        // 书籍列表
        var booksList = new Dictionary<string, string>()
        {
            ["Moby Dick; Or, The Whale by Herman Melville"] = "http://www.gutenberg.org/cache/epub/2701/pg2701.txt",
            ["The Adventures of Tom Sawyer by Mark Twain"] = "http://www.gutenberg.org/cache/epub/74/pg74.txt",
            ["Treasure Islan by Robert Louis Stevenson"] = "http://www.gutenberg.org/cache/epub/120/pg120.txt",
            ["The Picture of Dorian Gray by Oscar Wilde"] = "http://www.gutenberg.org/cache/epub/174/pg174.txt",
        };

        // 异步获取过滤词汇
        HashSet<string> stopwords = DownloadStopWordsAsync().GetAwaiter().GetResult();

        var output = new StringBuilder();
        
        // 并行处理书籍
        Parallel.ForEach(booksList.Keys, key => {
            // 异步下载书籍
            var bookContent = DownloadBookAsync(booksList[key]).GetAwaiter().GetResult();
            // 异步统计书籍
            string result = ProcessBookAsync(bookContent, key, stopwords).GetAwaiter().GetResult();
            // 打印结果
            output.Append(result);
            output.AppendLine();
        });

        Console.Write(output.ToString());
        Console.ReadLine();
    }

    async static Task<string> ProcessBookAsync(string bookContent, string title, HashSet<string> stopwords)
    {
        using (var reader = new StringReader(bookContent))
        {
            var query = reader.EnumLines() // 异步获取文件所有行
                .AsParallel() // 并行化
                .SelectMany(line => line.Split(delimiters)) // 对每一行分词
                .MapReduce( // 调用自定义的 MapReduce 方法
                    word => new[] { word.ToLower() },
                    key => key,
                    g => new[] { new { Word = g.Key, Count = g.Count() } }
                )
                .ToList();

            // 过滤单词并根据统计数倒序排序
            var words = query
                .Where(element => !string.IsNullOrEmpty(element.Word) && !stopwords.Contains(element.Word))
                .OrderByDescending(element => element.Count);

            var sb = new StringBuilder();

            sb.AppendLine($"'{title}' book stats");
            sb.AppendLine($"Top ten words used in this book:");
            // 打印 TOP 10 单词
            foreach (var w in words.Take(10))
            {
                sb.AppendLine($"Word: '{w.Word}', times used: '{w.Count}'");
            }

            sb.AppendLine($"Unique Words used: {query.Count()}");

            return sb.ToString();
        }
    }

    async static Task<string> DownloadBookAsync(string bookUrl)
    {
        using (var client = new HttpClient())
        {
            return await client.GetStringAsync(bookUrl);
        }
    }

    async static Task<HashSet<string>> DownloadStopWordsAsync()
    {
        string url = "https://raw.githubusercontent.com/6/stopwords/master/stopwords-all.json";

        using (var client = new HttpClient())
        {
            try
            {
                var content = await client.GetStringAsync(url);
                var words = JsonConvert.DeserializeObject<Dictionary<string, string[]>>(content);
                return new HashSet<string>(words["en"]);
            }
            catch
            {
                return new HashSet<string>();
            }
        }
    }
}

/// <summary>
/// 扩展方法类
/// </summary>
static class Extensions
{
    /// <summary>
    /// 自定义的 Map/Reduce 扩展方法
    /// </summary>
    /// <typeparam name="TSource"></typeparam>
    /// <typeparam name="TMapped"></typeparam>
    /// <typeparam name="TKey"></typeparam>
    /// <typeparam name="TResult"></typeparam>
    /// <param name="source">源</param>
    /// <param name="map">获取单个元素 Func</param>
    /// <param name="keySelector">统计 Func</param>
    /// <param name="reduce">查询结果 Func</param>
    /// <returns></returns>
    public static ParallelQuery<TResult> MapReduce<TSource, TMapped, TKey, TResult>(
        this ParallelQuery<TSource> source,
        Func<TSource, IEnumerable<TMapped>> map,
        Func<TMapped, TKey> keySelector,
        Func<IGrouping<TKey, TMapped>, IEnumerable<TResult>> reduce
        )
    {
        return source
            .SelectMany(map)
            .GroupBy(keySelector)
            .SelectMany(reduce);
    }

    public static IEnumerable<string> EnumLines(this StringReader reader)
    {
        while (true)
        {
            string line = reader.ReadLine();
            if (null == line)
            {
                yield break;
            }

            yield return line;
        }
    }
}

打印结果

txt
'The Adventures of Tom Sawyer by Mark Twain' book stats
Top ten words used in this book:
Word: '?', times used: '61'
Word: '??', times used: '19'
Word: '???', times used: '5'
Word: 'p?', times used: '3'
Word: '}?', times used: '2'
Word: '=', times used: '2'
Word: '0?', times used: '2'
Word: '??{', times used: '2'
Word: 't?', times used: '2'
Word: '#', times used: '2'
Unique Words used: 4150

'Treasure Islan by Robert Louis Stevenson' book stats
Top ten words used in this book:
Word: 'man', times used: '227'
Word: 'captain', times used: '205'
Word: 'silver', times used: '194'
Word: 'doctor', times used: '151'
Word: 'time', times used: '130'
Word: 'good', times used: '123'
Word: 'hand', times used: '119'
Word: 'long', times used: '114'
Word: 'back', times used: '106'
Word: 'cried', times used: '103'
Unique Words used: 7452

'The Picture of Dorian Gray by Oscar Wilde' book stats
Top ten words used in this book:
Word: 'dorian', times used: '390'
Word: 'lord', times used: '247'
Word: 'henry', times used: '220'
Word: 'life', times used: '216'
Word: 'gray', times used: '176'
Word: 'man', times used: '168'
Word: 'harry', times used: '141'
Word: 'basil', times used: '135'
Word: 'things', times used: '124'
Word: 'thing', times used: '118'
Unique Words used: 8226

'Moby Dick; Or, The Whale by Herman Melville' book stats
Top ten words used in this book:
Word: '?', times used: '230'
Word: '??', times used: '71'
Word: '???', times used: '38'
Word: '????', times used: '10'
Word: '', times used: '8'
Word: 'e?', times used: '5'
Word: '?a', times used: '5'
Word: '<', times used: '5'
Word: '\', times used: '5'
Word: '{', times used: '5'
Unique Words used: 13003