Spring Cloud Stream 整合 RocketMQ

2021/5/24

Spring Cloud Stream 介绍

Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。

Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。

Spring Cloud Stream 内部有两个概念:Binder 和 Binding。

Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。

比如 Kafka 的实现 KafkaMessageChannelBinder,RabbitMQ 的实现 RabbitMessageChannelBinder 以及 RocketMQ 的实现 RocketMQMessageChannelBinder。

Binding: 包括 Input Binding 和 Output Binding。

Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。

整合 RocketMQ

Spring Boot 项目添加依赖

<spring.boot.version>2.2.5.RELEASE</spring.boot.version>
<spring.alibaba.cloud.version>2.2.1.RELEASE</spring.alibaba.cloud.version>


<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    <version>${spring.alibaba.cloud.version}</version>
</dependency>

添加配置

application.yml 中添加配置

spring:
  cloud:
    stream:
      bindings:
        # output 相当于生产者
        output:
          # 目的地。这里使用 RocketMQ Topic
          destination: test111
          content-type: application/json

        # input 相当于消费者
        input:
          # 目的地。这里使用 RocketMQ Topic
          destination: test111
          content-type: application/json
          # rocket mq 的消费者必须要有 group,不然启动失败
          group: test

      rocketmq:
        binder:
          # rocket mq 地址
          name-server: 127.0.0.1:9876
          # enable-msg-trace: true

发送与接收消息

Source

public interface Source {

	String OUTPUT = "output";
	String INPUT = "input";

	@Output(Source.OUTPUT)
	MessageChannel output();

	@Input(Source.INPUT)
	SubscribableChannel input();

}

发送消息

@Autowired
private Source source;

@Override
public void send(String msg) {
	Message<String> message = MessageBuilder.withPayload(msg)
			.build();
	source.output().send(message);
}

接收消息

@StreamListener(value = Source.INPUT)
public void receiveInput(String receiveMsg) {
	System.out.println("input receive: " + receiveMsg);
}

启动

启动类上添加注解 @EnableBinding({Source.class})

gitee


https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ

https://xie.infoq.cn/article/13407cedcc683163cfce19afc