Skip to content

.NET Core RabbitMQ 消费者

🏷️ .NET Core RabbitMQ

大致的方案是在配置文件中配置消息队列及其消费的处理方法,然后创建订阅并通过反射执行配置的处理方法来消费。

connection(连接)整个应用程序共用一个,channel 则是每个队列共同一个 channel 则是每个消费者订阅单独使用一个。

下面是示例代码,仅供参考。

示例代码

  1. 新建 .NET Core 控制台工程 RabbitMQConsumers,目标框架为 .NET Core 2.1

  2. 安装所需程序包

    powershell
    Install-Package RabbitMQ.Client -Version 5.1.0
    Install-Package Newtonsoft.Json -Version 12.0.2
    Install-Package Microsoft.AspNetCore.All -Version 2.1.5
  3. 创建配置用的类 MQSetting.cs(MQ 设置)、ConsumerSetting.cs(消费设置)和 QueueSetting.cs(队列设置)。

    MQSetting.cs

    csharp
    /// <summary>
    /// MQ 设置
    /// </summary>
    class MQSetting
    {
        /// <summary>
        /// MQ 服务器地址
        /// </summary>
        public string HostName { get; set; }
    
        /// <summary>
        /// 用户名
        /// </summary>
        public string UserName { get; set; }
    
        /// <summary>
        /// 密码
        /// </summary>
        public string Password { get; set; }
    }

    ConsumerSetting.cs

    csharp
    /// <summary>
    /// 消费设置
    /// </summary>
    class ConsumerSetting
    {
        /// <summary>
        /// 程序集名称
        /// </summary>
        public string AssemblyName { get; set; }
    
        /// <summary>
        /// 类名
        /// </summary>
        public string ClassName { get; set; }
    
        /// <summary>
        /// 方法名
        /// </summary>
        public string MethodName { get; set; }
    
        /// <summary>
        /// 是否自动应答
        /// </summary>
        public bool AutoAck { get; set; } = true;
    
        /// <summary>
        /// 是否自动应答为 false 时,若发生异常,是否重新回队列。
        /// true:重回队列;false:直接丢弃;
        /// </summary>
        public bool Requeue { get; set; } = false;
    
        /// <summary>
        /// 消费者数量
        /// </summary>
        public int ConsumerCount { get; set; } = 1;
    
        /// <summary>
        /// 队列设置
        /// </summary>
        public QueueSetting QueueSetting { get; set; }
    }

    QueueSetting.cs

    csharp
    /// <summary>
    /// 队列设置
    /// </summary>
    class QueueSetting
    {
        /// <summary>
        /// 队列名称
        /// </summary>
        public string QueueName { get; set; }
    
        /// <summary>
        /// 持久化队列
        /// </summary>
        public bool Durable { get; set; } = true;
    
        /// <summary>
        /// 独占队列
        /// </summary>
        public bool Exclusive { get; set; } = false;
    
        /// <summary>
        /// 自动删除
        /// </summary>
        public bool AutoDelete { get; set; } = false;
    
        /// <summary>
        /// 参数
        /// </summary>
        public IDictionary<string, object> Arguments = null;
    }
  4. 创建消费启动接口及类 IRabbitMQConsumerApplication.cs RabbitMQConsumerApplication.cs

    IRabbitMQConsumerApplication.cs

    csharp
    interface IRabbitMQConsumerApplication
    {
        void Start();
    }

    RabbitMQConsumerApplication.cs

    csharp
    /// <summary>
    /// RabbitMQ 消费 App
    /// </summary>
    class RabbitMQConsumerApplication : IRabbitMQConsumerApplication
    {
        /// <summary>
        /// MQ 设置
        /// </summary>
        MQSetting _mqSetting;
    
        /// <summary>
        /// MQ 消费者设置
        /// </summary>
        List<ConsumerSetting> _consumerSettings;
    
        ILoggerFactory _loggerFactory;
    
        /// <summary>
        /// Logger
        /// </summary>
        ILogger _logger;
    
        /// <summary>
        /// 构造函数
        /// </summary>
        public RabbitMQConsumerApplication(IOptions<MQSetting> mqSetting, IOptions<List<ConsumerSetting>> consumerSettings, ILoggerFactory loggerFactory)
        {
            _mqSetting = mqSetting.Value;
            _consumerSettings = consumerSettings.Value;
            _loggerFactory = loggerFactory;
            _logger = loggerFactory.CreateLogger(nameof(RabbitMQConsumerApplication));
        }
    
        /// <summary>
        /// 启动 RabbitMQ 消费程序
        /// </summary>
        public void Start()
        {
            var factory = new ConnectionFactory()
            {
                HostName = _mqSetting.HostName,
                UserName = _mqSetting.UserName,
                Password = _mqSetting.Password
            };
    
            using (var connection = factory.CreateConnection())
            {
                foreach (var consumerSetting in _consumerSettings)
                {
                    for (int i = 0; i < consumerSetting.ConsumerCount; i++)
                    {
                        Task.Run(() =>
                        {
                            using (var channel = connection.CreateModel())
                            {
                                channel.QueueDeclare(
                                    queue: consumerSetting.QueueSetting.QueueName,
                                    durable: consumerSetting.QueueSetting.Durable,
                                    exclusive: consumerSetting.QueueSetting.Exclusive,
                                    autoDelete: consumerSetting.QueueSetting.AutoDelete,
                                    arguments: consumerSetting.QueueSetting.Arguments);
    
                                _logger.LogInformation($"StartConsumer : {JsonConvert.SerializeObject(consumerSetting)}");
    
                                var consumer = new EventingBasicConsumer(channel);
    
                                consumer.Received += (currentConsumer, ea) =>
                                {
                                    try
                                    {
                                        var body = ea.Body;
                                        var message = Encoding.UTF8.GetString(body);
    
                                        _logger.LogInformation($"mission start ( message: {message} )");
    
                                        var assembly = Assembly.Load(consumerSetting.AssemblyName);
                                        var type = assembly.GetType(consumerSetting.ClassName);
                                        var method = type.GetMethod(consumerSetting.MethodName);
    
                                        method.Invoke(Activator.CreateInstance(type, _loggerFactory), new object[] { message });
    
                                        _logger.LogInformation($"mission success");
    
                                        if (!consumerSetting.AutoAck)
                                        {
                                            channel.BasicAck(ea.DeliveryTag, false);
                                        }
                                    }
                                    catch (Exception ex)
                                    {
                                        _logger.LogError(ex, "mission fail");
                                        if (!consumerSetting.AutoAck)
                                        {
                                            channel.BasicReject(ea.DeliveryTag, consumerSetting.Requeue);
                                        }
                                    }
                                };
    
                                channel.BasicConsume(queue: consumerSetting.QueueSetting.QueueName,
                                                    autoAck: consumerSetting.AutoAck,
                                                    consumer: consumer);
    
                                _logger.LogInformation($"StartConsumer Successed");
    
                                Thread.Sleep(Timeout.Infinite);
                            }
                        });
                    }
                }
    
                Thread.Sleep(Timeout.Infinite);
            }
        }
    }
  5. 创建具体的消费类 HelloConsumer.cs

    csharp
    /// <summary>
    /// Hello 消费者
    /// </summary>
    class HelloConsumer
    {
        /// <summary>
        /// Logger
        /// </summary>
        ILogger _logger;
    
        public HelloConsumer(ILoggerFactory loggerFactory)
        {
            _logger = loggerFactory.CreateLogger(nameof(HelloConsumer));
        }
    
        /// <summary>
        /// 处理消息
        /// </summary>
        /// <param name="message"></param>
        public void HandleMessage(string message)
        {
            Console.WriteLine(" [x] {0}", message);
    
            _logger.LogInformation(message);
        }
    }
  6. 在配置文件中配置 MQ、队列及消费 appsettings.json

    json
    {
        "Logging": {
            "LogLevel": {
                "Default": "Debug"
            }
        },
        "MqSetting": {
            "HostName": "192.168.0.69",
            "UserName": "octopus",
            "Password": "octopus"
        },
        "MqConsumerSetting": [
            {
                "AssemblyName": "RabbitMQConsumers",
                "ClassName": "RabbitMQConsumers.HelloConsumer",
                "MethodName": "HandleMessage",
                "AutoAck": false,
                "Requeue": false,
                "ConsumerCount": 2,
                "QueueSetting": {
                    "Arguments": null,
                    "QueueName": "QUEUE_HELLO",
                    "Durable": true,
                    "Exclusive": false,
                    "AutoDelete": false
                }
            }
        ]
    }
  7. Program.cs 中读取配置并启动消费

    csharp
    class Program
    {
        static void Main(string[] args)
        {
            IServiceCollection services = new ServiceCollection();
    
            // 配置
            var configuration = new ConfigurationBuilder().AddJsonFile("appsettings.json").Build();
    
            services.Configure<MQSetting>(configuration.GetSection("MqSetting"));
            services.Configure<List<ConsumerSetting>>(configuration.GetSection("MqConsumerSetting"));
            services.AddLogging((builder) => builder
                .AddConfiguration(configuration.GetSection("Logging"))
                .AddConsole());
    
            // 注入
            services.AddSingleton<IRabbitMQConsumerApplication, RabbitMQConsumerApplication>();
    
            // 构建容器
            IServiceProvider serviceProvider = services.BuildServiceProvider();
    
            // 启动 MQ 消费
            Task.Run(() => {
                serviceProvider.GetService<IRabbitMQConsumerApplication>().Start();
            });
    
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
    
            Environment.Exit(0);
        }
    }

参考文档

  1. .NET/C# Client API Guide
  2. RabbitMQ 入门教程 (十二):消息确认 Ack
  3. .NET Core 使用 RabbitMQ
  4. 基于 RabbitMQ.Client 组件实现 RabbitMQ 可复用的 ConnectionPool(连接池)
  5. LogLevel Enum
  6. .Net Core 中的日志组件 (Logging)
  7. 在.NET Core 控制台程序中使用依赖注入
  8. .net core 读取 json 格式的配置文件
  9. ASP.NET Core 源码学习之 Logging 3:Logger

2019/06/27 追记

之前关于 RabbitMQConsumerApplicationConsumerSetting.ConsumerCount 的使用有误,应该每个消费者在单独的线程中。(上面的代码已更新)

另外关于 EventingBasicConsumer 通过订阅消费队列的过程,跟我想象的有些出入。

本以为是 RabbitMQ 会将一个消息推送给订阅者,待其消费结束后再推送下一个;但实际上其过程是 RabbitMQ 会将当前队列中 noack 的所有消息通过某种方式(貌似不是轮询,而是将所有任务按照当前订阅者的数量分段)分配给当前的订阅者。

这就导致当 AutoAck 设置为 true 时可能会发生如下的情况:当队列中的消息已经推送到了消费者,但消费者在未处理结束时发生异常导致进程结束了(比如人为关闭、系统崩溃、停电等),此时已经推送到消费者的消费并不会重新回到队列(其在推送到消费者时就被从队列中删除了)。所以在将消费者配置为 AutoAck (自动应答)时需谨慎些。

虽然每个消费者同时可能接收到了很多消息,但是同一个消费者同一时间貌似只能处理一个,其他的任务处于等待执行的状态。在消费处理中增加 Thread.Sleep 可以很明显的看到这个效果。