Spring Cloud Stream 介绍
Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。
Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。
Spring Cloud Stream 内部有两个概念:Binder 和 Binding。
Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。
比如 Kafka 的实现 KafkaMessageChannelBinder,RabbitMQ 的实现 RabbitMessageChannelBinder 以及 RocketMQ 的实现 RocketMQMessageChannelBinder。
Binding: 包括 Input Binding 和 Output Binding。
Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。
整合 RocketMQ
Spring Boot 项目添加依赖
<spring.boot.version>2.2.5.RELEASE</spring.boot.version>
<spring.alibaba.cloud.version>2.2.1.RELEASE</spring.alibaba.cloud.version>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>${spring.alibaba.cloud.version}</version>
</dependency>
添加配置
application.yml 中添加配置
spring:
cloud:
stream:
bindings:
# output 相当于生产者
output:
# 目的地。这里使用 RocketMQ Topic
destination: test111
content-type: application/json
# input 相当于消费者
input:
# 目的地。这里使用 RocketMQ Topic
destination: test111
content-type: application/json
# rocket mq 的消费者必须要有 group,不然启动失败
group: test
rocketmq:
binder:
# rocket mq 地址
name-server: 127.0.0.1:9876
# enable-msg-trace: true
发送与接收消息
Source
public interface Source {
String OUTPUT = "output";
String INPUT = "input";
@Output(Source.OUTPUT)
MessageChannel output();
@Input(Source.INPUT)
SubscribableChannel input();
}
发送消息
@Autowired
private Source source;
@Override
public void send(String msg) {
Message<String> message = MessageBuilder.withPayload(msg)
.build();
source.output().send(message);
}
接收消息
@StreamListener(value = Source.INPUT)
public void receiveInput(String receiveMsg) {
System.out.println("input receive: " + receiveMsg);
}
启动
启动类上添加注解 @EnableBinding({Source.class})
https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ