➜ Old React website
Chung Cheuk Hang MichaelJava Web Developer
Helm chart 入門用 Next.js 建立 React 網站

Spring 項目中訂閱 Kafka

Continued from previous post:
透過 CLI 學習 Kafka

Table of contents

1 Kafka 簡介

1.1 Dead lettering

Kafka 既 dead letter:
  • Kafka 本身係冇針對 dead letter 既 native support,唔似 RabbitMQ 咁會根據 DLX args 自動搬訊息。
  • 同 RabbitMQ 一樣,都係由 Spring Cloud Stream 負責做 dead letter 處理,亦即係靠 application side 而唔係 broker side 既方式實現。
  • 要喺 Kafka 實現 dead letter 功能,就要用到一個新既 topic。
Spring Cloud Stream Kafka Binder 既 dead letter 流程:
  1. Spring Cloud Stream 會執行我地定義既 Consumer function 去處理訊息。
  2. 我地定義既 Consumer function 會 throw exception。
  3. Spring Cloud Stream 見到 exception 就會 retry 我地既 Consumer function,直至總處理次數(第一次 + retries)達到 maxAttempts 既上限。
  4. 如果全部 retries 都失敗,最終 Spring Cloud Stream 會將呢個訊息連同 stacktrace 作為一個新訊息發送去 dead letter topic,但因為 Kafka 既訊息都係 immutable,所以原來既訊息會依然存在喺佢本身個 topic 原本既 partition 裡面。

2 動手寫

Project structure:
  • src/main
    • /java
      • /code
        • /config
          • MyConsumer.java
          • MyProducer.java
        • /controller
          • DemoController.java
    • /resources
      • application.yml
      • application-consumer.yml
      • application-producer.yml

2.1 Maven dependencies

1<parent> 2 <groupId>org.springframework.boot</groupId> 3 <artifactId>spring-boot-starter-parent</artifactId> 4 <version>3.4.2</version> 5</parent> 6 7<dependencyManagement> 8 <dependencies> 9 <dependency> 10 <groupId>org.springframework.cloud</groupId> 11 <artifactId>spring-cloud-dependencies</artifactId> 12 <version>2024.0.0</version> 13 <type>pom</type> 14 <scope>import</scope> 15 </dependency> 16 </dependencies> 17</dependencyManagement> 18 19<dependencies> 20 <dependency> 21 <groupId>org.springframework.boot</groupId> 22 <artifactId>spring-boot-starter-web</artifactId> 23 </dependency> 24 25 <dependency> 26 <groupId>org.springframework.cloud</groupId> 27 <artifactId>spring-cloud-stream-binder-kafka</artifactId> 28 </dependency> 29 30 <dependency> 31 <groupId>org.apache.commons</groupId> 32 <artifactId>commons-lang3</artifactId> 33 </dependency> 34 <dependency> 35 <groupId>commons-io</groupId> 36 <artifactId>commons-io</artifactId> 37 <version>2.18.0</version> 38 </dependency> 39 40 <dependency> 41 <groupId>org.projectlombok</groupId> 42 <artifactId>lombok</artifactId> 43 <scope>provided</scope> 44 </dependency> 45</dependencies>

2.2 Java code

  • Kafka 會根據個 key 既 hash 計算,達到接近完全平均分配啲訊息去個 topic 既 partitions 既效果。
  • 我地會用 110 既 keys 去測試 Kafka 既 partition 功能。
MyProducer.java
1@Slf4j 2@Configuration 3public class MyProducer { 4 5 int key = 0; 6 7 @Bean 8 public Supplier<Message<String>> demoProduce() { 9 return () -> { 10 final String dateTime = LocalDateTime.now(ZoneId.of("Hongkong")) 11 .format(ofPattern("uuuu-MM-dd HH:mm:ss")); 12 13 key = key % 10 + 1; 14 15 final String msg = 16 format("key(%s) - Test msg - %s (%s)", 17 key, RandomStringUtils.secure().nextAlphanumeric(8), dateTime); 18 19 log.info("Sending: {}", msg); 20 21 return MessageBuilder.withPayload(msg) 22 .setHeader(KafkaHeaders.KEY, String.valueOf(key).getBytes()) 23 .build(); 24 }; 25 } 26}
MyConsumer.java
1@Slf4j 2@Configuration 3public class MyConsumer { 4 5 @Value("${server.port}") 6 Long serverPort; 7 8 @Bean 9 public Consumer<String> demoConsume() { 10 return msg -> { 11 log.info("Received message: {}", msg); 12 13 try { 14 FileUtils.writeStringToFile( 15 new File("consumer-" + serverPort + ".log"), msg + "\n", StandardCharsets.UTF_8, true); 16 } catch (Exception e) { 17 log.error("Failed to write file.", e); 18 } 19 }; 20 } 21}
DemoController.java
1@Slf4j 2@RestController 3public class DemoController { 4 5 @Autowired 6 StreamBridge streamBridge; 7 8 @PostMapping("send") 9 public void send(@RequestParam(name = "msg") String msg) { 10 streamBridge.send("demoProduce-out-0", msg); 11 log.info("Sent: {}", msg); 12 } 13}

2.3 Application 配置

application.yml
1spring: 2 kafka: 3 bootstrap-servers: localhost:9092 4 5--- 6 7spring: 8 cloud: 9 stream: 10 default-binder: kafka # 默認既 binder,令每個 binding 都用呢個 binder 配置 11 12 bindings: 13 demoProduce-out-0: 14 destination: demo-topic 15# binder: kafka 16 producer: 17 partitionCount: 2 18 demoConsume-in-0: 19 destination: demo-topic 20 group: demo-group 21# binder: kafka 22 consumer: 23 maxAttempts: 5 # Application side 既 retry 次數(包括第一次),默認係 3 24 25 kafka: 26 bindings: 27 demoConsume-in-0: 28 consumer: 29 startOffset: earliest # 控制由頭定尾讀起 30 resetOffsets: false # 係咪容許現有既 consumer group 由頭或者尾讀起 31 enableDlq: true 32 dlqName: demo-topic.dlq 33 binder: 34 autoCreateTopics: true 35 autoAddPartitions: true
application-consumer.yml
spring: cloud: function: definition: demoConsume
application-producer.yml
spring: cloud: function: definition: demoProduce # 注意:demoProduce 會每秒發送訊息

3 測試

3.1 自動創建 topic

producer profile 運行呢個 microservice(port 8080):
java -jar app.jar --server.port=8080 --spring.profiles.active=producer
啟動左個 producer microservice 之後,執行以下 command 查詢 topics:
kafka-topics.sh --bootstrap-server localhost:9092 --describe
可以見到多左一個叫 demo-topic 既 topic,因為我地 Spring Cloud Stream binding 既 partitionCount 配置,所以裡面有 2 個 partitions:
Topic: demo-topic TopicId: gR0PRImvRoG69zs3cm7KyA PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: demo-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr: Topic: demo-topic Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:

3.2 自動發送訊息

Producer microservice 用左 spring.cloud.function.definition 註冊既 Supplier functions 會自動每秒執行,即係每秒都會有一個新訊息發送到 Kafka。
以下係佢既 log:
110:52:30.005 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(1) - Test msg - XLxZYlIT (2025-02-03 10:52:30) 210:52:31.021 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(2) - Test msg - wvqL9sf8 (2025-02-03 10:52:31) 310:52:32.036 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(3) - Test msg - Noeo6OhO (2025-02-03 10:52:32) 410:52:33.048 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(4) - Test msg - vdpdWCCq (2025-02-03 10:52:33) 510:52:34.058 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(5) - Test msg - whKgJaMQ (2025-02-03 10:52:34) 610:52:35.064 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(6) - Test msg - Wd93y0eN (2025-02-03 10:52:35) 710:52:36.093 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(7) - Test msg - v1c8tq11 (2025-02-03 10:52:36) 810:52:37.102 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(8) - Test msg - UnAhyGjX (2025-02-03 10:52:37) 910:52:38.139 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(9) - Test msg - TDNZsJ87 (2025-02-03 10:52:38) 1010:52:39.154 [scheduling-1] INFO code.config.MyProducer (MyProducer.java:39) - Sending: key(10) - Test msg - OtLwkeLr (2025-02-03 10:52:39)

3.3 接收訊息

3.3.1 只有一個 consumer

consumer profile 運行呢個 microservice(port 8081):
java -jar app.jar --server.port=8081 --spring.profiles.active=consumer
啟動左個 consumer microservice 之後,
  • 我地會發現 Kafka 裡面多左一個叫 __consumer_offsets 既 topic,裡面有 50 個 partitions。
  • 我地可以喺佢既 log 度睇到接收訊息既情況,會見到佢可以接收到 keys 係 110 既訊息。
  • 因為喺 consumer 啟動之前已經有啲訊息喺 Kafka 度未被 consume,所以當 consumer 啟動左,佢就會從 Kafka 所紀錄既 offset(喺而家呢個 case 黎講,即係最開始)開始接收訊息。以下係個 log 既節錄:
110:54:10.692 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(1) - Test msg - XLxZYlIT (2025-02-03 10:52:30) 210:54:10.693 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(3) - Test msg - Noeo6OhO (2025-02-03 10:52:32) 310:54:10.694 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(4) - Test msg - vdpdWCCq (2025-02-03 10:52:33) 410:54:10.695 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(7) - Test msg - v1c8tq11 (2025-02-03 10:52:36) 510:54:10.696 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(8) - Test msg - UnAhyGjX (2025-02-03 10:52:37) 610:54:10.697 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(9) - Test msg - TDNZsJ87 (2025-02-03 10:52:38) 7 8... 9 1010:54:10.749 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(2) - Test msg - wvqL9sf8 (2025-02-03 10:52:31) 1110:54:10.750 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(5) - Test msg - whKgJaMQ (2025-02-03 10:52:34) 1210:54:10.751 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(6) - Test msg - Wd93y0eN (2025-02-03 10:52:35) 1310:54:10.752 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:24) - Received message: key(10) - Test msg - OtLwkeLr (2025-02-03 10:52:39)
解釋:
  • 雖然以上既 log 按時間排序,但我地可以見到呢個 consumer 接收訊息既次序同 producer 發送訊息既次序唔同。
  • 咁係因為我地整左 2 個 partitions,而 producer 發送既所有訊息都會自動根據 key 既 hash 而自動分配去呢 2 個 partitions 度。
  • Kafka 只會保證每一個 partition 裡面既訊息接收次序同發送訊息既時候一樣。
    • 要避免發送訊息既時候如果出現 error 而影響到 broker 裡面儲存既訊息次序,最好將 max.in.flight.requests.per.connection 配置做 1
PartitionConsumer 接收訊息既次序
Akey(1) ➡️ key(3) ➡️ key(4) ➡️ key(7) ➡️ key(8) ➡️ key(9)
Bkey(2) ➡️ key(5) ➡️ key(6) ➡️ key(10)
直到呢個喺 consumer group 裡面唯一既 consumer 接收曬所有訊息之後,因為 Spring Cloud Stream 既 Supplier 係每秒發送一個訊息,所以我地會見到 consumer 每秒都收到訊息,而且係順序由 110

3.3.2 一個 consumer group,兩個 consumers

運行多一個 consumer profile 既 instance(port 8082):
java -jar app.jar --server.port=8082 --spring.profiles.active=consumer
以下係 consumer instance 1(port 8081)既 log,可以見到佢接收左 keys 係 134789 既訊息:
121:45:40.670 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(1) - Test msg - tjqmaqf4 (2025-02-02 21:45:40) 221:45:42.677 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(3) - Test msg - MjQ9H9nY (2025-02-02 21:45:42) 321:45:43.689 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(4) - Test msg - 4MLd6FV3 (2025-02-02 21:45:43) 421:45:46.756 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(7) - Test msg - TwywpJtF (2025-02-02 21:45:46) 521:45:47.730 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(8) - Test msg - dShVwokA (2025-02-02 21:45:47) 621:45:48.735 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(9) - Test msg - acu28o6v (2025-02-02 21:45:48)
以下係 consumer instance 2(port 8082)既 log,可以見到佢接收左 keys 係 25610 既訊息:
21:45:41.665 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(2) - Test msg - MWCgFZJT (2025-02-02 21:45:41) 21:45:44.709 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(5) - Test msg - hHyYxzW8 (2025-02-02 21:45:44) 21:45:45.709 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(6) - Test msg - yuWXNCF3 (2025-02-02 21:45:45) 21:45:49.747 [KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1] INFO code.config.MyConsumer (MyConsumer.java:17) - Received message: key(10) - Test msg - KEOHIXvQ (2025-02-02 21:45:49)
解釋:
  • Kafka 將呢 2 個 partitions 分配到 consumer group demo-group 裡面既 2 個 consumer instances,形成一對一既 mapping。
  • 2 個 consumer instances 所接收到既訊息收及接收訊息既次序都同之前所見既一樣。
    • 咁係因為 Kafka 默認既 key hashing algorithm 唔帶隨機性。
  • 呢個就係 Kafka 利用 partitions 實現每一個 consumer group 裡面既 competing consumers 效果。
    • 一個 partition 既每一個訊息喺每一個 consumer group 裡面只可以有一個 consumer 接收到。

3.3.3 兩個 consumer groups

運行多一個 consumer profile 既 instance(port 8083),不過今次將 consumer group 改成 demo-group-2
java -jar app.jar --server.port=8083 --spring.profiles.active=consumer --spring.cloud.stream.bindings.demoConsume-in-0.group=demo-group-2
之後亦可以運行多一個屬於 consumer group demo-group-2 既 instance(port 8084):
java -jar app.jar --server.port=8084 --spring.profiles.active=consumer --spring.cloud.stream.bindings.demoConsume-in-0.group=demo-group-2
啟動左再等一陣之後,我地會發現:
  • 當我地第一次用新既 consumer group demo-group-2 黎接收訊息,佢既 consumers 會收到個 topic 從創建到而家既所有訊息,亦即係之前既 consumer group demo-group 既 consumers 接收過既所有訊息。
  • 同之前一樣,Kafka 會自動分配個 topic 既 partitions 畀唔同既 consumer instances 去接收訊息。
    • demo-group-2 只有 1 個 consumer,無論個 topic 有幾多個 partitions,呢個 consumer 都會收到個 topic 既所有 partitions 既訊息。
    • demo-group-22 個 consumers,而個 topic 有 2 個 partitions,咁佢地就會係一對一既 mapping,每個 consumer 就會收到個 topic 其中一個 partition 既訊息。
  • Consumer groups demo-group 以及 demo-group-2 都會繼續接收到一樣既訊息(regardless of 邊一個 consumer instance 接收到)。

3.3.4 Dead letter 功能

要測試 dead letter 功能,首先我地要喺 microservice 既 Consumer function 裡面 hardcode 一個 logic,如果個訊息內容係 dlq-test 或者 key(5) 開頭,就會 throw exception。
修改 MyConsumer.java
1@Slf4j 2@Configuration 3public class MyConsumer { 4 5 @Bean 6 public Consumer<String> demoConsume() { 7 return msg -> { 8 log.info("Received message: {}", msg); 9 10 // 用 StreamBridge 以及自動發送既方式測試 dead letter 11 if (msg.startsWith("dlq-test") || msg.startsWith("key(5)")) { 12 throw new RuntimeException("Exception simulation for message: " + msg); 13 } 14 }; 15 } 16}
用一個新既 Kafka broker instance 測試,之後運行一個 producer 以及一個 consumer。
利用我地寫既 REST HTTP API 發送訊息:
curl localhost:8080/send?msg=dlq-test-1
期望測試結果:
  • 運行 producer 之後,producer 會自動創建 topic demo-topic,裡面有 2 個 partitions。
  • 運行 consumer 之後,consumer 會自動創建 topics。
    • Topic __consumer_offsets,裡面有 50 個 partitions。
    • Topic demo-topic.dlq,裡面有 2 個 partitions,數量係同 demo-topic 既一樣。
  • Producer(port 8080)會發送訊息 dlq-test-1
  • 個 microservice 會接收到所有訊息,而當佢處理 dlq-test 或者 key(5) 開頭既訊息就會一直失敗,而 log 會顯示佢喺大約 15 秒內接收左個訊息總共 5 次,喺第 5 次既時候就會 log exception。
    • Retry 既默認 backoff 係 1 秒,而每個下一次既 retry 會用 2 倍既 backoff multiplier。
    • 我地配置左 maxAttempts5,唔計第一次,就有 4 次 retries,總共既時間就係 1 + 2 + 4 + 8 = 15 秒。
  • Throw 左 exception 之後,就會見到 dead letter topic 裡面多左呢個訊息。
  • 我地會見到個 microservice 只係用單一 thread 去接收黎自同一個 topic 既訊息。
    • Thread 名:KafkaConsumerDestination{consumerDestinationName='demo-topic', partitions=2, dlqName='demo-topic.dlq'}.container-0-C-1
    • 喺其他時間,個 microservice 會接收到 producer 每秒自動發送既訊息。
我地可以用呢個 command 查詢一個 topic 既所有訊息,首先我地查詢 topic demo-topic
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo-topic --from-beginning
之後查詢 dead letter topic demo-topic.dlq
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo-topic.dlq --from-beginning
透過呢個方法,我地會見到:
  • Topics demo-topic 以及 demo-topic.dlq 都有 key(5) 以及 dlq-test-1 既訊息。
  • 即使 consumer 接收訊息既時候 throw 左 exception,Kafka 既訊息都唔會從 topic 消失。

3.3.5 Reset offset 功能

Consumer groupstartOffset 配置resetOffsets 配置測試結果
Existingearliest 或者 latestfalseConsumer 會從 current offset 開始讀起。
ExistingearliesttrueConsumer 會無視 current offset,從最開頭開始讀起,喺接收曬所有訊息之後就會更新 current offset。
ExistinglatesttrueConsumer 會無視 current offset,從最尾(log end offset)開始讀起。當有新訊息出現,喺接收左之後就會更新 current offset。
Newearliesttrue 或者 false喺創建 consumer group 既時候,唔會設定 current offset,而 consumer 會從最開頭開始讀起,喺接收曬所有訊息之後就會更新 current offset。
Newlatesttrue 或者 false喺創建 consumer group 既時候,會設定 current offset 做 log end offset 既 value,而 consumer 會從最尾(log end offset)開始讀起。

4 參考資料