Spring Cloud Bus 设计原理
本节将介绍Spring Cloud Bus的设计原理。理解原理有利于更好地基于Spring Cloud Bus来进行二次开发。
基于Spring Cloud Stream
Spring Cloud Bus是基于Spring Cloud Stream基础之上而做的封装。Spring Cloud Stream是Spring Cloud家族中一个构建消息驱动微服务的框架。
图16-3所示的是来自官方的Spring Cloud Stream应用模型。
在该应用模型中可以发现Spring Cloud Stream的几个核心概念。
1.Spring Cloud Stream Application
Application通过inputs或outputs来与SpringCloud Stream中的 Binder交互,通过配置来binding,而Spring Cloud Stream的 Binder负责与中间件交互。所以只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。
2.Binder
Binder是Spring Cloud Stream 的一个抽象概念,是应用与消息中间件之间的黏合剂。目前Spring Cloud Stream实现了Kafka和 Rabbit等消息中间件的 Binder。
通过Binder,可以很方便地连接消息中间件,可以动态地改变消息的destinations(对应于Kaf-ka 的 topic,Rabbit 的exchanges),这些都可以通过外部配置项做到。通过配置,不需要修改一行代码,就能实现消息中间件的更换。
3.订阅/发布
消息的发布(Publish)和订阅(Subscribe)是事件驱动的经典模式,如图16-4所示。SpringCloud Stream的数据交互也是基于这个思想。生产者把消息通过某个topic广播出去(Spring CloudStream 中的destinations)。其他的微服务通过订阅特定topic来获取广播出来的消息,以触发业务的进行。
这种模式极大地降低了生产者与消费者之间的耦合。即使有新的应用引入,也不需要破坏当前系统的整体结构。
4.消费者分组
Spring Cloud Stream的意思基本与Kafka一致。为了防止同一个事件被重复消费,只要把这些应用放置于同一个“group”中,就能够保证消息只会被其中一个应用消费一次。
每个binding 都可以使用 spring.cloud.stream.bindings.<channelName>.group来指定分组的名称,如图16-5所示。
图16-5展示了Stream 的消费者分组设置,属性值分别设置为 spring.cloud.stream.bind-ings.<channelName>.group=hdfsWrite和 spring.cloud.stream.bindings.<channelName>.group=average.
5.持久化
消息事件的持久化是必不可少的。Spring Cloud Stream可以动态地选择一个消息队列是否需要持久化。
6.Binding
Binding 是通过配置把应用与Spring Cloud Stream的 Binder绑定在一起的,之后只需要修改Binding 的配置来达到动态修改topic、exchange、type等一系列信息,而不需要修改一行代码。
7.分区支持
Spring Cloud Stream支持在给定应用程序的多个实例之间对数据进行分区。在分区方案中,物理通信介质(如topic)被视为多个分区。
Spring Cloud Stream为统一实现分区处理用例提供了一个通用抽象。无论代理本身是自然分区(如Kafka)还是非自然分区(如RabbitMQ),都可以使用分区。
Spring Cloud Bus的编程模型
当微服务之间需要通信时,先将消息传递给消息总线,而其他微服务实现接收消息总线分发信息。Spring Cloud Bus提供了简化微服务发送和接收消息总线指令的能力。
1.AbstractBusEndpoint及其子类
通过这个接口来实现用户的访问,都需要继承AbstractBusEndpoint。
以下是AbstractBusEndpoint.java的核心代码。
代码语言:javascript复制package org.springframework.cloud.bus.endpoint;
import org.springframework.boot.actuate.endpoint.Endpoint;
import org.springframework.boot.actuate.endpoint.mvc.MvcEndpoint;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
public class AbstractBusEndpoint implements MvcEndpoint
private ApplicationEventPublisher context;
private BusEndpoint delegate;
private string appId;
public AbstractBusEndpoint(ApplicationEventPublisher context,String
appId,
BusEndpoint busEndpoint){
this.context =context;
this.apprd = appId;
this.delegate = busEndpoint;
}
protected string getInstanceId() {
return this.appld;
protected void publish(ApplicationEvent event)
context.publishEvent (event);
}
@override
public String getPath()
return "/" this.delegate.getld();
}
override
public boolean issensitive({
return this.delegate.isSensitive(;
}
@override
@Suppresswarnings("rawtypes")
public Class<?extends Endpoint> getEndpointType()f
return this.delegate.getClass();
}
}
最常用的AbstractBusEndpoint 的子类,莫过于EnvironmentBusEndpoint和RefreshBusEndpoint。
这两个类分别实现了/bus/env和/bus/refresh的HTTP接口。
以下是 EnvironmentBusEndpoint.java的源码。
代码语言:javascript复制package org.springframework.cloud.bus.endpoint;
import java.util.Map;
import org.springframework.cloud.bus.event.EnvironmentChangeRemote
ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.jmx.export.annotation.Managed0peration;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
@ManagedResource
public class EnvironmentBusEndpoint extends AbstractBusEndpoint {
public EnvironmentBusEndpoint (ApplicationEventPublisher context,
string id,
BusEndpoint delegate) {
super(context, id,delegate);
}
@RequestMapping(value = "env", method = RequestMethod.POST)
@ResponseBody
ManagedOperation
public void env(@RequestParam Map<String,String> params,
@RequestParam(value = "destination",required = false)
String destination){
publish(new EnvironmentChangeRemoteApplicationEvent(this,
getInstancerd(,
destination,params));
}
}
以下是RefreshBusEndpoint.java的源码。
代码语言:javascript复制package org.springframework.cloud.bus.endpoint;
import org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.jmx.export.annotation.Managed0peration;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
@ManagedResource
public class RefreshBusEndpoint extends AbstractBusEndpoint
public RefreshBusEndpoint (ApplicationEventPublisher context,Stringid,
BusEndpoint delegate)f
super(context, id, delegate);
RequestMapping(value = "refresh",method = RequestMethod.POST)
ResponseBody
@Managedoperation
public void refresh(
@RequestParam(value = "destination", required = false)
String destination){
publish(new RefreshRemoteApplicationEvent(this, getInstanceId(),
destination));
}
}
2.RemoteApplicationEvent及其子类
RemoteApplicationEvent用来定义被传输的消息事件。
以下是 RemoteApplicationEvent.java的源码。
代码语言:javascript复制package org.springframework.cloud.bus.event;
import java.util.UUID;
import org.springframework.context.ApplicationEvent;
import org.springframework.util.StringUtils;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
@suppresswarnings( "serial")
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,property = "type")
@JsonIgnoreProperties( "source")
public abstract class RemoteApplicationEvent extends ApplicationEvent(
private static final 0bject TRANSTENT_SOURCE= new object();
private final string originService;
private final String destinationService;
private final String id;
protected RemoteApplicationEvent({
//for serialization libs like jackson
this(TRANSIENT_SOURCE,null,null);
}
protected RemoteApplicationEvent(Object source,String origin
service,
String destinationService){
super(source);
this.originservice = originService;
if(destinationService -=null)
destinationService ="**";
)
1/ If the destinationService is not already a wildcard,match
everything that follows
// if there at most two path elements,and last element is not
a global wildcard already
if(!"**".equals(destinationService)){
if (StringUtils.countoccurrencesof(destinationService,":")
<= 1
&& !StringUtils.endsWithIgnoreCase (destination
Service,":**")) {
//All instances of the destination unless specifically
requested
destinationService = destinationService ":**;
}
this.destinationService = destinationService;
this.id= UUID.randomUUID().toString(;
protected RemoteApplicationEvent (Object source,String origin
Service){
this(source,originservice,null);
}
//省略 getter/setter方法
}
最常用的RemoteApplicationEvent的子类,莫过于 EnvironmentChangeRemoteApplicationEvent和RefreshRemoteApplicationEvent。
以下是 EnvironmentChangeRemoteApplicationEvent.java的源码。
代码语言:javascript复制package org.springframework.cloud.bus.event;
import java.util.Map;
@SuppressWarnings( "serial")
public class EnvironmentChangeRemoteApplicationEvent extends Remote
ApplicationEvent {
private final Map<String,String> values;
@SuppressWarnings("unused")
private EnvironmentChangeRemoteApplicationEvent() {
//for serializers
values = null;
public EnvironmentChangeRemoteApplicationEvent (Object source,String
originService,
String destinationService,Map<String,String> values)
super(source,originService,destinationService);
this.values =values;
//省略 getter/setter 方法
}
以下是 RefreshRemoteApplicationEvent.java的源码。
代码语言:javascript复制package org.springframework.cloud.bus.event;
@suppressWarnings("serial")
public class RefreshRemoteApplicationEvent extends RemoteApplication
Event {
SuppressWarnings ("unused")
private RefreshRemoteApplicationEvent( {
//for serializers
}
public RefreshRemoteApplicationEvent(Object source,String origin
service,
String destinationservice){
super(source, originService,destinationService);
}
}
3.ApplicationListener及其子类
ApplicationListener是用来处理消息事件的监听器,是Spring框架的核心接口。该接口只有一个方法。
代码语言:javascript复制public interface ApplicationListenerE extends ApplicationEvent> extends
EventListener {
★★
*Handle an application event.
* @param event the event to respond to
*/
void onApplicationEvent(E event) ;
}
Spring Cloud Bus中的监听器都需要实现该接口。EnvironmentChangeListener及RefreshListener是其中两个常用的实现类。
以下是 EnvironmentChangeListener.java的源码。
代码语言:javascript复制package org.springframework.cloud.bus.event;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.context.environment.EnvironmentManager;
import org.springframework.context.ApplicationListener;
public class EnvironmentChangeListener
implements ApplicationListener<EnvironmentChangeRemote
ApplicationEvent> {
private static Log log = LogFactory.getLog(EnvironmentChangeListener.class);
@Autowired
private EnvironmentManager env;
aoverride
public void onApplicationEvent(EnvironmentChangeRemoteApplication
Event event){
Map<String,String> values = event.getValues();
log.info ("Received remote environment change request. Keys/
values to update "
values);
for (Map.Entry<String,String> entry : values.entrySet()){
env.setProperty(entry.getKey(,entry.getvalue());
}
}
}
以下是RefreshListener.java的源码。
代码语言:javascript复制package org.springframework.cloud.bus.event;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.context.refresh.ContextRefresher;
import org.springframework.context.ApplicationListener;
public class RefreshListener
implements ApplicationListener<RefreshRemoteApplicationEvent>{
private static Log log = LogFactory.getLog (RefreshListener.class);
private ContextRefresher contextRefresher;
public RefreshListener(ContextRefresher contextRefresher)
this.contextRefresher = contextRefresher;
@override
public void onApplicationEvent(RefreshRemoteApplicationEvent event)
Set<String> keys = contextRefresher.refresh();
log.info ("Received remote refresh request.Keys refreshed " keys);
}
}
本篇文章内容给大家讲解的是SpringCloudBus 设计原理
- 下篇文章给大家讲解的是如何集成 BuS;
- 觉得文章不错的朋友可以转发此文关注小编;
- 感谢大家的支持!
本文就是愿天堂没有BUG给大家分享的内容,大家有收获的话可以分享下,想学习更多的话可以到微信公众号里找我,我等你哦。