➜ Old React website
Chung Cheuk Hang MichaelJava Web Developer
JPA/Hibernate 使用方式(一)HTML 網頁顯示 WebP 圖片

透過 CLI 學習 Kafka

Table of contents

1 Kafka 簡介

Kafka 係一個 event broker/event-streaming broker,除左好似 message brokers(MQ)咁喺 microservices 之間提供左 asynchronous communication 既可能性之外,隨住 event-driven microservice architecture 既興起,events 仲可以作為一個機構既數據既 single source of truth。
Kafka 既設計應用左 2 種 messaging 模式:
模式描述(以 Kafka 術語)
Publish-subscribe每個訊息都會 broadcast 去所有 consumer groups。
Message queue每個訊息喺每個 consumer group 都只會有 1 個 consumer 接收到(用 partitions 實現 competing consumers)。
Kafka 強大之處:
  • 高性能、高吞吐量。
  • 支援 broker horizontal scaling。
  • 支援 producer、consumer horizontal scaling。
    • 一個 topic 可以分成多個 partitions。
    • Scale up 同一個 consumer microservice 既 instances 去組成一個 consumer group。
    • 同一個 consumer group 裡面既各個 consumer 都會被分配同一個 topic 既 N 個 partitions。
  • 支援 consumer 重新啟動之後可以即時接收返斷線果陣 producer 所產生但未被接收既訊息。
  • 所有訊息都會被保存,如果將來諗到有新既業務用途,可以由一開始既 offset 讀起。
註:
  • Horizontal scaling 即係我地可以 deploy 一模一樣既 microservice/app 而得到多個 running instances。
    • 咁做既目的:
      1. 避免有啲 instances 行行下 crash 左,影響業務。
      2. 透過多個 instances 做 parallel processing 黎增加 throughput。
    • 呢啲 instances 之間唔需要知道其他 instances 既存在,亦唔需要有任何溝通。

1.1 Offset 概念

Kafka 係 immutable、append-only 既 message/event store,佢係利用 offset values 黎紀錄 consumer groups 既訊息接收進度。
  • Kafka 既訊息寫入左落 Kafka broker 之後就唔可以更改或者刪除。
  • Kafka 既 offsets 係 per partition + consumer group。
    • 即係每個 consumer group 喺每個 partition 都可以有一個 offset value。
    • 個 offset 係用黎紀錄低個 consumer group(regardless of 邊個 consumer instance)已經讀到個 partition 既邊一個訊息。
    • 一般情況下,consumer group 下次就會由 offset 開始讀起(由第一個未讀既訊息開始讀起)。
  • 更新 offsets 係 consumers 既責任,讀完訊息就應該更新 offsets。

2 安裝 Kafka

2.1 本地(免安裝)

⚠️ 建議將個 .tgz 裡面既 binary 檔案解壓縮去 C:/kafka,因為如果條 path 太長,行 command 果陣可能會有 error。

2.2 Docker image

我地可以用 Docker Desktop,然後利用 RabbitMQ 既官方 Docker image 執行 Docker container。
Command 可以喺呢篇搵到:Docker 基本操作 - Kafka

3 Commands

Windows 版既 CLI 喺 bin/windows folder,啲 scripts 既檔案副檔名係 .bat
如果想模擬到有多部機器同時運作,即係 horizontal scaling 既效果,穩陣啲可以用 Docker containers(保證獨立運行),而簡單啲既就可以複製成個 Kafka binaries 既 folder(不過其實複製啲 properties 檔已經足夠,因為啲數據檔案默認係儲存喺 /tmp(Windows 版就係 C:/tmp),而唔係儲存喺 Kafka 安裝檔既 folders 裡面)。

3.1 ZooKeeper

⚠️ 如果用 KRaft mode 啟動 Kafka 就唔需要 ZooKeeper。

3.1.1 啟動

zookeeper-server-start.sh config/zookeeper.properties

3.2 Kafka broker

想要模擬多部 broker 機器同時運作,就需要修改每個 broker 既 Kafka binaries 裡面既 config/server.properties
1# 唔同既 broker 要 assign 唔同既 ID 2broker.id=9 3 4# 唔同既 broker 要 assign 唔同既 port 5listeners=PLAINTEXT://:9999 6 7# 唔同既 broker 要 assign 唔同既 log directory 8log.dirs=/tmp/kafka-logs-9
我地會起 2個 brokers:
Brokerbroker.idlisteners 既 portlog.dirs
11119991/tmp/kafka-logs-1
22229992/tmp/kafka-logs-2
註:
  • Windows 既話都可以用上面既 log dir,啲 Kafka 數據會去左 C:/tmp
  • 因為有啲 --describe 既 commands 會顯示數字(例如列出所有 topics 會顯示 leaders、replicas、in-sync replicas),果啲數字其實係 broker ID。
    • 所以為左容易啲分辨,broker ID 可以用長啲既數字(只可以用純數字)。

3.2.1 啟動

kafka-server-start.sh config/server.properties

3.2.2 停止

想要停止 broker 既話,一定要 gracefully shutdown broker。
可以撳 Ctrl+C,佢會顯示 shut down completed

3.3 Topic

3.3.1 建立新 topic

我地會起 1 個 topic,而呢個 topic 會有 2 個 partitions。
我地可以喺個 ZooKeeper 度執行以下既 command:
kafka-topics.sh --bootstrap-server localhost:9991,localhost:9992 --create --topic new-topic-name --partitions 2

3.3.2 列出所有 topics

kafka-topics.sh --bootstrap-server localhost:9991,localhost:9992 --list kafka-topics.sh --bootstrap-server localhost:9991,localhost:9992 --describe

3.4 Kafka producer

Kafka 既 producer 係有 retry 機制,默認既 producer 配置下係會無限 retry 發送訊息,而必須注意既係,喺默認既 producer 配置下,1 個 connection 最多可以有 5 個等待發送既訊息。而如果有發送失敗既情況,例如發送第一個訊息出去果陣失敗左,但係發送排喺後面既訊息成功左既話,呢個情況下就會令最終條 MQ 裡面既訊息既次序同本來唔同。所以有啲配置係好重要,一定要設定。
config/producer.properties
max.in.flight.requests.per.connection=1
參考資料:

3.4.1 發送訊息

Kafka 既訊息係 key-value pair。
如果訊息需要順次序,有 2 個方法,視乎情況而用:
場景方法
所有訊息都要順次序。每個 topic 只有 1 個 partition,而每個 consumer group 亦只需要 1 個 consumer。
只需要相同業務 ID(例如用戶 ID)既訊息順次序。每個 topic 可以有多個 partitions,但係每個訊息都要提供業務 ID(例如用戶 ID)作為 key,用黎計算應該落入邊個 partition。因為 producer partitioner 係用 hashing algorithm 黎計算,所以相同既 key 一定會落入相同既 partition。
用下面既 command 就可以發送訊息:
kafka-console-producer.sh --bootstrap-server localhost:9991,localhost:9992 --topic new-topic-name --property parse.key=true --property key.separator="|"
之後輸入訊息內容,例如 1|foo2|bar,撳 Enter 掣可以立即發送訊息。想退出既話可以撳 Ctrl+C
我地可以喺 producer 度不停發送相同 key 既訊息,又夾雜下唔同 key 既訊息,睇下有冇出現同一個 key 既訊息畀每個 consumer group 裡面唔同既 consumers 接收既情況,但正常係唔會出現,因為 Kafka 保證相同 key 既訊息只會由每個 consumer group 裡面既同一個 consumer 接收,咁就可以做到 message ordering。
註:
  • key.separator 默認係 tab。
  • --property parse.key=true --property key.separator="|" 係用黎話畀 producer 知我地既訊息既 key、value 用 | 黎分隔。
    • 如果唔加上呢 2 個配置,咁訊息既 key 就會係 null,而 producer partitioner 會用 round-robin 既方式將訊息發送去某個 partition。
  • 如果唔用 parse.key 以及 key.separator,producer 發送既訊息就會冇 key,咁就有可能會因為 sticky partitioning strategy 而去左同一個 partition 度,所以最好都係有 key。

3.4.2 查詢所有訊息

kafka-console-consumer.sh --bootstrap-server localhost:9991,localhost:9992 --topic new-topic-name --from-beginning

3.5 Kafka consumer

我地會起 2 個 consumer groups,各有 2 個 consumers,總共 4 個 consumers。
每個 consumer group 都會收到一樣的訊息,而同一個 consumer group 裡面既每個 consumer 都會收到 Kafka assign 畀佢對應既 partition 既訊息。
ConsumerGroup ID
1group1
2group1
3group2
4group2

3.5.1 接收訊息

執行 4 次以下既 command:
  • Consumer 1--consumer-property group.id=group1
  • Consumer 2--consumer-property group.id=group1
  • Consumer 3--consumer-property group.id=group2
  • Consumer 4--consumer-property group.id=group2
kafka-console-consumer.sh --bootstrap-server localhost:9991,localhost:9992 --topic new-topic-name --consumer-property group.id=group1

3.5.2 列出 consumer groups 以及 offsets

kafka-consumer-groups.sh --bootstrap-server localhost:9991,localhost:9992 --describe --all-groups

4 測試

先複製好所有安裝檔既 folders,然後跟次序啟用以下既 components:
  1. ZooKeeper
  2. Broker 1
  3. Broker 2
  4. Producer 1
  5. Producer 2
  6. Consumer 1 (group 1)
  7. Consumer 2 (group 1)
  8. Consumer 3 (group 2)
  9. Consumer 4 (group 2)

4.1 相同數量既 partitions、consumers

將以下既 command prompt script 保存做 start-all.bat,然後執行:
1@ECHO OFF 2 3PUSHD zookeeper 4START bin\windows\zookeeper-server-start.bat config\zookeeper.properties 5POPD 6 7TIMEOUT /NOBREAK /T 8 8 9PUSHD broker-1 10START bin\windows\kafka-server-start.bat config\server.properties 11POPD 12 13PUSHD broker-2 14START bin\windows\kafka-server-start.bat config\server.properties 15POPD 16 17TIMEOUT /NOBREAK /T 8 18 19PUSHD zookeeper 20CALL bin\windows\kafka-topics.bat --bootstrap-server localhost:9991,localhost:9992 --create --topic new-topic-name --partitions 2 21CALL bin\windows\kafka-topics.bat --bootstrap-server localhost:9991,localhost:9992 --list 22CALL bin\windows\kafka-topics.bat --bootstrap-server localhost:9991,localhost:9992 --describe 23POPD 24 25START "Producer 1" producer-1\bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9991,localhost:9992 --topic new-topic-name --property parse.key=true --property key.separator="|" 26START "Producer 2" producer-2\bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9991,localhost:9992 --topic new-topic-name --property parse.key=true --property key.separator="|" 27 28START "Group 1 - Consumer 1" consumer-1\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9991,localhost:9992 --topic new-topic-name --consumer-property group.id=group1 29START "Group 1 - Consumer 2" consumer-2\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9991,localhost:9992 --topic new-topic-name --consumer-property group.id=group1 30START "Group 2 - Consumer 1" consumer-3\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9991,localhost:9992 --topic new-topic-name --consumer-property group.id=group2 31START "Group 2 - Consumer 2" consumer-4\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9991,localhost:9992 --topic new-topic-name --consumer-property group.id=group2 32 33TIMEOUT /NOBREAK /T 8 34 35PUSHD zookeeper 36bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9991,localhost:9992 --describe --all-groups 37POPD 38 39PAUSE
之後發送訊息:
  1. Producer 1 發送 1|message should go to partition A
  2. Producer 1 發送 2|message should go to partition B
  3. Producer 2 發送 1|message should go to partition A (producer 2)
  4. Producer 2 發送 2|message should go to partition B (producer 2)
結果:
  • 唔會有一個 consumer 又收到 partition A 既訊息,又收到 partition B 既訊息。
解釋:
  • 因為每個 consumer group 裡面既 consumer 數量同 partition 數量一樣,所以我地見到訊息 ID 係 1 既訊息只會去同一個 consumer,而訊息 ID 係 2 既訊息就會去另一個 consumer。

4.2 Partitions 多過 consumers(只維持一段時間)

關於呢個測試:
  • 承接上面既測試,我地可以停止某一個 consumer,令到佢既 consumer group 得返 1 個 consumer。
  • 因為 partitions 有 2 個,但係呢個 consumer group 只有 1 個 consumer,咁之後既所有訊息發送去呢個 consumer group 既時候都只會發送到呢 1 個 consumer 度。
  • 不過,過一排之後,當呢個 consumer group 回復返係 2 個 consumers,咁之後既所有訊息發送去呢個 consumer group 既時候又會再次平均分配到呢 2 個 consumers 度。
具體既測試步驟:
  1. 承接上面既測試,我地而家有 2 個 consumer groups,各有 2 個 consumers,總共有 4 個 consumers。
  2. 喺 consumer 3(group 2)既 command prompt window 度撳 Ctrl+C
  3. 喺任意既一個 producer 發送:1|message should go to partition A (testing single consumer in group)
  4. 喺任意既一個 producer 發送:2|message should go to partition B (testing single consumer in group)
  5. 檢查 consumer 4(group 2)既 command prompt window,應該會見到佢收到曬 2 個訊息。
  6. 喺 consumer 3(group 2)既 command prompt window 度執行返啟動既 command。
  7. 喺任意既一個 producer 發送:1|message should go to partition A (testing two consumers in group again)
  8. 喺任意既一個 producer 發送:2|message should go to partition B (testing two consumers in group again)
  9. 檢查 consumer 34(group 2)既 command prompt windows,應該會見到佢地各自都收到 1 個訊息。
  10. 檢查 consumer 12(group 1)既 command prompt windows,應該會見到佢地各自都收到 2 個訊息。
    • 收開 partition A 訊息既 consumer 會收到呢個測試新增既 2partition A 訊息。
    • 收開 partition B 訊息既 consumer 會收到呢個測試新增既 2partition B 訊息。
註:
  • 就算 consumer 3 回復運作,佢都唔一定會收返同最開始相同既 partition 既訊息,而係可能會同 consumer 4 交換左。
    • 不過唔需要擔心,因為相同 partition 既訊息都仲係保持到 message ordering。

5 參考資料