在 R2DBC 官网(http://r2dbc.io/) 上,对 R2DBC 有一句话的介绍:
The Reactive Relational Database Connectivity (R2DBC) project brings reactive programming APIs to relational databases.
R2DBC 的含义是 Reactive Relational Database Connectivity,它是一个使用响应式驱动程序集成关系数据库的孵化器。它是在响应式编程的基础上使用关系数据访问技术。
R2DBC 最初是一项实验和概念验证,旨在将 SQL 数据库集成到使用响应式编程模型的系统中。JDBC 使用的是阻塞式 API,而 R2DBC 允许开发者使用无阻塞 API 访问关系数据库,因为 R2DBC 包含 Reactive Streams 规范。从官网上还能看到 R2DBC 支持的响应式框架包括:Reactor、RxJava、Smallrye Mutiny。
R2DBC 目前是一个开放的规范,它为驱动程序供应商实现和客户端使用建立了一个服务提供者接口(SPI)。
另外,R2DBC 是由 Spring 官方团队提出的规范,除了驱动实现外还提供了 R2DBC 连接池和 R2DBC 代理。
目前 R2DBC 已经支持的驱动实现包括:
- cloud-spanner-r2dbc - driver for Google Cloud Spanner.
- jasync-sql - R2DBC wrapper for Java & Kotlin Async Database Driver for MySQL and PostgreSQL (written in Kotlin).
- r2dbc-h2 - native driver implemented for H2 as a test database.
- r2dbc-mariadb - native driver implemented for MariaDB.
- r2dbc-mssql - native driver implemented for Microsoft SQL Server.
- r2dbc-mysql - native driver implemented for MySQL.
- r2dbc-postgres - native driver implemented for PostgreSQL.
二. R2DBC 使用
在 Gradle 中配置 Spring Boot 以及 R2DBC 相关依赖的库:
代码语言:javascript复制 implementation "io.r2dbc:r2dbc-h2:0.8.4.RELEASE"
implementation "com.h2database:h2:1.4.200"
implementation "org.springframework.data:spring-data-r2dbc:1.0.0.RELEASE"
implementation "org.springframework.boot:spring-boot-starter-actuator:2.3.5.RELEASE"
implementation "org.springframework.boot:spring-boot-starter-data-r2dbc:2.3.5.RELEASE"
implementation "org.springframework.boot:spring-boot-starter-webflux:2.3.5.RELEASE"
annotationProcessor "org.springframework.boot:spring-boot-configuration-processor:2.3.5.RELEASE"
implementation "io.projectreactor.kotlin:reactor-kotlin-extensions:1.1.0"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.9"
连接数据库
我们注册和配置 ConnectionFactoryInitializer bean,并通过 ConnectionFactory 来初始化数据库:
代码语言:javascript复制@Configuration
@EnableR2dbcRepositories
open class AppConfiguration {
......
@Bean
open fun initializer(@Qualifier("connectionFactory") connectionFactory: ConnectionFactory): ConnectionFactoryInitializer {
val initializer = ConnectionFactoryInitializer()
initializer.setConnectionFactory(connectionFactory)
val populator = CompositeDatabasePopulator()
populator.addPopulators(ResourceDatabasePopulator(ClassPathResource("schema.sql")))
populator.addPopulators(ResourceDatabasePopulator(ClassPathResource("data.sql")))
initializer.setDatabasePopulator(populator)
return initializer
}
}
这种初始化的支持是由 Spring Boot R2DBC 自动配置的,通过 schema.sql 以及 data.sql 配置到 ConnectionFactory。
基于 routing function 模式创建接口
WebFlux 提供了2种开发模式,一种是传统的基于注解的开发模式,使用 Controller 注解进行开发。另一种是 routing function 模式,使用函数式的编程风格。
routing function 模式主要使用 HandlerFunction 和 RouterFunction。
- HandlerFunction 表示一个函数,该函数为路由到它们的请求生成响应。
- RouterFunction 可以替代 @RequestMapping 注释。 我们可以使用它将请求路由到处理程序函数。
他们就像使用带注解的 Controller 一样,只不过 http method 是通过响应式来构建的。
coRouter() 允许使用 Kotlin DSL 以及 Coroutines 轻松创建 RouterFunction。例如:
代码语言:javascript复制@Configuration
@EnableR2dbcRepositories
open class AppConfiguration {
@Bean
open fun userRoute(userHandler: UserHandler) = coRouter {
GET("/users", userHandler::findAll)
GET("/users/search", userHandler::search)
GET("/users/{id}", userHandler::findUser)
POST("/users", userHandler::addUser)
PUT("/users/{id}", userHandler::updateUser)
DELETE("/users/{id}", userHandler::deleteUser)
}
......
}
创建 HandlerFunctions
UserHandler 是它们的 HandlerFunction 的集合,Handler 有点类似于 Service:
代码语言:javascript复制@Component
class UserHandler {
private val logger = LoggerFactory.getLogger(UserHandler::class.java)
@Autowired
lateinit var service: UserService
suspend fun findAll(request: ServerRequest): ServerResponse {
val users = service.findAll()
return ServerResponse.ok().json().bodyAndAwait(users)
}
suspend fun search(request: ServerRequest): ServerResponse {
val criterias = request.queryParams()
return when {
criterias.isEmpty() -> ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("Search must have query params"))
criterias.contains("name") -> {
val criteriaValue = criterias.getFirst("name")
if (criteriaValue.isNullOrBlank()) {
ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("Incorrect search criteria value"))
} else {
ServerResponse.ok().json().bodyAndAwait(service.findByName(criteriaValue))
}
}
criterias.contains("email") -> {
val criteriaValue = criterias.getFirst("email")
if (criteriaValue.isNullOrBlank()) {
ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("Incorrect search criteria value"))
} else {
ServerResponse.ok().json().bodyAndAwait(service.findByEmail(criteriaValue))
}
}
else -> ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("Incorrect search criteria"))
}
}
suspend fun findUser(request: ServerRequest): ServerResponse {
val id = request.pathVariable("id").toLongOrNull()
return if (id == null) {
ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("`id` must be numeric"))
} else {
val user = service.findById(id)
if (user == null) ServerResponse.notFound().buildAndAwait()
else ServerResponse.ok().json().bodyValueAndAwait(user)
}
}
suspend fun addUser(request: ServerRequest): ServerResponse {
val newUser = try {
request.bodyToMono<UserDTO>().awaitFirstOrNull()
} catch (e: Exception) {
logger.error("Decoding body error", e)
null
}
return if (newUser == null) {
ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("Invalid body"))
} else {
val user = service.addUser(newUser)
if (user == null) ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).json().bodyValueAndAwait(ErrorMessage("Internal error"))
else ServerResponse.status(HttpStatus.CREATED).json().bodyValueAndAwait(user)
}
}
suspend fun updateUser(request: ServerRequest): ServerResponse {
val id = request.pathVariable("id").toLongOrNull()
return if (id == null) {
ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("`id` must be numeric"))
} else {
val updateUser = try {
request.bodyToMono<UserDTO>().awaitFirstOrNull()
} catch (e: Exception) {
logger.error("Decoding body error", e)
null
}
if (updateUser == null) {
ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("Invalid body"))
} else {
val user = service.updateUser(id, updateUser)
if (user == null) ServerResponse.status(HttpStatus.NOT_FOUND).json().bodyValueAndAwait(ErrorMessage("Resource $id not found"))
else ServerResponse.status(HttpStatus.OK).json().bodyValueAndAwait(user)
}
}
}
suspend fun deleteUser(request: ServerRequest): ServerResponse {
val id = request.pathVariable("id").toLongOrNull()
return if (id == null) {
ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("`id` must be numeric"))
} else {
if (service.deleteUser(id)) ServerResponse.noContent().buildAndAwait()
else ServerResponse.status(HttpStatus.NOT_FOUND).json().bodyValueAndAwait(ErrorMessage("Resource $id not found"))
}
}
}
每个 HandlerFunction 函数返回的 ServerResponse 提供了对 Http 响应的访问,可以使用 build 方法来创建。 Builder 构建器可以设置响应代码,响应标题或正文。
创建 Service
UserHandler 通过 UserService 来实现具体的业务。
代码语言:javascript复制@Service
class UserService {
@Autowired
private lateinit var userRepository: UserRepository
suspend fun findAll() = userRepository.findAll().asFlow()
suspend fun findById(id: Long) = userRepository.findById(id).awaitFirstOrNull()
suspend fun findByName(name: String) = userRepository.findByName(name).asFlow()
suspend fun findByEmail(email: String) = userRepository.findByEmail(email).asFlow()
suspend fun addUser(user: UserDTO) = userRepository.save(user.toModel()).awaitFirstOrNull()
suspend fun updateUser(id: Long, userDTO: UserDTO): User? {
val user = findById(id)
return if (user != null)
userRepository.save(userDTO.toModel(id = id)).awaitFirstOrNull()
else null
}
suspend fun deleteUser(id: Long): Boolean {
val user = findById(id)
return if (user != null) {
userRepository.delete(user).awaitFirstOrNull()
true
} else false
}
}
UserService 的 findAll()、findByName()、findByEmail() 返回的是 Flow<User> 对象。 这是由于 Spring Data R2DBC 的 Coroutines 扩展了响应式的基础架构,因此可以将 UserService 的方法定义为 suspend 函数并将 Flux 结果转换成 Kotlin 的 Flow 类型。
创建 Repository
而 UserService 会调用 Repository 来跟数据库打交道。在创建 Repository 之前,我们先创建实体类 User:
代码语言:javascript复制@Table("users")
data class User(
@Id
val id: Long? = null,
val name: String,
val password: String,
val email: String,
)
User 类具有唯一的标识符和一些字段。有了实体类之后,我们可以创建一个合适的 Repository,如下所示:
代码语言:javascript复制interface UserRepository : ReactiveCrudRepository<User, Long> {
@Query("SELECT u.* FROM users u WHERE u.name = :name")
fun findByName(name: String): Flux<User>
@Query("SELECT u.* FROM users u WHERE u.email = :email")
fun findByEmail(email: String): Flux<User>
}
需要注意的是,在使用了 R2DBC 之后,就没有 ORM 了,取而代之的是响应式的方式。
运行效果
展示用户列表
用户列表.jpeg
搜索用户
搜索用户.jpeg
三. 小结
本文介绍了 R2DBC 的背景,随后介绍了 WebFlux 的 routing function 模式,以及使用 RouterFunction和HandlerFunction 创建路由以处理请求并生成响应。
当 WebFlux 和 R2DBC 配置使用时,所创建的程序每一层都是通过异步处理的数据。