
点赞是创作最好的鼓励!
环境说明
erlang 版本:21.3
RabbitMQ 版本:3.7.14
SpringBoot 版本:2.3.3.RELEASE
rabbitmq_delayed_message_exchange插件
下载地址:www.rabbitmq.com/community-p…

rabbitmq-delayed-message-exchange v3.8版本适用于RabbitMQ3.7.X版本,插件要与RabbitMQ版本对应,不然使用延迟消息,会遇到各种版本不兼容的问题。

下载插件后放在RabbitMQ安装目录下plugins目录,执行如下命令启动该插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
复制代码
The following plugins have been configured:
rabbitmq_delayed_message_exchange
复制代码
启动插件后需要重启RabbitMQ使插件生效
重启RabbitMQ服务通过两个命令来实现:
rabbitmqctl stop:停止RabbitMQ
rabbitmq-server restart:重启RabbitMQ
注意点:rabbitmqctl是没有restart命令,所哟重启RabbitMQ需要执行以上两条命令
复制代码
集成RabbitMQ
在pom.xml文件中加入RabbitMQ依赖
<!--集成消息队列-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
复制代码
配置RabbitMQ连接信息
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
复制代码
定义ConnectionFactory和RabbitTemplate
package com.ozx.rabbitmqconsumer.config;
import lombok.Data;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @ClassName: RabbitMqConfig
* @Description: RabbitMQ相关配置
* @Author Gxin
* @Date 2021/6/24 16:06
* @Version: 1.0
**/
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {
private String host;
private int port;
private String userName;
private String password;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port);
cachingConnectionFactory.setUsername(userName);
cachingConnectionFactory.setPassword(password);
cachingConnectionFactory.setVirtualHost("/");
cachingConnectionFactory.setPublisherConfirms(true);
return cachingConnectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
return rabbitTemplate;
}
}
复制代码
配置Queue、交换机、路由键等信息
package com.ozx.rabbitmqconsumer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassName: QueueConfig
* @Description: 定义队列、路由键、交换机,路由键绑定交换机、交换机分派消息对应队列
* @Author Gxin
* @Date 2021/6/23 16:49
* @Version: 2.0
**/
@Configuration
public class QueueConfig {
/**
* 分派普通消息交换机
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("ordinary_exchange",true,false);
}
@Bean
public Queue queue() {
Queue queue = new Queue("ordinary_queue", true);
return queue;
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(topicExchange()).with("ordinary_queue");
}
/**
* 分派延迟消息交换机
*/
@Bean
public CustomExchange delayExchange(){
Map<String, Object> paramMap = new HashMap<String, Object>();
paramMap.put("x-delayed-type","direct");
return new CustomExchange("delay_exchange","x-delayed-message",true,false,paramMap);
}
@Bean
public Queue delayQueue(){
Queue delayQueue = new Queue("delay_queue", true);
return delayQueue;
}
@Bean
public Binding delayMessagebinding(){
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay_queue").noargs();
}
}
复制代码
注意
延迟消息使用CustomExchange,而不是使用DirectExchange、TopicExchange,此外CustomExchange的类型必须是x-delayed-message
实现延迟消息
package com.ozx.rabbitmqconsumer.service.impl;
import com.ozx.rabbitmqconsumer.service.MessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @ClassName: MessageServiceImpl
* @Description: 生产者生产消息
* @Author Gxin
* @Date 2021/6/23 17:01
* @Version: 2.0
**/
@Service
@Slf4j
public class MessageServiceImpl implements MessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void sendMsg(String queueName,String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("消息发送时间:"+sdf.format(new Date()));
rabbitTemplate.convertAndSend("ordinary_exchange", queueName, msg);
}
/**
* 实现延迟消息
* @param queueName
* @param message
*/
@Override
public void sendDelayMessage(String queueName, String message) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.debug("消息发送时间:{}",sdf.format(new Date()));
rabbitTemplate.convertAndSend("delay_exchange", queueName, message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay",3000);
return message;
}
});
}
}
复制代码
注意
发送消息需添加一个Header请求头,x-delay设置延迟时间是3s
消息者消费消息
package com.ozx.rabbitmqconsumer.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @ClassName: MessageReceiver
* @Description: 消费者接收并处理消息
* @Author Gxin
* @Date 2021/6/23 16:49
* @Version: 2.0
**/
@Component
@Slf4j
public class MessageReceiver {
@RabbitListener(queues = "ordinary_queue")
public void receive(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.debug("消息接收时间:{},接收到的消息:{}",sdf.format(new Date()),msg);
}
@RabbitListener(queues = "delay_queue")
public void receiveDelayMessage(String message) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.debug("消息接收时间:{},接收到的消息:{}",sdf.format(new Date()),message);
}
}
复制代码
Controller层
package com.ozx.rabbitmqconsumer.controller;
import com.ozx.rabbitmqconsumer.common.ApiRest;
import com.ozx.rabbitmqconsumer.common.BaseController;
import com.ozx.rabbitmqconsumer.service.MessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName: MessageController
* @Description: 集成RabbitMQ实现延迟消息
* @Author Gxin
* @Date 2021/6/23 16:32
* @Version: 2.0
**/
@RestController
public class MessageController extends BaseController {
@Autowired
private MessageService messageService;
@GetMapping("send")
public ApiRest sendMessage(String queueName, String msg){
messageService.sendMsg(queueName,msg);
return this.success();
}
@GetMapping("delaySend")
public ApiRest sendDelayMessage(String queueName,String message){
messageService.sendDelayMessage(queueName, message);
return this.success();
}
}
复制代码
使用postman调试发送消息接口,结果如下

控制台输出日志如下:

实现延迟消息结果如下:


消息延迟3S后,被消息者接收并处理
原创文章,作者:睿达君,如若转载,请注明出处:https://zrrd.net.cn/2185.html