最新版本的 Spring Data 中引入的更显着的特性之一是对 Elasticsearch 的响应式支持。自 Spring Data Moore 以来,我们可以利用响应式模板和存储库。它建立在基于 Spring WebClient 的完全反应式 Elasticsearch REST 客户端之上。还值得一提的是对响应式 Querydsl 的支持,可以通过 ReactiveQueryPredicateExecutor 将其包含到您的应用程序中
在我之前的一篇文章 Elasticsearch with Spring Boot 中,我已经向您展示了如何使用 Spring Data Repositories 与 Elasticsearch API 同步集成。使用标准和响应式 Spring Data Repositories 之间没有太大区别。我将重点向您展示在上一篇文章中使用的示例应用程序中的这些差异。因此,在阅读本文之前,值得阅读我之前的文章。让我们继续构建 Spring Boot 响应式 Elasticsearch 示例。
依赖
我正在使用带有 JDK 11 的最新稳定版 Spring Boot。
代码语言:html复制<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.RELEASE</version>
<relativePath/>
</parent>
<properties>
<java.version>11</java.version>
</properties>
我们需要包含 Spring WebFlux 和 Spring Data Elasticsearch starters。我们还将使用 Actuator 来公开健康检查,以及一些用于自动化测试的库,如 Spring Test 和 Testcontainers
代码语言:html复制<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.12.2</version>
<scope>test</scope>
</dependency>
</dependencies>
启用响应式存储库
在开始使用响应式 Spring Data 存储库之前,我们应该通过使用 @EnableReactiveElasticsearchRepositories 注释主类或配置类来启用它。
代码语言:java复制@SpringBootApplication
@EnableReactiveElasticsearchRepositories
public class SampleApplication {
public static void main(String[] args) {
SpringApplication.run(SampleApplication.class, args);
}
@Bean
@ConditionalOnProperty("initial-import.enabled")
public SampleDataSet dataSet() {
return new SampleDataSet();
}
}
构建响应式 Elasticsearch 存储库
Spring Data Elasticsearch 带有三个支持响应式操作的接口:ReactiveRepository、添加保存/更新操作的 ReactiveCrudRepository 和提供一些排序方法的 ReactiveSortingRepository。用法与之前相同——我们只需要创建自己的存储库来扩展上面列出的接口之一。我们还可以按照 Spring Data 查询命名约定添加一些自定义查找方法。与所有其他 Spring 响应式项目类似,Spring Data Elasticsearch Repositories 支持构建在 Project Reactor 之上。
代码语言:java复制@Repository
public interface EmployeeRepository extends ReactiveCrudRepository<Employee, Long> {
Flux<Employee> findByOrganizationName(String name);
Flux<Employee> findByName(String name);
}
这是我们的模型类:
代码语言:java复制@Document(indexName = "sample", type = "employee")
public class Employee {
@Id
private Long id;
@Field(type = FieldType.Object)
private Organization organization;
@Field(type = FieldType.Object)
private Department department;
private String name;
private int age;
private String position;
}
创建控制器
代码语言:java复制@RestController
@RequestMapping("/employees")
public class EmployeeController {
@Autowired
EmployeeRepository repository;
@PostMapping
public Mono<Employee> add(@RequestBody Employee employee) {
return repository.save(employee);
}
@GetMapping("/{name}")
public Flux<Employee> findByName(@PathVariable("name") String name) {
return repository.findByName(name);
}
@GetMapping
public Flux<Employee> findAll() {
return repository.findAll();
}
@GetMapping("/organization/{organizationName}")
public Flux<Employee> findByOrganizationName(@PathVariable("organizationName") String organizationName) {
return repository.findByOrganizationName(organizationName);
}
}
运行 Spring Boot 应用程序
出于测试目的,我们需要一个在开发模式下运行的单节点 Elasticsearch 实例。像往常一样,我们将使用 Docker 容器。这是启动 Docker 容器并在端口 9200 上公开它的命令
代码语言:shell复制$ docker run -d --name elasticsearch -p 9200:9200 -e "discovery.type=single-node" elasticsearch:6.6.2
我的 Docker 机器在虚拟地址 192.168.99.100 上可用,所以我必须在 Spring Boot 配置文件中覆盖 Elasticsearch 地址。因为 Elasticsearch 响应式存储库使用 ReactiveElasticsearchClient,我们必须将属性 spring.data.elasticsearch.client.reactive.endpoints 设置为 192.168.99.100:9200。 Actuator 仍然使用同步 REST 客户端来检测 healthcheck 中的 Elasticsearch 状态,因此我们还需要覆盖 spring.elasticsearch.rest.uris 属性中的默认地址。
代码语言:yaml复制spring:
application:
name: sample-spring-elasticsearch
data:
elasticsearch:
client:
reactive:
endpoints: 192.168.99.100:9200
elasticsearch:
rest:
uris: http://192.168.99.100:9200
测试 Spring Boot 反应式 Elasticserach 支持
与同步存储库一样,我们使用 Testcontainers 进行 JUnit 测试。唯一的区别是我们在验证测试结果时需要阻塞一个存储库方法。
代码语言:java复制@RunWith(SpringRunner.class)
@SpringBootTest
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class EmployeeRepositoryTest {
@ClassRule
public static ElasticsearchContainer container = new ElasticsearchContainer();
@Autowired
EmployeeRepository repository;
@BeforeClass
public static void before() {
System.setProperty("spring.data.elasticsearch.client.reactive.endpoints", container.getContainerIpAddress() ":" container.getMappedPort(9200));
}
@Test
public void testAdd() {
Employee employee = new Employee();
employee.setId(1L);
employee.setName("John Smith");
employee.setAge(33);
employee.setPosition("Developer");
employee.setDepartment(new Department(1L, "TestD"));
employee.setOrganization(new Organization(1L, "TestO", "Test Street No. 1"));
Mono<Employee> employeeSaved = repository.save(employee);
Assert.assertNotNull(employeeSaved.block());
}
@Test
public void testFindAll() {
Flux<Employee> employees = repository.findAll();
Assert.assertTrue(employees.count().block() > 0);
}
@Test
public void testFindByOrganization() {
Flux<Employee> employees = repository.findByOrganizationName("TestO");
Assert.assertTrue(employees.count().block() > 0);
}
@Test
public void testFindByName() {
Flux<Employee> employees = repository.findByName("John Smith");
Assert.assertTrue(employees.count().block() > 0);
}
}
源代码
对于当前示例,我使用与具有同步存储库的示例相同的存储库。我为此创建了一个新的反应分支。这里是 GitHub 仓库地址 https://github.com/piomin/sample-spring-elasticsearch/tree/reactive。它说明了如何构建 Spring Boot 反应式 Elasticsearch 应用程序。