Skip to content

RabbitMQ 延时消息

使用 rabbitmq-delayed-message-exchange 插件来实现延时消息功能。

安装延时消息插件:rabbitmq-delayed-message-exchange

官方插件现在只支持 3.8.x. 及以上的版本,不支持我现在使用的 3.5.7 版本,所以只能根据网上找到的文章中提供的插件下载地址。

具体步骤如下(摘自这篇博客):

  1. 查找 rabbitmq 的安装目录

    bash
    whereis rabbitmq
  2. 定位到 plugins 目录

    bash
    cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.5.7/plugins
  3. 下载插件包

    bash
    wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez
  4. 重命名文件

    下载的文件名前面多了一段文字,使用 mv 命令重命名文件。

    bash
    mv download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez rabbitmq_delayed_message_exchange-0.0.1.ez
  5. 启用插件

    bash
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    使用如下命令禁用插件:

    bash
    rabbitmq-plugins disable rabbitmq_delayed_message_exchange

生产者【Spring】

使用 RabbitTemplate 来实现延时消息的发送。通过添加 x-delay Header 来指定延迟的时间。

java
import cn.hutool.json.JSONUtil;
import com.mokasz.zhyx.zeus.customer.service.core.entity.LazyMessage;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.*;

import java.nio.charset.StandardCharsets;

@RestController
@RequestMapping("/mq/kf")
@RequiredArgsConstructor
public class KefuController {

    private final RabbitTemplate rabbitTemplate;

    @PostMapping
    public void add(@RequestBody LazyMessage message, @RequestParam(value = "delayedSeconds", defaultValue = "1") int delayedSeconds) {
        if (delayedSeconds < 0) delayedSeconds = 0;

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("x-delay", delayedSeconds * 1000);

        rabbitTemplate.send(
                "delayed-kefu-message",
                "kefu-message",
                new Message(JSONUtil.toJsonStr(message).getBytes(StandardCharsets.UTF_8), messageProperties)
        );
    }

}

消费者【Spring】

Listener

使用 CustomExchange 来创建延时消息 Exchange

注意

需要添加 x-delayed-type 参数,否则会报错。

另外一点需要注意的是,创建 Exchangebinding 时,需要指定 x-delay: * 通配参数。

java
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.HashMap;

/**
 * 延迟消息 监听器
 */
@Slf4j
@Component("listener-" + LazyMessageListener.QUEUE_NAME)
@RequiredArgsConstructor
public class LazyMessageListener {
    final static String QUEUE_NAME = "kefu-message";

    @Bean(name = "queue-" + QUEUE_NAME)
    Queue queue() {
        return new Queue(QUEUE_NAME, true);
    }

    @Bean(name = "exchange-" + QUEUE_NAME)
    CustomExchange exchange() {
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("delayed-kefu-message", "x-delayed-message", true, false, args);
    }

    @Bean(name = "binding-" + QUEUE_NAME)
    Binding binding(@Qualifier("queue-" + QUEUE_NAME) Queue queue, @Qualifier("exchange-" + QUEUE_NAME) CustomExchange exchange) {
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-delay", "*");
        return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME).and(args);
    }

    @Bean(name = "container-" + QUEUE_NAME)
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                             @Qualifier("listener-adapter-" + QUEUE_NAME) MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(QUEUE_NAME);
        container.setConcurrentConsumers(5);
        container.setPrefetchCount(200);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean(name = "listener-adapter-" + QUEUE_NAME)
    MessageListenerAdapter listenerAdapter(LazyMessageReceiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
}

Receiver

这只是个消费的示例。

java
import cn.hutool.json.JSONUtil;
import com.mokasz.zhyx.zeus.customer.service.core.entity.LazyMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.common.api.WxConsts;
import me.chanjar.weixin.mp.api.WxMpService;
import me.chanjar.weixin.mp.bean.kefu.WxMpKefuMessage;
import org.springframework.stereotype.Component;

import java.util.concurrent.CountDownLatch;

/**
 * 延迟消息 接收器
 */
@Slf4j
@Component("receiver-" + LazyMessageListener.QUEUE_NAME)
@RequiredArgsConstructor
public class LazyMessageReceiver {

    private final WxMpService wxService;

    private CountDownLatch latch = new CountDownLatch(1);

    public void receiveMessage(byte[] message) {
        receiveMessage(new String(message));
    }

    public void receiveMessage(String json) {
        try {
            log.info("接收消息({})", json);
            LazyMessage message = JSONUtil.toBean(json, LazyMessage.class);
            if (message == null) return;

            WxMpKefuMessage kefuMessage = new WxMpKefuMessage();
            kefuMessage.setToUser(message.getOpenId());
            kefuMessage.setMsgType(WxConsts.KefuMsgType.TEXT);
            kefuMessage.setTitle(message.getTitle());
            kefuMessage.setContent(message.getContent());

            String response = wxService
                    .switchoverTo(message.getAppid())
                    .getKefuService()
                    .sendKefuMessageWithResponse(kefuMessage);
            log.info("response : {}", response);
        } catch (Exception e) {
            log.error(String.format("系统发生异常(%s)(%s)", e.getMessage(), json), e);
        } finally {
            latch.countDown();
        }
    }

    public CountDownLatch getLatch() {
        return latch;
    }
}

附 1. 'x-delayed-type' must be an existing exchange type

创建 exchange 时报了如下错误:

'x-delayed-type' must be an existing exchange type

这个是由于缺少了 x-delayed-type 导致的,需要在 Arguments 中添加 x-delayed-type = direct

通过 Java 创建 exchange

java
@Bean(name = "exchange-" + QUEUE_NAME)
CustomExchange exchange() {
    HashMap<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    return new CustomExchange("delayed-kefu-message", "x-delayed-message", true, false, args);
}

通过 Java 创建 binding

java
@Bean(name = "binding-" + QUEUE_NAME)
Binding binding(@Qualifier("queue-" + QUEUE_NAME) Queue queue, @Qualifier("exchange-" + QUEUE_NAME) CustomExchange exchange) {
    HashMap<String, Object> args = new HashMap<>();
    args.put("x-delay", "*");
    return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME).and(args);
}

注意: bingding 需要添加 x-delay 参数,否则消息不会转发到队列。

参考

  1. 实现 RabbitMQ 延时消息
  2. Delayed Message 插件实现 RabbitMQ 延迟队列
  3. rabbitmq / rabbitmq-delayed-message-exchange

Page Layout Max Width

Adjust the exact value of the page width of VitePress layout to adapt to different reading needs and screens.

Adjust the maximum width of the page layout
A ranged slider for user to choose and customize their desired width of the maximum width of the page layout can go.

Content Layout Max Width

Adjust the exact value of the document content width of VitePress layout to adapt to different reading needs and screens.

Adjust the maximum width of the content layout
A ranged slider for user to choose and customize their desired width of the maximum width of the content layout can go.