SpringBoot 整合 RabbitMQ 实现延迟消息

点赞是创作最好的鼓励!

环境说明

erlang 版本:21.3

RabbitMQ 版本:3.7.14

SpringBoot 版本:2.3.3.RELEASE

rabbitmq_delayed_message_exchange插件

下载地址:www.rabbitmq.com/community-p…

输入图片说明

rabbitmq-delayed-message-exchange v3.8版本适用于RabbitMQ3.7.X版本,插件要与RabbitMQ版本对应,不然使用延迟消息,会遇到各种版本不兼容的问题。

输入图片说明

下载插件后放在RabbitMQ安装目录下plugins目录,执行如下命令启动该插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
复制代码
The following plugins have been configured:
	rabbitmq_delayed_message_exchange
复制代码

启动插件后需要重启RabbitMQ使插件生效

重启RabbitMQ服务通过两个命令来实现:

rabbitmqctl stop:停止RabbitMQ
rabbitmq-server restart:重启RabbitMQ
    
注意点:rabbitmqctl是没有restart命令,所哟重启RabbitMQ需要执行以上两条命令
复制代码

集成RabbitMQ

在pom.xml文件中加入RabbitMQ依赖

<!--集成消息队列-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
复制代码

配置RabbitMQ连接信息

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
复制代码

定义ConnectionFactory和RabbitTemplate

package com.ozx.rabbitmqconsumer.config;

import lombok.Data;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName: RabbitMqConfig
 * @Description: RabbitMQ相关配置
 * @Author Gxin
 * @Date 2021/6/24 16:06
 * @Version: 1.0
 **/
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {
    private String host;
    private int port;
    private String userName;
    private String password;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port);
        cachingConnectionFactory.setUsername(userName);
        cachingConnectionFactory.setPassword(password);
        cachingConnectionFactory.setVirtualHost("/");
        cachingConnectionFactory.setPublisherConfirms(true);
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        return rabbitTemplate;
    }
}

复制代码

配置Queue、交换机、路由键等信息

package com.ozx.rabbitmqconsumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @ClassName: QueueConfig
 * @Description: 定义队列、路由键、交换机,路由键绑定交换机、交换机分派消息对应队列
 * @Author Gxin
 * @Date 2021/6/23 16:49
 * @Version: 2.0
 **/
@Configuration
public class QueueConfig {
    /**
     * 分派普通消息交换机
     */
    @Bean
    public TopicExchange topicExchange(){
       return  new TopicExchange("ordinary_exchange",true,false);
    }

    @Bean
    public Queue queue() {
        Queue queue = new Queue("ordinary_queue", true);
        return queue;
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(topicExchange()).with("ordinary_queue");
    }

    /**
     * 分派延迟消息交换机
     */
    @Bean
    public CustomExchange delayExchange(){
        Map<String, Object> paramMap = new HashMap<String, Object>();
        paramMap.put("x-delayed-type","direct");
        return new CustomExchange("delay_exchange","x-delayed-message",true,false,paramMap);
    }

    @Bean
    public Queue delayQueue(){
        Queue delayQueue = new Queue("delay_queue", true);
        return delayQueue;
    }

    @Bean
    public Binding delayMessagebinding(){
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay_queue").noargs();
    }
}
复制代码

注意

延迟消息使用CustomExchange,而不是使用DirectExchange、TopicExchange,此外CustomExchange的类型必须是x-delayed-message

实现延迟消息

package com.ozx.rabbitmqconsumer.service.impl;

import com.ozx.rabbitmqconsumer.service.MessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @ClassName: MessageServiceImpl
 * @Description: 生产者生产消息
 * @Author Gxin
 * @Date 2021/6/23 17:01
 * @Version: 2.0
 **/
@Service
@Slf4j
public class MessageServiceImpl implements MessageService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendMsg(String queueName,String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("消息发送时间:"+sdf.format(new Date()));
        rabbitTemplate.convertAndSend("ordinary_exchange", queueName, msg);
    }

    /**
     * 实现延迟消息
     * @param queueName
     * @param message
     */
    @Override
    public void sendDelayMessage(String queueName, String message) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.debug("消息发送时间:{}",sdf.format(new Date()));
        rabbitTemplate.convertAndSend("delay_exchange", queueName, message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setHeader("x-delay",3000);
                return message;
            }
        });
    }
}
复制代码

注意

发送消息需添加一个Header请求头,x-delay设置延迟时间是3s

消息者消费消息

package com.ozx.rabbitmqconsumer.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @ClassName: MessageReceiver
 * @Description: 消费者接收并处理消息
 * @Author Gxin
 * @Date 2021/6/23 16:49
 * @Version: 2.0
 **/
@Component
@Slf4j
public class MessageReceiver {
    @RabbitListener(queues = "ordinary_queue")
    public void receive(String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.debug("消息接收时间:{},接收到的消息:{}",sdf.format(new Date()),msg);
    }

    @RabbitListener(queues = "delay_queue")
    public void receiveDelayMessage(String message) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.debug("消息接收时间:{},接收到的消息:{}",sdf.format(new Date()),message);
    }
}
复制代码

Controller层

package com.ozx.rabbitmqconsumer.controller;

import com.ozx.rabbitmqconsumer.common.ApiRest;
import com.ozx.rabbitmqconsumer.common.BaseController;
import com.ozx.rabbitmqconsumer.service.MessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName: MessageController
 * @Description: 集成RabbitMQ实现延迟消息
 * @Author Gxin
 * @Date 2021/6/23 16:32
 * @Version: 2.0
 **/
@RestController
public class MessageController extends BaseController {

    @Autowired
    private MessageService messageService;

    @GetMapping("send")
    public ApiRest sendMessage(String queueName, String msg){
        messageService.sendMsg(queueName,msg);
        return this.success();
    }

    @GetMapping("delaySend")
    public ApiRest sendDelayMessage(String queueName,String message){
        messageService.sendDelayMessage(queueName, message);
        return this.success();
    }
}

复制代码

使用postman调试发送消息接口,结果如下

image-20210625160009544

控制台输出日志如下:

image-20210625160051494

实现延迟消息结果如下:

image-20210625160147518
image-20210625160210246

消息延迟3S后,被消息者接收并处理

原创文章,作者:睿达君,如若转载,请注明出处:https://zrrd.net.cn/2185.html

发表回复

登录后才能评论
咨询电话
联系电话:0451-81320577

地址:哈尔滨市松北区中小企业总部基地13F

微信咨询
微信咨询
QQ咨询
分享本页
返回顶部