Skip to content

Spring MQ 使用多线程消费

🏷️ Spring

如何使用 Spring 创建消费者可以参考 使用 Springboot 开发 MQ 消费服务

开启多线程很简单,只需要调整 latch 的参数即可。
private CountDownLatch latch = new CountDownLatch(20);
上面的 20 就是指最多开启 20 个线程消费,超过了则会等待。
没有起作用 MQ 的Unacked数量一直都是 1,而不是想象中的 20.
理解错了。CountDownLatch只是用来通知消息已经收到了的,并不会开启多线程。实现多线程是需要设置消费者的数量。

java
@Bean(name = "container-hello")
SimpleMessageListenerContainer container(
        ConnectionFactory connectionFactory,
        @Qualifier("listenerAdapter-hello") MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(queueName);
    container.setConcurrentConsumers(10);
    container.setPrefetchCount(2);
    container.setMessageListener(listenerAdapter);
    return container;
}

container.setConcurrentConsumers(10); 就是设置了 10 个消费者。每个消费者都是独立的线程。

container.setPrefetchCount(2); 则是设置了 MQ 投递过来的消息数量。