➜ Old React website
Chung Cheuk Hang MichaelJava Web Developer
Railway 免費 DB PaaS寫 Chrome extension 排序 Google Lens 搜尋結果

Spring 項目中訂閱 RabbitMQ

Table of contents

1 RabbitMQ 簡介

RabbitMQ 係一個強大既 message queue(MQ)/message broker 工具,可以畀我地用 messaging 既方式取代 synchronous HTTP 黎達到 microservices 之間既 asynchronous communication。
負責發布訊息既微服務會將文字訊息發送到 RabbitMQ,然後 RabbitMQ 會暫時儲存住呢啲訊息,而訂閱左呢個 RabbitMQ 既微服務稍後就會收到訊息。當訂閱方成功接收到訊息,RabbitMQ 就會移除呢啲已經傳送既訊息。
其他 MQ 產品有:Azure Service Bus、Amazon SQS、ActiveMQ、RocketMQ。

1.1 Messaging entities 概念

首先,當我地喺 RabbitMQ 創建 messaging entities,我地一定要將佢地創建喺某一個 virtual host(亦叫 vhost)裡面,而 virtual host 係用黎 group 起有關既 messaging entities,類似係一個 folder 既概念。
RabbitMQ 有以下既 messaging entities:
Messaging entity描述
ExchangeExchange 係一個訊息發送對象。
Queue一條隊列,可以係一個訊息發送對象。任意數量既發布方、任意數量既訂閱方都可以使用同一條隊列收發訊息。Queue 並唔隸屬於某一個 exchange,而係獨立既存在,而我地要透過 binding 黎將 queue 連繫到 exchange。

1.2 Exchange 種類

Exchange 種類描述
Fanout所有 bind 左既 queues 都收到訊息,用作 broadcasting。類似 Azure Service Bus 既 topic、subscriptions。Queue 依然可以用 routing key bind 落 fanout exchange,但佢會無視 routing key 照發送畀所有 bind 左既 queues。
DirectQueue 可以用特定既 routing key(例如 backup.orderbackup.product)黎 bind 落 direct exchange,之後就會收到符合呢個 routing key 既訊息。
Topic同 direct exchange 一樣,但佢既 bindings 既 routing key 可以用 * 代表 exactly 一個任何文字,或者 # 黎代表任意長度(0N)既任何文字,但要注意佢地唔似 RegEx,我地必須用 . 黎分隔佢地以及其他 routing key 既文字部分,例如 backup.#。如果個 binding 既 routing key 只係一個 # 符號,咁個 queue 就會收到黎自個 topic exchange 既所有訊息。
HeadersQueue binding 可以有多個特定既 key-value argument match 要求,之後就會收到符合呢啲 key-value arguments 既訊息。

1.3 Queue 種類

Queue 種類描述
Classic queue一般既 queue。
Quorum queue係一種新型既 queue 種類,針對需要高數據安全以及可用性既場景,比起 classic mirrored queue 可以應付既錯誤場景更多、更全面。
Stream用 append-only log 既 implementation,類似 Kafka。

1.4 Dead lettering

Spring Cloud Stream RabbitMQ Binder 既 dead letter 流程:
  1. Spring Cloud Stream 會執行我地定義既 Consumer function 去處理訊息。
  2. 我地定義既 Consumer function 會 throw exception。
  3. Spring Cloud Stream 見到 exception 就會 retry 我地既 Consumer function,直至總處理次數(第一次 + retries)達到 maxAttempts 既上限。
  4. 如果全部 retries 都失敗,最終 Spring Cloud Stream 會將呢個訊息連同 stacktrace 作為一個新訊息發送去 dead letter exchange,而原來既訊息就會消失。

2 動手寫

2.1 Maven dependencies

1<parent> 2 <groupId>org.springframework.boot</groupId> 3 <artifactId>spring-boot-starter-parent</artifactId> 4 <version>3.4.2</version> 5</parent> 6 7<dependencyManagement> 8 <dependencies> 9 <dependency> 10 <groupId>org.springframework.cloud</groupId> 11 <artifactId>spring-cloud-dependencies</artifactId> 12 <version>2024.0.0</version> 13 <type>pom</type> 14 <scope>import</scope> 15 </dependency> 16 </dependencies> 17</dependencyManagement> 18 19<dependencies> 20 <dependency> 21 <groupId>org.springframework.boot</groupId> 22 <artifactId>spring-boot-starter-web</artifactId> 23 </dependency> 24 25 <dependency> 26 <groupId>org.springframework.cloud</groupId> 27 <artifactId>spring-cloud-stream-binder-rabbit</artifactId> 28 </dependency> 29 30 <dependency> 31 <groupId>org.apache.commons</groupId> 32 <artifactId>commons-lang3</artifactId> 33 </dependency> 34 35 <dependency> 36 <groupId>org.projectlombok</groupId> 37 <artifactId>lombok</artifactId> 38 <scope>provided</scope> 39 </dependency> 40</dependencies>

2.2 Java code

Project structure:
  • src/main/java
    • /code
      • /config
        • MyConsumer.java
        • MyProducer.java
      • /controller
        • DemoController.java
MyProducer.java
1@Slf4j 2@Configuration 3public class MyProducer { 4 5 @Bean 6 public Supplier<String> demoProduce() { 7 return () -> { 8 final String dateTime = LocalDateTime.now(ZoneId.of("Hongkong")).format(ofPattern("uuuu-MM-dd HH:mm:ss")); 9 final String msg = format("Test msg - %s (%s)", RandomStringUtils.secure().nextAlphanumeric(8), dateTime); 10 11 log.info("Sending: {}", msg); 12 13 return msg; 14 }; 15 } 16}
MyConsumer.java
1@Slf4j 2@Configuration 3public class MyConsumer { 4 5 @Bean 6 public Consumer<String> demoConsume() { 7 return msg -> { 8 log.info("Received message: {}", msg); 9 }; 10 } 11}
DemoController.java
1@Slf4j 2@RestController 3public class DemoController { 4 5 @Autowired 6 StreamBridge streamBridge; 7 8 @PostMapping("send") 9 public void send(@RequestParam(name = "msg") String msg) { 10 streamBridge.send("demoProduce-out-0", msg); 11 log.info("Sent: {}", msg); 12 } 13}

2.3 Application 配置

1spring: 2 rabbitmq: 3 host: localhost 4 port: 5672 5 username: rmq 6 password: rmq 7# virtual-host: "/" 8 9--- 10 11spring: 12 cloud: 13 function: 14 definition: demoProduce;demoConsume # 注意:demoProduce 會每秒發送訊息 15 16 stream: 17 default-binder: rabbit # 默認既 binder,令每個 binding 都用呢個 binder 配置 18 19 bindings: 20 demoProduce-out-0: 21 destination: demo-exchange 22# binder: rabbit 23 demoConsume-in-0: 24 destination: demo-exchange 25 group: demo-queue 26# binder: rabbit 27 consumer: 28 maxAttempts: 5 29 30 rabbit: 31 bindings: 32 demoProduce-out-0: 33 producer: 34 exchangeType: fanout 35 demoConsume-in-0: 36 consumer: 37 exchangeType: fanout 38 autoBindDlq: true 39 deadLetterExchange: demo-dlq-exchange 40 dlqDeadLetterExchange: demo-exchange 41 dlqTtl: 10000

2.3.1 配置詳解

spring.cloud.stream.bindings.<binding>.consumer
配置描述
maxAttempts默認係 3。如果個 Consumer function 處理個訊息既時候 throw 左 exception,咁個 app 就會自動啟用 retry 機制,而呢個配置可以用黎控制最多既處理次數(包括第一次 + retries)。
spring.cloud.stream.rabbit.bindings.<binding>.producer
配置描述
exchangeTypedestinationdemo-exchange)既 RabbitMQ exchange 既 exchange type,默認係 topic。如果已經個 exchange 已經存在,咁呢個配置就一定要 match,否則會有 error。
spring.cloud.stream.rabbit.bindings.<binding>.consumer
配置描述
exchangeTypedestinationdemo-exchange)既 RabbitMQ exchange 既 exchange type,默認係 topic。如果已經個 exchange 已經存在,咁呢個配置就一定要 match,否則會有 error。
autoBindDlq默認 false,改成 true 就可以自動創建 dead letter exchange、dead letter queue 以及 binding。如果喺 exchange 以及 queue 已經創建之後先啟用呢個功能,佢唔會自動幫個 group 條 queue 加返 x-dead-letter-exchange 以及 x-dead-letter-routing-key 既 args,但咁唔會影響 dead letter 既功能,因為 Spring Cloud Stream 係靠 application side 而唔係 broker side 既 requeue 方式。
deadLetterExchangegroup 條 queue(demo-queue)既 x-dead-letter-exchange arg。
dlqDeadLetterExchange個 dead letter queue 既 x-dead-letter-exchange arg。
dlqTtl個 dead letter queue 既 x-message-ttl arg,單位係 milliseconds,用黎控制 dead letter queue 既 messages 既 TTL,超過左就會 republish 到 x-dead-letter-exchange arg 既 exchange。
註:
  • group 條 queue(demo-queue)既名會用 <destination>.<group> 既格式,即係 demo-exchange.demo-queue
  • 個 dead letter queue 既名會用 <destination>.<group>.dlq 既格式,即係 demo-exchange.demo-queue.dlq
  • group 條 queue(demo-queue)既 x-dead-letter-routing-key arg 會用 <destination>.<group> 既格式,即係 demo-exchange.demo-queue
  • 所有既 queues 一旦創建左,就算我地更改 application 配置,都冇辦法更改到佢地既 args。
    • 咁係因為 RabbitMQ 本身唔畀更改 args,我地只能刪除左個 exchange、queue 或者 binding 再用新既配置重建。
    • 因為處理訊息失敗而將訊息 republish 去 dead letter exchange 呢個過程係發生喺 application side 而唔係 broker side,所以即使一開始創建 demo-queue 既時候冇啟用到 dead letter 功能,之後用 autoBindDlq 啟用返都係可以實現到 dead letter 功能。
  • Dead letter queue 喺 x-message-ttl 既時限過左之後自動將訊息搬到 x-dead-letter-exchange 呢個 exchange 既過程係發生喺 broker side。

3 測試

3.1 自動創建 exchange、queue、binding

運行個 microservice 之後,我地可以喺 RabbitMQ 既 management 介面見到多左以下既野:
  • 新增 fanout exchange demo-exchange
  • 新增 durable queue demo-exchange.demo-queue
  • 建立 demo-exchangedemo-exchange.demo-queue 之間既 binding,並冇 routing key
  • 新增 direct exchange demo-dlq-exchange
  • 新增 durable queue demo-exchange.demo-queue.dlq
  • 建立 demo-dlq-exchangedemo-exchange.demo-queue.dlq 之間既 binding,而 routing key 係 demo-exchange.demo-queue

3.2 接收自動發送訊息

用左 spring.cloud.function.definition 註冊既 Supplier functions 會自動每秒執行,即係每秒都會有一個新訊息發送到 RabbitMQ,所以如果我地既 microservice 有齊發送、接收訊息既功能,我地就可以喺 log 度睇到收發既情況。
期望測試結果:
  • 個 microservice 每秒都會發送一個新訊息。
  • 個 microservice 發送完訊息之後就即時收到返相同既訊息。

3.3 接收手動發送訊息

利用我地寫既 REST HTTP API 發送訊息:
curl localhost:8080/send?msg=foo
期望測試結果:
  • 個 microservice 會發送訊息 foo,然後好快就收到呢個訊息。
  • 如果不斷執行呢句 command,個 microservice 就會不斷發送、接收相同既訊息好多次。
  • 我地會見到個 microservice 只係用單一 thread(demo-exchange.demo-queue-1)去接收黎自同一條 queue 既訊息。
    • 喺其他時間,個 microservice 會接收到佢每秒自動發送既訊息。

3.4 Competing consumers

我地可以喺 port 808080818082 運行 3 個 instances。
  • 只有一個 instance 註冊曬 ConsumerSupplier functions。
  • 另外兩個 instances 只註冊 Consumer function。
運行第一個 instance:
java -jar Spring-Cloud-Stream-RabbitMQ-Demo-1.0.0.jar
開個新既 console window,運行第二個 instance:
java -jar -Dserver.port=8081 -Dspring.cloud.function.definition=demoConsume Spring-Cloud-Stream-RabbitMQ-Demo-1.0.0.jar
開個新既 console window,運行第三個 instance:
java -jar -Dserver.port=8082 -Dspring.cloud.function.definition=demoConsume Spring-Cloud-Stream-RabbitMQ-Demo-1.0.0.jar
註:
  • 我地用 -D 去定義或者覆蓋 server.port80818082
  • 我地用 -D 去定義或者覆蓋 spring.cloud.function.definitiondemoConsume
期望測試結果:
  • 第一個 instance 每秒都會發送一個新訊息。
  • 每個 instance 都係 exactly 每 3 秒收到一個訊息(唔係平均)。
  • 每個 instance 收到既訊息都唔一樣。
解釋:
  • RabbitMQ 默認既訊息分配策略係 round-robin。喺我地既情況,訊息會順序派發到 consumer instance 123123,如此類推。

3.5 Dead letter 功能

要測試 dead letter 功能,首先我地要喺 microservice 既 Consumer function 裡面 hardcode 一個 logic,如果個訊息內容係 dlq-test 開頭,就會 throw exception。
修改 MyConsumer.java
1@Slf4j 2@Configuration 3public class MyConsumer { 4 5 @Bean 6 public Consumer<String> demoConsume() { 7 return msg -> { 8 log.info("Received message: {}", msg); 9 10 // 用 StreamBridge 既方式測試 dead letter 11 if (msg.startsWith("dlq-test")) { 12 throw new RuntimeException("Exception simulation for message: " + msg); 13 } 14 }; 15 } 16}
利用我地寫既 REST HTTP API 發送訊息:
curl localhost:8080/send?msg=dlq-test-1
期望測試結果:
  • 個 microservice 會發送訊息 dlq-test-1
  • 個 microservice 會接收到所有訊息,而當佢處理 dlq-test 開頭既訊息就會一直失敗,而 log 會顯示佢喺大約 15 秒內接收左個訊息總共 5 次,喺第 5 次既時候就會 log exception。
    • Retry 既默認 backoff 係 1 秒,而每個下一次既 retry 會用 2 倍既 backoff multiplier。
    • 我地配置左 maxAttempts5,唔計第一次,就有 4 次 retries,總共既時間就係 1 + 2 + 4 + 8 = 15 秒。
  • Throw 左 exception 之後,就會見到 dead letter queue 裡面多左訊息 dlq-test-1
  • Throw 左 exception 之後等大約 10 秒(dead letter queue 既 x-message-ttl arg),又會再接收到訊息 dlq-test-1,為數 5 次。
    • 因為我地既 code 寫死左一定會 throw exception,而 dead letter queue 喺 10 秒之後又會將個訊息搬返去 demo-queue,所以呢個情況只會不斷重複。
    • 而喺現實情況下,當然係只會 retry 到個 exception 既 root cause 解決左,例如某個 upstream service 死左之後重啟返。
  • 我地會見到個 microservice 只係用單一 thread 去接收黎自同一條 queue 既訊息。
    • Thread 名:demo-exchange.demo-queue-1
    • 喺其他時間,個 microservice 會接收到佢每秒自動發送既訊息。

4 已知問題

4.1 方括號包圍 message 導致既 bug

5 Debug RabbitMQ

我地可以喺需要 debug 既 virtual host 上面開啟 RabbitMQ 既 Firehose Tracer 功能。
  1. 每個 virtual host 本身都帶有一個 amq.rabbitmq.trace 既 topic exchange。我地可以 create 一個 queue,然後用 # 既 routing key 黎 bind 落去,咁呢個 queue 就會收到個 exchange 既所有 messages。
  2. 執行 command 為呢個 virtual host 開啟 Firehose Tracer 功能。
    • rabbitmqctl trace_on -p "<vhost>"
參考資料:

6 參考資料

Next posts:
RabbitMQ 基礎