Table of contents
1 Azure Service Bus 簡介
Microsoft 既 Azure Service Bus(ASB)係一個 message broker 既雲服務,係用黎畀唔同既微服務之間進行異步既溝通(asynchronous communication)。佢同 RabbitMQ 以及 Kafka 相類似。
負責發布訊息既微服務會將文字訊息發送到 ASB,然後 ASB 會暫時儲存住呢啲訊息,而訂閱左呢個 ASB 既微服務稍後就會收到訊息。當訂閱方成功接收到訊息,ASB 就會移除呢啲已經傳送既訊息。
1.1 Messaging entities 概念
ASB 有以下既 messaging entities:
Messaging entity | 描述 |
---|
Queue | 一條隊列,可以係一個訊息發送對象。任意數量既發布方、任意數量既訂閱方都可以使用同一條隊列收發訊息。 |
Topic、subscriptions | Topic 係一個訊息發送對象,任意數量既發送方都可以發送訊息到同一個 topic;而 subscriptions 係隸屬於 topic,我地可以視佢地為 queues。當個 topic 收到訊息,佢既所有 subscriptions 都會收到同一個訊息。發布方只會將訊息發送到 topic,唔會知道 subscriptions 既存在。一個 topic 可以有任意數量既 subscriptions。視乎需要,任意數量既訂閱方可能會使用同一個 subscription,咁就相等於 consumer group 既概念,係 competing consumers,而同一個訊息只會有一個訂閱方既微服務收到;另一個情況係各自使用獨立既 subscription,咁同一個訊息所有訂閱方既微服務都會收到。 |
2 動手寫
2.1 Maven dependencies
1<parent>
2 <groupId>org.springframework.boot</groupId>
3 <artifactId>spring-boot-starter-parent</artifactId>
4 <version>3.1.2</version>
5</parent>
6
7<dependencyManagement>
8 <dependencies>
9 <dependency>
10 <groupId>org.springframework.cloud</groupId>
11 <artifactId>spring-cloud-dependencies</artifactId>
12 <version>2022.0.3</version>
13 <type>pom</type>
14 <scope>import</scope>
15 </dependency>
16 </dependencies>
17</dependencyManagement>
18
19<dependencies>
20 <dependency>
21 <groupId>org.springframework.boot</groupId>
22 <artifactId>spring-boot-starter-web</artifactId>
23 </dependency>
24
25 <dependency>
26 <groupId>com.azure.spring</groupId>
27 <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
28 </dependency>
29
30 <dependency>
31 <groupId>org.apache.commons</groupId>
32 <artifactId>commons-lang3</artifactId>
33 </dependency>
34
35 <dependency>
36 <groupId>org.projectlombok</groupId>
37 <artifactId>lombok</artifactId>
38 <scope>provided</scope>
39 </dependency>
40</dependencies>
2.2 Java code
2.2.1 新式 function 做法
Project structure:
MessageConfig.java
:
1@Slf4j
2@Configuration
3public class MessageConfig {
4
5 // 每秒發送訊息
6 @Bean
7 public Supplier<String> supplyBinding() {
8 return () -> {
9 final String dateTime = LocalDateTime.now(ZoneId.of("Hongkong")).format(DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss"));
10 final String message = String.format("Test msg - %s (%s)", RandomStringUtils.randomAlphanumeric(8), dateTime);
11 log.info("Sending message: {}", message);
12 return message;
13 };
14 }
15
16 @Bean
17 public Consumer<String> consumeBinding() {
18 return message -> {
19 log.info("Successfully received message: {}", message);
20 };
21 }
22}
SampleController.java
:
1@Slf4j
2@RestController
3public class SampleController {
4
5 @Autowired
6 StreamBridge streamBridge;
7
8 @GetMapping("/test/{value}")
9 public ResponseEntity<String> testSend(@PathVariable("value") String value) {
10 log.info("Sending value {} to topic", value);
11 streamBridge.send("supplyBinding-out-0", value);
12 return ResponseEntity.ok("ok");
13 }
14}
2.2.2 舊式 annotation 做法
Project structure:
src/main/java
/code
/config
MessageBinding.java
MessageConfig.java
/controller
MessageBinding.java
:
1public interface MessageBinding {
2
3 @Output("supplyBinding-out-0")
4 MessageChannel supplyBinding();
5
6 @Input("consumeBinding-in-0")
7 SubscribableChannel consumeBinding();
8}
MessageConfig.java
:
1@Slf4j
2@EnableBinding(MessageBinding.class)
3public class MessageConfig {
4
5 @Autowired
6 MessageBinding messageBinding;
7
8 @StreamListener("consumeBinding-in-0")
9 public void consumeBinding(String msg) {
10 log.info("Successfully received message: {}", msg);
11 }
12}
SampleController.java
:
1@Slf4j
2@RestController
3@EnableBinding(MessageBinding.class)
4public class SampleController {
5
6 @Autowired
7 MessageBinding messageBinding;
8
9 @GetMapping("/test/{value}")
10 public ResponseEntity<String> testSend(@PathVariable("value") String value) throws Exception {
11 log.info("Sending message: {}", value);
12 publish(value);
13 return ResponseEntity.ok("ok");
14 }
15
16 private void publish(String msg) throws Exception {
17
18 log.info("Start publishing message: {}", msg);
19
20 try {
21 if (!messageBinding.supplyBinding().send(MessageBuilder.withPayload(msg).build(), 30_000L)) {
22 log.error("Failed to publish message. send() returns false.");
23 throw new Exception("Failed to publish message.");
24 }
25
26 log.info("Successfully published message.");
27 } catch (Exception e) {
28 log.error("Failed to publish message.", e);
29 throw e;
30 }
31 }
32}
2.3 Application 配置
1spring:
2 cloud:
3 function: # 新式 function 做法
4 definition: supplyBinding;consumeBinding # 注意:supplyBinding 會每秒發送訊息
5
6 stream:
7# default-binder: testBinder1 # 默認既 binder,令每個 binding 都用呢個 binder 配置
8 output-bindings: supplyBinding-out-0 # 可以喺個 app startup 既時候 pre-create producer bindings 以及 binders,如果只係用 StreamBridge 既話,佢就好有用
9
10 binders:
11 testBinder1:
12 type: servicebus
13 environment:
14 spring.cloud.azure.servicebus.connection-string: Endpoint=sb://xxxxxx.servicebus.windows.net/;SharedAccessKeyName=my-test-topic-policy;SharedAccessKey=xxxxxx;EntityPath=my-test-topic
15 testBinder2:
16 type: servicebus
17 environment:
18 spring.cloud.azure.servicebus.connection-string: Endpoint=sb://xxxxxx.servicebus.windows.net/;SharedAccessKeyName=my-test-topic-policy;SharedAccessKey=xxxxxx;EntityPath=my-test-topic
19
20 bindings:
21 supplyBinding-out-0:
22 destination: my-test-topic
23 binder: testBinder1
24 consumeBinding-in-0:
25 destination: my-test-topic
26 group: my-test-subscription
27 binder: testBinder2
28
29 servicebus:
30 bindings:
31 supplyBinding-out-0:
32 producer:
33 entity-type: topic
34 consumeBinding-in-0:
35 consumer:
36 entity-type: topic
37 auto-complete: true
38# requeue-rejected: true
3 已知問題
3.1 方括號包圍 message 導致既 bug
當我地用以下 Java code 去發送訊息:
streamBridge.send("supply-out-0", "[hello world]");
streamBridge.send("supply-out-0", (Object) "[hello world]", MediaType.TEXT_PLAIN);
就會導致以下呢個 exception:
java.lang.IllegalStateException: Failed to convert. Possible bug as the conversion probably shouldn't have been attampted here
at org.springframework.cloud.function.json.JacksonMapper.doFromJson(JacksonMapper.java:70)
at org.springframework.cloud.function.json.JsonMapper.fromJson(JsonMapper.java:94)
所以我地要注意 Spring framework 可能會針對某啲格式而去做轉換。
參考:
3.2 升級 Spring Cloud Stream 4.0.2 既舊配置問題
Spring Cloud Stream 4.x
適用於 Spring Boot 3.x
以及 Spring Cloud 2022.x
。
以下呢個配置黎自 Microsoft 文檔:
spring:
cloud:
stream:
function:
definition: supplyBinding;consumeBinding
根據 Spring team 既開發者 Oleg Zhurakousky 以及 Soby Chacko 所講,其實呢個配置已經 deprecated 左好耐,甚至乎係出左冇耐就 deprecated,而家已經 remove 左,取而代之既係:
spring:
cloud:
function:
definition: supplyBinding;consumeBinding
參考:
否則,當我地升級 spring-cloud-stream
去 4.0.2
,就會出現呢個 WARN
log,而個 Spring application 亦唔會識得接收或者發送訊息。
Multiple functional beans were found [consume, supply], thus can't determine default function definition. Please use 'spring.cloud.function.definition' property to explicitly define it.
Class:BeanFactoryAwareFunctionRegistry
(黎自 spring-cloud-function-context 4.0.2
)
註:
spring-cloud-stream 4.0.1
冇呢個問題,係 spring-cloud-stream 4.0.2
先有。
4 Unit test
ControllerTest
:
1import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
2import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.log;
3import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
4
5@SpringBootTest
6@RunWith(SpringRunner.class)
7@AutoConfigureMockMvc
8public class ControllerTest {
9
10 @Autowired MockMvc mvc;
11
12 @Autowired SampleController controller;
13 @Autowired MessageService service;
14
15 @Test
16 public void test_context() {
17 assertNotNull(controller);
18 assertNotNull(service);
19 }
20
21 @Test
22 public void test_getHttp() throws Exception {
23 mvc.perform(get("/test/dummy"))
24 .andDo(log())
25 .andExpect(status().isOk());
26 }
27}
src/test/resources/application.yml
:
1spring:
2 cloud:
3 stream:
4 binders:
5 testBinder1:
6 type: servicebus
7 environment:
8 spring.cloud.azure.servicebus.connection-string: Endpoint=sb://dummy.servicebus.windows.net/;SharedAccessKeyName=dummy;SharedAccessKey=dummy # 用 dummy connection string,唔需要 EntityPath(有既話就要 match 返 binding 既 destination)
9 testBinder2:
10 type: servicebus
11 environment:
12 spring.cloud.azure.servicebus.connection-string: Endpoint=sb://dummy.servicebus.windows.net/;SharedAccessKeyName=dummy;SharedAccessKey=dummy # 用 dummy connection string,唔需要 EntityPath(有既話就要 match 返 binding 既 destination)
13
14 bindings:
15 supplyBinding-out-0:
16 destination: my-test-topic
17 binder: testBinder1
18 consumeBinding-in-0:
19 destination: my-test-topic
20 group: my-test-subscription
21 binder: testBinder2
22
23 servicebus:
24 bindings:
25 supplyBinding-out-0:
26 producer:
27 requeue-rejected: true
28 entity-type: topic
29 consumeBinding-in-0:
30 consumer:
31 requeue-rejected: true
32 auto-complete: true
33 entity-type: topic
5 參考資料
其他: