基于AbstractRoutingDataSource实现读写分离

2021-12-27 11:19:50 浏览数 (3)

一、概念

读写分离

所谓读写分离,本质上是数据库层面的查询和更新隔离,其实就是将数据库分为了主从库,一个主库用于写数据,多个从库完成读数据的操作,主从库之间通过某种机制进行数据的同步,是一种常见的数据库架构。

读写分离解决了什么问题?

大多数互联网业务,往往读多写少,这时候,数据库的读会首先成为数据库的瓶颈,这时,如果除了增加缓存属性,我们希望能够线性的提升数据库的读性能,消除读写锁冲突从而提升数据库的写性能,那么就可以使用读写分离架构。

也就是说,读写分离是为了解决数据库的查询性能瓶颈的。

二、AbstractRoutingDataSource源码分析

AbstractRoutingDataSource继承了AbstractDataSource并且实现了InitializingBean接口,我们看一下属性声明和几个核心的方法实现。

属性

代码语言:javascript复制
 @Nullable
  private Map<Object, Object> targetDataSources;//目标数据源key-value
  @Nullable
  private Object defaultTargetDataSource;//默认数据源

  private boolean lenientFallback = true;//在没有找到目标数据源是否宽松处理,使用默认

  private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();//jndi数据源

  @Nullable
  private Map<Object, DataSource> resolvedDataSources;//经过加工处理的数据源key-value

  @Nullable
  private DataSource resolvedDefaultDataSource;//经过加工处理的默认数据源

核心方法

1:afterPropertiesSet

代码语言:javascript复制
@Override
public void afterPropertiesSet() {
  if (this.targetDataSources == null) {
    throw new IllegalArgumentException("Property 'targetDataSources' is required");
  }
  this.resolvedDataSources = new HashMap<>(this.targetDataSources.size());
  this.targetDataSources.forEach((key, value) -> {
    Object lookupKey = resolveSpecifiedLookupKey(key);
    DataSource dataSource = resolveSpecifiedDataSource(value);
    this.resolvedDataSources.put(lookupKey, dataSource);
  });
  if (this.defaultTargetDataSource != null) {
    this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource);
  }
}

该方法是InitializingBean接口中定义,主要是做一些初始化工作,这里首先检查目标数据源是否为null,如果为null则抛异常终止,然后将目标数据源的key和value转换之后放到最终要使用的数据源中,最后将默认数据源转换之后赋值给最终要使用的默认数据源。

代码语言:javascript复制
protected Object resolveSpecifiedLookupKey(Object lookupKey) {  
    return lookupKey; 
}

resolveSpecifiedLookupKey方法是将自定义数据源的key转换,这里是直接使用原值。

代码语言:javascript复制
protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException {
	if (dataSource instanceof DataSource) {
		return (DataSource) dataSource;
	}
	else if (dataSource instanceof String) {
		return this.dataSourceLookup.getDataSource((String) dataSource);
	}
	else {
		throw new IllegalArgumentException(
				"Illegal data source value - only [javax.sql.DataSource] and String supported: "   dataSource);
	}
}

resolveSpecifiedDataSource方法是将用户传入数据源转换成待使用的数据源,如果入参是DataSource类型就直接返回,如果入参是String类型就从jndi数据源中获取执行数据源,如果入参是其他类型直接抛异常终止。

2:getConnection

代码语言:javascript复制
@Override
public Connection getConnection() throws SQLException {
	return determineTargetDataSource().getConnection();
}

此方法是DataSource接口中定义的获取连接的方法,也是最核心的一个方法,先调用determineTargetDataSource方法,决定使用哪一个数据源然后再获取相应数据源的连接。

代码语言:javascript复制
protected DataSource determineTargetDataSource() {
	Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
	Object lookupKey = determineCurrentLookupKey();
	DataSource dataSource = this.resolvedDataSources.get(lookupKey);
	if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
		dataSource = this.resolvedDefaultDataSource;
	}
	if (dataSource == null) {
		throw new IllegalStateException("Cannot determine target DataSource for lookup key ["   lookupKey   "]");
	}
	return dataSource;
}

该方法就是从数据源列表中决定使用哪个数据源,先获取目标数据源的key,然后从数据源map中获取相应的数据源,如果没有获取到数据源且数据源key为null或者可以宽松处理,就使用默认数据源。如果没有合适的数据源就抛异常终止,否则返回指定的数据源。

代码语言:javascript复制
protected DataSource determineTargetDataSource() {
	Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
	Object lookupKey = determineCurrentLookupKey();
	DataSource dataSource = this.resolvedDataSources.get(lookupKey);
	if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
		dataSource = this.resolvedDefaultDataSource;
	}
	if (dataSource == null) {
		throw new IllegalStateException("Cannot determine target DataSource for lookup key ["   lookupKey   "]");
	}
	return dataSource;
}

这是定义的一个抽象钩子方法,留个子类去实现,作用是使用哪个数据源对应的key,用于上边获取连接时使用哪一个数据源。

三、原理分析

工作原理

从上一小节的源码分析中我们可以看出,其实AbstractRoutingDataSource承担的是一个数据源代理角色,内部维护多个数据源,当业务和数据库交互时,ARDS会根据特定的需要选择合适的数据源进行操作,工作原理大致如下:

时序图

这里时序图中我们没有借用第三方持久层框架,使用spring自带的JdbcTemplate,Dao层查询和更新调用JdbcTemplate,然后JdbcTemplate使用动态数据源,在读写操作的时候动态数据源会根据需要选择合适的真实数据源进行操作,然后把结果返回给上层调用。

四、基于AbstractRoutingDataSource实现读写分离

了解了动态数据源AbstractRoutingDataSource的源码和原理,我们大致知道了具体如何使用,只需要在子类中实现自定义determineCurrentLookupKey方法,我们此篇的思路是在通过aop拦截Dao层数据库操作,然后根据需要在线程上下文中放入数据源的key,执行完操作后将数据源的key从线程上下文中移除。

1.配置数据源

配置两个数据源,一个读库一个写库:

代码语言:javascript复制
#写库
spring.write.datasource.driverClassName=com.mysql.jdbc.Driver
spring.write.datasource.url=jdbc:mysql://host:3306/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=true
spring.write.datasource.username=aaa
spring.write.datasource.password=bbb
#--------------------------

#读库
spring.read.datasource.driverClassName=com.mysql.jdbc.Driver
spring.read.datasource.url=jdbc:mysql://host:3306/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=true
spring.read.datasource.username=ccc
spring.read.datasource.password=ddd
#--------------------------

数据源属性配置:

代码语言:javascript复制
/**
 * 数据源属性配置
 *
 * @author typhoon
 * @Description TODO
 * @date 2019-09-15 12:14
 * @since V2.0.0
 */
@Configuration
public class DataSourcePropertiesConfig {

    @Primary
    @Bean("writeDataSourceProperties")
    @ConfigurationProperties("spring.write.datasource")
    public DataSourceProperties writeDataSourceProperties() {
        return new DataSourceProperties();
    }

    @Bean("readDataSourceProperties")
    @ConfigurationProperties("spring.read.datasource")
    public DataSourceProperties readDataSourceProperties() {
        return new DataSourceProperties();
    }
}

2.自定义路由数据源

CustomRoutingDataSource

代码语言:javascript复制
@Slf4j
public class CustomRoutingDataSource extends AbstractRoutingDataSource {

    @Resource(name = "writeDataSourceProperties")
    private DataSourceProperties writeProperties;

    @Resource(name = "readDataSourceProperties")
    private DataSourceProperties readProperties;

    @Override
    public void afterPropertiesSet() {
        DataSource writeDataSource =
                writeProperties.initializeDataSourceBuilder().type(DruidDataSource.class).build();
        DataSource readDataSource =
                readProperties.initializeDataSourceBuilder().type(DruidDataSource.class).build();
        setDefaultTargetDataSource(writeDataSource);
        Map<Object, Object> dataSourceMap = new HashMap<>();
        dataSourceMap.put(WRITE_DATASOURCE, writeDataSource);
        dataSourceMap.put(READ_DATASOURCE, readDataSource);
        setTargetDataSources(dataSourceMap);
        super.afterPropertiesSet();
    }
    @Override
    protected Object determineCurrentLookupKey() {
        String key = DataSourceHolder.getDataSource();
        if (key == null) {
            // default datasource
            return WRITE_DATASOURCE;
        }
        log.info("CustomRoutingDataSource.determineCurrentLookupKey key={}",key);
        return key;
    }
}

afterPropertiesSet方法将读库和写库数据源填充到目标数据源中,determineCurrentLookupKey从线程上下文中获取数据源的key。

声明自定义路由数据源和JdbcTemplate:

代码语言:javascript复制
@Bean("customRoutingDataSource")
public CustomRoutingDataSource customRoutingDataSource() {
    return new CustomRoutingDataSource();
}
@Bean
public JdbcTemplate jdbcTemplate(@Qualifier("customRoutingDataSource") CustomRoutingDataSource customRoutingDataSource) {
    return new JdbcTemplate(customRoutingDataSource);
}

3.AOP设置和清除线程上下文数据源key

代码语言:javascript复制
@Component
@Aspect
@Slf4j
public class JdbcAspect {

//    private static final Pattern writePattern = Pattern.compile("^\s*(modify|update|delete|insert)");
    private static final Pattern readPattern = Pattern.compile("^\s*(select|get|query)");

    @Pointcut("execution(public * com.typhoon.skeleton.dao.dao.*.*(..))")
    public void jdbcPointCut() {}

    @Before("jdbcPointCut()")
    public void before(JoinPoint joinPoint) {
        //joinPoint.getSignature().getName();
        String signature = joinPoint.getSignature().getName();
        log.info("JdbcAspect.before signature={}",signature);
        Matcher matcher = readPattern.matcher(signature);
        if(matcher.find() && !TransactionSynchronizationManager.isSynchronizationActive()) {
            DataSourceHolder.putDataSource(DataSourceHolder.READ_DATASOURCE);
            return;
        }
        DataSourceHolder.putDataSource(DataSourceHolder.WRITE_DATASOURCE);
    }

    @After("jdbcPointCut()")
    public void after(JoinPoint joinPoint) {
        String signature = joinPoint.getSignature().toString();
        log.info("JdbcAspect.after signature={}",joinPoint.getSignature());

        DataSourceHolder.clearDataSource();
    }
}

AOP拦截Dao层的所有public方法,在方法执行之前判断如果是读操作且没有开启事务,那么就把读库key放入线程上下文,否则就把写库key放入线程上下文。在方法执行完之后清除线程上下文中的数据源key,避免污染其他业务。

4.测试验证

省略业务层代码,在请求处理层增加查询和新增数据接口:

代码语言:javascript复制
@RestController
@RequestMapping("user")
@Slf4j
public class UserController {
    @Autowired
    private UserManager userManager;
    @GetMapping("/{id}")
    public IResp<UserVO> queryUser(@PathVariable("id")Long id) {
        if(null == id || id <= 0L) {
            log.warn("UserController.queryUser param illegal;id={}",id);
            return IResp.getFailureResult(EntityError.ILLEGAL_ARGUMENT);
        }
        try {
            UserVO userVO =  this.userManager.queryById(id);
            return IResp.getSuccessResult(userVO);
        } catch (Exception e) {
            log.error("UserController.queryUser occur error;id={}",id,e);
            return IResp.getFailureResult(EntityError.SYSTEM_ERROR);
        }
    }
    @PostMapping("/insert")
    public IResp insert(@RequestBody UserDO userDO) {
        try {
            this.userManager.insert(userDO);
            return IResp.getSuccessResult(null);
        } catch (Exception e) {
            log.error("UserController.insert occur error;userDO={}",userDO,e);
            return IResp.getFailureResult(EntityError.SYSTEM_ERROR);
        }
    }
}

启动应用,先发送读请求:

代码语言:javascript复制
 curl http://localhost:8080/user/1

查询结果如下:

通过路由日志看到查询路由到了读库:

这样我们已经验证了动态数据源已经将查询路由到了读库。

在验证写操作之前,我们先看一下写库的数据:

然后发送新增数据请求:

代码语言:javascript复制
curl -l -H "Content-type: application/json" -X POST -d '{"name":"abc","sex":1,"age":1}'    http://localhost:8080/user/insert

操作结果如下:

新增数据成功,通过路由日志能够看出写操作路由到了写库:

查询写库数据,数据已经新增进来:

同样,我们验证了更新操作路由到了写库。

总结

本篇文章我们介绍了读写分离的基本概念和应用场景,也详细的分析了动态数据源AbstractRoutingDataSource的源码和工作原理,通过实例代码测试验证了基于动态数据源实现读写分离,当然这只是很简单的场景,对于读写分离的复杂场景(比如一主多从)动态数据源显得有点吃力,并且对于多数据源的健康检查和动态剔除更是完全不支持,现在市面比较流行的数据库中间件比如mycat和shardingJdbc这方面做得都还不错,但是引入会有相对比较大的人力成本和技术成本,对于相对小众并且流量不是太大的中小型应用,如果有读写分离的需求,完全可以基于AbstractRoutingDataSource来实现,只需要简单的扩展实现并且是spring自带的,接入成本低并且不存在兼容性问题。希望对大家有所帮助。

0 人点赞