使用 Spring Cloud Bus 在微服务之间传递消息示例

2023-04-18 15:41:11 浏览数 (2)

下面是一个完整的示例,演示如何使用 Spring Cloud Bus 在微服务之间传递消息。该示例包含两个微服务,一个是消息发送者,另一个是消息接收者。

首先,我们需要创建一个 Spring Boot 项目,并添加 Spring Cloud Bus 和 RabbitMQ 的依赖。在 pom.xml 文件中添加以下依赖:

代码语言:javascript复制
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
    <version>2.2.1.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.2.1.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.2.1.RELEASE</version>
</dependency>

在创建的项目中,我们定义了一个消息类型 MyMessage,该类型包含一个字符串类型的 content 属性。

代码语言:javascript复制
javaCopy codepublic class MyMessage implements Serializable {

    private static final long serialVersionUID = 1L;

    private String content;

    public MyMessage(String content) {
        this.content = content;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

然后,我们创建了一个消息发送者,该发送者使用 Spring Cloud Bus 发送消息。

代码语言:javascript复制
@RestController
public class MyController {

    @Autowired
    private MessageSender messageSender;

    @PostMapping("/send")
    public void sendMessage(@RequestBody MyMessage message) {
        messageSender.sendMessage(message);
    }
}

@Service
public class MessageSender {

    private static final Logger LOGGER = LoggerFactory.getLogger(MessageSender.class);

    @Autowired
    private MessageChannel output;

    public void sendMessage(MyMessage message) {
        LOGGER.info("Sending message: {}", message.getContent());
        output.send(MessageBuilder.withPayload(message).build());
    }
}

@Service
public class MessageListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(MessageListener.class);

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='MyMessage'")
    public void handleMessage(MyMessage message) {
        LOGGER.info("Received message: {}", message.getContent());
    }
}

在这个例子中,我们创建了一个 MyController 类,该类定义了一个发送消息的 API 接口,接收一个 MyMessage 类型的参数,将参数传递给 MessageSender 类的 sendMessage() 方法。

MessageSender 类定义了一个 sendMessage() 方法,该方法使用 Spring Cloud Stream 的 output 消息通道发送消息。

MessageListener 类定义了一个 handleMessage() 方法,该方法使用 @StreamListener 注解监听 Spring Cloud Stream 的 input 消息通道,并根据消息类型过滤消息。当有符合条件的消息到达时,handleMessage() 方法会被自动调用,处理接收到的消息。

在以上代码中,我们使用了 @Autowired 注解自动注入了 MessageSender 和 MessageListener 类,这是 Spring Boot 自带的依赖注入功能。

运行应用程序后,我们可以使用 Postman 工具或其他 HTTP 工具发送 HTTP POST 请求,将消息发送到消息发送者的 API 接口,如下所示:

代码语言:javascript复制
POST http://localhost:8080/send HTTP/1.1
Content-Type: application/json

{
    "content": "Hello, World!"
}

当消息到达时,消息接收者会打印消息内容,如下所示:

代码语言:javascript复制
2023-04-19 09:24:47.836  INFO 29740 --- [afka-listener-1] com.example.demo.M

0 人点赞