Table of contents
1 RabbitMQ 簡介
RabbitMQ 係一個強大既 message queue(MQ)/message broker 工具,可以畀我地用 messaging 既方式取代 synchronous HTTP 黎達到 microservices 之間既 asynchronous communication。
負責發布訊息既微服務會將文字訊息發送到 RabbitMQ,然後 RabbitMQ 會暫時儲存住呢啲訊息,而訂閱左呢個 RabbitMQ 既微服務稍後就會收到訊息。當訂閱方成功接收到訊息,RabbitMQ 就會移除呢啲已經傳送既訊息。
其他 MQ 產品有:Azure Service Bus、Amazon SQS、ActiveMQ、RocketMQ。
1.1 Messaging entities 概念
首先,當我地喺 RabbitMQ 創建 messaging entities,我地一定要將佢地創建喺某一個 virtual host(亦叫 vhost)裡面,而 virtual host 係用黎 group 起有關既 messaging entities,類似係一個 folder 既概念。
RabbitMQ 有以下既 messaging entities:
Messaging entity | 描述 |
---|
Exchange | Exchange 係一個訊息發送對象。 |
Queue | 一條隊列,可以係一個訊息發送對象。任意數量既發布方、任意數量既訂閱方都可以使用同一條隊列收發訊息。Queue 並唔隸屬於某一個 exchange,而係獨立既存在,而我地要透過 binding 黎將 queue 連繫到 exchange。 |
1.2 Exchange 種類
Exchange 種類 | 描述 |
---|
Fanout | 所有 bind 左既 queues 都收到訊息,用作 broadcasting。類似 Azure Service Bus 既 topic、subscriptions。Queue 依然可以用 routing key bind 落 fanout exchange,但佢會無視 routing key 照發送畀所有 bind 左既 queues。 |
Direct | Queue 可以用特定既 routing key(例如 backup.order 、backup.product )黎 bind 落 direct exchange,之後就會收到符合呢個 routing key 既訊息。 |
Topic | 同 direct exchange 一樣,但佢既 bindings 既 routing key 可以用 * 代表 exactly 一個任何文字,或者 # 黎代表任意長度(0 至 N )既任何文字,但要注意佢地唔似 RegEx,我地必須用 . 黎分隔佢地以及其他 routing key 既文字部分,例如 backup.# 。如果個 binding 既 routing key 只係一個 # 符號,咁個 queue 就會收到黎自個 topic exchange 既所有訊息。 |
Headers | Queue binding 可以有多個特定既 key-value argument match 要求,之後就會收到符合呢啲 key-value arguments 既訊息。 |
1.3 Queue 種類
Queue 種類 | 描述 |
---|
Classic queue | 一般既 queue。 |
Quorum queue | 係一種新型既 queue 種類,針對需要高數據安全以及可用性既場景,比起 classic mirrored queue 可以應付既錯誤場景更多、更全面。 |
Stream | 用 append-only log 既 implementation,類似 Kafka。 |
1.4 Dead lettering
Spring Cloud Stream RabbitMQ Binder 既 dead letter 流程:
- Spring Cloud Stream 會執行我地定義既
Consumer
function 去處理訊息。
- 我地定義既
Consumer
function 會 throw exception。
- Spring Cloud Stream 見到 exception 就會 retry 我地既
Consumer
function,直至總處理次數(第一次 + retries)達到 maxAttempts
既上限。
- 如果全部 retries 都失敗,最終 Spring Cloud Stream 會將呢個訊息連同 stacktrace 作為一個新訊息發送去 dead letter exchange,而原來既訊息就會消失。
2 動手寫
2.1 Maven dependencies
1<parent>
2 <groupId>org.springframework.boot</groupId>
3 <artifactId>spring-boot-starter-parent</artifactId>
4 <version>3.4.2</version>
5</parent>
6
7<dependencyManagement>
8 <dependencies>
9 <dependency>
10 <groupId>org.springframework.cloud</groupId>
11 <artifactId>spring-cloud-dependencies</artifactId>
12 <version>2024.0.0</version>
13 <type>pom</type>
14 <scope>import</scope>
15 </dependency>
16 </dependencies>
17</dependencyManagement>
18
19<dependencies>
20 <dependency>
21 <groupId>org.springframework.boot</groupId>
22 <artifactId>spring-boot-starter-web</artifactId>
23 </dependency>
24
25 <dependency>
26 <groupId>org.springframework.cloud</groupId>
27 <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
28 </dependency>
29
30 <dependency>
31 <groupId>org.apache.commons</groupId>
32 <artifactId>commons-lang3</artifactId>
33 </dependency>
34
35 <dependency>
36 <groupId>org.projectlombok</groupId>
37 <artifactId>lombok</artifactId>
38 <scope>provided</scope>
39 </dependency>
40</dependencies>
2.2 Java code
Project structure:
src/main/java
/code
/config
MyConsumer.java
MyProducer.java
/controller
MyProducer.java
:
1@Slf4j
2@Configuration
3public class MyProducer {
4
5 @Bean
6 public Supplier<String> demoProduce() {
7 return () -> {
8 final String dateTime = LocalDateTime.now(ZoneId.of("Hongkong")).format(ofPattern("uuuu-MM-dd HH:mm:ss"));
9 final String msg = format("Test msg - %s (%s)", RandomStringUtils.secure().nextAlphanumeric(8), dateTime);
10
11 log.info("Sending: {}", msg);
12
13 return msg;
14 };
15 }
16}
MyConsumer.java
:
1@Slf4j
2@Configuration
3public class MyConsumer {
4
5 @Bean
6 public Consumer<String> demoConsume() {
7 return msg -> {
8 log.info("Received message: {}", msg);
9 };
10 }
11}
DemoController.java
:
1@Slf4j
2@RestController
3public class DemoController {
4
5 @Autowired
6 StreamBridge streamBridge;
7
8 @PostMapping("send")
9 public void send(@RequestParam(name = "msg") String msg) {
10 streamBridge.send("demoProduce-out-0", msg);
11 log.info("Sent: {}", msg);
12 }
13}
2.3 Application 配置
1spring:
2 rabbitmq:
3 host: localhost
4 port: 5672
5 username: rmq
6 password: rmq
7# virtual-host: "/"
8
9---
10
11spring:
12 cloud:
13 function:
14 definition: demoProduce;demoConsume # 注意:demoProduce 會每秒發送訊息
15
16 stream:
17 default-binder: rabbit # 默認既 binder,令每個 binding 都用呢個 binder 配置
18
19 bindings:
20 demoProduce-out-0:
21 destination: demo-exchange
22# binder: rabbit
23 demoConsume-in-0:
24 destination: demo-exchange
25 group: demo-queue
26# binder: rabbit
27 consumer:
28 maxAttempts: 5
29
30 rabbit:
31 bindings:
32 demoProduce-out-0:
33 producer:
34 exchangeType: fanout
35 demoConsume-in-0:
36 consumer:
37 exchangeType: fanout
38 autoBindDlq: true
39 deadLetterExchange: demo-dlq-exchange
40 dlqDeadLetterExchange: demo-exchange
41 dlqTtl: 10000
2.3.1 配置詳解
spring.cloud.stream.bindings.<binding>.consumer
:
配置 | 描述 |
---|
maxAttempts | 默認係 3 。如果個 Consumer function 處理個訊息既時候 throw 左 exception,咁個 app 就會自動啟用 retry 機制,而呢個配置可以用黎控制最多既處理次數(包括第一次 + retries)。 |
spring.cloud.stream.rabbit.bindings.<binding>.producer
:
配置 | 描述 |
---|
exchangeType | 個 destination (demo-exchange )既 RabbitMQ exchange 既 exchange type,默認係 topic 。如果已經個 exchange 已經存在,咁呢個配置就一定要 match,否則會有 error。 |
spring.cloud.stream.rabbit.bindings.<binding>.consumer
:
配置 | 描述 |
---|
exchangeType | 個 destination (demo-exchange )既 RabbitMQ exchange 既 exchange type,默認係 topic 。如果已經個 exchange 已經存在,咁呢個配置就一定要 match,否則會有 error。 |
autoBindDlq | 默認 false ,改成 true 就可以自動創建 dead letter exchange、dead letter queue 以及 binding。如果喺 exchange 以及 queue 已經創建之後先啟用呢個功能,佢唔會自動幫個 group 條 queue 加返 x-dead-letter-exchange 以及 x-dead-letter-routing-key 既 args,但咁唔會影響 dead letter 既功能,因為 Spring Cloud Stream 係靠 application side 而唔係 broker side 既 requeue 方式。 |
deadLetterExchange | 個 group 條 queue(demo-queue )既 x-dead-letter-exchange arg。 |
dlqDeadLetterExchange | 個 dead letter queue 既 x-dead-letter-exchange arg。 |
dlqTtl | 個 dead letter queue 既 x-message-ttl arg,單位係 milliseconds,用黎控制 dead letter queue 既 messages 既 TTL,超過左就會 republish 到 x-dead-letter-exchange arg 既 exchange。 |
註:
- 個
group
條 queue(demo-queue
)既名會用 <destination>.<group>
既格式,即係 demo-exchange.demo-queue
。
- 個 dead letter queue 既名會用
<destination>.<group>.dlq
既格式,即係 demo-exchange.demo-queue.dlq
。
- 個
group
條 queue(demo-queue
)既 x-dead-letter-routing-key
arg 會用 <destination>.<group>
既格式,即係 demo-exchange.demo-queue
。
- 所有既 queues 一旦創建左,就算我地更改 application 配置,都冇辦法更改到佢地既 args。
- 咁係因為 RabbitMQ 本身唔畀更改 args,我地只能刪除左個 exchange、queue 或者 binding 再用新既配置重建。
- 因為處理訊息失敗而將訊息 republish 去 dead letter exchange 呢個過程係發生喺 application side 而唔係 broker side,所以即使一開始創建
demo-queue
既時候冇啟用到 dead letter 功能,之後用 autoBindDlq
啟用返都係可以實現到 dead letter 功能。
- Dead letter queue 喺
x-message-ttl
既時限過左之後自動將訊息搬到 x-dead-letter-exchange
呢個 exchange 既過程係發生喺 broker side。
3 測試
3.1 自動創建 exchange、queue、binding
運行個 microservice 之後,我地可以喺 RabbitMQ 既 management 介面見到多左以下既野:
- 新增 fanout exchange
demo-exchange
- 新增 durable queue
demo-exchange.demo-queue
- 建立
demo-exchange
同 demo-exchange.demo-queue
之間既 binding,並冇 routing key
- 新增 direct exchange
demo-dlq-exchange
- 新增 durable queue
demo-exchange.demo-queue.dlq
- 建立
demo-dlq-exchange
同 demo-exchange.demo-queue.dlq
之間既 binding,而 routing key 係 demo-exchange.demo-queue
3.2 接收自動發送訊息
用左 spring.cloud.function.definition
註冊既 Supplier
functions 會自動每秒執行,即係每秒都會有一個新訊息發送到 RabbitMQ,所以如果我地既 microservice 有齊發送、接收訊息既功能,我地就可以喺 log 度睇到收發既情況。
期望測試結果:
- 個 microservice 每秒都會發送一個新訊息。
- 個 microservice 發送完訊息之後就即時收到返相同既訊息。
3.3 接收手動發送訊息
利用我地寫既 REST HTTP API 發送訊息:
curl localhost:8080/send?msg=foo
期望測試結果:
- 個 microservice 會發送訊息
foo
,然後好快就收到呢個訊息。
- 如果不斷執行呢句 command,個 microservice 就會不斷發送、接收相同既訊息好多次。
- 我地會見到個 microservice 只係用單一 thread(
demo-exchange.demo-queue-1
)去接收黎自同一條 queue 既訊息。
- 喺其他時間,個 microservice 會接收到佢每秒自動發送既訊息。
3.4 Competing consumers
我地可以喺 port 8080
、8081
、8082
運行 3
個 instances。
- 只有一個 instance 註冊曬
Consumer
、Supplier
functions。
- 另外兩個 instances 只註冊
Consumer
function。
運行第一個 instance:
java -jar Spring-Cloud-Stream-RabbitMQ-Demo-1.0.0.jar
開個新既 console window,運行第二個 instance:
java -jar -Dserver.port=8081 -Dspring.cloud.function.definition=demoConsume Spring-Cloud-Stream-RabbitMQ-Demo-1.0.0.jar
開個新既 console window,運行第三個 instance:
java -jar -Dserver.port=8082 -Dspring.cloud.function.definition=demoConsume Spring-Cloud-Stream-RabbitMQ-Demo-1.0.0.jar
註:
- 我地用
-D
去定義或者覆蓋 server.port
做 8081
、8082
。
- 我地用
-D
去定義或者覆蓋 spring.cloud.function.definition
做 demoConsume
。
期望測試結果:
- 第一個 instance 每秒都會發送一個新訊息。
- 每個 instance 都係 exactly 每
3
秒收到一個訊息(唔係平均)。
- 每個 instance 收到既訊息都唔一樣。
解釋:
- RabbitMQ 默認既訊息分配策略係 round-robin。喺我地既情況,訊息會順序派發到 consumer instance
1
➜ 2
➜ 3
➜ 1
➜ 2
➜ 3
,如此類推。
3.5 Dead letter 功能
要測試 dead letter 功能,首先我地要喺 microservice 既 Consumer
function 裡面 hardcode 一個 logic,如果個訊息內容係 dlq-test
開頭,就會 throw exception。
修改 MyConsumer.java
:
1@Slf4j
2@Configuration
3public class MyConsumer {
4
5 @Bean
6 public Consumer<String> demoConsume() {
7 return msg -> {
8 log.info("Received message: {}", msg);
9
10 // 用 StreamBridge 既方式測試 dead letter
11 if (msg.startsWith("dlq-test")) {
12 throw new RuntimeException("Exception simulation for message: " + msg);
13 }
14 };
15 }
16}
利用我地寫既 REST HTTP API 發送訊息:
curl localhost:8080/send?msg=dlq-test-1
期望測試結果:
- 個 microservice 會發送訊息
dlq-test-1
。
- 個 microservice 會接收到所有訊息,而當佢處理
dlq-test
開頭既訊息就會一直失敗,而 log 會顯示佢喺大約 15
秒內接收左個訊息總共 5
次,喺第 5
次既時候就會 log exception。
- Retry 既默認 backoff 係
1
秒,而每個下一次既 retry 會用 2
倍既 backoff multiplier。
- 我地配置左
maxAttempts
係 5
,唔計第一次,就有 4
次 retries,總共既時間就係 1
+ 2
+ 4
+ 8
= 15
秒。
- Throw 左 exception 之後,就會見到 dead letter queue 裡面多左訊息
dlq-test-1
。
- Throw 左 exception 之後等大約
10
秒(dead letter queue 既 x-message-ttl
arg),又會再接收到訊息 dlq-test-1
,為數 5
次。
- 因為我地既 code 寫死左一定會 throw exception,而 dead letter queue 喺
10
秒之後又會將個訊息搬返去 demo-queue
,所以呢個情況只會不斷重複。
- 而喺現實情況下,當然係只會 retry 到個 exception 既 root cause 解決左,例如某個 upstream service 死左之後重啟返。
- 我地會見到個 microservice 只係用單一 thread 去接收黎自同一條 queue 既訊息。
- Thread 名:
demo-exchange.demo-queue-1
- 喺其他時間,個 microservice 會接收到佢每秒自動發送既訊息。
4 已知問題
4.1 方括號包圍 message 導致既 bug
5 Debug RabbitMQ
我地可以喺需要 debug 既 virtual host 上面開啟 RabbitMQ 既 Firehose Tracer 功能。
- 每個 virtual host 本身都帶有一個
amq.rabbitmq.trace
既 topic exchange。我地可以 create 一個 queue,然後用 #
既 routing key 黎 bind 落去,咁呢個 queue 就會收到個 exchange 既所有 messages。
- 執行 command 為呢個 virtual host 開啟 Firehose Tracer 功能。
參考資料:
6 參考資料