➜ Old React website
Chung Cheuk Hang MichaelJava Web Developer
kubectl 基本操作Spring Cloud Config——使用 RDBMS(JDBC)存放配置

Spring 項目中訂閱 Azure Service Bus

Table of contents

1 Azure Service Bus 簡介

Microsoft 既 Azure Service Bus(ASB)係一個 message broker 既雲服務,係用黎畀唔同既微服務之間進行異步既溝通(asynchronous communication)。佢同 RabbitMQ 以及 Kafka 相類似。
負責發布訊息既微服務會將文字訊息發送到 ASB,然後 ASB 會暫時儲存住呢啲訊息,而訂閱左呢個 ASB 既微服務稍後就會收到訊息。當訂閱方成功接收到訊息,ASB 就會移除呢啲已經傳送既訊息。

1.1 Messaging entities 概念

ASB 有以下既 messaging entities:
Messaging entity描述
Queue一條隊列,可以係一個訊息發送對象。任意數量既發布方、任意數量既訂閱方都可以使用同一條隊列收發訊息。
Topic、subscriptionsTopic 係一個訊息發送對象,任意數量既發送方都可以發送訊息到同一個 topic;而 subscriptions 係隸屬於 topic,我地可以視佢地為 queues。當個 topic 收到訊息,佢既所有 subscriptions 都會收到同一個訊息。發布方只會將訊息發送到 topic,唔會知道 subscriptions 既存在。一個 topic 可以有任意數量既 subscriptions。視乎需要,任意數量既訂閱方可能會使用同一個 subscription,咁就相等於 consumer group 既概念,係 competing consumers,而同一個訊息只會有一個訂閱方既微服務收到;另一個情況係各自使用獨立既 subscription,咁同一個訊息所有訂閱方既微服務都會收到。

2 動手寫

2.1 Maven dependencies

1<parent> 2 <groupId>org.springframework.boot</groupId> 3 <artifactId>spring-boot-starter-parent</artifactId> 4 <version>3.1.2</version> 5</parent> 6 7<dependencyManagement> 8 <dependencies> 9 <dependency> 10 <groupId>org.springframework.cloud</groupId> 11 <artifactId>spring-cloud-dependencies</artifactId> 12 <version>2022.0.3</version> 13 <type>pom</type> 14 <scope>import</scope> 15 </dependency> 16 </dependencies> 17</dependencyManagement> 18 19<dependencies> 20 <dependency> 21 <groupId>org.springframework.boot</groupId> 22 <artifactId>spring-boot-starter-web</artifactId> 23 </dependency> 24 25 <dependency> 26 <groupId>com.azure.spring</groupId> 27 <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> 28 </dependency> 29 30 <dependency> 31 <groupId>org.apache.commons</groupId> 32 <artifactId>commons-lang3</artifactId> 33 </dependency> 34 35 <dependency> 36 <groupId>org.projectlombok</groupId> 37 <artifactId>lombok</artifactId> 38 <scope>provided</scope> 39 </dependency> 40</dependencies>

2.2 Java code

2.2.1 新式 function 做法

Project structure:
  • src/main/java
    • /code
      • /config
        • MessageConfig.java
      • /controller
        • SampleController.java
MessageConfig.java
1@Slf4j 2@Configuration 3public class MessageConfig { 4 5 // 每秒發送訊息 6 @Bean 7 public Supplier<String> supplyBinding() { 8 return () -> { 9 final String dateTime = LocalDateTime.now(ZoneId.of("Hongkong")).format(DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss")); 10 final String message = String.format("Test msg - %s (%s)", RandomStringUtils.randomAlphanumeric(8), dateTime); 11 log.info("Sending message: {}", message); 12 return message; 13 }; 14 } 15 16 @Bean 17 public Consumer<String> consumeBinding() { 18 return message -> { 19 log.info("Successfully received message: {}", message); 20 }; 21 } 22}
SampleController.java
1@Slf4j 2@RestController 3public class SampleController { 4 5 @Autowired 6 StreamBridge streamBridge; 7 8 @GetMapping("/test/{value}") 9 public ResponseEntity<String> testSend(@PathVariable("value") String value) { 10 log.info("Sending value {} to topic", value); 11 streamBridge.send("supplyBinding-out-0", value); 12 return ResponseEntity.ok("ok"); 13 } 14}

2.2.2 舊式 annotation 做法

Project structure:
  • src/main/java
    • /code
      • /config
        • MessageBinding.java
        • MessageConfig.java
      • /controller
        • SampleController.java
MessageBinding.java
1public interface MessageBinding { 2 3 @Output("supplyBinding-out-0") 4 MessageChannel supplyBinding(); 5 6 @Input("consumeBinding-in-0") 7 SubscribableChannel consumeBinding(); 8}
MessageConfig.java
1@Slf4j 2@EnableBinding(MessageBinding.class) 3public class MessageConfig { 4 5 @Autowired 6 MessageBinding messageBinding; 7 8 @StreamListener("consumeBinding-in-0") 9 public void consumeBinding(String msg) { 10 log.info("Successfully received message: {}", msg); 11 } 12}
SampleController.java
1@Slf4j 2@RestController 3@EnableBinding(MessageBinding.class) 4public class SampleController { 5 6 @Autowired 7 MessageBinding messageBinding; 8 9 @GetMapping("/test/{value}") 10 public ResponseEntity<String> testSend(@PathVariable("value") String value) throws Exception { 11 log.info("Sending message: {}", value); 12 publish(value); 13 return ResponseEntity.ok("ok"); 14 } 15 16 private void publish(String msg) throws Exception { 17 18 log.info("Start publishing message: {}", msg); 19 20 try { 21 if (!messageBinding.supplyBinding().send(MessageBuilder.withPayload(msg).build(), 30_000L)) { 22 log.error("Failed to publish message. send() returns false."); 23 throw new Exception("Failed to publish message."); 24 } 25 26 log.info("Successfully published message."); 27 } catch (Exception e) { 28 log.error("Failed to publish message.", e); 29 throw e; 30 } 31 } 32}

2.3 Application 配置

1spring: 2 cloud: 3 function: # 新式 function 做法 4 definition: supplyBinding;consumeBinding # 注意:supplyBinding 會每秒發送訊息 5 6 stream: 7# default-binder: testBinder1 # 默認既 binder,令每個 binding 都用呢個 binder 配置 8 output-bindings: supplyBinding-out-0 # 可以喺個 app startup 既時候 pre-create producer bindings 以及 binders,如果只係用 StreamBridge 既話,佢就好有用 9 10 binders: 11 testBinder1: 12 type: servicebus 13 environment: 14 spring.cloud.azure.servicebus.connection-string: Endpoint=sb://xxxxxx.servicebus.windows.net/;SharedAccessKeyName=my-test-topic-policy;SharedAccessKey=xxxxxx;EntityPath=my-test-topic 15 testBinder2: 16 type: servicebus 17 environment: 18 spring.cloud.azure.servicebus.connection-string: Endpoint=sb://xxxxxx.servicebus.windows.net/;SharedAccessKeyName=my-test-topic-policy;SharedAccessKey=xxxxxx;EntityPath=my-test-topic 19 20 bindings: 21 supplyBinding-out-0: 22 destination: my-test-topic 23 binder: testBinder1 24 consumeBinding-in-0: 25 destination: my-test-topic 26 group: my-test-subscription 27 binder: testBinder2 28 29 servicebus: 30 bindings: 31 supplyBinding-out-0: 32 producer: 33 entity-type: topic 34 consumeBinding-in-0: 35 consumer: 36 entity-type: topic 37 auto-complete: true 38# requeue-rejected: true

3 已知問題

3.1 方括號包圍 message 導致既 bug

當我地用以下 Java code 去發送訊息:
streamBridge.send("supply-out-0", "[hello world]"); streamBridge.send("supply-out-0", (Object) "[hello world]", MediaType.TEXT_PLAIN);
就會導致以下呢個 exception:
java.lang.IllegalStateException: Failed to convert. Possible bug as the conversion probably shouldn't have been attampted here at org.springframework.cloud.function.json.JacksonMapper.doFromJson(JacksonMapper.java:70) at org.springframework.cloud.function.json.JsonMapper.fromJson(JsonMapper.java:94)
所以我地要注意 Spring framework 可能會針對某啲格式而去做轉換。
參考:

3.2 升級 Spring Cloud Stream 4.0.2 既舊配置問題

Spring Cloud Stream 4.x 適用於 Spring Boot 3.x 以及 Spring Cloud 2022.x
以下呢個配置黎自 Microsoft 文檔:
spring: cloud: stream: function: definition: supplyBinding;consumeBinding
根據 Spring team 既開發者 Oleg Zhurakousky 以及 Soby Chacko 所講,其實呢個配置已經 deprecated 左好耐,甚至乎係出左冇耐就 deprecated,而家已經 remove 左,取而代之既係:
spring: cloud: function: definition: supplyBinding;consumeBinding
參考:
否則,當我地升級 spring-cloud-stream4.0.2,就會出現呢個 WARN log,而個 Spring application 亦唔會識得接收或者發送訊息。
Multiple functional beans were found [consume, supply], thus can't determine default function definition. Please use 'spring.cloud.function.definition' property to explicitly define it.
Class:BeanFactoryAwareFunctionRegistry(黎自 spring-cloud-function-context 4.0.2
註:
  • spring-cloud-stream 4.0.1 冇呢個問題,係 spring-cloud-stream 4.0.2 先有。

4 Unit test

ControllerTest
1import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; 2import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.log; 3import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; 4 5@SpringBootTest 6@RunWith(SpringRunner.class) 7@AutoConfigureMockMvc 8public class ControllerTest { 9 10 @Autowired MockMvc mvc; 11 12 @Autowired SampleController controller; 13 @Autowired MessageService service; 14 15 @Test 16 public void test_context() { 17 assertNotNull(controller); 18 assertNotNull(service); 19 } 20 21 @Test 22 public void test_getHttp() throws Exception { 23 mvc.perform(get("/test/dummy")) 24 .andDo(log()) 25 .andExpect(status().isOk()); 26 } 27}
src/test/resources/application.yml
1spring: 2 cloud: 3 stream: 4 binders: 5 testBinder1: 6 type: servicebus 7 environment: 8 spring.cloud.azure.servicebus.connection-string: Endpoint=sb://dummy.servicebus.windows.net/;SharedAccessKeyName=dummy;SharedAccessKey=dummy # 用 dummy connection string,唔需要 EntityPath(有既話就要 match 返 binding 既 destination) 9 testBinder2: 10 type: servicebus 11 environment: 12 spring.cloud.azure.servicebus.connection-string: Endpoint=sb://dummy.servicebus.windows.net/;SharedAccessKeyName=dummy;SharedAccessKey=dummy # 用 dummy connection string,唔需要 EntityPath(有既話就要 match 返 binding 既 destination) 13 14 bindings: 15 supplyBinding-out-0: 16 destination: my-test-topic 17 binder: testBinder1 18 consumeBinding-in-0: 19 destination: my-test-topic 20 group: my-test-subscription 21 binder: testBinder2 22 23 servicebus: 24 bindings: 25 supplyBinding-out-0: 26 producer: 27 requeue-rejected: true 28 entity-type: topic 29 consumeBinding-in-0: 30 consumer: 31 requeue-rejected: true 32 auto-complete: true 33 entity-type: topic

5 參考資料

其他: