Table of contents
1 Kafka 簡介
1.1 Dead lettering
Kafka 既 dead letter:
- Kafka 本身係冇針對 dead letter 既 native support,唔似 RabbitMQ 咁會根據 DLX args 自動搬訊息。
- 同 RabbitMQ 一樣,都係由 Spring Cloud Stream 負責做 dead letter 處理,亦即係靠 application side 而唔係 broker side 既方式實現。
- 要喺 Kafka 實現 dead letter 功能,就要用到一個新既 topic。
Spring Cloud Stream Kafka Binder 既 dead letter 流程:
- Spring Cloud Stream 會執行我地定義既
Consumer
function 去處理訊息。
- 我地定義既
Consumer
function 會 throw exception。
- Spring Cloud Stream 見到 exception 就會 retry 我地既
Consumer
function,直至總處理次數(第一次 + retries)達到 maxAttempts
既上限。
- 如果全部 retries 都失敗,最終 Spring Cloud Stream 會將呢個訊息連同 stacktrace 作為一個新訊息發送去 dead letter topic,但因為 Kafka 既訊息都係 immutable,所以原來既訊息會依然存在喺佢本身個 topic 原本既 partition 裡面。
2 動手寫
Project structure:
src/main
/java
/code
/config
MyConsumer.java
MyProducer.java
/controller
/resources
application.yml
application-consumer.yml
application-producer.yml
2.1 Maven dependencies
1<parent>
2 <groupId>org.springframework.boot</groupId>
3 <artifactId>spring-boot-starter-parent</artifactId>
4 <version>3.4.2</version>
5</parent>
6
7<dependencyManagement>
8 <dependencies>
9 <dependency>
10 <groupId>org.springframework.cloud</groupId>
11 <artifactId>spring-cloud-dependencies</artifactId>
12 <version>2024.0.0</version>
13 <type>pom</type>
14 <scope>import</scope>
15 </dependency>
16 </dependencies>
17</dependencyManagement>
18
19<dependencies>
20 <dependency>
21 <groupId>org.springframework.boot</groupId>
22 <artifactId>spring-boot-starter-web</artifactId>
23 </dependency>
24
25 <dependency>
26 <groupId>org.springframework.cloud</groupId>
27 <artifactId>spring-cloud-stream-binder-kafka</artifactId>
28 </dependency>
29
30 <dependency>
31 <groupId>org.apache.commons</groupId>
32 <artifactId>commons-lang3</artifactId>
33 </dependency>
34 <dependency>
35 <groupId>commons-io</groupId>
36 <artifactId>commons-io</artifactId>
37 <version>2.18.0</version>
38 </dependency>
39
40 <dependency>
41 <groupId>org.projectlombok</groupId>
42 <artifactId>lombok</artifactId>
43 <scope>provided</scope>
44 </dependency>
45</dependencies>
2.2 Java code
- Kafka 會根據個 key 既 hash 計算,達到接近完全平均分配啲訊息去個 topic 既 partitions 既效果。
- 我地會用
1
至 10
既 keys 去測試 Kafka 既 partition 功能。
MyProducer.java
:
1@Slf4j
2@Configuration
3public class MyProducer {
4
5 int key = 0;
6
7 @Bean
8 public Supplier<Message<String>> demoProduce() {
9 return () -> {
10 final String dateTime = LocalDateTime.now(ZoneId.of("Hongkong"))
11 .format(ofPattern("uuuu-MM-dd HH:mm:ss"));
12
13 key = key % 10 + 1;
14
15 final String msg =
16 format("key(%s) - Test msg - %s (%s)",
17 key, RandomStringUtils.secure().nextAlphanumeric(8), dateTime);
18
19 log.info("Sending: {}", msg);
20
21 return MessageBuilder.withPayload(msg)
22 .setHeader(KafkaHeaders.KEY, String.valueOf(key).getBytes())
23 .build();
24 };
25 }
26}
MyConsumer.java
:
1@Slf4j
2@Configuration
3public class MyConsumer {
4
5 @Value("${server.port}")
6 Long serverPort;
7
8 @Bean
9 public Consumer<String> demoConsume() {
10 return msg -> {
11 log.info("Received message: {}", msg);
12
13 try {
14 FileUtils.writeStringToFile(
15 new File("consumer-" + serverPort + ".log"), msg + "\n", StandardCharsets.UTF_8, true);
16 } catch (Exception e) {
17 log.error("Failed to write file.", e);
18 }
19 };
20 }
21}
DemoController.java
:
1@Slf4j
2@RestController
3public class DemoController {
4
5 @Autowired
6 StreamBridge streamBridge;
7
8 @PostMapping("send")
9 public void send(@RequestParam(name = "msg") String msg) {
10 streamBridge.send("demoProduce-out-0", msg);
11 log.info("Sent: {}", msg);
12 }
13}
2.3 Application 配置
application.yml
:
1spring:
2 kafka:
3 bootstrap-servers: localhost:9092
4
5---
6
7spring:
8 cloud:
9 stream:
10 default-binder: kafka # 默認既 binder,令每個 binding 都用呢個 binder 配置
11
12 bindings:
13 demoProduce-out-0:
14 destination: demo-topic
15# binder: kafka
16 producer:
17 partitionCount: 2
18 demoConsume-in-0:
19 destination: demo-topic
20 group: demo-group
21# binder: kafka
22 consumer:
23 maxAttempts: 5 # Application side 既 retry 次數(包括第一次),默認係 3
24
25 kafka:
26 bindings:
27 demoConsume-in-0:
28 consumer:
29 startOffset: earliest # 控制由頭定尾讀起
30 resetOffsets: false # 係咪容許現有既 consumer group 由頭或者尾讀起
31 enableDlq: true
32 dlqName: demo-topic.dlq
33 binder:
34 autoCreateTopics: true
35 autoAddPartitions: true
application-consumer.yml
:
spring:
cloud:
function:
definition: demoConsume
application-producer.yml
:
spring:
cloud:
function:
definition: demoProduce # 注意:demoProduce 會每秒發送訊息
3 測試
3.1 自動創建 topic
用 producer
profile 運行呢個 microservice(port 8080
):
java -jar app.jar --server.port=8080 --spring.profiles.active=producer
啟動左個 producer microservice 之後,執行以下 command 查詢 topics:
kafka-topics.sh --bootstrap-server localhost:9092 --describe
可以見到多左一個叫 demo-topic
既 topic,因為我地 Spring Cloud Stream binding 既 partitionCount
配置,所以裡面有 2
個 partitions:
Topic: demo-topic TopicId: gR0PRImvRoG69zs3cm7KyA PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: demo-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:
Topic: demo-topic Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:
3.2 自動發送訊息
Producer microservice 用左 spring.cloud.function.definition
註冊既 Supplier
functions 會自動每秒執行,即係每秒都會有一個新訊息發送到 Kafka。
以下係佢既 log:
110:52:30.005 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(1) - Test msg - XLxZYlIT (2025-02-03 10:52:30)
210:52:31.021 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(2) - Test msg - wvqL9sf8 (2025-02-03 10:52:31)
310:52:32.036 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(3) - Test msg - Noeo6OhO (2025-02-03 10:52:32)
410:52:33.048 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(4) - Test msg - vdpdWCCq (2025-02-03 10:52:33)
510:52:34.058 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(5) - Test msg - whKgJaMQ (2025-02-03 10:52:34)
610:52:35.064 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(6) - Test msg - Wd93y0eN (2025-02-03 10:52:35)
710:52:36.093 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(7) - Test msg - v1c8tq11 (2025-02-03 10:52:36)
810:52:37.102 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(8) - Test msg - UnAhyGjX (2025-02-03 10:52:37)
910:52:38.139 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(9) - Test msg - TDNZsJ87 (2025-02-03 10:52:38)
1010:52:39.154 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(10) - Test msg - OtLwkeLr (2025-02-03 10:52:39)
3.3 接收訊息
3.3.1 只有一個 consumer
用 consumer
profile 運行呢個 microservice(port 8081
):
java -jar app.jar --server.port=8081 --spring.profiles.active=consumer
啟動左個 consumer microservice 之後,
- 我地會發現 Kafka 裡面多左一個叫
__consumer_offsets
既 topic,裡面有 50
個 partitions。
- 我地可以喺佢既 log 度睇到接收訊息既情況,會見到佢可以接收到 keys 係
1
至 10
既訊息。
- 因為喺 consumer 啟動之前已經有啲訊息喺 Kafka 度未被 consume,所以當 consumer 啟動左,佢就會從 Kafka 所紀錄既 offset(喺而家呢個 case 黎講,即係最開始)開始接收訊息。以下係個 log 既節錄:
110:54:10.692 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(1) - Test msg - XLxZYlIT (2025-02-03 10:52:30)
210:54:10.693 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(3) - Test msg - Noeo6OhO (2025-02-03 10:52:32)
310:54:10.694 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(4) - Test msg - vdpdWCCq (2025-02-03 10:52:33)
410:54:10.695 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(7) - Test msg - v1c8tq11 (2025-02-03 10:52:36)
510:54:10.696 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(8) - Test msg - UnAhyGjX (2025-02-03 10:52:37)
610:54:10.697 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(9) - Test msg - TDNZsJ87 (2025-02-03 10:52:38)
7
8...
9
1010:54:10.749 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(2) - Test msg - wvqL9sf8 (2025-02-03 10:52:31)
1110:54:10.750 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(5) - Test msg - whKgJaMQ (2025-02-03 10:52:34)
1210:54:10.751 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(6) - Test msg - Wd93y0eN (2025-02-03 10:52:35)
1310:54:10.752 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(10) - Test msg - OtLwkeLr (2025-02-03 10:52:39)
解釋:
- 雖然以上既 log 按時間排序,但我地可以見到呢個 consumer 接收訊息既次序同 producer 發送訊息既次序唔同。
- 咁係因為我地整左
2
個 partitions,而 producer 發送既所有訊息都會自動根據 key 既 hash 而自動分配去呢 2
個 partitions 度。
- Kafka 只會保證每一個 partition 裡面既訊息接收次序同發送訊息既時候一樣。
- 要避免發送訊息既時候如果出現 error 而影響到 broker 裡面儲存既訊息次序,最好將
max.in.flight.requests.per.connection
配置做 1
。
Partition | Consumer 接收訊息既次序 |
---|
A | key(1) ➡️ key(3) ➡️ key(4) ➡️ key(7) ➡️ key(8) ➡️ key(9) |
B | key(2) ➡️ key(5) ➡️ key(6) ➡️ key(10) |
直到呢個喺 consumer group 裡面唯一既 consumer 接收曬所有訊息之後,因為 Spring Cloud Stream 既 Supplier
係每秒發送一個訊息,所以我地會見到 consumer 每秒都收到訊息,而且係順序由 1
至 10
。
3.3.2 一個 consumer group,兩個 consumers
運行多一個 consumer
profile 既 instance(port 8082
):
java -jar app.jar --server.port=8082 --spring.profiles.active=consumer
以下係 consumer instance 1(port 8081
)既 log,可以見到佢接收左 keys 係 1
、3
、4
、7
、8
、9
既訊息:
121:45:40.670 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(1) - Test msg - tjqmaqf4 (2025-02-02 21:45:40)
221:45:42.677 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(3) - Test msg - MjQ9H9nY (2025-02-02 21:45:42)
321:45:43.689 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(4) - Test msg - 4MLd6FV3 (2025-02-02 21:45:43)
421:45:46.756 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(7) - Test msg - TwywpJtF (2025-02-02 21:45:46)
521:45:47.730 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(8) - Test msg - dShVwokA (2025-02-02 21:45:47)
621:45:48.735 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(9) - Test msg - acu28o6v (2025-02-02 21:45:48)
以下係 consumer instance 2(port 8082
)既 log,可以見到佢接收左 keys 係 2
、5
、6
、10
既訊息:
21:45:41.665 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(2) - Test msg - MWCgFZJT (2025-02-02 21:45:41)
21:45:44.709 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(5) - Test msg - hHyYxzW8 (2025-02-02 21:45:44)
21:45:45.709 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(6) - Test msg - yuWXNCF3 (2025-02-02 21:45:45)
21:45:49.747 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(10) - Test msg - KEOHIXvQ (2025-02-02 21:45:49)
解釋:
- Kafka 將呢
2
個 partitions 分配到 consumer group demo-group
裡面既 2
個 consumer instances,形成一對一既 mapping。
- 呢
2
個 consumer instances 所接收到既訊息收及接收訊息既次序都同之前所見既一樣。
- 咁係因為 Kafka 默認既 key hashing algorithm 唔帶隨機性。
- 呢個就係 Kafka 利用 partitions 實現每一個 consumer group 裡面既 competing consumers 效果。
- 一個 partition 既每一個訊息喺每一個 consumer group 裡面只可以有一個 consumer 接收到。
3.3.3 兩個 consumer groups
運行多一個 consumer
profile 既 instance(port 8083
),不過今次將 consumer group 改成 demo-group-2
:
java -jar app.jar --server.port=8083 --spring.profiles.active=consumer --spring.cloud.stream.bindings.demoConsume-in-0.group=demo-group-2
之後亦可以運行多一個屬於 consumer group demo-group-2
既 instance(port 8084
):
java -jar app.jar --server.port=8084 --spring.profiles.active=consumer --spring.cloud.stream.bindings.demoConsume-in-0.group=demo-group-2
啟動左再等一陣之後,我地會發現:
- 當我地第一次用新既 consumer group
demo-group-2
黎接收訊息,佢既 consumers 會收到個 topic 從創建到而家既所有訊息,亦即係之前既 consumer group demo-group
既 consumers 接收過既所有訊息。
- 同之前一樣,Kafka 會自動分配個 topic 既 partitions 畀唔同既 consumer instances 去接收訊息。
- 當
demo-group-2
只有 1
個 consumer,無論個 topic 有幾多個 partitions,呢個 consumer 都會收到個 topic 既所有 partitions 既訊息。
- 當
demo-group-2
有 2
個 consumers,而個 topic 有 2
個 partitions,咁佢地就會係一對一既 mapping,每個 consumer 就會收到個 topic 其中一個 partition 既訊息。
- Consumer groups
demo-group
以及 demo-group-2
都會繼續接收到一樣既訊息(regardless of 邊一個 consumer instance 接收到)。
3.3.4 Dead letter 功能
要測試 dead letter 功能,首先我地要喺 microservice 既 Consumer
function 裡面 hardcode 一個 logic,如果個訊息內容係 dlq-test
或者 key(5)
開頭,就會 throw exception。
修改 MyConsumer.java
:
1@Slf4j
2@Configuration
3public class MyConsumer {
4
5 @Bean
6 public Consumer<String> demoConsume() {
7 return msg -> {
8 log.info("Received message: {}", msg);
9
10 // 用 StreamBridge 以及自動發送既方式測試 dead letter
11 if (msg.startsWith("dlq-test") || msg.startsWith("key(5)")) {
12 throw new RuntimeException("Exception simulation for message: " + msg);
13 }
14 };
15 }
16}
用一個新既 Kafka broker instance 測試,之後運行一個 producer 以及一個 consumer。
利用我地寫既 REST HTTP API 發送訊息:
curl localhost:8080/send?msg=dlq-test-1
期望測試結果:
- 運行 producer 之後,producer 會自動創建 topic
demo-topic
,裡面有 2
個 partitions。
- 運行 consumer 之後,consumer 會自動創建 topics。
- Topic
__consumer_offsets
,裡面有 50
個 partitions。
- Topic
demo-topic.dlq
,裡面有 2
個 partitions,數量係同 demo-topic
既一樣。
- Producer(port
8080
)會發送訊息 dlq-test-1
。
- 個 microservice 會接收到所有訊息,而當佢處理
dlq-test
或者 key(5)
開頭既訊息就會一直失敗,而 log 會顯示佢喺大約 15
秒內接收左個訊息總共 5
次,喺第 5
次既時候就會 log exception。
- Retry 既默認 backoff 係
1
秒,而每個下一次既 retry 會用 2
倍既 backoff multiplier。
- 我地配置左
maxAttempts
係 5
,唔計第一次,就有 4
次 retries,總共既時間就係 1
+ 2
+ 4
+ 8
= 15
秒。
- Throw 左 exception 之後,就會見到 dead letter topic 裡面多左呢個訊息。
- 我地會見到個 microservice 只係用單一 thread 去接收黎自同一個 topic 既訊息。
- Thread 名:
KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1
。
- 喺其他時間,個 microservice 會接收到 producer 每秒自動發送既訊息。
我地可以用呢個 command 查詢一個 topic 既所有訊息,首先我地查詢 topic demo-topic
:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo-topic --from-beginning
之後查詢 dead letter topic demo-topic.dlq
:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo-topic.dlq --from-beginning
透過呢個方法,我地會見到:
- Topics
demo-topic
以及 demo-topic.dlq
都有 key(5)
以及 dlq-test-1
既訊息。
- 即使 consumer 接收訊息既時候 throw 左 exception,Kafka 既訊息都唔會從 topic 消失。
3.3.5 Reset offset 功能
Consumer group | startOffset 配置 | resetOffsets 配置 | 測試結果 |
---|
Existing | earliest 或者 latest | false | Consumer 會從 current offset 開始讀起。 |
Existing | earliest | true | Consumer 會無視 current offset,從最開頭開始讀起,喺接收曬所有訊息之後就會更新 current offset。 |
Existing | latest | true | Consumer 會無視 current offset,從最尾(log end offset)開始讀起。當有新訊息出現,喺接收左之後就會更新 current offset。 |
New | earliest | true 或者 false | 喺創建 consumer group 既時候,唔會設定 current offset,而 consumer 會從最開頭開始讀起,喺接收曬所有訊息之後就會更新 current offset。 |
New | latest | true 或者 false | 喺創建 consumer group 既時候,會設定 current offset 做 log end offset 既 value,而 consumer 會從最尾(log end offset)開始讀起。 |
4 參考資料