➜ Old React website
Chung Cheuk Hang MichaelJava Web Developer
Spring Boot 項目使用 Swagger/OpenAPI用 Snyk 掃瞄 Java 項目

RabbitMQ 基礎

Continued from previous post:
Spring 項目中訂閱 RabbitMQ

Table of contents

1 RabbitMQ 簡介

可以睇返呢篇:Spring 項目中訂閱 RabbitMQ

2 啟動 RabbitMQ

我地可以用 Docker Desktop,然後利用 RabbitMQ 既官方 Docker image 執行 Docker container。
Command 可以喺呢篇搵到:Docker 基本操作 - RabbitMQ

3 動手玩

3.1 建立 virtual host

RabbitMQ 支援 multi-tenant,我地可以用 virtual host 黎 group 起相關既 resources。
  1. 去 Admin > Virtual Hosts。
  2. 打開「Add a new virtual host」。
  3. 輸入 Name,例如「rabbitmq-basics」。
  4. 撳「Add virtual host」。
  5. Refresh 頁面。
  6. 右上角 > 揀新建立既 virtual host。
完成之後,我地就可以喺呢個新既 virtual host 度玩啦!

3.2 測試 queue

3.2.1 建立 queue

  1. 去 Queues and Streams。
  2. 打開「Add a new queue」。
  3. 我地可以揀 Virtual host、Type,亦可以由得佢用默認值。
  4. 輸入 Name,例如 my-queue
  5. Durability 由得佢用默認既 Durable 就得。
  6. Arguments 可以唔輸入。
  7. 撳「Add queue」。
完成之後,會見到有一個 queue:
  • Name:my-queue
  • Type:classic
  • Features:
    • D 係 durable。
    • Args 裡面有 x-queue-type: classic
  • State:idle
  • Messages:0

3.2.2 直接發送訊息到 queue

  1. 撳「my-queue」。
  2. 打開「Publish message」。
  3. Delivery mode 由得佢用默認既 1 - Non-persistent 就得。
  4. Headers、Properties 可以唔輸入。
  5. Payload 係我地想發送既 message body,可以輸入 foo
  6. Payload encoding 由得佢用默認既 String (default) 就得。
  7. 撳「Publish message」。
  8. 如果發送成功,佢會彈出灰色底既 popup,寫住「Message published.」,撳「Close」就得。

3.2.3 接收訊息

  1. 打開「Get messages」。
  2. 將 Ack mode 改成 Automatic ack
  3. Encoding 由得佢用默認既 Auto string / base64 就得。
  4. Messages 影響接收既訊息數量,可以改成 10,就算多過實際存在既訊息數量都冇問題。
  5. 撳「Get Message(s)」。
之後佢會顯示我地既「foo」訊息:
  • Exchange:(AMQP default)
  • Routing key:my-queue
  • Redelivered:(白色代表 false,requeue 既訊息先會係 true
  • Properties
    • delivery_mode:1
    • headers:N/A
  • Payload:foo
如果我地再撳「Get Message(s)」,佢就會彈出灰色底既 popup,寫住「Queue is empty」,咁係因為先前既訊息已經 acknowledge 左。

3.2.4 Requeue 訊息

  1. 重複上面既發送、接收步驟。
    • 喺接收既步驟,Ack mode 揀「Nack message requeue true」或者「Reject requeue true」。
結果:
  1. 第一次接收訊息既時候,我地會見到 Redelivered 係「○」(false),因為呢個訊息係第一次接收。
  2. 第二次接收訊息既時候,我地會見到 Redelivered 係「●」(true),因為呢個訊息喺第一次接收之後被 requeue,而家接收既係第二次傳遞同一個訊息。

3.3 測試 direct exchange

3.3.1 建立 direct exchange

  1. 去 Exchanges。
  2. 打開「Add a new exchange」。
  3. 輸入 Name,例如 my-direct-exchange
  4. Type 揀「direct」。
  5. 我地可以揀 Virtual host、Durability、Auto delete、Internal,亦可以由得佢用默認值。
  6. Arguments 可以唔輸入。
  7. 撳「Add exchange」。

3.3.2 建立 queue

  1. 可以參考返上面建立 queue 既步驟。
    • Name 可以輸入 my-direct-queue

3.3.3 建立 binding

  1. 撳「my-direct-queue」。
  2. 打開「Bindings (1)」。
  3. From exchange 輸入 my-direct-exchange
  4. Routing key 輸入 foo
  5. Arguments 可以唔輸入。
  6. 撳「Bind」。

3.3.4 發送、接收訊息

  1. 去「my-direct-exchange」度發送訊息。
    1. Routing key 輸入 foo(要同 binding key 一樣)。
    2. Headers、Properties 可以唔輸入。
    3. 輸入 Payload,例如 my message content
  2. 去「my-direct-queue」度接收訊息。
    1. 會收到 1 個訊息。
如果我地發送既時候唔用 foo 作為 routing key(唔輸入,或者輸入另一個值),佢就會彈出黃色底既 popup,寫住「Message published, but not routed.」,咁係因為冇一個 binding 既 binding key 係 match 到呢個 routing key。

3.4 測試 fanout exchange

3.4.1 建立 fanout exchange

  1. 去 Exchanges。
  2. 打開「Add a new exchange」。
  3. 輸入 Name,例如 my-fanout-exchange
  4. Type 揀「fanout」。
  5. 我地可以揀 Virtual host、Durability、Auto delete、Internal,亦可以由得佢用默認值。
  6. Arguments 可以唔輸入。
  7. 撳「Add exchange」。

3.4.2 建立 queue

  1. 可以參考返上面建立 queue 既步驟。
    • Name 可以輸入 my-fanout-queue

3.4.3 建立 binding

  1. 撳「my-fanout-queue」。
  2. 打開「Bindings (1)」。
  3. From exchange 輸入 my-fanout-exchange
  4. Routing key 輸入 foo,或者唔輸入都得。
  5. Arguments 可以唔輸入。
  6. 撳「Bind」。

3.4.4 發送、接收訊息

  1. 去「my-fanout-exchange」度發送訊息。
    1. Routing key、Headers、Properties 可以唔輸入。
    2. 輸入 Payload,例如 my message content
  2. 去「my-fanout-queue」度接收訊息。
    1. 會收到 1 個訊息。
我地發送既時候用咩作為 routing key(輸入 foo、唔輸入,或者輸入另一個值),都一樣可以發送到訊息,咁係因為 fanout exchange 唔會理 routing key 或者 binding key。

3.5 測試 headers exchange

3.5.1 建立 headers exchange

  1. 去 Exchanges。
  2. 打開「Add a new exchange」。
  3. 輸入 Name,例如 my-headers-exchange
  4. Type 揀「headers」。
  5. 我地可以揀 Virtual host、Durability、Auto delete、Internal,亦可以由得佢用默認值。
  6. Arguments 可以唔輸入。
  7. 撳「Add exchange」。

3.5.2 建立 queue

  1. 可以參考返上面建立 queue 既步驟。
    • Name 可以輸入 my-headers-queue

3.5.3 建立 binding

  1. 撳「my-headers-queue」。
  2. 打開「Bindings (1)」。
  3. From exchange 輸入 my-headers-exchange
  4. Routing key 唔洗輸入。
  5. Arguments 輸入:
    • h1 = fooString
    • h2 = barString
  6. 撳「Bind」。

3.5.4 發送、接收訊息

  1. 去「my-headers-exchange」度發送訊息。
    1. Routing key、Properties 可以唔輸入。
    2. Headers 輸入:
      • h1 = fooString
      • h2 = barString
    3. 輸入 Payload,例如 my message content
  2. 去「my-headers-queue」度接收訊息。
    1. 會收到 1 個訊息。
如果我地發送既時候,headers 一定要 match 曬 binding 既 arguments,只要有任何一個唔 match,佢都會彈出黃色底既 popup,寫住「Message published, but not routed.」,咁係因為冇一個 binding 既 arguments 係 match 到呢個訊息既 headers。

3.6 測試 topic exchange

3.6.1 建立 topic exchange

  1. 去 Exchanges。
  2. 打開「Add a new exchange」。
  3. 輸入 Name,例如 my-topic-exchange
  4. Type 揀「topic」。
  5. 我地可以揀 Virtual host、Durability、Auto delete、Internal,亦可以由得佢用默認值。
  6. Arguments 可以唔輸入。
  7. 撳「Add exchange」。

3.6.2 建立 queue

  1. 可以參考返上面建立 queue 既步驟。
    • Name 可以輸入 my-topic-queue

3.6.3 建立 binding

  1. 撳「my-topic-queue」。
  2. 打開「Bindings (1)」。
  3. From exchange 輸入 my-topic-exchange
  4. Routing key 輸入 foo.#
  5. Arguments 可以唔輸入。
  6. 撳「Bind」。

3.6.4 發送、接收訊息

  1. 去「my-topic-exchange」度發送訊息。
    1. Routing key 輸入 foo.123(要係一個 match 到 foo.# 接受既值)。
    2. Headers、Properties 可以唔輸入。
    3. 輸入 Payload,例如 my message content
  2. 去「my-topic-queue」度接收訊息。
    1. 會收到 1 個訊息。
如果我地發送既時候唔用 match 到個 binding key 既值作為 routing key(唔輸入,或者輸入唔 match 既值),佢就會彈出黃色底既 popup,寫住「Message published, but not routed.」,咁係因為冇一個 binding 既 binding key 係 match 到呢個 routing key。
foo.# binding key 接受既 routing key 例子:
  • foofoo.foo..foo.#foo.1foo.afoo.abc
foo.# binding key 唔接受既 routing key 例子:
  • 唔輸入
  • bar.bar.123

3.7 測試 dead letter exchange

3.7.1 建立 business exchange

  • Name:biz-exchange
  • Type:direct

3.7.2 建立 business queue

  • Name:biz-queue
  • Arguments
    • x-dead-letter-exchangemy-dlx
    • x-dead-letter-routing-keymy-dlkey

3.7.3 建立 business binding

  • Exchange:biz-exchange
  • Queue:biz-queue
  • Binding key:biz-key

3.7.4 建立 dead letter exchange

  • Name:my-dlx
  • Type:direct

3.7.5 建立 dead letter queue

  • Name:my-dlq
  • Arguments(可選)
    • x-dead-letter-exchangebiz-exchangeString
    • x-dead-letter-routing-keybiz-keyString
    • x-message-ttl10000Number)(等於 10 秒)

3.7.6 建立 dead letter binding

  • Exchange:my-dlx
  • Queue:my-dlq
  • Binding key:my-dlkey

3.7.7 發送、接收訊息

  1. 去「biz-exchange」度發送訊息。
    1. Routing key 輸入 biz-key
    2. Headers、Properties 可以唔輸入。
    3. 輸入 Payload,例如 my message content
  2. 去「biz-queue」度接收訊息。
    1. Ack mode 揀「Reject requeue false」。
    2. 會收到 1 個訊息。
  3. 去「my-dlq」度接收訊息。
    1. Ack mode 揀「Nack message requeue true」。
    2. 會收到 1 個訊息。
如果 dead letter queue 有設定度可選既 arguments:
  1. 等幾秒(message TTL 係 10 秒)。
  2. 去「biz-queue」度接收訊息。
    1. Ack mode 揀「Nack message requeue true」。
    2. 會收到 1 個訊息。
解釋:
  • 上面既例子可以達到 retry dead letter queue 裡面既訊息既效果。
    • 當訊息喺「biz-queue」度接收失敗,被轉移到「my-dlx」然後再到「my-dlq」之後,經過一段時間,當 message TTL 過期既時候,訊息又會自動被轉移到「biz-exchange」然後再到「biz-queue」。

4 筆記

4.1 Exchange 選項

選項解釋
Durable個 exchange 會喺 RabbitMQ 重新啟動之後都繼續存在。
Auto-delete當最後一個 binding 被解除,RabbitMQ 就會自動刪除呢個 exchange。
Direct typeBinding key 會係 exact match。
Fanout type唔需要 binding key,就算提供左都冇用,所有 bind 左既 queues 都必定會收到所有訊息。
Headers type用特定既 key-value arguments 黎 match。
Topic typeBinding key 可以係 exact match,亦可以運用 wildcard characters——* 代表 wildcard 單字;# 代表 wildcard 任意長度(0N)既文字。

4.2 Queue 選項

選項解釋
Durable個 queue 會喺 RabbitMQ 重新啟動之後都繼續存在。
Auto-delete當最後一個 consumer cancel 或者 disconnect 左,RabbitMQ 就會自動刪除呢個 queue。
Exclusive當佢所屬既 connection close 左,RabbitMQ 就會自動刪除呢個 queue。
Message TTL(x-message-ttl arg)Queue level 既 time-to-live,一個訊息如果超過呢個時間就會 expire,consumers 唔會收到已經 expire 左既訊息。如果有 dead letter exchange 既 args,當訊息喺 head 位置,RabbitMQ 就會將佢轉移去 dead letter exchange。
Auto-expire(x-expires arg)數值係 millisecond 數字(5000 等於 5 秒)。當冇任何 consumer 連接住呢個 queue(一建立 queue 既時候都算),RabbitMQ 就會開始計時,一到左時間,RabbitMQ 就會自動刪除呢個 queue。
Single active consumer(x-single-active-consumer arg)如果我地既訊息一定要按入 queue 既先後次序處理,如果想保持住多個相同 consumer 既 app instances,咁我地可以用 single active consumer 選項,咁所有訊息都只會傳遞到同一個 consumer,保證處理訊息既先後次序。
Dead letter exchange(x-dead-letter-exchange如果傳遞失敗,RabbitMQ 可能會將佢轉移到呢個 dead letter exchange。
Dead letter routing key(x-dead-letter-routing-keyRabbitMQ 將訊息轉移到 dead letter exchange 既時候會用既 routing key。

4.3 Binding 選項

選項解釋
Binding key(或者 routing key)提供 filter 訊息既作用。一個訊息如果要由 exchange X1 去到 queue Q1,呢個訊息既 routing key 一定要 match 到 Q1X1 之間既 binding 既 binding key。我地應該根據 exchange type 去輸入符合我地需要既 binding key。
Arguments用喺 headers exchange 既 bindings 度,只有 match 到曬所有 arguments 既值既訊息先會 route 到去個 queue 度。

4.4 訊息發送選項

選項解釋
Delivery modePersistent(1)會先寫落 persistent storage(disk),如果 RabbitMQ crash 左既話都會喺返度。Non-persistent(2)就唔會寫落 disk。

4.5 訊息接收選項

選項解釋
Reject係 AMQP 既標準,用黎拒絕單一訊息。
NackNegative acknowledgement 係 RabbitMQ 獨有既功能,可以同時拒絕對多個訊息。

5 參考資料