熬夜整理的RabbitMQ知識點相當齊全的文章

作者: 修羅debug
版權聲明:本文為博主原創文章,遵循 CC 4.0 by-sa 版權協議,轉載請附上原文出處鏈接和本聲明。


在如今微服務、分布式時代,不懂一點消息隊列、服務解耦、異步通信,都不好意思說自己做過Java高并發、分布式項目了;趁著放假,debug熬夜整理了消息中間件RabbitMQ相關的知識點,對于沒擼過這一技術棧的小伙伴而言可以說是福利了,而對于已經擼過的小伙伴而言不妨再過一遍,畢竟溫故而知新嘛!


以下為本文的目錄,先一睹為快:

  • 一、基本概念
  • 二、RabbitMQ底層架構
  • 三、如何在Spring Boot項目中使用
  • 四、福利奉上!


話不多說,咱們直接進入正題!

一、簡介

1.RabbitMQ,一款由Erlang語言開發的、基于AMQP(高級消息隊列協議)實現的開源消息代理軟件,俗稱“消息中間件”;

2.那何謂“AMQP”呢,解釋為中文:高級消息隊列協議,是應用層協議的一個開放標準,專門面向消息而設計的,基于此協議的客戶端與消息中間件可傳遞消息,并不受產品、開發語言等條件的限制;

3.據說這RabbitMQ最初起源于金融系統,用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現還挺不俗,具體特點有:

(1)可靠性:RabbitMQ使用一些機制來保證可靠性,如持久化、消費確認、發布確認;

(2)靈活的路由:在消息進入隊列之前,通過交換器Exchange 來路由消息的;對于典型的路由功能,RabbitMQ 已經提供了一些內置Exchange來實現;針對更復雜的路由功能,可以將多個 Exchange 綁定在一起,也通過插件機制實現自己的 Exchange

(3)消息集群:多個RabbitMQ 服務器可以組成一個集群,形成一個邏輯上的Broker(消息代理服務器);

(4)高可用:隊列可以在集群中的機器節點進行鏡像備份,當部分節點出問題時隊列仍然可用;

(5)多種協議支持;RabbitMQ 支持多種消息隊列協議,比如 STOMP、MQTT等等;

(6)多語言客戶端:RabbitMQ 幾乎支持所有常用的開發語言,比如 Java、.NET、Ruby 等;

(7)Web控制臺管理界面:RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監控和管理消息代理服務器 Broker 中的方方面面(包括隊列、交換器、路由、消息消費情況、隊列綁定情況、消費者實例情況、使用賬戶等等)

(8)跟蹤機制:RabbitMQ 提供了消息跟蹤機制,在消息傳輸期間出現異常時使用者可以找出到底發生了什么事!

(9)插件機制:RabbitMQ 提供了許多插件,來從多方面進行擴展,也可以編寫自己的插件,比如RabbitMQ自帶的“延遲隊列插件”就相當好用!


二、RabbitMQ底層架構與基本概念

1.如下圖所示為RabbitMQ底層的架構圖,消息在底層傳輸期間幾乎經歷了其中所涉及的每個組件!


2.下面介紹下這些組件的具體含義:

1Message:消息,就像萬物皆對象一樣,萬物皆可充當消息!因此消息可以理解為任意的事物、任意的數據;它由消息頭Header和消息體Body組成;

其中消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括 routing-key(路由鍵)、priority(相對于其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等;

2Publisher:消息的生產者,也是一個向交換器發布消息的客戶端應用程序;

3Exchange:交換器,用來接收生產者發送的消息并將這些消息路由給服務器中的隊列,簡而言之:即將消息路由給隊列;

4Binding:綁定,用于將消息隊列綁定、關聯到指定的交換器;即用于消息隊列和交換器之間的關聯;那如何將其關聯、綁定在一起呢,通過路由鍵RoutingKey實現;

即通過 路由鍵RoutingKey 將 交換器Exchange 和 消息隊列 Queue 連接起來的,從另外一個角度上看,也可以將交換器Exchange理解成一個由綁定構成的路由表。

5Queue:消息隊列,用于保存消息直到發送給消費者;可以理解為它是消息的容器;一個消息可投入一個或多個隊列;消息一直在隊列里面,直到消費者連接到這個隊列并將其取走 這個 消息才算真正走到了最后!

6Connection:網絡連接,比如一個 TCP 連接;

7Channel:信道,多路復用連接中的一條獨立的雙向數據流通道;信道是建立在真實TCP 連接之上的虛擬連接;AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成,因為對于操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接

8Consumer:消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序;

9Virtual Host:虛擬主機,表示一批交換器、消息隊列和相關對象,虛擬主機是共享相同的身份認證和加密環境的獨立服務器域;

10Broker:表示消息隊列服務器實體;


3. 交換器Exchange的類型: 交換器分發消息時根據類型的不同,分發策略也有所不同,目前共四種類型:direct、fanout、topic、headers ;headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和direct 交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接看另外三種類型:

1)基于Direct類型交換器的傳輸模式:消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致,交換器就將消息發到對應的隊列中,它是完全匹配、單播的模式;如下圖所示:














2)基于Fanout類型交換器的廣播分發模式:每個分發到 Fanout類型交換器的消息都會分到所有綁定的隊列上去,很像子網廣播,每臺子網內的主機都獲得了一份復制的消息,Fanout 類型轉發消息是最快的; 















3)基于Topic類型交換器的模式匹配分發模式:Topic交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上,它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點隔開,它同樣也會識別兩個通配符:

   符號“#”和符號“*”,其中 # 匹配 0 個或多個單詞,而 * 匹配不多不少一個單詞,形象如下圖:














三、如何在Spring Boot項目中使用

  說一千道一百,最終還是得將其用到實際的項目并用實際的開發語言將它用起來,下面將基于Java Spring Boot介紹下如何在Spring Boot搭建的項目使用RabbitMQ,其實,很簡單,只要把握住上面RabbitMQ的系統架構圖即可!

1)消息(在Java中就是一實體類):

@Data
@AllArgsConstructor
@NoArgsConstructor
public class BlogIndexMsg implements Serializable{
private Integer type;
private Integer blogId;
}


2)生產者(發送消息):

@Component
@Slf4j
public class BlogMqService {
@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private Environment env;

@Autowired
private ObjectMapper objectMapper;

//用于首頁微博/朋友圈的相關操作對應的mq服務
public void sendIndexBlogMsg(final BlogIndexMsg msg){
try {
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("mq. exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("mq. routing.key.name"));
Message message= MessageBuilder.withBody(objectMapper.writeValueAsBytes(msg))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
rabbitTemplate.convertAndSend(message);
}catch (Exception e){
log.error("用于首頁微博/朋友圈的相關操作對應的mq服務-發生異常:{} ",msg,e);
}
}
}


3)消費者:

@Component
@Slf4j
public class BlogMqListener {

@Autowired
private ObjectMapper objectMapper;

//消息監聽器
@RabbitListener(queues = {"${mq. queue.name}"})
public void consumeBlogIndexMsg(@Payload Message msg){
try {
BlogIndexMsg indexMsg=objectMapper.readValue(msg.getBody(),BlogIndexMsg.class);
log.info("消息監聽器監聽到的消息:{}",indexMsg);

//執行具體的業務邏輯代碼

}catch (Exception e){
log.error("消息監聽器-發生異常:",e);
}
}
}


4)消息隊列與交換器的綁定(即消息模型的構建):

@Configuration
public class RabbitConfig {

private static final Logger log= LoggerFactory.getLogger(RabbitConfig.class);

@Autowired
private Environment env;

//構建消息模型(消息綁定)
@Bean
public Queue blogIndexQueue(){
return new Queue(env.getProperty("mq.queue.name"),true);
}

@Bean
public TopicExchange blogIndexExchange(){
return new TopicExchange(env.getProperty("mq.exchange.name"),true,false);
}

@Bean
public Binding blogIndexBinding(){
return BindingBuilder.bind(blogIndexQueue()).to(blogIndexExchange()).with(env.getProperty("mq. routing.key.name"));
}
}


然后在代碼的某個地方調用 new BlogMqService().sendIndexBlogMsg()即可?。?!


四、福利奉上!

關于RabbitMQ更多詳細的介紹,諸位可以觀看debug錄制的 “RabbitMQ實戰視頻教程”,地址如下所示:https://www.fightjava.com/web/index/course/detail/4


在那里不僅僅介紹了本文所提及的所有知識點,還包括了:消費確認模式、高并發下消息限流、服務解耦、異步通信、死信隊列等技術干貨,課程目錄如下圖所示:


總結:

好了,本文就介紹到這里吧,快過春節了,提前祝大家新春快樂,身體健康,來年擼碼更硬?。?!

如果本文對你有幫助,請關注公眾號,并動動手指收藏、點贊、以及轉發哦?。?!