佳佳的博客
Menu
首页
[.NET Core] RabbitMQ 消费者
Posted by
佳佳
on 2019-06-24
IT
RabbitMQ
.NET Core
<!-- # [.NET Core] RabbitMQ 消费者 --> <!-- dnc-rabbitmq-consumer --> 大致的方案是在配置文件中配置消息队列及其消费的处理方法,然后创建订阅并通过反射执行配置的处理方法来消费。 *connection*(连接)整个应用程序共用一个,~~*channel* 则是每个队列共同一个~~ *channel* 则是每个消费者订阅单独使用一个。 下面是示例代码,仅供参考。 ## 示例代码 1. 新建 .NET Core 控制台工程 *RabbitMQConsumers*,目标框架为 *.NET Core 2.1*。 2. 安装所需程序包 ```powershell Install-Package RabbitMQ.Client -Version 5.1.0 Install-Package Newtonsoft.Json -Version 12.0.2 Install-Package Microsoft.AspNetCore.All -Version 2.1.5 ``` 3. 创建配置用的类 *MQSetting.cs*(MQ设置)、*ConsumerSetting.cs*(消费设置)和 *QueueSetting.cs*(队列设置)。 *MQSetting.cs* ```csharp /// <summary> /// MQ设置 /// </summary> class MQSetting { /// <summary> /// MQ服务器地址 /// </summary> public string HostName { get; set; } /// <summary> /// 用户名 /// </summary> public string UserName { get; set; } /// <summary> /// 密码 /// </summary> public string Password { get; set; } } ``` *ConsumerSetting.cs* ```csharp /// <summary> /// 消费设置 /// </summary> class ConsumerSetting { /// <summary> /// 程序集名称 /// </summary> public string AssemblyName { get; set; } /// <summary> /// 类名 /// </summary> public string ClassName { get; set; } /// <summary> /// 方法名 /// </summary> public string MethodName { get; set; } /// <summary> /// 是否自动应答 /// </summary> public bool AutoAck { get; set; } = true; /// <summary> /// 是否自动应答为 false 时,若发生异常,是否重新回队列。 /// true:重回队列;false:直接丢弃; /// </summary> public bool Requeue { get; set; } = false; /// <summary> /// 消费者数量 /// </summary> public int ConsumerCount { get; set; } = 1; /// <summary> /// 队列设置 /// </summary> public QueueSetting QueueSetting { get; set; } } ``` *QueueSetting.cs* ```csharp /// <summary> /// 队列设置 /// </summary> class QueueSetting { /// <summary> /// 队列名称 /// </summary> public string QueueName { get; set; } /// <summary> /// 持久化队列 /// </summary> public bool Durable { get; set; } = true; /// <summary> /// 独占队列 /// </summary> public bool Exclusive { get; set; } = false; /// <summary> /// 自动删除 /// </summary> public bool AutoDelete { get; set; } = false; /// <summary> /// 参数 /// </summary> public IDictionary<string, object> Arguments = null; } ``` 4. 创建消费启动接口及类 *IRabbitMQConsumerApplication.cs* *RabbitMQConsumerApplication.cs* *IRabbitMQConsumerApplication.cs* ```csharp interface IRabbitMQConsumerApplication { void Start(); } ``` *RabbitMQConsumerApplication.cs* ```csharp /// <summary> /// RabbitMQ消费App /// </summary> class RabbitMQConsumerApplication : IRabbitMQConsumerApplication { /// <summary> /// MQ设置 /// </summary> MQSetting _mqSetting; /// <summary> /// MQ消费者设置 /// </summary> List<ConsumerSetting> _consumerSettings; ILoggerFactory _loggerFactory; /// <summary> /// Logger /// </summary> ILogger _logger; /// <summary> /// 构造函数 /// </summary> public RabbitMQConsumerApplication(IOptions<MQSetting> mqSetting, IOptions<List<ConsumerSetting>> consumerSettings, ILoggerFactory loggerFactory) { _mqSetting = mqSetting.Value; _consumerSettings = consumerSettings.Value; _loggerFactory = loggerFactory; _logger = loggerFactory.CreateLogger(nameof(RabbitMQConsumerApplication)); } /// <summary> /// 启动RabbitMQ消费程序 /// </summary> public void Start() { var factory = new ConnectionFactory() { HostName = _mqSetting.HostName, UserName = _mqSetting.UserName, Password = _mqSetting.Password }; using (var connection = factory.CreateConnection()) { foreach (var consumerSetting in _consumerSettings) { for (int i = 0; i < consumerSetting.ConsumerCount; i++) { Task.Run(() => { using (var channel = connection.CreateModel()) { channel.QueueDeclare( queue: consumerSetting.QueueSetting.QueueName, durable: consumerSetting.QueueSetting.Durable, exclusive: consumerSetting.QueueSetting.Exclusive, autoDelete: consumerSetting.QueueSetting.AutoDelete, arguments: consumerSetting.QueueSetting.Arguments); _logger.LogInformation($"StartConsumer : {JsonConvert.SerializeObject(consumerSetting)}"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (currentConsumer, ea) => { try { var body = ea.Body; var message = Encoding.UTF8.GetString(body); _logger.LogInformation($"mission start ( message: {message} )"); var assembly = Assembly.Load(consumerSetting.AssemblyName); var type = assembly.GetType(consumerSetting.ClassName); var method = type.GetMethod(consumerSetting.MethodName); method.Invoke(Activator.CreateInstance(type, _loggerFactory), new object[] { message }); _logger.LogInformation($"mission success"); if (!consumerSetting.AutoAck) { channel.BasicAck(ea.DeliveryTag, false); } } catch (Exception ex) { _logger.LogError(ex, "mission fail"); if (!consumerSetting.AutoAck) { channel.BasicReject(ea.DeliveryTag, consumerSetting.Requeue); } } }; channel.BasicConsume(queue: consumerSetting.QueueSetting.QueueName, autoAck: consumerSetting.AutoAck, consumer: consumer); _logger.LogInformation($"StartConsumer Successed"); Thread.Sleep(Timeout.Infinite); } }); } } Thread.Sleep(Timeout.Infinite); } } } ``` 5. 创建具体的消费类 *HelloConsumer.cs* ```csharp /// <summary> /// Hello消费者 /// </summary> class HelloConsumer { /// <summary> /// Logger /// </summary> ILogger _logger; public HelloConsumer(ILoggerFactory loggerFactory) { _logger = loggerFactory.CreateLogger(nameof(HelloConsumer)); } /// <summary> /// 处理消息 /// </summary> /// <param name="message"></param> public void HandleMessage(string message) { Console.WriteLine(" [x] {0}", message); _logger.LogInformation(message); } } ``` 6. 在配置文件中配置MQ、队列及消费 *appsettings.json* ```json { "Logging": { "LogLevel": { "Default": "Debug" } }, "MqSetting": { "HostName": "192.168.0.69", "UserName": "octopus", "Password": "octopus" }, "MqConsumerSetting": [ { "AssemblyName": "RabbitMQConsumers", "ClassName": "RabbitMQConsumers.HelloConsumer", "MethodName": "HandleMessage", "AutoAck": false, "Requeue": false, "ConsumerCount": 2, "QueueSetting": { "Arguments": null, "QueueName": "QUEUE_HELLO", "Durable": true, "Exclusive": false, "AutoDelete": false } } ] } ``` 7. 在 *Program.cs* 中读取配置并启动消费 ```csharp class Program { static void Main(string[] args) { IServiceCollection services = new ServiceCollection(); // 配置 var configuration = new ConfigurationBuilder().AddJsonFile("appsettings.json").Build(); services.Configure<MQSetting>(configuration.GetSection("MqSetting")); services.Configure<List<ConsumerSetting>>(configuration.GetSection("MqConsumerSetting")); services.AddLogging((builder) => builder .AddConfiguration(configuration.GetSection("Logging")) .AddConsole()); // 注入 services.AddSingleton<IRabbitMQConsumerApplication, RabbitMQConsumerApplication>(); // 构建容器 IServiceProvider serviceProvider = services.BuildServiceProvider(); // 启动MQ消费 Task.Run(() => { serviceProvider.GetService<IRabbitMQConsumerApplication>().Start(); }); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); Environment.Exit(0); } } ``` ## 参考文档 1. [.NET/C# Client API Guide](https://www.rabbitmq.com/dotnet-api-guide.html) 2. [RabbitMQ入门教程(十二):消息确认Ack](https://blog.csdn.net/vbirdbest/article/details/78699913) 3. [.NET Core 使用RabbitMQ](https://www.cnblogs.com/stulzq/p/7551819.html) 4. [基于RabbitMQ.Client组件实现RabbitMQ可复用的 ConnectionPool(连接池)](https://www.e-learn.cn/content/net/377274) 5. [LogLevel Enum](https://docs.microsoft.com/en-us/dotnet/api/microsoft.extensions.logging.loglevel?view=aspnetcore-2.1) 6. [.Net Core中的日志组件(Logging)](https://www.cnblogs.com/MicroHeart/p/9341286.html) 7. [在.NET Core控制台程序中使用依赖注入](https://www.cnblogs.com/dudu/p/5552293.html) 8. [.net core读取json格式的配置文件](https://www.cnblogs.com/dotnet261010/p/10172961.html) 9. [ASP.NET Core 源码学习之 Logging 3:Logger](https://www.cnblogs.com/RainingNight/p/asp-net-core-logging-logger.html) ## 2019/06/27 追记 之前关于 *RabbitMQConsumerApplication* 中 *ConsumerSetting.ConsumerCount* 的使用有误,应该每个消费者在单独的线程中。(上面的代码已更新) 另外关于 `EventingBasicConsumer` 通过订阅消费队列的过程,跟我想象的有些出入。 本以为是 *RabbitMQ* 会将一个消息推送给订阅者,待其消费结束后再推送下一个;但实际上其过程是 *RabbitMQ* 会将当前队列中 *noack* 的所有消息通过某种方式(貌似不是轮询,而是将所有任务按照当前订阅者的数量分段)分配给当前的订阅者。 这就导致当 *AutoAck* 设置为 `true` 时可能会发生如下的情况:当队列中的消息已经推送到了消费者,但消费者在未处理结束时发生异常导致进程结束了(比如人为关闭、系统崩溃、停电等),此时已经推送到消费者的消费并不会重新回到队列(其在推送到消费者时就被从队列中删除了)。所以在将消费者配置为 *AutoAck* (自动应答) 时需谨慎些。 虽然每个消费者同时可能接收到了很多消息,但是同一个消费者同一时间貌似只能处理一个,其他的任务处于等待执行的状态。在消费处理中增加 `Thread.Sleep` 可以很明显的看到这个效果。
版权声明:原创文章,未经允许不得转载。
https://www.liujiajia.me/2019/6/24/dnc-rabbitmq-consumer
“Buy me a nongfu spring”
« Git 开发流程
如何清空(Truncate)带外键(FK)的表? »
昵称
*
电子邮箱
*
回复内容
*
(回复审核后才会显示)
提交
目录
AUTHOR
刘佳佳
江苏 - 苏州
软件工程师