Table of contents
1 Kafka 簡介
Kafka 係一個 event broker/event-streaming broker,除左好似 message brokers(MQ)咁喺 microservices 之間提供左 asynchronous communication 既可能性之外,隨住 event-driven microservice architecture 既興起,events 仲可以作為一個機構既數據既 single source of truth。
Kafka 既設計應用左 2
種 messaging 模式:
模式 | 描述(以 Kafka 術語) |
---|
Publish-subscribe | 每個訊息都會 broadcast 去所有 consumer groups。 |
Message queue | 每個訊息喺每個 consumer group 都只會有 1 個 consumer 接收到(用 partitions 實現 competing consumers)。 |
Kafka 強大之處:
- 高性能、高吞吐量。
- 支援 broker horizontal scaling。
- 支援 producer、consumer horizontal scaling。
- 一個 topic 可以分成多個 partitions。
- Scale up 同一個 consumer microservice 既 instances 去組成一個 consumer group。
- 同一個 consumer group 裡面既各個 consumer 都會被分配同一個 topic 既
N
個 partitions。
- 支援 consumer 重新啟動之後可以即時接收返斷線果陣 producer 所產生但未被接收既訊息。
- 所有訊息都會被保存,如果將來諗到有新既業務用途,可以由一開始既 offset 讀起。
註:
- Horizontal scaling 即係我地可以 deploy 一模一樣既 microservice/app 而得到多個 running instances。
- 咁做既目的:
- 避免有啲 instances 行行下 crash 左,影響業務。
- 透過多個 instances 做 parallel processing 黎增加 throughput。
- 呢啲 instances 之間唔需要知道其他 instances 既存在,亦唔需要有任何溝通。
1.1 Offset 概念
Kafka 係 immutable、append-only 既 message/event store,佢係利用 offset values 黎紀錄 consumer groups 既訊息接收進度。
- Kafka 既訊息寫入左落 Kafka broker 之後就唔可以更改或者刪除。
- Kafka 既 offsets 係 per partition + consumer group。
- 即係每個 consumer group 喺每個 partition 都可以有一個 offset value。
- 個 offset 係用黎紀錄低個 consumer group(regardless of 邊個 consumer instance)已經讀到個 partition 既邊一個訊息。
- 一般情況下,consumer group 下次就會由 offset 開始讀起(由第一個未讀既訊息開始讀起)。
- 更新 offsets 係 consumers 既責任,讀完訊息就應該更新 offsets。
2 安裝 Kafka
2.1 本地(免安裝)
⚠️ 建議將個 .tgz
裡面既 binary 檔案解壓縮去 C:/kafka
,因為如果條 path 太長,行 command 果陣可能會有 error。
2.2 Docker image
我地可以用 Docker Desktop,然後利用 RabbitMQ 既官方 Docker image 執行 Docker container。
3 Commands
Windows 版既 CLI 喺 bin/windows
folder,啲 scripts 既檔案副檔名係 .bat
。
如果想模擬到有多部機器同時運作,即係 horizontal scaling 既效果,穩陣啲可以用 Docker containers(保證獨立運行),而簡單啲既就可以複製成個 Kafka binaries 既 folder(不過其實複製啲 properties 檔已經足夠,因為啲數據檔案默認係儲存喺 /tmp
(Windows 版就係 C:/tmp
),而唔係儲存喺 Kafka 安裝檔既 folders 裡面)。
3.1 ZooKeeper
⚠️ 如果用 KRaft mode 啟動 Kafka 就唔需要 ZooKeeper。
3.1.1 啟動
zookeeper-server-start.sh config/zookeeper.properties
3.2 Kafka broker
想要模擬多部 broker 機器同時運作,就需要修改每個 broker 既 Kafka binaries 裡面既 config/server.properties
:
1# 唔同既 broker 要 assign 唔同既 ID
2broker.id=9
3
4# 唔同既 broker 要 assign 唔同既 port
5listeners=PLAINTEXT://:9999
6
7# 唔同既 broker 要 assign 唔同既 log directory
8log.dirs=/tmp/kafka-logs-9
我地會起 2
個 brokers:
Broker | broker.id | listeners 既 port | log.dirs |
---|
1 | 111 | 9991 | /tmp/kafka-logs-1 |
2 | 222 | 9992 | /tmp/kafka-logs-2 |
註:
- Windows 既話都可以用上面既 log dir,啲 Kafka 數據會去左
C:/tmp
。
- 因為有啲
--describe
既 commands 會顯示數字(例如列出所有 topics 會顯示 leaders、replicas、in-sync replicas),果啲數字其實係 broker ID。
- 所以為左容易啲分辨,broker ID 可以用長啲既數字(只可以用純數字)。
3.2.1 啟動
kafka-server-start.sh config/server.properties
3.2.2 停止
想要停止 broker 既話,一定要 gracefully shutdown broker。
可以撳 Ctrl+C
,佢會顯示 shut down completed
。
3.3 Topic
3.3.1 建立新 topic
我地會起 1
個 topic,而呢個 topic 會有 2
個 partitions。
我地可以喺個 ZooKeeper 度執行以下既 command:
kafka-topics.sh --bootstrap-server localhost:9991,localhost:9992 --create --topic new-topic-name --partitions 2
3.3.2 列出所有 topics
kafka-topics.sh --bootstrap-server localhost:9991,localhost:9992 --list
kafka-topics.sh --bootstrap-server localhost:9991,localhost:9992 --describe
3.4 Kafka producer
Kafka 既 producer 係有 retry 機制,默認既 producer 配置下係會無限 retry 發送訊息,而必須注意既係,喺默認既 producer 配置下,1
個 connection 最多可以有 5
個等待發送既訊息。而如果有發送失敗既情況,例如發送第一個訊息出去果陣失敗左,但係發送排喺後面既訊息成功左既話,呢個情況下就會令最終條 MQ 裡面既訊息既次序同本來唔同。所以有啲配置係好重要,一定要設定。
config/producer.properties
:
max.in.flight.requests.per.connection=1
參考資料:
3.4.1 發送訊息
Kafka 既訊息係 key-value pair。
如果訊息需要順次序,有 2
個方法,視乎情況而用:
場景 | 方法 |
---|
所有訊息都要順次序。 | 每個 topic 只有 1 個 partition,而每個 consumer group 亦只需要 1 個 consumer。 |
只需要相同業務 ID(例如用戶 ID)既訊息順次序。 | 每個 topic 可以有多個 partitions,但係每個訊息都要提供業務 ID(例如用戶 ID)作為 key,用黎計算應該落入邊個 partition。因為 producer partitioner 係用 hashing algorithm 黎計算,所以相同既 key 一定會落入相同既 partition。 |
用下面既 command 就可以發送訊息:
kafka-console-producer.sh --bootstrap-server localhost:9991,localhost:9992 --topic new-topic-name --property parse.key=true --property key.separator="|"
之後輸入訊息內容,例如 1|foo
、2|bar
,撳 Enter
掣可以立即發送訊息。想退出既話可以撳 Ctrl+C
。
我地可以喺 producer 度不停發送相同 key 既訊息,又夾雜下唔同 key 既訊息,睇下有冇出現同一個 key 既訊息畀每個 consumer group 裡面唔同既 consumers 接收既情況,但正常係唔會出現,因為 Kafka 保證相同 key 既訊息只會由每個 consumer group 裡面既同一個 consumer 接收,咁就可以做到 message ordering。
註:
key.separator
默認係 tab。
--property parse.key=true --property key.separator="|"
係用黎話畀 producer 知我地既訊息既 key、value 用 |
黎分隔。
- 如果唔加上呢
2
個配置,咁訊息既 key 就會係 null
,而 producer partitioner 會用 round-robin 既方式將訊息發送去某個 partition。
- 如果唔用
parse.key
以及 key.separator
,producer 發送既訊息就會冇 key,咁就有可能會因為 sticky partitioning strategy 而去左同一個 partition 度,所以最好都係有 key。
3.4.2 查詢所有訊息
kafka-console-consumer.sh --bootstrap-server localhost:9991,localhost:9992 --topic new-topic-name --from-beginning
3.5 Kafka consumer
我地會起 2
個 consumer groups,各有 2
個 consumers,總共 4
個 consumers。
每個 consumer group 都會收到一樣的訊息,而同一個 consumer group 裡面既每個 consumer 都會收到 Kafka assign 畀佢對應既 partition 既訊息。
Consumer | Group ID |
---|
1 | group1 |
2 | group1 |
3 | group2 |
4 | group2 |
3.5.1 接收訊息
執行 4
次以下既 command:
- Consumer
1
用 --consumer-property group.id=group1
。
- Consumer
2
用 --consumer-property group.id=group1
。
- Consumer
3
用 --consumer-property group.id=group2
。
- Consumer
4
用 --consumer-property group.id=group2
。
kafka-console-consumer.sh --bootstrap-server localhost:9991,localhost:9992 --topic new-topic-name --consumer-property group.id=group1
3.5.2 列出 consumer groups 以及 offsets
kafka-consumer-groups.sh --bootstrap-server localhost:9991,localhost:9992 --describe --all-groups
4 測試
先複製好所有安裝檔既 folders,然後跟次序啟用以下既 components:
- ZooKeeper
- Broker 1
- Broker 2
- Producer 1
- Producer 2
- Consumer 1 (group 1)
- Consumer 2 (group 1)
- Consumer 3 (group 2)
- Consumer 4 (group 2)
4.1 相同數量既 partitions、consumers
將以下既 command prompt script 保存做 start-all.bat
,然後執行:
1@ECHO OFF
2
3PUSHD zookeeper
4START bin\windows\zookeeper-server-start.bat config\zookeeper.properties
5POPD
6
7TIMEOUT /NOBREAK /T 8
8
9PUSHD broker-1
10START bin\windows\kafka-server-start.bat config\server.properties
11POPD
12
13PUSHD broker-2
14START bin\windows\kafka-server-start.bat config\server.properties
15POPD
16
17TIMEOUT /NOBREAK /T 8
18
19PUSHD zookeeper
20CALL bin\windows\kafka-topics.bat --bootstrap-server localhost:9991,localhost:9992 --create --topic new-topic-name --partitions 2
21CALL bin\windows\kafka-topics.bat --bootstrap-server localhost:9991,localhost:9992 --list
22CALL bin\windows\kafka-topics.bat --bootstrap-server localhost:9991,localhost:9992 --describe
23POPD
24
25START "Producer 1" producer-1\bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9991,localhost:9992 --topic new-topic-name --property parse.key=true --property key.separator="|"
26START "Producer 2" producer-2\bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9991,localhost:9992 --topic new-topic-name --property parse.key=true --property key.separator="|"
27
28START "Group 1 - Consumer 1" consumer-1\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9991,localhost:9992 --topic new-topic-name --consumer-property group.id=group1
29START "Group 1 - Consumer 2" consumer-2\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9991,localhost:9992 --topic new-topic-name --consumer-property group.id=group1
30START "Group 2 - Consumer 1" consumer-3\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9991,localhost:9992 --topic new-topic-name --consumer-property group.id=group2
31START "Group 2 - Consumer 2" consumer-4\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9991,localhost:9992 --topic new-topic-name --consumer-property group.id=group2
32
33TIMEOUT /NOBREAK /T 8
34
35PUSHD zookeeper
36bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9991,localhost:9992 --describe --all-groups
37POPD
38
39PAUSE
之後發送訊息:
- Producer 1 發送
1|message should go to partition A
。
- Producer 1 發送
2|message should go to partition B
。
- Producer 2 發送
1|message should go to partition A (producer 2)
。
- Producer 2 發送
2|message should go to partition B (producer 2)
。
結果:
- 唔會有一個 consumer 又收到
partition A
既訊息,又收到 partition B
既訊息。
解釋:
- 因為每個 consumer group 裡面既 consumer 數量同 partition 數量一樣,所以我地見到訊息 ID 係
1
既訊息只會去同一個 consumer,而訊息 ID 係 2
既訊息就會去另一個 consumer。
4.2 Partitions 多過 consumers(只維持一段時間)
關於呢個測試:
- 承接上面既測試,我地可以停止某一個 consumer,令到佢既 consumer group 得返
1
個 consumer。
- 因為 partitions 有
2
個,但係呢個 consumer group 只有 1
個 consumer,咁之後既所有訊息發送去呢個 consumer group 既時候都只會發送到呢 1
個 consumer 度。
- 不過,過一排之後,當呢個 consumer group 回復返係
2
個 consumers,咁之後既所有訊息發送去呢個 consumer group 既時候又會再次平均分配到呢 2
個 consumers 度。
具體既測試步驟:
- 承接上面既測試,我地而家有
2
個 consumer groups,各有 2
個 consumers,總共有 4
個 consumers。
- 喺 consumer
3
(group 2)既 command prompt window 度撳 Ctrl+C
。
- 喺任意既一個 producer 發送:
1|message should go to partition A (testing single consumer in group)
。
- 喺任意既一個 producer 發送:
2|message should go to partition B (testing single consumer in group)
。
- 檢查 consumer
4
(group 2)既 command prompt window,應該會見到佢收到曬 2
個訊息。
- 喺 consumer
3
(group 2)既 command prompt window 度執行返啟動既 command。
- 喺任意既一個 producer 發送:
1|message should go to partition A (testing two consumers in group again)
。
- 喺任意既一個 producer 發送:
2|message should go to partition B (testing two consumers in group again)
。
- 檢查 consumer
3
、4
(group 2)既 command prompt windows,應該會見到佢地各自都收到 1
個訊息。
- 檢查 consumer
1
、2
(group 1)既 command prompt windows,應該會見到佢地各自都收到 2
個訊息。
- 收開
partition A
訊息既 consumer 會收到呢個測試新增既 2
個 partition A
訊息。
- 收開
partition B
訊息既 consumer 會收到呢個測試新增既 2
個 partition B
訊息。
註:
- 就算 consumer
3
回復運作,佢都唔一定會收返同最開始相同既 partition 既訊息,而係可能會同 consumer 4
交換左。
- 不過唔需要擔心,因為相同 partition 既訊息都仲係保持到 message ordering。
5 參考資料