技術干貨實戰(1)- RabbitMQ死信與延遲隊列的區別與實現
作者:
修羅debug
版權聲明:本文為博主原創文章,遵循 CC 4.0 by-sa 版權協議,轉載請附上原文出處鏈接和本聲明。
摘要:對于消息中間件RabbitMQ,想必各位小伙伴并不陌生,其廣泛應用程度不言而喻,此前我們也在許多課程以及諸多專欄文章中介紹了它的應用,其應用場景也是相當廣泛的,像什么消息異步通信、服務模塊解耦、高并發流量削峰、訂單超時未支付自動失效等等都是實際項目中最為常見的場景。本文我們將重點介紹并實現RabbitMQ的死信與延時隊列,并將兩者做一個簡單的對比!
內容:對于RabbitMQ的死信隊列,此前我們在“Java秒殺系統”這一技術專欄中已經有重點介紹過了,在那里我們是將其應用于 “訂單超時未支付自動失效”這一業務場景中,簡而言之,“死信隊列”是一種特殊的“隊列”,跟普通的隊列相比,具有“延遲處理任務”的特性。
而在消息中間件RabbitMQ的架構組件中,也存在著跟“死信隊列”在功能特性方面幾乎相同的組件,那就是“延遲隊列/延時隊列”,同樣也具有“延遲、延時處理任務”的功效!
當然啦,這兩者還是有一丟丟區別的,最直觀的當然是名字上啦,從名字上你就可以看出來兩者的“處事風格”是不一樣的,具體體現在:
一、創建上的差異:
(1)RabbitMQ的死信隊列DeadQueue是由“死信交換機DLX”+“死信路由DLK”組成的,當然,可能還會有“TTL”,而DLX和DLK又可以綁定指向真正的隊列RealQueue,這個隊列RealQueue便是“消費者”真正監聽的對象.
(2)而RabbitMQ的延遲/延時隊列DelayedQueue 則是由普通的隊列來創建即可,唯一不同的地方在于其綁定的交換機為自定義的交換機,即“CustomExchange”,在創建該交換機時只需要指定其消息的類型為 “x-delayed-message”即可.“消費者”真正監聽的隊列也是它本人,即DelayedQueue
(畫外音:從這一點上看,延遲/延時隊列的創建相對而言簡單一些?。?/span>
二、功能特性上的差異:
(1)死信隊列在實際應用時雖然可以實現“延時、延遲處理任務”的功效,但進入死信中的消息卻依然保留了隊列的特性,即“FIFO” ~ 先進先出,而不管先后進入隊列中消息的TTL的值. 即假設先后進入死信的消息為A、B、C,各自的TTL分別為:10s、3s、5s,理論上TTL先后到達的順序是:B、C、A,然后從死信出來,最終被路由到真正的隊列中,即消息被消費的先后順序應該為:B、C、A,然而現實卻是殘酷的,其最終消費的消息的順序為:A、B、C,即“消息是怎么進去的,就怎么出來”,保留了所謂的FIFO特性.
(2)或許是因為死信有這種缺陷,所以RabbitMQ提供了另一種組件,即“延遲隊列”,它可以很完美的解決上面死信出現的問題,即最終消費的消息的順序為:B、C、A,我們將在下面用實際的代碼進行實戰實現與演練.
三、插件安裝上的差異:
(1)死信不需要額外的插件
(2)但是延遲隊列在實際項目使用時卻需要在Mq Server中安裝一個插件,它的名字叫做:“rabbitmq_delayed_message_exchange”,其安裝過程可以參考鏈接:https://www.cnblogs.com/isunsine/p/11572457.html 里面就提供了Windows環境和Linux環境下的插件的安裝過程(很簡單,只需要不到3步的步驟.)
四、代碼的實戰實現~RabbitMQ的死信隊列
說了這么多,想必有些小伙伴有點不耐煩了,下面我將采用實際的代碼對上面所介紹的幾點區別進行實現與演練(代碼都是基于Spring Boot2.0搭建的項目環境實現與測試的)
(1)首先,我們需要創建死信隊列以及真正的隊列,并實現相關的綁定:
//構建訂單超時未支付的死信隊列消息模型
@Bean
public Queue successKillDeadQueue(){
Map<String, Object> argsMap= Maps.newHashMap();
argsMap.put("x-dead-letter-exchange",env.getProperty("mq.kill.item.success.kill.dead.exchange"));
argsMap.put("x-dead-letter-routing-key",env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
return new Queue(env.getProperty("mq.kill.item.success.kill.dead.queue"),true,false,false,argsMap);
}
//基本交換機
@Bean
public TopicExchange successKillDeadProdExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"),true,false);
}
//創建基本交換機+基本路由 -> 死信隊列 的綁定
@Bean
public Binding successKillDeadProdBinding(){
return BindingBuilder.bind(successKillDeadQueue()).to(successKillDeadProdExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
}
//真正的隊列
@Bean
public Queue successKillRealQueue(){
return new Queue(env.getProperty("mq.kill.item.success.kill.dead.real.queue"),true);
}
//死信交換機
@Bean
public TopicExchange successKillDeadExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.exchange"),true,false);
}
//死信交換機+死信路由->真正隊列 的綁定
@Bean
public Binding successKillDeadBinding(){
return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
}
(2)將項目運行起來,登錄RabbitMQ的后端控制臺,可以看到成功創建了相應的死信隊列和真正的隊列等組件,如下圖所示:
(3)緊接著,我們在Controller中建立一個請求方法,用于接收前端請求過來的消息,并將該消息附以TTL值,塞入死信隊列中,如下所示:
//死信隊列-生產者
@RequestMapping(value = "dead/msg/send",method = RequestMethod.GET)
@ResponseBody
public BaseResponse sendDQMsg(@RequestParam String msg,@RequestParam Long ttl){
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
Message realMsg=MessageBuilder.withBody(msg.getBytes("UTF-8")).build();
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.convertAndSend(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"), env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"), realMsg, message -> {
MessageProperties mp=message.getMessageProperties();
mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//TODO:動態設置TTL
mp.setExpiration(String.valueOf(ttl));
log.info("死信隊列生產者-發出消息:{} TTL:{}",msg,ttl);
return message;
});
}catch (Exception e){
response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
}
return response;
}
(4)最后是寫一個Spring Bean類充當消費者,在其中監聽“實際隊列”的消息:
@RabbitListener(queues = {"${mq.kill.item.success.kill.dead.real.queue}"},containerFactory = "singleListenerContainer")
public void consumeExpireOrder(@Payload byte[] msg){
try {
log.info("死信隊列-監聽者-接收消息:{}",new String(msg,"UTF-8"));
}catch (Exception e){
log.error("死信隊列-監聽者-發生異常:",e.fillInStackTrace());
}
}
最后,我們進入測試環節,打開Postman,前后輸入3次不同的請求信息,其中各自的TTL也不盡相同,即消息A的TTL=50s,消息B的TTL=20s,消息C的TTL=10s,最終在Console控制臺等待,你會發現消費者監聽的消息的順序為:A、B、C,而不是C、B、A,如下圖所示:
五、代碼的實戰實現~RabbitMQ的延遲/延時隊列
很明顯,由于死信存在的這個缺陷,故而其在上面的應用場景中是不太適用的!即死信隊列在 消息的TTL不一致,且后入死信的消息TTL小于前入的消息TTL的應用場景中是不適用的,而像“訂單超時未支付”的應用場景,因為大家都一樣,都是固定的30min或者 1h,故而這種場景,死信是相當適合的
因此,為了解決實際項目中“TTL不一致且不固定”的應用場景,我們需要搬上“延遲/延時隊列”(當然啦,Redisson的延遲/延遲隊列也是可以實現的?。?,下面我們用代碼加以實現!
(1)首先是創建“延遲/延時隊列”等相關的組件,如下所示:
//TODO:RabbitMQ延遲隊列
@Bean
public Queue delayQueue(){
return QueueBuilder.durable(env.getProperty("mq.kill.delay.queue")).build();
}
@Bean
public CustomExchange delayExchange(){
Map<String,Object> map=Maps.newHashMap();
map.put("x-delayed-type","direct");
return new CustomExchange(env.getProperty("mq.kill.delay.exchange"),"x-delayed-message",true,false,map);
}
@Bean
public Binding delayBinding(){
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(env.getProperty("mq.kill.delay.routingKey")).noargs();
}
(2)其生產者發送消息的代碼我們仍然是放在一個Controller的請求方法中,如下所示:
//延遲隊列-生產者
@RequestMapping(value = "delay/msg/send",method = RequestMethod.GET)
@ResponseBody
public BaseResponse sendDelayMsg(@RequestParam String msg,@RequestParam Long ttl){
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
String info=msg;
Message realMsg=MessageBuilder.withBody(info.getBytes("UTF-8")).build();
rabbitTemplate.convertAndSend(env.getProperty("mq.kill.delay.exchange"),env.getProperty("mq.kill.delay.routingKey"),
realMsg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties mp=message.getMessageProperties();
mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
mp.setHeader("x-delay",ttl);
log.info("延遲隊列生產者-發出消息:{} TTL:{}",msg,ttl);
return message;
}
});
}catch (Exception e){
response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
}
return response;
}
(3)最后是用于監聽延遲隊列中消息的消費者的代碼,如下所示:
/**
* 延時隊列-消息監聽器-消費者
* @Author:debug (SteadyJack)
* @Link: weixin-> debug0868 qq-> 1948831260
**/
@Component
public class DelayQueueMqListener {
private static final Logger log= LoggerFactory.getLogger(DelayQueueMqListener.class);
//消息監聽
@RabbitListener(queues = {"${mq.kill.delay.queue}"})
public void consumeMsg(@Payload byte[] msg){
try {
String info=new String(msg,"UTF-8");
log.info("延時隊列監聽到消息:{} ",info);
}catch (Exception e){
log.error("延時隊列-消息監聽器-消費者-消息監聽-發生異常:",e.fillInStackTrace());
}
}
}
(4)將項目跑起來,可以看到RabbitMQ的后端控制臺已經建立了該隊列,如下圖所示:
(5)最后,我們打開postman,前后輸入3次不同的請求信息,其中各自的TTL也不盡相同,即消息A的TTL=50s,消息B的TTL=20s,消息C的TTL=10s,最終在Console控制臺等待,你會發現消費者監聽的消息的順序為:C、B、A,而不是A、B、C,,即按照消息的TTL來決定消費的先后順序,如下圖所示:
從該運行結果上看,會發現這才是我們真正想要的結果,即按照時間TTL的大小來決定消息被消費的先后順序,而且,你可以看出消費時的時間跟發出的時間剛好差 TTL !
在文章的最后的,我們簡單總結一下本文所講的內容,即主要介紹、對比并實戰了RabbitMQ中兩款具有“延時、延遲處理任務”功效的組件,即“死信隊列”和“延遲隊列”,其差異性主要體現在:創建上的不同、功能特性的不同、插件安裝上的不同等方面。
總體來說,如果是想追求消息傳輸的穩定性、可靠性且TTL是固定的話,那么建議選擇“死信隊列”,因為消息從一開始就在隊列中待著,等到TTL一到才被路由到真正的隊列!而“延遲隊列”則不同,即發送出去的消息需要等待 TTL 的時間才進入“延遲隊列”,如果在等待的期間,Mq Server 宕機了,那很可能消息就丟失了…..
好了,本文我們就介紹到這里了,最后打個小廣告,Debug最近上新了一門新課:Java分布式中間件大匯聚實戰第一季 (SpringBoot2.0+點贊系統+面試),課程所介紹的內容正是基于企業級項目真實的應用案例為出發點,來實戰各種典型的主流技術棧,目前課程還處于優惠期,原價是129,目前仍然是59.9而已哦(下個月就要漲價嘍…)
課程觀看:https://www.fightjava.com/web/index/course/detail/15 !
其他相關的技術,感興趣的小伙伴可以關注底部Debug的技術公眾號,或者加Debug的微信,拉你進“微信版”的真正技術交流群!一起學習、共同成長!
補充
1、若想學習其他的技術干貨,可以前往Debug自建的技術社區進行學習觀看,包括技術專欄、博客和課程哦:https://www.fightjava.com/
2、關注一下Debug的技術微信公眾號,最新的技術文章、課程以及技術專欄將會第一時間在公眾號發布哦!