springboot集成elasticsearch7实现全文检索及分页
elasticsearch系列文章前面已经更新过三篇(https://blog.csdn.net/lsqingfeng/category_10219329.html)(建议先看下这三篇文章),分别讲解了elasticsearch7.2的安装,和springboot的集成以及简单的使用。前面都是通过demo的方式,主要讲解了他的一下简单使用,最近已经将es成功应用到了项目当中,本篇文章主要系统的介绍一下到底在项目中该如何使用es.
一. 概述
为什么选择elasticsearch? 我想能看到这篇文章的人应该都了解过了。 因为当你已经开始想知道springboot如何集成es的时候,说明你已经过了了解elasticsearch的阶段。 简单的说,就是在数据量很大的情况下,elasticsearch通过内部的“倒叙索引”使其查询性能更佳。接下来就是要面对怎么查的问题。一般情况下,我们都会使用数据库作为项目中的数据存储单元,得益于数据库对于事务的超强控制能力。而现在我们要引入es, es本身其实也是一个数据存储单元,只不过由于他的内部数据结构和数据库的结构不一致,使其具备了查询高效的能力。我们在学习的时候完全可以把它当做一个查询速度很快的数据库。
那么我们要想从es中查询数据,es中必须得有数据,而往往我们的数据都是存储在数据库中, 所以查询的第一个就是将数据中的数据同步到es中(也有一些应用单独的使用es存储数据)。关于数据库和es的同步方案,网上有很多。最简单的一种,你往数据库存一条,同时就往es里存一条。你往数据库修改一条,同时也修改一下es,这种方式的有点就是操作简单,只需要在调用mysql的dao的同事,调用一下es的增删改查即可。缺点是要处理二者的同步性问题,比如往mysql插入成功,往es插入失败的情况的处理。第二种方式,我们可以使用定时任务,定时任务每隔一段时间从mysql中把数据全量读出来,然后往es中同步一次,这种方式的优点时对于原来的业务代码没有任何侵入,缺点也很明显,就是定时任务的通用缺点,实时性差,并且每次全量同步,可能导致查询压力大。还有一种方式是通过数据库的binlog, binlog大家应该比较了解,做数据库主从的同步都是通过binlog实现的。而mysql和es之间也可以通过binlog进行同步,这样不需要对代码做任何的修改,有兴趣的可以研究一下阿里的canal,就是专门做binlog同步的。
在我的项目中,经过一顿思考,决定使用两种方式,实时同步(增量同步)和定时任务(全量同步)的方式。当有对应数据发生增改删的操作,实时向es中同步。同时使用定时任务,每隔一段时间全量同步一次。
二. 准备工作
- 准备好自己的springboot项目,这里不会详细介绍springboot相关内容
- 在服务器上或者自己的虚拟机上安装好elasticsearch7.2 (全部代码均采用这个版本)
- 在服务器或者自己的虚拟机上安装好elasticsearch-head插件,也是7.2
- 在服务器或者自己的虚拟机上安装好kibana7.2
关于2.3.4这三项内容的安装请参考文章:
https://cloud.tencent.com/developer/article/1911266
如果下载软件慢的,可以使用国内镜像加速下载:https://www.newbe.pro/tags/Mirrors/
一定要确保上述服务安装成功:有head的页面,和kibana的页面。
三. 集成
集成的方式主要有两种,一种是使用es提供的原生客户端,一种是使用springboot-data提供的客户端spring-data-elasticsearch, 像一般我们使用redis,一般都会选择使用spring-data-redis, spring已经封装好了的一些工具方法,使用起来很方便。但是这里我要介绍的是原生客户端的方案。主要原因就是spring-data支持的es版本太低,虽然近期spring-data-es已经更新可以支持比较新的版本的es,但是同时对于springboot的版本要求也比较高。我项目中所使用spring-boot版本较低,而对应支持的es版本更低,所以我直接集成es的原生客户端。具体原因可以参看:https://blog.csdn.net/lsqingfeng/article/details/106398077。也可自行查看spring-data-es官网了解es和springboot版本对应关系做选择。
关于集成,这里使用的es中提供的 HighLevelRestClient,高级别客户端,这也是官方推荐的,另外es7以上,已经不推荐使用TransportClient了,es7也取消了type的概念。
集成方式,引入jar包,添加配置即可
- pom.xml
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
这里 的版本号定义的是7.2.0
代码语言:javascript复制 <elasticsearch.version>7.2.0</elasticsearch.version>
截止到目前,就已经集成完毕,接下来就可以在代码中使用客户端操作es了。
四. 自定义注解创建索引
关于这块的内容参见:https://cloud.tencent.com/developer/article/1911164
这里简单说一下,索引就相当于是表结构,es本身也是存储数据的,既然是存储,就需要定一个结构,比如有哪些字段,每个字段是什么类型。但是痛点是如果我们定义的这个结构如果比较复杂,那么用原生的方法代码会很多,很麻烦,所以我们可以自己定义一套注解,加入到实体类上,这样就可以根据实体类,让es自己去创建索引,很方便。就类似于以前hibernate,可以根据我们写的实体类自动生成表。
关于注解,这里也给出以下,在之前文章基础上做了些改动,主要就是加入了EsId注解,可以将制定字段作为es的id,如果不加这个,es默认id是自动生成的,有了这个,那么我们可以让mysql的id直接作为es的id,方便更新。
代码语言:javascript复制package xx.xxx.xxx.es.annotation;
import java.lang.annotation.*;
/**
* Es 文档注解,用于做索引实体映射
* 作用在类上
* @author ls
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Inherited
public @interface Document {
/**
* index : 索引名称
* @return
*/
String index();
/**
* 类型名称
* @return
*/
String type();
}
代码语言:javascript复制package xx.xxx.xxx.es.annotation;
import java.lang.annotation.*;
/**
* 用于标识使用 该字段作为ES数据中的id
* @author sh.Liu
* @create: 2020-07-22
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Documented
@Inherited
public @interface EsId {
}
代码语言:javascript复制package xx.xxx.xxx.es.annotation;
import xx.xxx.xxx.es.enums.AnalyzerType;
import xx.xxx.xxx.es.enums.FieldType;
import java.lang.annotation.*;
/**
* 作用在字段上,用于定义类型,映射关系
* @author ls
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Documented
@Inherited
public @interface Field {
FieldType type() default FieldType.TEXT;
/**
* 指定分词器
* @return
*/
AnalyzerType analyzer() default AnalyzerType.STANDARD;
}
两个枚举:
代码语言:javascript复制package xx.xxx.xxx.es.enums;
import lombok.Getter;
/**
* @className: AnalyzerType
* @description:
* @author: sh.Liu
* @create: 2020-05-27 11:37
*/
@Getter
public enum AnalyzerType {
NO("不使用分词"),
/**
* 标准分词,默认分词器
*/
STANDARD("standard"),
/**
* ik_smart:会做最粗粒度的拆分;已被分出的词语将不会再次被其它词语占有
*/
IK_SMART("ik_smart"),
/**
* ik_max_word :会将文本做最细粒度的拆分;尽可能多的拆分出词语
*/
IK_MAX_WORD("ik_max_word")
;
private String type;
AnalyzerType(String type){
this.type = type;
}
}
代码语言:javascript复制package xx.xxx.xxx.es.enums;
import lombok.Getter;
/**
* es 类型参看
* https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
*/
@Getter
public enum FieldType {
/**
* text
*/
TEXT("text"),
KEYWORD("keyword"),
INTEGER("integer"),
DOUBLE("double"),
DATE("date"),
/**
* 单条数据
*/
OBJECT("object"),
/**
* 嵌套数组
*/
NESTED("nested"),
;
FieldType(String type){
this.type = type;
}
private String type;
}
这里直接给出我封装的es工具类,如果有不成熟的地方,也请大家多多指教。
代码语言:javascript复制package com.cestc.common.es;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import xx.xxx.xxx.es.annotation.Document;
import xx.xxx.xxx.es.annotation.EsId;
import xx.xxx.xxx.es.enums.FieldType;
import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
/**
* @className: EsUtil
* @description: es 操作工具类;
* 这里均采用同步调用的方式
* @author: sh.Liu
* @create: 2020-05-25 09:41
*/
@Component
@Slf4j
public class ElasticsearchUtil {
@Resource
private RestHighLevelClient restHighLevelClient;
/**
* 创建索引(默认分片数为5和副本数为1)
* @param clazz 根据实体自动映射es索引
* @throws IOException
*/
public boolean createIndex(Class clazz) throws Exception {
Document declaredAnnotation = (Document)clazz.getDeclaredAnnotation(Document.class);
if(declaredAnnotation == null){
throw new Exception(String.format("class name: %s can not find Annotation [Document], please check", clazz.getName()));
}
String indexName = declaredAnnotation.index();
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(Settings.builder()
// 设置分片数为3, 副本为2
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 2)
);
request.mapping(generateBuilder(clazz));
CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
// 指示是否所有节点都已确认请求
boolean acknowledged = response.isAcknowledged();
// 指示是否在超时之前为索引中的每个分片启动了必需的分片副本数
boolean shardsAcknowledged = response.isShardsAcknowledged();
if (acknowledged || shardsAcknowledged) {
log.info("创建索引成功!索引名称为{}", indexName);
return true;
}
return false;
}
/**
* 创建索引(默认分片数为5和副本数为1)
* @param clazz 根据实体自动映射es索引
* @throws IOException
*/
public boolean createIndexIfNotExist(Class clazz) throws Exception {
Document declaredAnnotation = (Document)clazz.getDeclaredAnnotation(Document.class);
if(declaredAnnotation == null){
throw new Exception(String.format("class name: %s can not find Annotation [Document], please check", clazz.getName()));
}
String indexName = declaredAnnotation.index();
boolean indexExists = isIndexExists(indexName);
if (!indexExists) {
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(Settings.builder()
// 设置分片数为3, 副本为2
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 2)
);
request.mapping(generateBuilder(clazz));
CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
// 指示是否所有节点都已确认请求
boolean acknowledged = response.isAcknowledged();
// 指示是否在超时之前为索引中的每个分片启动了必需的分片副本数
boolean shardsAcknowledged = response.isShardsAcknowledged();
if (acknowledged || shardsAcknowledged) {
log.info("创建索引成功!索引名称为{}", indexName);
return true;
}
}
return false;
}
/**
* 更新索引(默认分片数为5和副本数为1):
* 只能给索引上添加一些不存在的字段
* 已经存在的映射不能改
*
* @param clazz 根据实体自动映射es索引
* @throws IOException
*/
public boolean updateIndex(Class clazz) throws Exception {
Document declaredAnnotation = (Document )clazz.getDeclaredAnnotation(Document.class);
if(declaredAnnotation == null){
throw new Exception(String.format("class name: %s can not find Annotation [Document], please check", clazz.getName()));
}
String indexName = declaredAnnotation.index();
PutMappingRequest request = new PutMappingRequest(indexName);
request.source(generateBuilder(clazz));
AcknowledgedResponse response = restHighLevelClient.indices().putMapping(request, RequestOptions.DEFAULT);
// 指示是否所有节点都已确认请求
boolean acknowledged = response.isAcknowledged();
if (acknowledged ) {
log.info("更新索引索引成功!索引名称为{}", indexName);
return true;
}
return false;
}
/**
* 删除索引
* @param indexName
* @return
*/
public boolean delIndex(String indexName){
boolean acknowledged = false;
try {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
deleteIndexRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
AcknowledgedResponse delete = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
acknowledged = delete.isAcknowledged();
} catch (IOException e) {
e.printStackTrace();
}
return acknowledged;
}
/**
* 判断索引是否存在
* @param indexName
* @return
*/
public boolean isIndexExists(String indexName){
boolean exists = false;
try {
GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
getIndexRequest.humanReadable(true);
exists = restHighLevelClient.indices().exists(getIndexRequest,RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
return exists;
}
/**
* 添加单条数据
* 提供多种方式:
* 1. json
* 2. map
* Map<String, Object> jsonMap = new HashMap<>();
* jsonMap.put("user", "kimchy");
* jsonMap.put("postDate", new Date());
* jsonMap.put("message", "trying out Elasticsearch");
* IndexRequest indexRequest = new IndexRequest("posts")
* .id("1").source(jsonMap);
* 3. builder
* XContentBuilder builder = XContentFactory.jsonBuilder();
* builder.startObject();
* {
* builder.field("user", "kimchy");
* builder.timeField("postDate", new Date());
* builder.field("message", "trying out Elasticsearch");
* }
* builder.endObject();
* IndexRequest indexRequest = new IndexRequest("posts")
* .id("1").source(builder);
* 4. source:
* IndexRequest indexRequest = new IndexRequest("posts")
* .id("1")
* .source("user", "kimchy",
* "postDate", new Date(),
* "message", "trying out Elasticsearch");
*
* 报错: Validation Failed: 1: type is missing;
* 加入两个jar包解决
*
* 提供新增或修改的功能
*
* @return
*/
public IndexResponse index(Object o) throws Exception {
Document declaredAnnotation = (Document )o.getClass().getDeclaredAnnotation(Document.class);
if(declaredAnnotation == null){
throw new Exception(String.format("class name: %s can not find Annotation [Document], please check", o.getClass().getName()));
}
String indexName = declaredAnnotation.index();
IndexRequest request = new IndexRequest(indexName);
Field fieldByAnnotation = getFieldByAnnotation(o, EsId.class);
if (fieldByAnnotation != null) {
fieldByAnnotation.setAccessible(true);
try {
Object id = fieldByAnnotation.get(o);
request =request.id(id.toString());
} catch (IllegalAccessException e) {
log.error("获取id字段出错:{}", e);
}
}
String userJson = JSON.toJSONString(o);
request.source(userJson, XContentType.JSON);
IndexResponse indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);
return indexResponse;
}
/**
* 根据id查询
* @return
*/
public String queryById(String indexName, String id) throws IOException {
GetRequest getRequest = new GetRequest(indexName, id);
// getRequest.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
String jsonStr = getResponse.getSourceAsString();
return jsonStr;
}
/**
* 查询封装返回json字符串
* @param indexName
* @param searchSourceBuilder
* @return
* @throws IOException
*/
public String search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException {
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
searchRequest.scroll(TimeValue.timeValueMinutes(1L));
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
String scrollId = searchResponse.getScrollId();
SearchHits hits = searchResponse.getHits();
JSONArray jsonArray = new JSONArray();
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
JSONObject jsonObject = JSON.parseObject(sourceAsString);
jsonArray.add(jsonObject);
}
log.info("返回总数为:" hits.getTotalHits());
return jsonArray.toJSONString();
}
/**
* 查询封装,带分页
* @param searchSourceBuilder
* @param pageNum
* @param pageSize
* @param s
* @param <T>
* @return
* @throws IOException
*/
public <T> PageInfo<T> search(SearchSourceBuilder searchSourceBuilder, int pageNum, int pageSize, Class<T> s) throws Exception {
Document declaredAnnotation = (Document )s.getDeclaredAnnotation(Document.class);
if(declaredAnnotation == null){
throw new Exception(String.format("class name: %s can not find Annotation [Document], please check", s.getName()));
}
String indexName = declaredAnnotation.index();
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHits hits = searchResponse.getHits();
JSONArray jsonArray = new JSONArray();
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
JSONObject jsonObject = JSON.parseObject(sourceAsString);
jsonArray.add(jsonObject);
}
log.info("返回总数为:" hits.getTotalHits());
int total = (int)hits.getTotalHits().value;
// 封装分页
List<T> list = jsonArray.toJavaList(s);
PageInfo<T> page = new PageInfo<>();
page.setList(list);
page.setPageNum(pageNum);
page.setPageSize(pageSize);
page.setTotal(total);
page.setPages(total== 0 ? 0: (total%pageSize == 0 ? total / pageSize : (total / pageSize) 1));
page.setHasNextPage(page.getPageNum() < page.getPages());
return page;
}
/**
* 查询封装,返回集合
* @param searchSourceBuilder
* @param s
* @param <T>
* @return
* @throws IOException
*/
public <T> List<T> search(SearchSourceBuilder searchSourceBuilder, Class<T> s) throws Exception {
Document declaredAnnotation = (Document )s.getDeclaredAnnotation(Document.class);
if(declaredAnnotation == null){
throw new Exception(String.format("class name: %s can not find Annotation [Document], please check", s.getName()));
}
String indexName = declaredAnnotation.index();
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
searchRequest.scroll(TimeValue.timeValueMinutes(1L));
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
String scrollId = searchResponse.getScrollId();
SearchHits hits = searchResponse.getHits();
JSONArray jsonArray = new JSONArray();
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
JSONObject jsonObject = JSON.parseObject(sourceAsString);
jsonArray.add(jsonObject);
}
// 封装分页
List<T> list = jsonArray.toJavaList(s);
return list;
}
/**
* 批量插入文档
* 文档存在 则插入
* 文档不存在 则更新
* @param list
* @return
*/
public <T> boolean batchSaveOrUpdate(List<T> list) throws Exception {
Object o1 = list.get(0);
Document declaredAnnotation = (Document )o1.getClass().getDeclaredAnnotation(Document.class);
if(declaredAnnotation == null){
throw new Exception(String.format("class name: %s can not find Annotation [@Document], please check", o1.getClass().getName()));
}
String indexName = declaredAnnotation.index();
BulkRequest request = new BulkRequest(indexName);
for (Object o : list) {
String jsonStr = JSON.toJSONString(o);
IndexRequest indexReq = new IndexRequest().source(jsonStr, XContentType.JSON);
Field fieldByAnnotation = getFieldByAnnotation(o, EsId.class);
if (fieldByAnnotation != null) {
fieldByAnnotation.setAccessible(true);
try {
Object id = fieldByAnnotation.get(o);
indexReq = indexReq.id(id.toString());
} catch (IllegalAccessException e) {
log.error("获取id字段出错:{}", e);
}
}
request.add(indexReq);
}
BulkResponse bulkResponse = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
for(BulkItemResponse bulkItemResponse : bulkResponse){
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
IndexResponse indexResponse = (IndexResponse) itemResponse;
log.info("单条返回结果:{}", indexResponse);
if(bulkItemResponse.isFailed()){
log.error("es 返回错误{}",bulkItemResponse.getFailureMessage());
return false;
}
}
return true;
}
/**
* 删除文档
* @param indexName: 索引名称
* @param docId: 文档id
*/
public boolean deleteDoc(String indexName, String docId) throws IOException {
DeleteRequest request = new DeleteRequest(indexName, docId);
DeleteResponse deleteResponse = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
// 解析response
String index = deleteResponse.getIndex();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason();
log.info("删除失败,原因为 {}", reason);
}
}
return true;
}
/**
* 根据json类型更新文档
* @param indexName
* @param docId
* @param o
* @return
* @throws IOException
*/
public boolean updateDoc(String indexName, String docId, Object o) throws IOException {
UpdateRequest request = new UpdateRequest(indexName, docId);
request.doc(JSON.toJSONString(o), XContentType.JSON);
UpdateResponse updateResponse = restHighLevelClient.update(request, RequestOptions.DEFAULT);
String index = updateResponse.getIndex();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
return true;
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
return true;
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
}
return false;
}
/**
* 根据Map类型更新文档
* @param indexName
* @param docId
* @param map
* @return
* @throws IOException
*/
public boolean updateDoc(String indexName, String docId, Map<String, Object> map) throws IOException {
UpdateRequest request = new UpdateRequest(indexName, docId);
request.doc(map);
UpdateResponse updateResponse = restHighLevelClient.update(request, RequestOptions.DEFAULT);
String index = updateResponse.getIndex();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
return true;
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
return true;
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
}
return false;
}
public XContentBuilder generateBuilder(Class clazz) throws IOException {
// 获取索引名称及类型
Document doc = (Document) clazz.getAnnotation(Document.class);
System.out.println(doc.index());
System.out.println(doc.type());
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
builder.startObject("properties");
Field[] declaredFields = clazz.getDeclaredFields();
for (Field f : declaredFields) {
if (f.isAnnotationPresent(com.cestc.common.es.annotation.Field.class)) {
// 获取注解
com.cestc.common.es.annotation.Field declaredAnnotation = f.getDeclaredAnnotation(com.cestc.common.es.annotation.Field.class);
// 如果嵌套对象:
/**
* {
* "mappings": {
* "properties": {
* "region": {
* "type": "keyword"
* },
* "manager": {
* "properties": {
* "age": { "type": "integer" },
* "name": {
* "properties": {
* "first": { "type": "text" },
* "last": { "type": "text" }
* }
* }
* }
* }
* }
* }
* }
*/
if (declaredAnnotation.type() == FieldType.OBJECT) {
// 获取当前类的对象-- Action
Class<?> type = f.getType();
Field[] df2 = type.getDeclaredFields();
builder.startObject(f.getName());
builder.startObject("properties");
// 遍历该对象中的所有属性
for (Field f2 : df2) {
if (f2.isAnnotationPresent(com.cestc.common.es.annotation.Field.class)) {
// 获取注解
com.cestc.common.es.annotation.Field declaredAnnotation2 = f2.getDeclaredAnnotation(com.cestc.common.es.annotation.Field.class);
builder.startObject(f2.getName());
builder.field("type", declaredAnnotation2.type().getType());
// keyword不需要分词
if (declaredAnnotation2.type() == FieldType.TEXT) {
builder.field("analyzer", declaredAnnotation2.analyzer().getType());
}
if (declaredAnnotation2.type() == FieldType.DATE) {
builder.field("format", "yyyy-MM-dd HH:mm:ss");
}
builder.endObject();
}
}
builder.endObject();
builder.endObject();
}else{
builder.startObject(f.getName());
builder.field("type", declaredAnnotation.type().getType());
// keyword不需要分词
if (declaredAnnotation.type() == FieldType.TEXT) {
builder.field("analyzer", declaredAnnotation.analyzer().getType());
}
if (declaredAnnotation.type() == FieldType.DATE) {
builder.field("format", "yyyy-MM-dd HH:mm:ss");
}
builder.endObject();
}
}
}
// 对应property
builder.endObject();
builder.endObject();
return builder;
}
public static Field getFieldByAnnotation(Object o ,Class annotationClass){
Field[] declaredFields = o.getClass().getDeclaredFields();
if (declaredFields != null && declaredFields.length >0) {
for(Field f : declaredFields){
if (f.isAnnotationPresent(annotationClass)) {
return f;
}
}
}
return null;
}
}
这里会使用反射通过上面标注的注解来使用:比如我有一个实体,根据这个实体自动创建索引的方式如下; 首先在实体上加入我们自定义的注解,来设置索引名称,字段的类型,分词器是什么。这里字符串类型在es中有两种,一是KEY_WORD,不分词, 二是TEXT:会分词
代码语言:javascript复制package xx.xxx.xxx.es.pojo;
import xx.xxx.xxx.es.annotation.Document;
import xx.xxx.xxx.es.annotation.EsId;
import xx.xxx.xxx.es.annotation.Field;
import xx.xxx.xxx.es.enums.AnalyzerType;
import xx.xxx.xxx.es.enums.FieldType;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import java.math.BigDecimal;
/**
* @className: EsEvent
* @description:
* @author: sh.Liu
* @date: 2020-07-22 10:51
*/
@Data
@Document(index = "es_event", type = "")
public class EsEvent {
private static final long serialVersionUID=1L;
@EsId
@Field(type = FieldType.KEYWORD)
private Integer eventId;
/**
* 唯一标识码
*/
@Field(type = FieldType.KEYWORD)
private String uniqueCode;
/**
* 任务号
*/
@Field(type = FieldType.KEYWORD)
private String eventCode;
/**
* 事件来源编号
*/
@Field(type = FieldType.INTEGER)
private Integer eventSrcId;
/**
* 事件来源名称
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String eventSrcName;
/**
* 来源分组
*/
@Field(type = FieldType.KEYWORD)
private String srcGroupCode;
/**
* 事件大类编码
*/
@Field(type = FieldType.KEYWORD)
private String eventTypeCode;
/**
* 事件大类名称
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String eventTypeName;
/**
* 事件小类编码
*/
@Field(type = FieldType.KEYWORD)
private String eventSubtypeCode;
/**
* 事件小类名称
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String eventSubtypeName;
/**
* 重要程度
*/
@Field(type = FieldType.INTEGER)
private Integer eventGradeId;
/**
* 重要程度名称
*/
@Field(type = FieldType.KEYWORD)
private String eventGradeName;
/**
*紧急程度标识
*/
@Field(type = FieldType.INTEGER)
private Integer eventUrgencyId;
/**
*紧急程度名称
*/
@Field(type = FieldType.KEYWORD)
private String eventUrgencyName;
/**
*事件级别标识
*/
@Field(type = FieldType.INTEGER)
private Integer eventLevelId;
/**
*事件级别名称
*/
@Field(type = FieldType.KEYWORD)
private String eventLevelName;
/**
*事件升级标志
*/
@Field(type = FieldType.INTEGER)
private Integer eventUpgradeFlag;
/**
*处置级别标识
*/
@Field(type = FieldType.INTEGER)
private Integer dealLevelId;
/**
*处置级别标识
*/
@Field(type = FieldType.KEYWORD)
private String dealLevelName;
/**
*公众上报人名称
*/
@Field(type = FieldType.TEXT , analyzer = AnalyzerType.IK_SMART)
private String publicReporterName;
/**
*公众上报人身份证号
*/
@Field(type = FieldType.KEYWORD)
private String publicReporterIdcard;
/**
*公众上报人联系方式
*/
@Field(type = FieldType.KEYWORD)
private String publicReporterTel;
/**
* 事件描述
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String eventDesc;
/**
* 地址
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String address;
/**
* 地区名称
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String areaRegionName;
/**
* 地区编码
*/
@Field(type = FieldType.KEYWORD)
private String areaRegionCode;
/**
* 社区名称
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String commRegionName;
/**
* 区编码
*/
@Field(type = FieldType.KEYWORD)
private String commRegionCode;
/**
* 街道名称
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String streetRegionName;
/**
* 街道编码
*/
@Field(type = FieldType.KEYWORD)
private String streetRegionCode;
/**
* 社区名称
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String regionName;
/**
* 社区编码
*/
@Field(type = FieldType.KEYWORD)
private String regionCode;
/**
* 经度
*/
private BigDecimal coordX;
/**
* 纬度
*/
private BigDecimal coordY;
/**
*坐标系
*/
private String mapcoordinate;
/**
*网格员标识
*/
private Integer griderId;
/**
*网格员标识
*/
private String griderName;
/**
*网格员电话
*/
private String griderPhone;
/**
*核实状态标识
*/
@Field(type = FieldType.INTEGER)
private Integer verifyStateId;
/**
*核查状态标识
*/
@Field(type = FieldType.INTEGER)
private Integer checkStateId;
/**
*事件建立时间
*/
@Field(type = FieldType.DATE)
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String createTime;
/**
*流程结束时间
*/
@Field(type = FieldType.DATE)
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String endTime;
/**
*延期天数
*/
private Float postponedDays;
/**
*延期标志
*/
private Integer postponedFlag;
/**
*受理时间
*/
@Field(type = FieldType.DATE)
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String acceptTime;
/**
*立案时间
*/
@Field(type = FieldType.DATE)
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String establishTime;
/**
*调度时间
*/
@Field(type = FieldType.DATE)
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String dispatchTime;
/**
*流程开始时间
*/
@Field(type = FieldType.DATE)
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String procStartTime;
/**
*流程结束时间
*/
@Field(type = FieldType.DATE)
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String procEndTime;
/**
*流程截止时间
*/
@Field(type = FieldType.DATE)
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String procDeadLine;
/**
*流程警告时间
*/
@Field(type = FieldType.DATE)
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String procWarningTime;
/**
*处置开始时间
*/
@Field(type = FieldType.DATE)
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String funcBeginTime;
/**
*处置完成时间
*/
@Field(type = FieldType.DATE)
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String funcFinishTime;
/**
*自处置标识
*/
private Integer gridDealFlag;
/**
*跨网格标志
*/
private Integer overGridFlag;
/**
*是否督办
*/
@Field(type = FieldType.INTEGER)
private Integer pressFlag;
/**
*是否催办
*/
@Field(type = FieldType.INTEGER)
private Integer hurryFlag;
/**
*超期标志
*/
@Field(type = FieldType.INTEGER)
private Integer overtimeFlag;
/**
*活动属性
*/
@Field(type = FieldType.INTEGER)
private Integer actPropertyId;
/**
*活动属性名称
*/
@Field(type = FieldType.KEYWORD)
private String actPropertyName;
/**
*流程实例标识
*/
@Field(type = FieldType.KEYWORD)
private String procInstId;
/**
*流程定义标识
*/
@Field(type = FieldType.KEYWORD)
private String procDefId;
/**
*事件状态
*/
@Field(type = FieldType.INTEGER)
private Integer eventStateId;
/**
* 上一操作项
*/
@Field(type = FieldType.KEYWORD)
private String preActionName;
/**
* 登记人Id
*/
@Field(type = FieldType.INTEGER)
private Integer registerId;
/**
* 登记人姓名
*/
@Field(analyzer = AnalyzerType.IK_SMART)
private String registerName;
/**
* 回访标识:0-未回访 1-已回访 2-无法回访
*/
@Field(type = FieldType.INTEGER)
private Integer visitorStateId;
/**
* 删除标识
*/
@Field(type = FieldType.INTEGER)
private Integer deleteFlag;
/**
* 删除用户标识
*/
@Field(type = FieldType.INTEGER)
private Integer deleteUserId;
/**
* 删除时间
*/
@Field(type = FieldType.DATE)
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String deleteTime;
/**
* 是否下发督查
* 0:否,1:是
*/
@Field(type = FieldType.INTEGER)
private Integer overseerFlag;
@Field(type = FieldType.DATE)
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String updateTime;
@Field(type = FieldType.OBJECT)
private EsAct act;
}
act结构:
代码语言:javascript复制package xx.xxx.xxx.es.pojo;
import xx.xxx.xxx.es.annotation.Field;
import xx.xxx.xxx.es.ednums.AnalyzerType;
import xx.xxx.xxx.es.enums.FieldType;
import lombok.Data;
import java.io.Serializable;
import java.math.BigDecimal;
/**
* @className: EsAct
* @description: es act表
* @author: sh.Liu
* @date: 2020-07-22 13:18
*/
@Data
public class EsAct implements Serializable {
private static final long serialVersionUID = 1L;
@Field(type = FieldType.INTEGER)
private Integer actId;
/**
* 任务标识
*/
@Field(type = FieldType.KEYWORD)
private String taskId;
/**
* 流程定义标识
*/
@Field(type = FieldType.KEYWORD)
private String procDefId;
/**
* 流程实例标识
*/
@Field(type = FieldType.KEYWORD)
private String procInstId;
/**
* 子流程实例标识
*/
@Field(type = FieldType.KEYWORD)
private String subInstId;
/**
* 节点定义标识
*/
@Field(type = FieldType.KEYWORD)
private String nodeId;
/**
* 节点定义名称
*/
@Field(type = FieldType.KEYWORD)
private String nodeName;
/**
* 业务主键标识
*/
@Field(type = FieldType.KEYWORD)
private String bizId;
/**
* 参与者标识
*/
@Field(type = FieldType.KEYWORD)
private String partId;
/**
* 参与者姓名
*/
@Field(type = FieldType.TEXT)
private String partName;
/**
* 部门id
*/
@Field(type = FieldType.KEYWORD)
private String unitId;
/**
* 部门名称
*/
@Field(type = FieldType.TEXT)
private String unitName;
/**
* 角色标识
*/
@Field(type = FieldType.KEYWORD)
private String roleId;
/**
* 角色名称
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String roleName;
/**
* 上一活动标识
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String preActId;
/**
* 上一活动参与者标识
*/
private String prePartId;
/**
* 上一活动参与者名称
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String prePartName;
/**
* 上一活动定义标识
*/
private String preNodeId;
/**
* 上一活动定义名称
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String preNodeName;
/**
* 上一活动意见
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String preOpinion;
/**
* 上一活动操作项名称
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String preActionName;
/**
* 上一活动操作项显示名称
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String preActionLabel;
/**
* 创建时间
*/
@Field(type = FieldType.DATE)
private String createTime;
/**
* 截止时间
*/
@Field(type = FieldType.DATE)
private String deadLine;
/**
* 警告时间
*/
@Field(type = FieldType.DATE)
private String warningTime;
/**
* 结束时间
*/
@Field(type = FieldType.DATE)
private String endTime;
/**
* 活动红绿灯
*/
@Field(type = FieldType.INTEGER)
private Integer actTimeStateId;
/**
* 活动时限
*/
@Field(type = FieldType.DOUBLE)
private BigDecimal timeLimit;
/**
* 计时单位
*/
@Field(type = FieldType.INTEGER)
private Integer timeUnit;
/**
* 活动时限分钟
*/
@Field(type = FieldType.INTEGER)
private Integer timeLimitM;
/**
* 已用时
*/
@Field(type = FieldType.INTEGER)
private Integer actUsedTime;
/**
* 剩余时
*/
@Field(type = FieldType.INTEGER)
private Integer actRemainTime;
/**
* 活动时限信息
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String actLimitInfo;
/**
* 活动已用时间字符串
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String actUsedTimeChar;
/**
* 活动剩余时间字符串
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String actRemainTimeChar;
/**
* 累计计时标志
*/
@Field(type = FieldType.INTEGER)
private Integer timeAddUpFlag;
/**
* 暂停前节点用时
*/
@Field(type = FieldType.INTEGER)
private Integer actUsedTimeBeforeStop;
/**
* 恢复计时时间
*/
@Field(type = FieldType.DATE)
private String actRestart;
/**
* 已读标志
*/
@Field(type = FieldType.INTEGER)
private Integer readFlag;
/**
* 已读时间
*/
@Field(type = FieldType.DATE)
private String readTime;
/**
* 批转意见
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String transOpinion;
/**
* 操作项名称
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String actionName;
/**
* 操作项显示名称
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String actionLabel;
/**
* 活动属性id
*/
@Field(type = FieldType.INTEGER)
private Integer actPropertyId;
/**
* 活动属性名称
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String actPropertyName;
/**
* 抄送参与者
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String ccPart;
/**
* 抄送参与者名称
*/
@Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String ccPartName;
/**
* 抄送标志
*/
@Field(type = FieldType.INTEGER)
private Integer ccFlag;
}
创建索引方式:
代码语言:javascript复制@RequestMapping("createIndex")
public String createIndex() {
try {
esUtil.createIndexIfNotExist(EsEvent.class);
} catch (Exception e) {
e.printStackTrace();
}
return "success";
}
这样就会根据创建一个叫做 es_event的索引(实体注解上配置)同时EsEvent 和 EsAct是一对一的依赖方式。
通过head插件可以看到索引结构(部分):
五. 分页查询封装:
我的项目中需要使用es做一个分页查询,查询条件通过前端传过来,所以我们需要做一下动态拼接,同时还有排序功能。我们将查询条件和分页条件都封装到一个VO中。
代码语言:javascript复制/**
* 综合查询列表
*
* @return
*/
@PostMapping("/comprehensive/list")
public Result<Result.ResultPage<ComprehensiveQueryEventVO>> listComprehensiveQuery(@RequestBody ComprehensiveQueryEventVO comprehensiveQueryEventVO) {
PageInfo<ComprehensiveQueryEventVO> returnPageInfo = null;
try {
returnPageInfo = esService.pageComprehensiveQuery(comprehensiveQueryEventVO);
} catch (Exception e) {
log.error("es 综合查询异常,开始使用数据库做综合查询,错误为 :{}", e);
// es异常,使用数据库查询
PageHelper.startPage(comprehensiveQueryEventVO.getPageNum(), comprehensiveQueryEventVO.getPageSize());
List<ComprehensiveQueryEventVO> comprehensiveQueryEventVOS = eventInfoService.listComprehensiveQuery(comprehensiveQueryEventVO);
PageInfo<ComprehensiveQueryEventVO> queryPageInfo = new PageInfo<>(comprehensiveQueryEventVOS);
returnPageInfo = new PageInfo<>();
// 此处统一VO,此处使用可忽略
Type type = new TypeToken<List<ComprehensiveQueryEventVO>>() {
}.getType();
ModelMapper modelMapper = new ModelMapper();
modelMapper.map(queryPageInfo, returnPageInfo);
returnPageInfo.setList(new ModelMapper().map(queryPageInfo.getList(), type));
}
Result.ResultPage<ComprehensiveQueryEventVO> resultPage = Result.ResultPage.build(returnPageInfo);
Result<Result.ResultPage<ComprehensiveQueryEventVO>> result = Result.success(resultPage);
return result;
}
代码语言:javascript复制package com.cestc.cooperative.pojo.vo;
import com.alibaba.excel.annotation.ExcelProperty;
import com.alibaba.excel.annotation.write.style.ColumnWidth;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* 综合查询对应javabean
*
* @author wanggang
* @date 2020/1/3
*/
@Data
@ColumnWidth(30)
public class ComprehensiveQueryEventVO implements Serializable {
@ExcelProperty("序号")
private Integer orderNum;
/**
* 任务号
*/
@ExcelProperty("事件编号")
private String eventCode;
/**
* 事件大类名称
*/
@ExcelProperty("类型")
private String eventTypeName;
/**
* 事件大类编码
*/
@ExcelProperty("事件大类编码")
private String eventTypeCode;
/**
* 事件小类编码
*/
@ExcelProperty("事件小类编码")
private String eventSubtypeCode;
/**
* 事件小类名称
*/
@ExcelProperty("事件小类名称")
private String eventSubtypeName;
/**
* 地区名称
*/
@ExcelProperty("地区名称")
private String areaRegionName;
/**
* 地区编码
*/
private String areaRegionCode;
/**
* 社区名称
*/
@ExcelProperty("社区名称")
private String commRegionName;
/**
* 社区编码
*/
private String commRegionCode;
/**
* 街道名称
*/
@ExcelProperty("街道名称")
private String streetRegionName;
/**
* 街道编码
*/
private String streetRegionCode;
/**
* 社区名称
*/
@ExcelProperty("区域")
private String regionName;
/**
* 社区编码
*/
private String regionCode;
/**
* 地址描述
*/
@ExcelProperty("地址")
private String address;
/**
* 重要程度标识
*/
@ExcelProperty("等级标识")
private Integer eventGradeId;
/**
* 重要程度名称
*/
@ExcelProperty("等级")
private String eventGradeName;
/**
* 活动状态标识
*/
private Integer actTimeStateId;
/**
* 活动状态名称
*/
@ExcelProperty("计时状态")
private String actTimeStateName;
/**
* 是否督办
*/
@ExcelProperty("是否督办标志位")
private Integer pressFlag;
@ExcelProperty("督办")
private String pressFlagName;
/**
* 是否督查
*/
@ExcelProperty("是否督查标志位")
private Integer overseerFlag;
@ExcelProperty("督查")
private String overseerFlagName;
/**
* 流程开始时间
*/
@ExcelProperty("上报时间")
// @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String procStartTime;
/**
* 流程截止时间
*/
@ExcelProperty("截止时间")
// @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String procDeadLine;
/**
* 事件描述
*/
@ExcelProperty("事件描述")
private String eventDesc;
/**
* 事件状态
*/
@ExcelProperty("事件状态Id")
private Integer eventStateId;
@ExcelProperty("事件状态")
private String eventStateName;
/**
* 事件来源标识
*/
@ExcelProperty("事件来源标识")
private Integer eventSrcId;
/**
* 事件来源名称
*/
@ExcelProperty("事件来源")
private String eventSrcName;
/**
* 流程结束时间
*/
@ExcelProperty("办结时间")
// @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String procEndTime;
/**
* 超时时长
*/
@ExcelProperty("超时时长")
private String timeoutDuration;
/**
* 回访时间
*/
@ExcelProperty("回访时间")
// @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String visitorTime;
/**
* 满意度
*/
@ExcelProperty("满意度")
private String resultLabel;
/**
* 上报开始时间
*/
@ExcelProperty("上报开始时间")
// @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String reportStartTime;
/**
* 上报结束时间
*/
@ExcelProperty("上报结束时间")
// @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String reportEndTime;
/**
* 截止开始时间
*/
@ExcelProperty("截止开始时间")
// @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String deadLineStartTime;
/**
* 截止结束时间
*/
@ExcelProperty("截止结束时间")
// @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
private String deadLineEndTime;
/**
* 上报人电话
*/
@ExcelProperty("上报人")
private String publicReporterName;
@ExcelProperty("联系电话")
private String phone;
/**
* 超时状态(流程红绿灯)
*/
private Integer procTimeStateId;
/**
* 超时状态对应名称
*/
private String procTimeStateName;
/**
* 导出字段List
*/
private List<String> exportColumnList;
/**
* 关键字
*/
private String keyword;
/**
* 活动截止时间
*/
private String deadLine;
private Integer pageNum;
private Integer pageSize;
/**
* 排序字段
*/
private String orderField;
/**
* 排序方式
*/
private String orderSort;
/**
* 事件Id
*/
private Integer eventId;
}
分页查询方法:
代码语言:javascript复制@Override
public PageInfo<ComprehensiveQueryEventVO> pageComprehensiveQuery(ComprehensiveQueryEventVO comprehensiveQueryEventVO) throws Exception {
SearchSourceBuilder searchSourceBuilder = getSearchSourceBuilder(comprehensiveQueryEventVO);
PageInfo page = esUtil.search(searchSourceBuilder, comprehensiveQueryEventVO.getPageNum(), comprehensiveQueryEventVO.getPageSize(), EsEvent.class);
// 转换
List<ComprehensiveQueryEventVO> list = convertList(page.getList());
page.setList(list);
return page;
}
/**
* 拼接综合查询 查询条件
* @param comprehensiveQueryEventVO
* @return
*/
private SearchSourceBuilder getSearchSourceBuilder(ComprehensiveQueryEventVO comprehensiveQueryEventVO){
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
if (comprehensiveQueryEventVO.getPageNum() == null) {
comprehensiveQueryEventVO.setPageNum(1);
}
if (comprehensiveQueryEventVO.getPageSize() == null) {
comprehensiveQueryEventVO.setPageSize(10000);
}
sourceBuilder.from((comprehensiveQueryEventVO.getPageNum()-1)*comprehensiveQueryEventVO.getPageSize());
sourceBuilder.size(comprehensiveQueryEventVO.getPageSize());
// 上报时间拼接时/分/秒
if (!ObjectUtils.isEmpty(comprehensiveQueryEventVO.getReportStartTime())) {
comprehensiveQueryEventVO.setReportStartTime(comprehensiveQueryEventVO.getReportStartTime() " 00:00:00");
}
if (!ObjectUtils.isEmpty(comprehensiveQueryEventVO.getReportEndTime())) {
comprehensiveQueryEventVO.setReportEndTime(comprehensiveQueryEventVO.getReportEndTime() " 23:59:59");
}
// 截止时间拼接时/分/秒
if (!ObjectUtils.isEmpty(comprehensiveQueryEventVO.getDeadLineStartTime())) {
comprehensiveQueryEventVO.setDeadLineStartTime(comprehensiveQueryEventVO.getDeadLineStartTime() " 00:00:00");
}
if (!ObjectUtils.isEmpty(comprehensiveQueryEventVO.getDeadLineEndTime())) {
comprehensiveQueryEventVO.setDeadLineEndTime(comprehensiveQueryEventVO.getDeadLineEndTime() " 23:59:59");
}
// 符合条件查询
BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
// event_state_id != 0 and event_state_id != 11 and delete_flag != 1
boolBuilder.mustNot(QueryBuilders.termQuery("eventStateId", 0));
boolBuilder.mustNot(QueryBuilders.termQuery("eventStateId", 11));
boolBuilder.mustNot(QueryBuilders.termQuery("delateFlag", 1));
// 动态条件----keyword
if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getKeyword())) {
boolBuilder.must(QueryBuilders.queryStringQuery(comprehensiveQueryEventVO.getKeyword()));
}
// 拼接动态查询条件
if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventCode())) {
boolBuilder.must(QueryBuilders.termQuery("eventCode", comprehensiveQueryEventVO.getEventCode()));
}
if (comprehensiveQueryEventVO.getEventGradeId() != null) {
boolBuilder.must(QueryBuilders.termQuery("eventGradeId", comprehensiveQueryEventVO.getEventGradeId()));
}
if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventGradeName())) {
boolBuilder.must(QueryBuilders.termQuery("eventGradeNam", comprehensiveQueryEventVO.getEventGradeName()));
}
if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventTypeName())) {
boolBuilder.must(QueryBuilders.termQuery("eventTypeName", comprehensiveQueryEventVO.getEventTypeName()));
}
if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventTypeCode())) {
boolBuilder.must(QueryBuilders.termQuery("eventTypeCode", comprehensiveQueryEventVO.getEventTypeCode()));
}
if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventSubtypeName())) {
boolBuilder.must(QueryBuilders.termQuery("eventSubtypeName", comprehensiveQueryEventVO.getEventSubtypeName()));
}
if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventSubtypeCode())) {
boolBuilder.must(QueryBuilders.termQuery("eventSubtypeCode", comprehensiveQueryEventVO.getEventSubtypeCode()));
}
if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getAreaRegionName())) {
boolBuilder.must(QueryBuilders.termQuery("areaRegionName", comprehensiveQueryEventVO.getAreaRegionName()));
}
if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getAreaRegionCode())) {
boolBuilder.must(QueryBuilders.termQuery("areaRegionCode", comprehensiveQueryEventVO.getAreaRegionCode()));
}
if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getCommRegionName())) {
boolBuilder.must(QueryBuilders.termQuery("commRegionName", comprehensiveQueryEventVO.getCommRegionName()));
}
if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getCommRegionCode())) {
boolBuilder.must(QueryBuilders.termQuery("commRegionCode", comprehensiveQueryEventVO.getCommRegionCode()));
}
if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getStreetRegionName())) {
boolBuilder.must(QueryBuilders.termQuery("streetRegionName", comprehensiveQueryEventVO.getStreetRegionName()));
}
if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getStreetRegionCode())) {
boolBuilder.must(QueryBuilders.termQuery("streetRegionCode", comprehensiveQueryEventVO.getStreetRegionName()));
}
if (comprehensiveQueryEventVO.getEventSrcId() != null) {
boolBuilder.must(QueryBuilders.termQuery("eventSrcId", comprehensiveQueryEventVO.getEventSrcId()));
}
if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventSrcName())) {
boolBuilder.must(QueryBuilders.termQuery("eventSrcName", comprehensiveQueryEventVO.getEventSrcName()));
}
if (comprehensiveQueryEventVO.getProcTimeStateId() != null) {
boolBuilder.must(QueryBuilders.termQuery("procTimeStateId", comprehensiveQueryEventVO.getProcTimeStateId()));
}
if (comprehensiveQueryEventVO.getPressFlag() != null) {
boolBuilder.must(QueryBuilders.termQuery("pressFlag", comprehensiveQueryEventVO.getPressFlag()));
}
if (comprehensiveQueryEventVO.getOverseerFlag() != null) {
boolBuilder.must(QueryBuilders.termQuery("overseerFlag", comprehensiveQueryEventVO.getOverseerFlag()));
}
if (comprehensiveQueryEventVO.getEventStateId() != null) {
boolBuilder.must(QueryBuilders.termQuery("eventStateId", comprehensiveQueryEventVO.getEventStateId()));
}
if (comprehensiveQueryEventVO.getActTimeStateId() != null) {
boolBuilder.must(QueryBuilders.termQuery("act.actTimeStateId", comprehensiveQueryEventVO.getActTimeStateId()));
}
if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getReportStartTime())) {
boolBuilder.must(QueryBuilders.rangeQuery("procStartTime").gte(comprehensiveQueryEventVO.getReportStartTime()));
}
if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getReportEndTime())) {
boolBuilder.must(QueryBuilders.rangeQuery("procStartTime").lte(comprehensiveQueryEventVO.getReportEndTime()));
}
if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getDeadLineStartTime())) {
boolBuilder.must(QueryBuilders.rangeQuery("act.deadLine").gte(comprehensiveQueryEventVO.getDeadLineStartTime()));
}
if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getDeadLineEndTime())) {
boolBuilder.must(QueryBuilders.rangeQuery("act.deadLine").lte(comprehensiveQueryEventVO.getDeadLineEndTime()));
}
sourceBuilder.query(boolBuilder);
// 排序:
if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getOrderField()) && StringUtils.isNotEmpty(comprehensiveQueryEventVO.getOrderSort())) {
String esOrderField = null;
if (EventInfoServiceImpl.QueryFieldEnum.PROC_START_TIME.getFieldName().equals(comprehensiveQueryEventVO.getOrderField())) {
esOrderField = "procStartTime";
} else if (EventInfoServiceImpl.QueryFieldEnum.PROC_DEAD_LINE.getFieldName().equals(comprehensiveQueryEventVO.getOrderField())) {
esOrderField = "act.deadLine";
} else if (EventInfoServiceImpl.QueryFieldEnum.PROC_END_TIME.getFieldName().equals(comprehensiveQueryEventVO.getOrderField())) {
esOrderField = "procEndTime";
} else if (EventInfoServiceImpl.QueryFieldEnum.VISITOR_TIME.getFieldName().equals(comprehensiveQueryEventVO.getOrderField())) {
esOrderField = "visitor.visitorTime";
}
FieldSortBuilder fieldSortBuilder = new FieldSortBuilder(esOrderField);
fieldSortBuilder = fieldSortBuilder.order("orderDesc".equals(comprehensiveQueryEventVO.getOrderSort()) ? SortOrder.DESC : SortOrder.ASC);
sourceBuilder.sort(fieldSortBuilder);
}
return sourceBuilder;
}
六. 数据同步
基本的方法都有了,下面就是数据同步了,我们需要在所有的对数据库有增删改操作的地方,同时去操作es,这一部分可以分散在代码的各个角落,所以推荐大家通过aop去处理,去定义对应的切面,在切面中统一处理。我这里利用了消息队列,主要原因是想做成异步,因为有些操作比较本身已经比较耗时了,所以不想在操作后,还去等待es的处理结果,所以直接扔到队列里,由队列异步处理:
发送到队列代码:
代码语言:javascript复制// 同步es
mqService.sendEs(SendEsVO.builder().eventId(Integer.valueOf(actionVO.getBizId())).build());
代码语言:javascript复制@Override
public void sendEs(SendEsVO sendEsVO) {
logger.info("发送mq请求【同步ES】===》" JSONObject.toJSONString(sendEsVO));
rabbitTemplate.convertAndSend(MqBaseConst.MQ_EXCHANGE_ES,
MqBaseConst.MQ_KEY_ES, JSONObject.toJSONString(sendEsVO));
}
队列消费者代码:
代码语言:javascript复制@Component
@Slf4j
public class EsMqListener {
private final EsService esService;
public EsMqListener(EsService esService) {
this.esService = esService;
}
@RabbitHandler
@RabbitListener(queues = MqBaseConst.MQ_QUEUE_ES)
public void receiveGatewayMsg(String msg){
log.info("【es】消息系统收到消息,内容为 {}",msg);
if (StringUtils.isNoneEmpty(msg)) {
SendEsVO sendEsVO = JSON.parseObject(msg, SendEsVO.class);
// 判断操作类型,目前都是 单条的saveOrUpdate 操作
try {
esService.saveOrUpdate(sendEsVO.getEventId());
} catch (Exception e) {
log.error("eventId: [{}], 同步到es失败,错误为:{}", sendEsVO.getEventId(), e);
//todo: 失败后的处理
return;
}
log.info("eventId: [{}], 同步到es成功", sendEsVO.getEventId());
}
}
}
代码语言:javascript复制@Override
public boolean saveOrUpdate(Integer eventId) throws Exception {
// 1. 如果索引不存在则创建
boolean re = esUtil.createIndexIfNotExist(EsEvent.class);
// 如果返回true, 代表该索引不存在,已经创建了,那么此时应该做一次全量同步
if (re) {
batchSaveOrUpdate();
return true;
}
// 2. 执行到此处,说明索引存在,那么做单条数据同步
EventDO eventDO = eventService.getById(eventId);
EsEvent event = ModelMapperUtil.map(eventDO, EsEvent.class, "yyyy-MM-dd HH:mm:ss");
// 查询act
CooActDO cooActDO = actService.getOne(new LambdaQueryWrapper<CooActDO>().eq(CooActDO::getBizId, event.getEventId()));
if (cooActDO != null) {
EsAct act = ModelMapperUtil.map(cooActDO, EsAct.class, "yyyy-MM-dd HH:mm:ss");
event.setAct(act);
}
IndexResponse indexResponse = esUtil.index(event);
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
log.info("eventId {}, 在es中对应的数据创建成功");
return true;
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
log.info("eventId {}, 在es中对应的数据更新成功");
return true;
}
return false;
}
同时定时任务做全量同步:
代码语言:javascript复制/**
* @className: MysqlToEsTaskServiceImpl
* @description: mysql数据同步到es的定时任务
* @author: sh.Liu
* @date: 2020-07-24 16:28
*/
@Service
@Slf4j
public class MysqlToEsTaskServiceImpl implements MysqlToEsTaskService {
private final EsService esService;
public MysqlToEsTaskServiceImpl(EsService esService) {
this.esService = esService;
}
@Override
public void syncEventData() {
log.info("将 mysql【事件数据】 全量同步到 elasticsearch 定时任务开始执行。。。");
try {
esService.batchSaveOrUpdate();
} catch (Exception e) {
log.error("将 mysql【事件数据】 全量同步到 elasticsearch 定时任务执行失败,{}", e);
}
log.info("将 mysql【事件数据】 全量同步到 elasticsearch 定时任务执行结束。。。");
}
}
全量同步代码;
代码语言:javascript复制/**
* mysql数据库和es全量同步
* @return
* @throws Exception
*/
@Override
public boolean batchSaveOrUpdate() throws Exception {
// 1. 如果索引不存在则创建
esUtil.createIndexIfNotExist(EsEvent.class);
// 2. 全量同步
List<EsEvent> esEventList = new ArrayList<>();
// 查询所有es数据,不包含草稿箱 eventStateId != 0
List<EventDO> list = eventService.list(
new LambdaQueryWrapper<EventDO>().ne(EventDO::getEventStateId,EventStateEnum.EVENTSTATE_DRAFT.getCode()));
if (list != null && list.size() > 0) {
for (EventDO eventDO : list) {
EsEvent event = ModelMapperUtil.map(eventDO, EsEvent.class, "yyyy-MM-dd HH:mm:ss");
// 查询act
CooActDO cooActDO = actService.getOne(new LambdaQueryWrapper<CooActDO>().eq(CooActDO::getBizId, event.getEventId()));
if (cooActDO != null) {
EsAct act = ModelMapperUtil.map(cooActDO, EsAct.class, "yyyy-MM-dd HH:mm:ss");
event.setAct(act);
}
esEventList.add(event);
}
}
boolean b = esUtil.batchSaveOrUpdate(esEventList);
return b;
}
然后将上述方法配置到xxl-job中即可。