获取 RabbitMQ 的消息数量

这篇博客的基础上了做了少许修改。具体代码如下:

private final RabbitTemplate rabbitTemplate;

try (Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
     Channel channel = connection.createChannel(false);) {
    // 设置消息交换机
    channel.exchangeDeclare("amp.topic", "topic", true, false, null);
    AMQP.Queue.DeclareOk declareOk = channel.queueDeclarePassive(QUEUE_NAME);
    //获取队列中的消息个数
    int queueCount = declareOk.getMessageCount();
    return queueCount;
}
SpringBoot 批量消费 RabbitMQ 消息

尝试使用 BatchMessagingMessageListenerAdapter 来实现批量消费时报了如下错误:

org.springframework.amqp.UncategorizedAmqpException: java.lang.NullPointerException

调试发现是执行到 MessagingMessageListenerAdapter.invokeHandler 时,this.handlerAdapter 值为空导致的。

RabbitMQ 延时消息

使用 rabbitmq-delayed-message-exchange 插件来实现延时消息功能。

安装延时消息插件:rabbitmq-delayed-message-exchange

官方插件现在只支持 3.8.x. 及以上的版本,不支持我现在使用的 3.5.7 版本,所以只能根据网上找到的文章中提供的插件下载地址。

具体步骤如下(摘自这篇博客):

.NET Core RabbitMQ 消费者

大致的方案是在配置文件中配置消息队列及其消费的处理方法,然后创建订阅并通过反射执行配置的处理方法来消费。

connection(连接)整个应用程序共用一个,channel 则是每个队列共同一个 channel 则是每个消费者订阅单独使用一个。

下面是示例代码,仅供参考。

示例代码

  1. 新建 .NET Core 控制台工程 RabbitMQConsumers,目标框架为 .NET Core 2.1

  2. 安装所需程序包

    Install-Package RabbitMQ.Client -Version 5.1.0
    Install-Package Newtonsoft.Json -Version 12.0.2
    Install-Package Microsoft.AspNetCore.All -Version 2.1.5
    
  3. 创建配置用的类 MQSetting.cs(MQ 设置)、ConsumerSetting.cs(消费设置)和 QueueSetting.cs(队列设置)。

    MQSetting.cs

    /// <summary>
    /// MQ 设置
    /// </summary>
    class MQSetting
    {
        /// <summary>
        /// MQ 服务器地址
        /// </summary>
        public string HostName { get; set; }
    
        /// <summary>
        /// 用户名
        /// </summary>
        public string UserName { get; set; }
    
        /// <summary>
        /// 密码
        /// </summary>
        public string Password { get; set; }
    }
    

    ConsumerSetting.cs

    /// <summary>
    /// 消费设置
    /// </summary>
    class ConsumerSetting
    {
        /// <summary>
        /// 程序集名称
        /// </summary>
        public string AssemblyName { get; set; }
    
        /// <summary>
        /// 类名
        /// </summary>
        public string ClassName { get; set; }
    
        /// <summary>
        /// 方法名
        /// </summary>
        public string MethodName { get; set; }
    
        /// <summary>
        /// 是否自动应答
        /// </summary>
        public bool AutoAck { get; set; } = true;
    
        /// <summary>
        /// 是否自动应答为 false 时,若发生异常,是否重新回队列。
        /// true:重回队列;false:直接丢弃;
        /// </summary>
        public bool Requeue { get; set; } = false;
    
        /// <summary>
        /// 消费者数量
        /// </summary>
        public int ConsumerCount { get; set; } = 1;
    
        /// <summary>
        /// 队列设置
        /// </summary>
        public QueueSetting QueueSetting { get; set; }
    }
    

    QueueSetting.cs

    /// <summary>
    /// 队列设置
    /// </summary>
    class QueueSetting
    {
        /// <summary>
        /// 队列名称
        /// </summary>
        public string QueueName { get; set; }
    
        /// <summary>
        /// 持久化队列
        /// </summary>
        public bool Durable { get; set; } = true;
    
        /// <summary>
        /// 独占队列
        /// </summary>
        public bool Exclusive { get; set; } = false;
    
        /// <summary>
        /// 自动删除
        /// </summary>
        public bool AutoDelete { get; set; } = false;
    
        /// <summary>
        /// 参数
        /// </summary>
        public IDictionary<string, object> Arguments = null;
    }
    
  4. 创建消费启动接口及类 IRabbitMQConsumerApplication.cs RabbitMQConsumerApplication.cs

    IRabbitMQConsumerApplication.cs

    interface IRabbitMQConsumerApplication
    {
        void Start();
    }
    

    RabbitMQConsumerApplication.cs

    /// <summary>
    /// RabbitMQ 消费 App
    /// </summary>
    class RabbitMQConsumerApplication : IRabbitMQConsumerApplication
    {
        /// <summary>
        /// MQ 设置
        /// </summary>
        MQSetting _mqSetting;
    
        /// <summary>
        /// MQ 消费者设置
        /// </summary>
        List<ConsumerSetting> _consumerSettings;
    
        ILoggerFactory _loggerFactory;
    
        /// <summary>
        /// Logger
        /// </summary>
        ILogger _logger;
    
        /// <summary>
        /// 构造函数
        /// </summary>
        public RabbitMQConsumerApplication(IOptions<MQSetting> mqSetting, IOptions<List<ConsumerSetting>> consumerSettings, ILoggerFactory loggerFactory)
        {
            _mqSetting = mqSetting.Value;
            _consumerSettings = consumerSettings.Value;
            _loggerFactory = loggerFactory;
            _logger = loggerFactory.CreateLogger(nameof(RabbitMQConsumerApplication));
        }
    
        /// <summary>
        /// 启动 RabbitMQ 消费程序
        /// </summary>
        public void Start()
        {
            var factory = new ConnectionFactory()
            {
                HostName = _mqSetting.HostName,
                UserName = _mqSetting.UserName,
                Password = _mqSetting.Password
            };
    
            using (var connection = factory.CreateConnection())
            {
                foreach (var consumerSetting in _consumerSettings)
                {
                    for (int i = 0; i < consumerSetting.ConsumerCount; i++)
                    {
                        Task.Run(() =>
                        {
                            using (var channel = connection.CreateModel())
                            {
                                channel.QueueDeclare(
                                    queue: consumerSetting.QueueSetting.QueueName,
                                    durable: consumerSetting.QueueSetting.Durable,
                                    exclusive: consumerSetting.QueueSetting.Exclusive,
                                    autoDelete: consumerSetting.QueueSetting.AutoDelete,
                                    arguments: consumerSetting.QueueSetting.Arguments);
    
                                _logger.LogInformation($"StartConsumer : {JsonConvert.SerializeObject(consumerSetting)}");
    
                                var consumer = new EventingBasicConsumer(channel);
    
                                consumer.Received += (currentConsumer, ea) =>
                                {
                                    try
                                    {
                                        var body = ea.Body;
                                        var message = Encoding.UTF8.GetString(body);
    
                                        _logger.LogInformation($"mission start ( message: {message} )");
    
                                        var assembly = Assembly.Load(consumerSetting.AssemblyName);
                                        var type = assembly.GetType(consumerSetting.ClassName);
                                        var method = type.GetMethod(consumerSetting.MethodName);
    
                                        method.Invoke(Activator.CreateInstance(type, _loggerFactory), new object[] { message });
    
                                        _logger.LogInformation($"mission success");
    
                                        if (!consumerSetting.AutoAck)
                                        {
                                            channel.BasicAck(ea.DeliveryTag, false);
                                        }
                                    }
                                    catch (Exception ex)
                                    {
                                        _logger.LogError(ex, "mission fail");
                                        if (!consumerSetting.AutoAck)
                                        {
                                            channel.BasicReject(ea.DeliveryTag, consumerSetting.Requeue);
                                        }
                                    }
                                };
    
                                channel.BasicConsume(queue: consumerSetting.QueueSetting.QueueName,
                                                    autoAck: consumerSetting.AutoAck,
                                                    consumer: consumer);
    
                                _logger.LogInformation($"StartConsumer Successed");
    
                                Thread.Sleep(Timeout.Infinite);
                            }
                        });
                    }
                }
    
                Thread.Sleep(Timeout.Infinite);
            }
        }
    }
    
  5. 创建具体的消费类 HelloConsumer.cs

    /// <summary>
    /// Hello 消费者
    /// </summary>
    class HelloConsumer
    {
        /// <summary>
        /// Logger
        /// </summary>
        ILogger _logger;
    
        public HelloConsumer(ILoggerFactory loggerFactory)
        {
            _logger = loggerFactory.CreateLogger(nameof(HelloConsumer));
        }
    
        /// <summary>
        /// 处理消息
        /// </summary>
        /// <param name="message"></param>
        public void HandleMessage(string message)
        {
            Console.WriteLine(" [x] {0}", message);
    
            _logger.LogInformation(message);
        }
    }
    
  6. 在配置文件中配置 MQ、队列及消费 appsettings.json

    {
        "Logging": {
            "LogLevel": {
                "Default": "Debug"
            }
        },
        "MqSetting": {
            "HostName": "192.168.0.69",
            "UserName": "octopus",
            "Password": "octopus"
        },
        "MqConsumerSetting": [
            {
                "AssemblyName": "RabbitMQConsumers",
                "ClassName": "RabbitMQConsumers.HelloConsumer",
                "MethodName": "HandleMessage",
                "AutoAck": false,
                "Requeue": false,
                "ConsumerCount": 2,
                "QueueSetting": {
                    "Arguments": null,
                    "QueueName": "QUEUE_HELLO",
                    "Durable": true,
                    "Exclusive": false,
                    "AutoDelete": false
                }
            }
        ]
    }
    
  7. Program.cs 中读取配置并启动消费

    class Program
    {
        static void Main(string[] args)
        {
            IServiceCollection services = new ServiceCollection();
    
            // 配置
            var configuration = new ConfigurationBuilder().AddJsonFile("appsettings.json").Build();
    
            services.Configure<MQSetting>(configuration.GetSection("MqSetting"));
            services.Configure<List<ConsumerSetting>>(configuration.GetSection("MqConsumerSetting"));
            services.AddLogging((builder) => builder
                .AddConfiguration(configuration.GetSection("Logging"))
                .AddConsole());
    
            // 注入
            services.AddSingleton<IRabbitMQConsumerApplication, RabbitMQConsumerApplication>();
    
            // 构建容器
            IServiceProvider serviceProvider = services.BuildServiceProvider();
    
            // 启动 MQ 消费
            Task.Run(() => {
                serviceProvider.GetService<IRabbitMQConsumerApplication>().Start();
            });
    
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
    
            Environment.Exit(0);
        }
    }
    
RabbitMQ Exchanges 类型

RabbitMQ 支持 4 种类型的 Exchange

合理使用 Exchange 可以降低代码和队列之间的耦合度。

1. Default Exchange

默认 Exchange 名为 空字符串""),且已被预创建

.NET 中使用 RabbitMQ

  1. 新建一个发送消息的控制台工程 FirstRabbitMQSend

    点击查看代码
    using RabbitMQ.Client;
    using System;
    using System.Text;
    
    namespace FirstRabbitMQSend
    {
        class Program
        {
            // 交换器名
            const string EXCHANGE_NAME = "hello.exchange";
            // 队列名
            const string QUEUE_NAME = "hello.queue";
    
            static void Main(string args)
            {
                // 定义连接工厂
                var factory = new ConnectionFactory() {
                    HostName ="localhost",
                    UserName = "liujj",
                    Password = "AAA@111",
                };
                // 连接到 RabbitMQ
                using (var connection = factory.CreateConnection())
                {
                    // 获取信道
                    using (var channel  = connection.CreateModel())
                    {
                        // 声明交换器
                        channel.ExchangeDeclare(
                            exchange: EXCHANGE_NAME,
                            type: "direct",
                            durable: true);
                        // 声明队列
                        channel.QueueDeclare(
                            queue: QUEUE_NAME,
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);
                        // 绑定交换器到队列
                        channel.QueueBind(
                            queue: QUEUE_NAME,
                            exchange: EXCHANGE_NAME,
                            routingKey: QUEUE_NAME);
                        // 创建消息
                        string message = "Hello, World!";
                        var body = Encoding.UTF8.GetBytes(message);
                        // 发布消息
                        channel.BasicPublish(
                            exchange: EXCHANGE_NAME,
                            routingKey: QUEUE_NAME,
                            basicProperties: null,
                            body: body);
                        Console.WriteLine("set {0}", message);
                    }
                }
            }
        }
    }
    
  2. 新建一个接收的控制台工程 FirstRabbitMQReceive

    点击查看代码
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Text;
    
    namespace FirstRabbitMQReceive
    {
        class Program
        {
            // 交换器名
            const string EXCHANGE_NAME = "hello.exchange";
            // 队列名
            const string QUEUE_NAME = "hello.queue";
    
            static void Main(string args)
            {
                // 定义连接工厂
                var factory = new ConnectionFactory()
                {
                    HostName = "localhost",
                    UserName = "liujj",
                    Password = "AAA@111",
                };
                // 连接到 RabbitMQ
                using (var connection = factory.CreateConnection())
                {
                    // 获得信道
                    using (var channel = connection.CreateModel())
                    {
                        // 声明交换器
                        channel.ExchangeDeclare(
                            exchange: EXCHANGE_NAME,
                            type: "direct",
                            durable: true);
                        // 声明队列
                        channel.QueueDeclare(
                            queue: QUEUE_NAME,
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);
                        // 绑定交换器到队列
                        channel.QueueBind(
                            queue: QUEUE_NAME,
                            exchange: EXCHANGE_NAME,
                            routingKey: QUEUE_NAME);
                        // 声明消费者
                        var consumer = new QueueingBasicConsumer(channel);
                        // 订阅消费者
                        channel.BasicConsume(
                            queue: QUEUE_NAME,
                            noAck: true,
                            consumer: consumer);
                        Console.WriteLine("waiting for message.");
                        while (true)
                        {
                            // 消费消息
                            var ea = consumer.Queue.Dequeue() as BasicDeliverEventArgs;
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            Console.WriteLine("Received {0}", message);
                        }
                    }
                }
            }
        }
    }