springboot集成elasticsearch7实现全文检索及分页

2021-12-06 09:59:39 浏览数 (1)

springboot集成elasticsearch7实现全文检索及分页

elasticsearch系列文章前面已经更新过三篇(https://blog.csdn.net/lsqingfeng/category_10219329.html)(建议先看下这三篇文章),分别讲解了elasticsearch7.2的安装,和springboot的集成以及简单的使用。前面都是通过demo的方式,主要讲解了他的一下简单使用,最近已经将es成功应用到了项目当中,本篇文章主要系统的介绍一下到底在项目中该如何使用es.

一. 概述

为什么选择elasticsearch? 我想能看到这篇文章的人应该都了解过了。 因为当你已经开始想知道springboot如何集成es的时候,说明你已经过了了解elasticsearch的阶段。 简单的说,就是在数据量很大的情况下,elasticsearch通过内部的“倒叙索引”使其查询性能更佳。接下来就是要面对怎么查的问题。一般情况下,我们都会使用数据库作为项目中的数据存储单元,得益于数据库对于事务的超强控制能力。而现在我们要引入es, es本身其实也是一个数据存储单元,只不过由于他的内部数据结构和数据库的结构不一致,使其具备了查询高效的能力。我们在学习的时候完全可以把它当做一个查询速度很快的数据库。

​ 那么我们要想从es中查询数据,es中必须得有数据,而往往我们的数据都是存储在数据库中, 所以查询的第一个就是将数据中的数据同步到es中(也有一些应用单独的使用es存储数据)。关于数据库和es的同步方案,网上有很多。最简单的一种,你往数据库存一条,同时就往es里存一条。你往数据库修改一条,同时也修改一下es,这种方式的有点就是操作简单,只需要在调用mysql的dao的同事,调用一下es的增删改查即可。缺点是要处理二者的同步性问题,比如往mysql插入成功,往es插入失败的情况的处理。第二种方式,我们可以使用定时任务,定时任务每隔一段时间从mysql中把数据全量读出来,然后往es中同步一次,这种方式的优点时对于原来的业务代码没有任何侵入,缺点也很明显,就是定时任务的通用缺点,实时性差,并且每次全量同步,可能导致查询压力大。还有一种方式是通过数据库的binlog, binlog大家应该比较了解,做数据库主从的同步都是通过binlog实现的。而mysql和es之间也可以通过binlog进行同步,这样不需要对代码做任何的修改,有兴趣的可以研究一下阿里的canal,就是专门做binlog同步的。

在我的项目中,经过一顿思考,决定使用两种方式,实时同步(增量同步)和定时任务(全量同步)的方式。当有对应数据发生增改删的操作,实时向es中同步。同时使用定时任务,每隔一段时间全量同步一次。

二. 准备工作
  1. 准备好自己的springboot项目,这里不会详细介绍springboot相关内容
  2. 在服务器上或者自己的虚拟机上安装好elasticsearch7.2 (全部代码均采用这个版本)
  3. 在服务器或者自己的虚拟机上安装好elasticsearch-head插件,也是7.2
  4. 在服务器或者自己的虚拟机上安装好kibana7.2

关于2.3.4这三项内容的安装请参考文章:

https://cloud.tencent.com/developer/article/1911266

如果下载软件慢的,可以使用国内镜像加速下载:https://www.newbe.pro/tags/Mirrors/

一定要确保上述服务安装成功:有head的页面,和kibana的页面。

三. 集成

集成的方式主要有两种,一种是使用es提供的原生客户端,一种是使用springboot-data提供的客户端spring-data-elasticsearch, 像一般我们使用redis,一般都会选择使用spring-data-redis, spring已经封装好了的一些工具方法,使用起来很方便。但是这里我要介绍的是原生客户端的方案。主要原因就是spring-data支持的es版本太低,虽然近期spring-data-es已经更新可以支持比较新的版本的es,但是同时对于springboot的版本要求也比较高。我项目中所使用spring-boot版本较低,而对应支持的es版本更低,所以我直接集成es的原生客户端。具体原因可以参看:https://blog.csdn.net/lsqingfeng/article/details/106398077。也可自行查看spring-data-es官网了解es和springboot版本对应关系做选择。

关于集成,这里使用的es中提供的 HighLevelRestClient,高级别客户端,这也是官方推荐的,另外es7以上,已经不推荐使用TransportClient了,es7也取消了type的概念。

集成方式,引入jar包,添加配置即可

  1. pom.xml
代码语言:javascript复制
 <dependency>
   <groupId>org.elasticsearch.client</groupId>
   <artifactId>elasticsearch-rest-high-level-client</artifactId>
   <version>${elasticsearch.version}</version>
 </dependency>
 <dependency>
   <groupId>org.elasticsearch.client</groupId>
   <artifactId>elasticsearch-rest-client</artifactId>
   <version>${elasticsearch.version}</version>
 </dependency>
 <dependency>
   <groupId>org.elasticsearch</groupId>
   <artifactId>elasticsearch</artifactId>
   <version>${elasticsearch.version}</version>
 </dependency>

这里 的版本号定义的是7.2.0

代码语言:javascript复制
  <elasticsearch.version>7.2.0</elasticsearch.version>

截止到目前,就已经集成完毕,接下来就可以在代码中使用客户端操作es了。

四. 自定义注解创建索引

关于这块的内容参见:https://cloud.tencent.com/developer/article/1911164

这里简单说一下,索引就相当于是表结构,es本身也是存储数据的,既然是存储,就需要定一个结构,比如有哪些字段,每个字段是什么类型。但是痛点是如果我们定义的这个结构如果比较复杂,那么用原生的方法代码会很多,很麻烦,所以我们可以自己定义一套注解,加入到实体类上,这样就可以根据实体类,让es自己去创建索引,很方便。就类似于以前hibernate,可以根据我们写的实体类自动生成表。

关于注解,这里也给出以下,在之前文章基础上做了些改动,主要就是加入了EsId注解,可以将制定字段作为es的id,如果不加这个,es默认id是自动生成的,有了这个,那么我们可以让mysql的id直接作为es的id,方便更新。

代码语言:javascript复制
package xx.xxx.xxx.es.annotation;

import java.lang.annotation.*;

/**
 * Es 文档注解,用于做索引实体映射
 * 作用在类上
 * @author ls
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Inherited
public @interface Document {

    /**
     * index : 索引名称
     * @return
     */
    String index();

    /**
     * 类型名称
     * @return
     */
    String type();

}
代码语言:javascript复制
package xx.xxx.xxx.es.annotation;

import java.lang.annotation.*;

/**
 * 用于标识使用 该字段作为ES数据中的id
 * @author sh.Liu
 * @create: 2020-07-22
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Documented
@Inherited
public @interface EsId {
}
代码语言:javascript复制
package xx.xxx.xxx.es.annotation;


import xx.xxx.xxx.es.enums.AnalyzerType;
import xx.xxx.xxx.es.enums.FieldType;

import java.lang.annotation.*;

/**
 * 作用在字段上,用于定义类型,映射关系
 * @author ls
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Documented
@Inherited
public @interface Field  {

    FieldType type() default FieldType.TEXT;

    /**
     * 指定分词器
     * @return
     */
    AnalyzerType analyzer() default AnalyzerType.STANDARD;
}

两个枚举:

代码语言:javascript复制
package xx.xxx.xxx.es.enums;

import lombok.Getter;

/**
 * @className: AnalyzerType
 * @description:
 * @author: sh.Liu
 * @create: 2020-05-27 11:37
 */
@Getter
public enum AnalyzerType {

    NO("不使用分词"),
    /**
     * 标准分词,默认分词器
     */
    STANDARD("standard"),

    /**
     * ik_smart:会做最粗粒度的拆分;已被分出的词语将不会再次被其它词语占有
     */
    IK_SMART("ik_smart"),

    /**
     * ik_max_word :会将文本做最细粒度的拆分;尽可能多的拆分出词语
     */
    IK_MAX_WORD("ik_max_word")

    ;

    private String type;

    AnalyzerType(String type){
        this.type = type;
    }
}
代码语言:javascript复制
package xx.xxx.xxx.es.enums;

import lombok.Getter;

/**
 * es 类型参看
 * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
 */
@Getter
public enum FieldType {
    /**
     * text
     */
    TEXT("text"),

    KEYWORD("keyword"),

    INTEGER("integer"),

    DOUBLE("double"),

    DATE("date"),

    /**
     * 单条数据
     */
    OBJECT("object"),

    /**
     * 嵌套数组
     */
    NESTED("nested"),


    ;


    FieldType(String type){
        this.type = type;
    }

    private String type;


}

这里直接给出我封装的es工具类,如果有不成熟的地方,也请大家多多指教。

代码语言:javascript复制
package com.cestc.common.es;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import xx.xxx.xxx.es.annotation.Document;
import xx.xxx.xxx.es.annotation.EsId;
import xx.xxx.xxx.es.enums.FieldType;
import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;

/**
 * @className: EsUtil
 * @description: es 操作工具类;
 *      这里均采用同步调用的方式
 * @author: sh.Liu
 * @create: 2020-05-25 09:41
 */
@Component
@Slf4j
public class ElasticsearchUtil {

    @Resource
    private RestHighLevelClient restHighLevelClient;

    /**
     * 创建索引(默认分片数为5和副本数为1)
     * @param clazz 根据实体自动映射es索引
     * @throws IOException
     */
    public boolean createIndex(Class clazz) throws Exception {
        Document declaredAnnotation = (Document)clazz.getDeclaredAnnotation(Document.class);
        if(declaredAnnotation == null){
            throw new Exception(String.format("class name: %s can not find Annotation [Document], please check", clazz.getName()));
        }
        String indexName = declaredAnnotation.index();
        CreateIndexRequest request = new CreateIndexRequest(indexName);
        request.settings(Settings.builder()
                // 设置分片数为3, 副本为2
                .put("index.number_of_shards", 3)
                .put("index.number_of_replicas", 2)
        );
        request.mapping(generateBuilder(clazz));
        CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
        // 指示是否所有节点都已确认请求
        boolean acknowledged = response.isAcknowledged();
        // 指示是否在超时之前为索引中的每个分片启动了必需的分片副本数
        boolean shardsAcknowledged = response.isShardsAcknowledged();
        if (acknowledged || shardsAcknowledged) {
            log.info("创建索引成功!索引名称为{}", indexName);
            return true;
        }
        return false;
    }

    /**
     * 创建索引(默认分片数为5和副本数为1)
     * @param clazz 根据实体自动映射es索引
     * @throws IOException
     */
    public boolean createIndexIfNotExist(Class clazz) throws Exception {
        Document declaredAnnotation = (Document)clazz.getDeclaredAnnotation(Document.class);
        if(declaredAnnotation == null){
            throw new Exception(String.format("class name: %s can not find Annotation [Document], please check", clazz.getName()));
        }
        String indexName = declaredAnnotation.index();

        boolean indexExists = isIndexExists(indexName);
        if (!indexExists) {
            CreateIndexRequest request = new CreateIndexRequest(indexName);
            request.settings(Settings.builder()
                    // 设置分片数为3, 副本为2
                    .put("index.number_of_shards", 3)
                    .put("index.number_of_replicas", 2)
            );
            request.mapping(generateBuilder(clazz));
            CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
            // 指示是否所有节点都已确认请求
            boolean acknowledged = response.isAcknowledged();
            // 指示是否在超时之前为索引中的每个分片启动了必需的分片副本数
            boolean shardsAcknowledged = response.isShardsAcknowledged();
            if (acknowledged || shardsAcknowledged) {
                log.info("创建索引成功!索引名称为{}", indexName);
                return true;
            }
        }
        return false;
    }

    /**
     * 更新索引(默认分片数为5和副本数为1):
     * 只能给索引上添加一些不存在的字段
     * 已经存在的映射不能改
     *
     * @param clazz 根据实体自动映射es索引
     * @throws IOException
     */
    public boolean updateIndex(Class clazz) throws Exception {
        Document declaredAnnotation = (Document )clazz.getDeclaredAnnotation(Document.class);
        if(declaredAnnotation == null){
            throw new Exception(String.format("class name: %s can not find Annotation [Document], please check", clazz.getName()));
        }
        String indexName = declaredAnnotation.index();
        PutMappingRequest request = new PutMappingRequest(indexName);

        request.source(generateBuilder(clazz));
        AcknowledgedResponse response = restHighLevelClient.indices().putMapping(request, RequestOptions.DEFAULT);
        // 指示是否所有节点都已确认请求
        boolean acknowledged = response.isAcknowledged();

        if (acknowledged ) {
            log.info("更新索引索引成功!索引名称为{}", indexName);
            return true;
        }
        return false;
    }
    /**
     * 删除索引
     * @param indexName
     * @return
     */
    public boolean delIndex(String indexName){
        boolean acknowledged = false;
        try {
            DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
            deleteIndexRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
            AcknowledgedResponse delete = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
            acknowledged = delete.isAcknowledged();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return acknowledged;
    }

    /**
     * 判断索引是否存在
     * @param indexName
     * @return
     */
    public boolean isIndexExists(String indexName){
        boolean exists = false;
        try {
            GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
            getIndexRequest.humanReadable(true);
            exists = restHighLevelClient.indices().exists(getIndexRequest,RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return exists;
    }


    /**
     * 添加单条数据
     * 提供多种方式:
     *  1. json
     *  2. map
     *      Map<String, Object> jsonMap = new HashMap<>();
     *      jsonMap.put("user", "kimchy");
     *      jsonMap.put("postDate", new Date());
     *      jsonMap.put("message", "trying out Elasticsearch");
     *      IndexRequest indexRequest = new IndexRequest("posts")
     *          .id("1").source(jsonMap);
     *  3. builder
     *      XContentBuilder builder = XContentFactory.jsonBuilder();
     *      builder.startObject();
     *      {
     *          builder.field("user", "kimchy");
     *          builder.timeField("postDate", new Date());
     *          builder.field("message", "trying out Elasticsearch");
     *      }
     *      builder.endObject();
     *      IndexRequest indexRequest = new IndexRequest("posts")
     *      .id("1").source(builder);
     * 4. source:
     *      IndexRequest indexRequest = new IndexRequest("posts")
     *     .id("1")
     *     .source("user", "kimchy",
     *         "postDate", new Date(),
     *         "message", "trying out Elasticsearch");
     *
     *   报错:  Validation Failed: 1: type is missing;
     *      加入两个jar包解决
     *
     *   提供新增或修改的功能
     *
     * @return
     */
    public IndexResponse index(Object o) throws Exception {
        Document declaredAnnotation = (Document )o.getClass().getDeclaredAnnotation(Document.class);
        if(declaredAnnotation == null){
            throw new Exception(String.format("class name: %s can not find Annotation [Document], please check", o.getClass().getName()));
        }
        String indexName = declaredAnnotation.index();

        IndexRequest request = new IndexRequest(indexName);
        Field fieldByAnnotation = getFieldByAnnotation(o, EsId.class);
        if (fieldByAnnotation != null) {
            fieldByAnnotation.setAccessible(true);
            try {
                Object id = fieldByAnnotation.get(o);
                request =request.id(id.toString());
            } catch (IllegalAccessException e) {
                log.error("获取id字段出错:{}", e);
            }
        }

        String userJson = JSON.toJSONString(o);
        request.source(userJson, XContentType.JSON);
        IndexResponse indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);
        return indexResponse;
    }


    /**
     * 根据id查询
     * @return
     */
    public String queryById(String indexName, String id) throws IOException {
        GetRequest getRequest = new GetRequest(indexName, id);
        // getRequest.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);

        GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
        String jsonStr = getResponse.getSourceAsString();
        return jsonStr;
    }

    /**
     * 查询封装返回json字符串
     * @param indexName
     * @param searchSourceBuilder
     * @return
     * @throws IOException
     */
    public String search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException {
        SearchRequest searchRequest = new SearchRequest(indexName);
        searchRequest.source(searchSourceBuilder);
        searchRequest.scroll(TimeValue.timeValueMinutes(1L));
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        String scrollId = searchResponse.getScrollId();
        SearchHits hits = searchResponse.getHits();
        JSONArray jsonArray = new JSONArray();
        for (SearchHit hit : hits) {
            String sourceAsString = hit.getSourceAsString();
            JSONObject jsonObject = JSON.parseObject(sourceAsString);
            jsonArray.add(jsonObject);
        }
        log.info("返回总数为:"   hits.getTotalHits());
        return jsonArray.toJSONString();
    }

    /**
     * 查询封装,带分页
     * @param searchSourceBuilder
     * @param pageNum
     * @param pageSize
     * @param s
     * @param <T>
     * @return
     * @throws IOException
     */
    public <T> PageInfo<T> search(SearchSourceBuilder searchSourceBuilder, int pageNum, int pageSize, Class<T> s) throws Exception {
        Document declaredAnnotation = (Document )s.getDeclaredAnnotation(Document.class);
        if(declaredAnnotation == null){
            throw new Exception(String.format("class name: %s can not find Annotation [Document], please check", s.getName()));
        }
        String indexName = declaredAnnotation.index();
        SearchRequest searchRequest = new SearchRequest(indexName);
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        SearchHits hits = searchResponse.getHits();
        JSONArray jsonArray = new JSONArray();
        for (SearchHit hit : hits) {
            String sourceAsString = hit.getSourceAsString();
            JSONObject jsonObject = JSON.parseObject(sourceAsString);
            jsonArray.add(jsonObject);
        }
        log.info("返回总数为:"   hits.getTotalHits());
        int total = (int)hits.getTotalHits().value;

        // 封装分页
        List<T> list = jsonArray.toJavaList(s);
        PageInfo<T> page = new PageInfo<>();
        page.setList(list);
        page.setPageNum(pageNum);
        page.setPageSize(pageSize);
        page.setTotal(total);
        page.setPages(total== 0 ? 0: (total%pageSize == 0 ? total / pageSize : (total / pageSize)   1));
        page.setHasNextPage(page.getPageNum() < page.getPages());
        return page;
    }

    /**
     * 查询封装,返回集合
     * @param searchSourceBuilder
     * @param s
     * @param <T>
     * @return
     * @throws IOException
     */
    public <T> List<T> search(SearchSourceBuilder searchSourceBuilder, Class<T> s) throws Exception {
        Document declaredAnnotation = (Document )s.getDeclaredAnnotation(Document.class);
        if(declaredAnnotation == null){
            throw new Exception(String.format("class name: %s can not find Annotation [Document], please check", s.getName()));
        }
        String indexName = declaredAnnotation.index();
        SearchRequest searchRequest = new SearchRequest(indexName);
        searchRequest.source(searchSourceBuilder);
        searchRequest.scroll(TimeValue.timeValueMinutes(1L));
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        String scrollId = searchResponse.getScrollId();
        SearchHits hits = searchResponse.getHits();
        JSONArray jsonArray = new JSONArray();
        for (SearchHit hit : hits) {
            String sourceAsString = hit.getSourceAsString();
            JSONObject jsonObject = JSON.parseObject(sourceAsString);
            jsonArray.add(jsonObject);
        }
        // 封装分页
        List<T> list = jsonArray.toJavaList(s);
        return list;
    }


    /**
     * 批量插入文档
     * 文档存在 则插入
     * 文档不存在 则更新
     * @param list
     * @return
     */
    public <T> boolean batchSaveOrUpdate(List<T> list) throws Exception {
        Object o1 = list.get(0);
        Document declaredAnnotation = (Document )o1.getClass().getDeclaredAnnotation(Document.class);
        if(declaredAnnotation == null){
            throw new Exception(String.format("class name: %s can not find Annotation [@Document], please check", o1.getClass().getName()));
        }
        String indexName = declaredAnnotation.index();

        BulkRequest request = new BulkRequest(indexName);
        for (Object o : list) {
            String jsonStr = JSON.toJSONString(o);
            IndexRequest indexReq = new IndexRequest().source(jsonStr, XContentType.JSON);

            Field fieldByAnnotation = getFieldByAnnotation(o, EsId.class);
            if (fieldByAnnotation != null) {
                fieldByAnnotation.setAccessible(true);
                try {
                    Object id = fieldByAnnotation.get(o);
                    indexReq = indexReq.id(id.toString());
                } catch (IllegalAccessException e) {
                    log.error("获取id字段出错:{}", e);
                }
            }
            request.add(indexReq);
        }
        BulkResponse bulkResponse = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);

        for(BulkItemResponse bulkItemResponse : bulkResponse){
            DocWriteResponse itemResponse = bulkItemResponse.getResponse();
            IndexResponse indexResponse = (IndexResponse) itemResponse;
            log.info("单条返回结果:{}", indexResponse);
            if(bulkItemResponse.isFailed()){
                log.error("es 返回错误{}",bulkItemResponse.getFailureMessage());
                return false;
            }
        }
        return true;
    }

    /**
     * 删除文档
     * @param indexName: 索引名称
     * @param docId:     文档id
     */
    public boolean deleteDoc(String indexName, String docId) throws IOException {
        DeleteRequest request = new DeleteRequest(indexName, docId);
        DeleteResponse deleteResponse = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
        // 解析response
        String index = deleteResponse.getIndex();
        String id = deleteResponse.getId();
        long version = deleteResponse.getVersion();
        ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
        if (shardInfo.getFailed() > 0) {
            for (ReplicationResponse.ShardInfo.Failure failure :
                    shardInfo.getFailures()) {
                String reason = failure.reason();
                log.info("删除失败,原因为 {}", reason);
            }
        }
        return true;
    }

    /**
     * 根据json类型更新文档
     * @param indexName
     * @param docId
     * @param o
     * @return
     * @throws IOException
     */
    public boolean updateDoc(String indexName, String docId, Object o) throws IOException {
        UpdateRequest request = new UpdateRequest(indexName, docId);
        request.doc(JSON.toJSONString(o), XContentType.JSON);
        UpdateResponse updateResponse = restHighLevelClient.update(request, RequestOptions.DEFAULT);
        String index = updateResponse.getIndex();
        String id = updateResponse.getId();
        long version = updateResponse.getVersion();
        if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
            return true;
        } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
            return true;
        } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
        } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {

        }
        return false;
    }

    /**
     * 根据Map类型更新文档
     * @param indexName
     * @param docId
     * @param map
     * @return
     * @throws IOException
     */
    public boolean updateDoc(String indexName, String docId, Map<String, Object> map) throws IOException {
        UpdateRequest request = new UpdateRequest(indexName, docId);
        request.doc(map);
        UpdateResponse updateResponse = restHighLevelClient.update(request, RequestOptions.DEFAULT);
        String index = updateResponse.getIndex();
        String id = updateResponse.getId();
        long version = updateResponse.getVersion();
        if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
            return true;
        } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
            return true;
        } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
        } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {

        }
        return false;
    }


    public XContentBuilder generateBuilder(Class clazz) throws IOException {
        // 获取索引名称及类型
        Document doc = (Document) clazz.getAnnotation(Document.class);
        System.out.println(doc.index());
        System.out.println(doc.type());

        XContentBuilder builder = XContentFactory.jsonBuilder();
        builder.startObject();
        builder.startObject("properties");
        Field[] declaredFields = clazz.getDeclaredFields();
        for (Field f : declaredFields) {
            if (f.isAnnotationPresent(com.cestc.common.es.annotation.Field.class)) {
                // 获取注解
                com.cestc.common.es.annotation.Field  declaredAnnotation = f.getDeclaredAnnotation(com.cestc.common.es.annotation.Field.class);

                // 如果嵌套对象:
                /**
                 * {
                 *   "mappings": {
                 *     "properties": {
                 *       "region": {
                 *         "type": "keyword"
                 *       },
                 *       "manager": {
                 *         "properties": {
                 *           "age":  { "type": "integer" },
                 *           "name": {
                 *             "properties": {
                 *               "first": { "type": "text" },
                 *               "last":  { "type": "text" }
                 *             }
                 *           }
                 *         }
                 *       }
                 *     }
                 *   }
                 * }
                 */
                if (declaredAnnotation.type() == FieldType.OBJECT) {
                    // 获取当前类的对象-- Action
                    Class<?> type = f.getType();
                    Field[] df2 = type.getDeclaredFields();
                    builder.startObject(f.getName());
                    builder.startObject("properties");
                    // 遍历该对象中的所有属性
                    for (Field f2 : df2) {
                        if (f2.isAnnotationPresent(com.cestc.common.es.annotation.Field.class)) {
                            // 获取注解
                            com.cestc.common.es.annotation.Field declaredAnnotation2 = f2.getDeclaredAnnotation(com.cestc.common.es.annotation.Field.class);
                            builder.startObject(f2.getName());
                            builder.field("type", declaredAnnotation2.type().getType());
                            // keyword不需要分词
                            if (declaredAnnotation2.type() == FieldType.TEXT) {
                                builder.field("analyzer", declaredAnnotation2.analyzer().getType());
                            }
                            if (declaredAnnotation2.type() == FieldType.DATE) {
                                builder.field("format", "yyyy-MM-dd HH:mm:ss");
                            }
                            builder.endObject();
                        }
                    }
                    builder.endObject();
                    builder.endObject();

                }else{
                    builder.startObject(f.getName());
                    builder.field("type", declaredAnnotation.type().getType());
                    // keyword不需要分词
                    if (declaredAnnotation.type() == FieldType.TEXT) {
                        builder.field("analyzer", declaredAnnotation.analyzer().getType());
                    }
                    if (declaredAnnotation.type() == FieldType.DATE) {
                        builder.field("format", "yyyy-MM-dd HH:mm:ss");
                    }
                    builder.endObject();
                }
            }
        }
        // 对应property
        builder.endObject();
        builder.endObject();
        return builder;
    }


   public static Field getFieldByAnnotation(Object o ,Class annotationClass){
       Field[] declaredFields = o.getClass().getDeclaredFields();
       if (declaredFields != null && declaredFields.length >0) {
           for(Field f : declaredFields){
                if (f.isAnnotationPresent(annotationClass)) {
                    return f;
                }
           }
       }
       return null;
   }

}

这里会使用反射通过上面标注的注解来使用:比如我有一个实体,根据这个实体自动创建索引的方式如下; 首先在实体上加入我们自定义的注解,来设置索引名称,字段的类型,分词器是什么。这里字符串类型在es中有两种,一是KEY_WORD,不分词, 二是TEXT:会分词

代码语言:javascript复制
package xx.xxx.xxx.es.pojo;

import xx.xxx.xxx.es.annotation.Document;
import xx.xxx.xxx.es.annotation.EsId;
import xx.xxx.xxx.es.annotation.Field;
import xx.xxx.xxx.es.enums.AnalyzerType;
import xx.xxx.xxx.es.enums.FieldType;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;

import java.math.BigDecimal;

/**
 * @className: EsEvent
 * @description:
 * @author: sh.Liu
 * @date: 2020-07-22 10:51
 */
@Data
@Document(index = "es_event", type = "") 
public class EsEvent {
    private static final long serialVersionUID=1L;

    @EsId
    @Field(type = FieldType.KEYWORD)
    private Integer eventId;

    /**
     * 唯一标识码
     */
    @Field(type = FieldType.KEYWORD)
    private String uniqueCode;

    /**
     * 任务号
     */
    @Field(type = FieldType.KEYWORD)
    private String eventCode;

    /**
     * 事件来源编号
     */
    @Field(type = FieldType.INTEGER)
    private Integer eventSrcId;

    /**
     * 事件来源名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String eventSrcName;

    /**
     * 来源分组
     */
    @Field(type = FieldType.KEYWORD)
    private String srcGroupCode;

    /**
     * 事件大类编码
     */
    @Field(type = FieldType.KEYWORD)
    private String eventTypeCode;

    /**
     * 事件大类名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String eventTypeName;

    /**
     * 事件小类编码
     */
    @Field(type = FieldType.KEYWORD)
    private String eventSubtypeCode;

    /**
     * 事件小类名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String eventSubtypeName;

    /**
     * 重要程度
     */
    @Field(type = FieldType.INTEGER)
    private Integer eventGradeId;

    /**
     * 重要程度名称
     */
    @Field(type = FieldType.KEYWORD)
    private String eventGradeName;

    /**
     *紧急程度标识
     */
    @Field(type = FieldType.INTEGER)
    private Integer eventUrgencyId;

    /**
     *紧急程度名称
     */
    @Field(type = FieldType.KEYWORD)
    private String eventUrgencyName;

    /**
     *事件级别标识
     */
    @Field(type = FieldType.INTEGER)
    private Integer eventLevelId;

    /**
     *事件级别名称
     */
    @Field(type = FieldType.KEYWORD)
    private String eventLevelName;

    /**
     *事件升级标志
     */
    @Field(type = FieldType.INTEGER)
    private Integer eventUpgradeFlag;

    /**
     *处置级别标识
     */
    @Field(type = FieldType.INTEGER)
    private Integer dealLevelId;

    /**
     *处置级别标识
     */
    @Field(type = FieldType.KEYWORD)
    private String dealLevelName;

    /**
     *公众上报人名称
     */
    @Field(type = FieldType.TEXT , analyzer = AnalyzerType.IK_SMART)
    private String publicReporterName;

    /**
     *公众上报人身份证号
     */
    @Field(type = FieldType.KEYWORD)
    private String publicReporterIdcard;

    /**
     *公众上报人联系方式
     */
    @Field(type = FieldType.KEYWORD)
    private String publicReporterTel;
    /**
     * 事件描述
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String eventDesc;
    /**
     * 地址
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String address;
    /**
     * 地区名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String areaRegionName;

    /**
     * 地区编码
     */
    @Field(type = FieldType.KEYWORD)
    private String areaRegionCode;

    /**
     * 社区名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String commRegionName;

    /**
     * 区编码
     */
    @Field(type = FieldType.KEYWORD)
    private String commRegionCode;

    /**
     * 街道名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String streetRegionName;

    /**
     * 街道编码
     */
    @Field(type = FieldType.KEYWORD)
    private String streetRegionCode;

    /**
     * 社区名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String regionName;

    /**
     * 社区编码
     */
    @Field(type = FieldType.KEYWORD)
    private String regionCode;

    /**
     * 经度
     */
    private BigDecimal coordX;

    /**
     * 纬度
     */
    private BigDecimal coordY;

    /**
     *坐标系
     */
    private String mapcoordinate;

    /**
     *网格员标识
     */
    private Integer griderId;

    /**
     *网格员标识
     */
    private String griderName;

    /**
     *网格员电话
     */
    private String griderPhone;

    /**
     *核实状态标识
     */
    @Field(type = FieldType.INTEGER)
    private Integer verifyStateId;

    /**
     *核查状态标识
     */
    @Field(type = FieldType.INTEGER)
    private Integer checkStateId;

    /**
     *事件建立时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String createTime;

    /**
     *流程结束时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String endTime;

    /**
     *延期天数
     */
    private Float postponedDays;

    /**
     *延期标志
     */
    private Integer postponedFlag;

    /**
     *受理时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String acceptTime;

    /**
     *立案时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String establishTime;

    /**
     *调度时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String dispatchTime;

    /**
     *流程开始时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String procStartTime;

    /**
     *流程结束时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String procEndTime;

    /**
     *流程截止时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String procDeadLine;

    /**
     *流程警告时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String procWarningTime;

    /**
     *处置开始时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String funcBeginTime;

    /**
     *处置完成时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String funcFinishTime;

    /**
     *自处置标识
     */
    private Integer gridDealFlag;

    /**
     *跨网格标志
     */
    private Integer overGridFlag;

    /**
     *是否督办
     */
    @Field(type = FieldType.INTEGER)
    private Integer pressFlag;

    /**
     *是否催办
     */
    @Field(type = FieldType.INTEGER)
    private Integer hurryFlag;

    /**
     *超期标志
     */
    @Field(type = FieldType.INTEGER)
    private Integer overtimeFlag;

    /**
     *活动属性
     */
    @Field(type = FieldType.INTEGER)
    private Integer actPropertyId;

    /**
     *活动属性名称
     */
    @Field(type = FieldType.KEYWORD)
    private String actPropertyName;

    /**
     *流程实例标识
     */
    @Field(type = FieldType.KEYWORD)
    private String procInstId;

    /**
     *流程定义标识
     */
    @Field(type = FieldType.KEYWORD)
    private String procDefId;

    /**
     *事件状态
     */
    @Field(type = FieldType.INTEGER)
    private Integer eventStateId;

    /**
     * 上一操作项
     */
    @Field(type = FieldType.KEYWORD)
    private String preActionName;

    /**
     * 登记人Id
     */
    @Field(type = FieldType.INTEGER)
    private Integer registerId;

    /**
     * 登记人姓名
     */
    @Field(analyzer = AnalyzerType.IK_SMART)
    private String registerName;

    /**
     * 回访标识:0-未回访 1-已回访 2-无法回访
     */
    @Field(type = FieldType.INTEGER)
    private Integer visitorStateId;

    /**
     * 删除标识
     */
    @Field(type = FieldType.INTEGER)
    private Integer deleteFlag;

    /**
     * 删除用户标识
     */
    @Field(type = FieldType.INTEGER)
    private Integer deleteUserId;

    /**
     * 删除时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String deleteTime;

    /**
     * 是否下发督查
     * 0:否,1:是
     */
    @Field(type = FieldType.INTEGER)
    private Integer overseerFlag;

    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String updateTime;

    @Field(type = FieldType.OBJECT)
    private EsAct act;

}

act结构:

代码语言:javascript复制
package xx.xxx.xxx.es.pojo;

import xx.xxx.xxx.es.annotation.Field;
import xx.xxx.xxx.es.ednums.AnalyzerType;
import xx.xxx.xxx.es.enums.FieldType;
import lombok.Data;

import java.io.Serializable;
import java.math.BigDecimal;

/**
 * @className: EsAct
 * @description: es act表
 * @author: sh.Liu
 * @date: 2020-07-22 13:18
 */
@Data
public class EsAct implements Serializable {
    private static final long serialVersionUID = 1L;

    @Field(type = FieldType.INTEGER)
    private Integer actId;

    /**
     * 任务标识
     */
    @Field(type = FieldType.KEYWORD)
    private String taskId;

    /**
     * 流程定义标识
     */
    @Field(type = FieldType.KEYWORD)
    private String procDefId;

    /**
     * 流程实例标识
     */
    @Field(type = FieldType.KEYWORD)
    private String procInstId;

    /**
     * 子流程实例标识
     */
    @Field(type = FieldType.KEYWORD)
    private String subInstId;

    /**
     * 节点定义标识
     */
    @Field(type = FieldType.KEYWORD)
    private String nodeId;

    /**
     * 节点定义名称
     */
    @Field(type = FieldType.KEYWORD)
    private String nodeName;

    /**
     * 业务主键标识
     */
    @Field(type = FieldType.KEYWORD)
    private String bizId;

    /**
     * 参与者标识
     */
    @Field(type = FieldType.KEYWORD)
    private String partId;
    /**
     * 参与者姓名
     */
    @Field(type = FieldType.TEXT)
    private String partName;
    /**
     * 部门id
     */
    @Field(type = FieldType.KEYWORD)
    private String unitId;
    /**
     * 部门名称
     */
    @Field(type = FieldType.TEXT)
    private String unitName;

    /**
     * 角色标识
     */
    @Field(type = FieldType.KEYWORD)
    private String roleId;

    /**
     * 角色名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String roleName;

    /**
     * 上一活动标识
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String preActId;

    /**
     * 上一活动参与者标识
     */
    private String prePartId;

    /**
     * 上一活动参与者名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String prePartName;

    /**
     * 上一活动定义标识
     */
    private String preNodeId;

    /**
     * 上一活动定义名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String preNodeName;

    /**
     * 上一活动意见
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String preOpinion;

    /**
     * 上一活动操作项名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String preActionName;

    /**
     * 上一活动操作项显示名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String preActionLabel;

    /**
     * 创建时间
     */
    @Field(type = FieldType.DATE)
    private String createTime;

    /**
     * 截止时间
     */
    @Field(type = FieldType.DATE)
    private String deadLine;

    /**
     * 警告时间
     */
    @Field(type = FieldType.DATE)
    private String warningTime;

    /**
     * 结束时间
     */
    @Field(type = FieldType.DATE)
    private String endTime;

    /**
     * 活动红绿灯
     */
    @Field(type = FieldType.INTEGER)
    private Integer actTimeStateId;

    /**
     * 活动时限
     */
    @Field(type = FieldType.DOUBLE)
    private BigDecimal timeLimit;

    /**
     * 计时单位
     */
    @Field(type = FieldType.INTEGER)
    private Integer timeUnit;

    /**
     * 活动时限分钟
     */
    @Field(type = FieldType.INTEGER)
    private Integer timeLimitM;

    /**
     * 已用时
     */
    @Field(type = FieldType.INTEGER)
    private Integer actUsedTime;

    /**
     * 剩余时
     */
    @Field(type = FieldType.INTEGER)
    private Integer actRemainTime;

    /**
     * 活动时限信息
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String actLimitInfo;

    /**
     * 活动已用时间字符串
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String actUsedTimeChar;

    /**
     * 活动剩余时间字符串
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String actRemainTimeChar;

    /**
     * 累计计时标志
     */
    @Field(type = FieldType.INTEGER)
    private Integer timeAddUpFlag;

    /**
     * 暂停前节点用时
     */
    @Field(type = FieldType.INTEGER)
    private Integer actUsedTimeBeforeStop;

    /**
     * 恢复计时时间
     */
    @Field(type = FieldType.DATE)
    private String actRestart;

    /**
     * 已读标志
     */
    @Field(type = FieldType.INTEGER)
    private Integer readFlag;

    /**
     * 已读时间
     */
    @Field(type = FieldType.DATE)
    private String readTime;

    /**
     * 批转意见
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String transOpinion;

    /**
     * 操作项名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String actionName;

    /**
     * 操作项显示名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String actionLabel;

    /**
     * 活动属性id
     */
    @Field(type = FieldType.INTEGER)
    private Integer actPropertyId;

    /**
     * 活动属性名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String actPropertyName;

    /**
     * 抄送参与者
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String ccPart;

    /**
     * 抄送参与者名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String ccPartName;

    /**
     * 抄送标志
     */
    @Field(type = FieldType.INTEGER)
    private Integer ccFlag;
}

创建索引方式:

代码语言:javascript复制
@RequestMapping("createIndex")
public String createIndex() {
  try {
    esUtil.createIndexIfNotExist(EsEvent.class);

  } catch (Exception e) {
    e.printStackTrace();
  }

  return "success";
}

这样就会根据创建一个叫做 es_event的索引(实体注解上配置)同时EsEvent 和 EsAct是一对一的依赖方式。

通过head插件可以看到索引结构(部分):

五. 分页查询封装:

我的项目中需要使用es做一个分页查询,查询条件通过前端传过来,所以我们需要做一下动态拼接,同时还有排序功能。我们将查询条件和分页条件都封装到一个VO中。

代码语言:javascript复制
/**
     * 综合查询列表
     *
     * @return
     */
    @PostMapping("/comprehensive/list")
    public Result<Result.ResultPage<ComprehensiveQueryEventVO>> listComprehensiveQuery(@RequestBody ComprehensiveQueryEventVO comprehensiveQueryEventVO) {
        PageInfo<ComprehensiveQueryEventVO> returnPageInfo = null;
        try {
            returnPageInfo = esService.pageComprehensiveQuery(comprehensiveQueryEventVO);
        } catch (Exception e) {
            log.error("es 综合查询异常,开始使用数据库做综合查询,错误为 :{}", e);
            // es异常,使用数据库查询
            PageHelper.startPage(comprehensiveQueryEventVO.getPageNum(), comprehensiveQueryEventVO.getPageSize());
            List<ComprehensiveQueryEventVO> comprehensiveQueryEventVOS = eventInfoService.listComprehensiveQuery(comprehensiveQueryEventVO);
            PageInfo<ComprehensiveQueryEventVO> queryPageInfo = new PageInfo<>(comprehensiveQueryEventVOS);
            returnPageInfo = new PageInfo<>();
            // 此处统一VO,此处使用可忽略
            Type type = new TypeToken<List<ComprehensiveQueryEventVO>>() {
            }.getType();
            ModelMapper modelMapper = new ModelMapper();
            modelMapper.map(queryPageInfo, returnPageInfo);
            returnPageInfo.setList(new ModelMapper().map(queryPageInfo.getList(), type));
        }
        Result.ResultPage<ComprehensiveQueryEventVO> resultPage = Result.ResultPage.build(returnPageInfo);
        Result<Result.ResultPage<ComprehensiveQueryEventVO>> result = Result.success(resultPage);
        return result;
    }
代码语言:javascript复制
package com.cestc.cooperative.pojo.vo;


import com.alibaba.excel.annotation.ExcelProperty;
import com.alibaba.excel.annotation.write.style.ColumnWidth;
import lombok.Data;

import java.io.Serializable;
import java.util.List;

/**
 * 综合查询对应javabean
 *
 * @author wanggang
 * @date 2020/1/3
 */
@Data
@ColumnWidth(30)
public class ComprehensiveQueryEventVO implements Serializable {

    @ExcelProperty("序号")
    private Integer orderNum;

    /**
     * 任务号
     */
    @ExcelProperty("事件编号")
    private String eventCode;

    /**
     * 事件大类名称
     */
    @ExcelProperty("类型")
    private String eventTypeName;

    /**
     * 事件大类编码
     */
    @ExcelProperty("事件大类编码")
    private String eventTypeCode;

    /**
     * 事件小类编码
     */
    @ExcelProperty("事件小类编码")
    private String eventSubtypeCode;

    /**
     * 事件小类名称
     */
    @ExcelProperty("事件小类名称")
    private String eventSubtypeName;

    /**
     * 地区名称
     */
    @ExcelProperty("地区名称")
    private String areaRegionName;

    /**
     * 地区编码
     */
    private String areaRegionCode;

    /**
     * 社区名称
     */
    @ExcelProperty("社区名称")
    private String commRegionName;

    /**
     * 社区编码
     */
    private String commRegionCode;

    /**
     * 街道名称
     */
    @ExcelProperty("街道名称")
    private String streetRegionName;

    /**
     * 街道编码
     */
    private String streetRegionCode;

    /**
     * 社区名称
     */
    @ExcelProperty("区域")
    private String regionName;

    /**
     * 社区编码
     */
    private String regionCode;

    /**
     * 地址描述
     */
    @ExcelProperty("地址")
    private String address;

    /**
     * 重要程度标识
     */
    @ExcelProperty("等级标识")
    private Integer eventGradeId;

    /**
     * 重要程度名称
     */
    @ExcelProperty("等级")
    private String eventGradeName;

    /**
     * 活动状态标识
     */
    private Integer actTimeStateId;

    /**
     * 活动状态名称
     */
    @ExcelProperty("计时状态")
    private String actTimeStateName;

    /**
     * 是否督办
     */
    @ExcelProperty("是否督办标志位")
    private Integer pressFlag;

    @ExcelProperty("督办")
    private String pressFlagName;

    /**
     * 是否督查
     */
    @ExcelProperty("是否督查标志位")
    private Integer overseerFlag;

    @ExcelProperty("督查")
    private String overseerFlagName;

    /**
     * 流程开始时间
     */
    @ExcelProperty("上报时间")
//    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String procStartTime;

    /**
     * 流程截止时间
     */
    @ExcelProperty("截止时间")
//    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String procDeadLine;

    /**
     * 事件描述
     */
    @ExcelProperty("事件描述")
    private String eventDesc;

    /**
     * 事件状态
     */
    @ExcelProperty("事件状态Id")
    private Integer eventStateId;

    @ExcelProperty("事件状态")
    private String eventStateName;

    /**
     * 事件来源标识
     */
    @ExcelProperty("事件来源标识")
    private Integer eventSrcId;

    /**
     * 事件来源名称
     */
    @ExcelProperty("事件来源")
    private String eventSrcName;

    /**
     * 流程结束时间
     */
    @ExcelProperty("办结时间")
//    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String procEndTime;

    /**
     * 超时时长
     */
    @ExcelProperty("超时时长")
    private String timeoutDuration;

    /**
     * 回访时间
     */
    @ExcelProperty("回访时间")
//    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String visitorTime;

    /**
     * 满意度
     */
    @ExcelProperty("满意度")
    private String resultLabel;

    /**
     * 上报开始时间
     */
    @ExcelProperty("上报开始时间")
//    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String reportStartTime;

    /**
     * 上报结束时间
     */
    @ExcelProperty("上报结束时间")
//    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String reportEndTime;

    /**
     * 截止开始时间
     */
    @ExcelProperty("截止开始时间")
//    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String deadLineStartTime;

    /**
     * 截止结束时间
     */
    @ExcelProperty("截止结束时间")
//    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT 8")
    private String deadLineEndTime;

    /**
     * 上报人电话
     */
    @ExcelProperty("上报人")
    private String publicReporterName;

    @ExcelProperty("联系电话")
    private String phone;

    /**
     * 超时状态(流程红绿灯)
     */
    private Integer procTimeStateId;

    /**
     * 超时状态对应名称
     */
    private String procTimeStateName;

    /**
     * 导出字段List
     */
    private List<String> exportColumnList;

    /**
     * 关键字
     */
    private String keyword;

    /**
     * 活动截止时间
     */
    private String deadLine;

    private Integer pageNum;

    private Integer pageSize;

    /**
     * 排序字段
     */
    private String orderField;

    /**
     * 排序方式
     */
    private String orderSort;

    /**
     * 事件Id
     */
    private Integer eventId;

}

分页查询方法:

代码语言:javascript复制
@Override
public PageInfo<ComprehensiveQueryEventVO> pageComprehensiveQuery(ComprehensiveQueryEventVO comprehensiveQueryEventVO) throws Exception {
        SearchSourceBuilder searchSourceBuilder = getSearchSourceBuilder(comprehensiveQueryEventVO);
        PageInfo page = esUtil.search(searchSourceBuilder, comprehensiveQueryEventVO.getPageNum(), comprehensiveQueryEventVO.getPageSize(), EsEvent.class);
        // 转换
        List<ComprehensiveQueryEventVO> list = convertList(page.getList());
        page.setList(list);

        return page;
}


/**
     * 拼接综合查询 查询条件
     * @param comprehensiveQueryEventVO
     * @return
     */
private SearchSourceBuilder getSearchSourceBuilder(ComprehensiveQueryEventVO comprehensiveQueryEventVO){
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        if (comprehensiveQueryEventVO.getPageNum() == null) {
            comprehensiveQueryEventVO.setPageNum(1);
        }

        if (comprehensiveQueryEventVO.getPageSize() == null) {
            comprehensiveQueryEventVO.setPageSize(10000);
        }

        sourceBuilder.from((comprehensiveQueryEventVO.getPageNum()-1)*comprehensiveQueryEventVO.getPageSize());
        sourceBuilder.size(comprehensiveQueryEventVO.getPageSize());

        // 上报时间拼接时/分/秒
        if (!ObjectUtils.isEmpty(comprehensiveQueryEventVO.getReportStartTime())) {
            comprehensiveQueryEventVO.setReportStartTime(comprehensiveQueryEventVO.getReportStartTime()   " 00:00:00");
        }
        if (!ObjectUtils.isEmpty(comprehensiveQueryEventVO.getReportEndTime())) {
            comprehensiveQueryEventVO.setReportEndTime(comprehensiveQueryEventVO.getReportEndTime()   " 23:59:59");
        }
        // 截止时间拼接时/分/秒
        if (!ObjectUtils.isEmpty(comprehensiveQueryEventVO.getDeadLineStartTime())) {
            comprehensiveQueryEventVO.setDeadLineStartTime(comprehensiveQueryEventVO.getDeadLineStartTime()   " 00:00:00");
        }
        if (!ObjectUtils.isEmpty(comprehensiveQueryEventVO.getDeadLineEndTime())) {
            comprehensiveQueryEventVO.setDeadLineEndTime(comprehensiveQueryEventVO.getDeadLineEndTime()   " 23:59:59");
        }

        // 符合条件查询
        BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();

        // event_state_id != 0 and event_state_id != 11 and delete_flag != 1
        boolBuilder.mustNot(QueryBuilders.termQuery("eventStateId", 0));
        boolBuilder.mustNot(QueryBuilders.termQuery("eventStateId", 11));
        boolBuilder.mustNot(QueryBuilders.termQuery("delateFlag", 1));

        // 动态条件----keyword
        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getKeyword())) {
            boolBuilder.must(QueryBuilders.queryStringQuery(comprehensiveQueryEventVO.getKeyword()));
        }

        // 拼接动态查询条件
        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventCode())) {
            boolBuilder.must(QueryBuilders.termQuery("eventCode", comprehensiveQueryEventVO.getEventCode()));
        }

        if (comprehensiveQueryEventVO.getEventGradeId() != null) {
            boolBuilder.must(QueryBuilders.termQuery("eventGradeId", comprehensiveQueryEventVO.getEventGradeId()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventGradeName())) {
            boolBuilder.must(QueryBuilders.termQuery("eventGradeNam", comprehensiveQueryEventVO.getEventGradeName()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventTypeName())) {
            boolBuilder.must(QueryBuilders.termQuery("eventTypeName", comprehensiveQueryEventVO.getEventTypeName()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventTypeCode())) {
            boolBuilder.must(QueryBuilders.termQuery("eventTypeCode", comprehensiveQueryEventVO.getEventTypeCode()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventSubtypeName())) {
            boolBuilder.must(QueryBuilders.termQuery("eventSubtypeName", comprehensiveQueryEventVO.getEventSubtypeName()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventSubtypeCode())) {
            boolBuilder.must(QueryBuilders.termQuery("eventSubtypeCode", comprehensiveQueryEventVO.getEventSubtypeCode()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getAreaRegionName())) {
            boolBuilder.must(QueryBuilders.termQuery("areaRegionName", comprehensiveQueryEventVO.getAreaRegionName()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getAreaRegionCode())) {
            boolBuilder.must(QueryBuilders.termQuery("areaRegionCode", comprehensiveQueryEventVO.getAreaRegionCode()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getCommRegionName())) {
            boolBuilder.must(QueryBuilders.termQuery("commRegionName", comprehensiveQueryEventVO.getCommRegionName()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getCommRegionCode())) {
            boolBuilder.must(QueryBuilders.termQuery("commRegionCode", comprehensiveQueryEventVO.getCommRegionCode()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getStreetRegionName())) {
            boolBuilder.must(QueryBuilders.termQuery("streetRegionName", comprehensiveQueryEventVO.getStreetRegionName()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getStreetRegionCode())) {
            boolBuilder.must(QueryBuilders.termQuery("streetRegionCode", comprehensiveQueryEventVO.getStreetRegionName()));
        }

        if (comprehensiveQueryEventVO.getEventSrcId() != null) {
            boolBuilder.must(QueryBuilders.termQuery("eventSrcId", comprehensiveQueryEventVO.getEventSrcId()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventSrcName())) {
            boolBuilder.must(QueryBuilders.termQuery("eventSrcName", comprehensiveQueryEventVO.getEventSrcName()));
        }

        if (comprehensiveQueryEventVO.getProcTimeStateId() != null) {
            boolBuilder.must(QueryBuilders.termQuery("procTimeStateId", comprehensiveQueryEventVO.getProcTimeStateId()));
        }

        if (comprehensiveQueryEventVO.getPressFlag() != null) {
            boolBuilder.must(QueryBuilders.termQuery("pressFlag", comprehensiveQueryEventVO.getPressFlag()));
        }

        if (comprehensiveQueryEventVO.getOverseerFlag() != null) {
            boolBuilder.must(QueryBuilders.termQuery("overseerFlag", comprehensiveQueryEventVO.getOverseerFlag()));
        }

        if (comprehensiveQueryEventVO.getEventStateId() != null) {
            boolBuilder.must(QueryBuilders.termQuery("eventStateId", comprehensiveQueryEventVO.getEventStateId()));
        }

        if (comprehensiveQueryEventVO.getActTimeStateId() != null) {
            boolBuilder.must(QueryBuilders.termQuery("act.actTimeStateId", comprehensiveQueryEventVO.getActTimeStateId()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getReportStartTime())) {
            boolBuilder.must(QueryBuilders.rangeQuery("procStartTime").gte(comprehensiveQueryEventVO.getReportStartTime()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getReportEndTime())) {
            boolBuilder.must(QueryBuilders.rangeQuery("procStartTime").lte(comprehensiveQueryEventVO.getReportEndTime()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getDeadLineStartTime())) {
            boolBuilder.must(QueryBuilders.rangeQuery("act.deadLine").gte(comprehensiveQueryEventVO.getDeadLineStartTime()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getDeadLineEndTime())) {
            boolBuilder.must(QueryBuilders.rangeQuery("act.deadLine").lte(comprehensiveQueryEventVO.getDeadLineEndTime()));
        }
        sourceBuilder.query(boolBuilder);

        // 排序:
        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getOrderField()) && StringUtils.isNotEmpty(comprehensiveQueryEventVO.getOrderSort())) {
            String esOrderField = null;
            if (EventInfoServiceImpl.QueryFieldEnum.PROC_START_TIME.getFieldName().equals(comprehensiveQueryEventVO.getOrderField())) {
                esOrderField = "procStartTime";
            } else if (EventInfoServiceImpl.QueryFieldEnum.PROC_DEAD_LINE.getFieldName().equals(comprehensiveQueryEventVO.getOrderField())) {
                esOrderField = "act.deadLine";
            } else if (EventInfoServiceImpl.QueryFieldEnum.PROC_END_TIME.getFieldName().equals(comprehensiveQueryEventVO.getOrderField())) {
                esOrderField = "procEndTime";
            } else if (EventInfoServiceImpl.QueryFieldEnum.VISITOR_TIME.getFieldName().equals(comprehensiveQueryEventVO.getOrderField())) {
                esOrderField = "visitor.visitorTime";
            }
            FieldSortBuilder fieldSortBuilder = new FieldSortBuilder(esOrderField);
            fieldSortBuilder = fieldSortBuilder.order("orderDesc".equals(comprehensiveQueryEventVO.getOrderSort()) ? SortOrder.DESC : SortOrder.ASC);
            sourceBuilder.sort(fieldSortBuilder);
        }

    return sourceBuilder;
}
六. 数据同步

基本的方法都有了,下面就是数据同步了,我们需要在所有的对数据库有增删改操作的地方,同时去操作es,这一部分可以分散在代码的各个角落,所以推荐大家通过aop去处理,去定义对应的切面,在切面中统一处理。我这里利用了消息队列,主要原因是想做成异步,因为有些操作比较本身已经比较耗时了,所以不想在操作后,还去等待es的处理结果,所以直接扔到队列里,由队列异步处理:

发送到队列代码:

代码语言:javascript复制
// 同步es
mqService.sendEs(SendEsVO.builder().eventId(Integer.valueOf(actionVO.getBizId())).build());
代码语言:javascript复制
@Override
public void sendEs(SendEsVO sendEsVO) {
	logger.info("发送mq请求【同步ES】===》"   JSONObject.toJSONString(sendEsVO));
	rabbitTemplate.convertAndSend(MqBaseConst.MQ_EXCHANGE_ES,
	MqBaseConst.MQ_KEY_ES, JSONObject.toJSONString(sendEsVO));
}

队列消费者代码:

代码语言:javascript复制
@Component
@Slf4j
public class EsMqListener {

    private final EsService esService;

    public EsMqListener(EsService esService) {
        this.esService = esService;
    }

    @RabbitHandler
    @RabbitListener(queues = MqBaseConst.MQ_QUEUE_ES)
    public void receiveGatewayMsg(String msg){
        log.info("【es】消息系统收到消息,内容为 {}",msg);
        if (StringUtils.isNoneEmpty(msg)) {
            SendEsVO sendEsVO = JSON.parseObject(msg, SendEsVO.class);

            // 判断操作类型,目前都是 单条的saveOrUpdate 操作
            try {
                esService.saveOrUpdate(sendEsVO.getEventId());
            } catch (Exception e) {
                log.error("eventId: [{}], 同步到es失败,错误为:{}", sendEsVO.getEventId(), e);
                //todo: 失败后的处理
                return;
            }
            log.info("eventId: [{}], 同步到es成功", sendEsVO.getEventId());
        }
    }
}
代码语言:javascript复制
@Override
public boolean saveOrUpdate(Integer eventId) throws Exception {

  // 1. 如果索引不存在则创建
  boolean re = esUtil.createIndexIfNotExist(EsEvent.class);
  // 如果返回true, 代表该索引不存在,已经创建了,那么此时应该做一次全量同步
  if (re) {
    batchSaveOrUpdate();
    return true;
  }

  // 2. 执行到此处,说明索引存在,那么做单条数据同步
  EventDO eventDO = eventService.getById(eventId);
  EsEvent event = ModelMapperUtil.map(eventDO, EsEvent.class, "yyyy-MM-dd HH:mm:ss");
  // 查询act
  CooActDO cooActDO = actService.getOne(new LambdaQueryWrapper<CooActDO>().eq(CooActDO::getBizId, event.getEventId()));
  if (cooActDO != null) {
    EsAct act =  ModelMapperUtil.map(cooActDO, EsAct.class, "yyyy-MM-dd HH:mm:ss");
    event.setAct(act);
  }

  IndexResponse indexResponse = esUtil.index(event);
  if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
    log.info("eventId {}, 在es中对应的数据创建成功");
    return true;
  } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    log.info("eventId {}, 在es中对应的数据更新成功");
    return true;
  }
  return false;
}

同时定时任务做全量同步:

代码语言:javascript复制
/**
 * @className: MysqlToEsTaskServiceImpl
 * @description: mysql数据同步到es的定时任务
 * @author: sh.Liu
 * @date: 2020-07-24 16:28
 */
@Service
@Slf4j
public class MysqlToEsTaskServiceImpl implements MysqlToEsTaskService {

    private final EsService esService;

    public MysqlToEsTaskServiceImpl(EsService esService) {
        this.esService = esService;
    }

    @Override
    public void syncEventData() {
        log.info("将 mysql【事件数据】 全量同步到 elasticsearch 定时任务开始执行。。。");
        try {
            esService.batchSaveOrUpdate();
        } catch (Exception e) {
            log.error("将 mysql【事件数据】 全量同步到 elasticsearch 定时任务执行失败,{}", e);
        }
        log.info("将 mysql【事件数据】 全量同步到 elasticsearch 定时任务执行结束。。。");

    }
}

全量同步代码;

代码语言:javascript复制
/**
     * mysql数据库和es全量同步
     * @return
     * @throws Exception
     */
@Override
public boolean batchSaveOrUpdate() throws Exception {
  // 1. 如果索引不存在则创建
  esUtil.createIndexIfNotExist(EsEvent.class);

  // 2. 全量同步
  List<EsEvent> esEventList = new ArrayList<>();
  // 查询所有es数据,不包含草稿箱  eventStateId != 0
  List<EventDO> list = eventService.list(
    new LambdaQueryWrapper<EventDO>().ne(EventDO::getEventStateId,EventStateEnum.EVENTSTATE_DRAFT.getCode()));
  if (list != null && list.size() > 0) {
    for (EventDO eventDO : list) {
      EsEvent event = ModelMapperUtil.map(eventDO, EsEvent.class, "yyyy-MM-dd HH:mm:ss");
      // 查询act
      CooActDO cooActDO = actService.getOne(new LambdaQueryWrapper<CooActDO>().eq(CooActDO::getBizId, event.getEventId()));
      if (cooActDO != null) {
        EsAct act =  ModelMapperUtil.map(cooActDO, EsAct.class, "yyyy-MM-dd HH:mm:ss");
        event.setAct(act);
      }
      esEventList.add(event);
    }
  }
  boolean b = esUtil.batchSaveOrUpdate(esEventList);
  return b;
}

然后将上述方法配置到xxl-job中即可。

0 人点赞