springboot实战之nosql整合(elasticsearch7.3版本)

2019-10-16 12:05:09 浏览数 (1)

前言

在正文之前,我先感慨下,elasticsearch版本更新实在是太快了。2019年4月时候,我看到的版本还是7.0版本,现在9月,已经出到了7.3版本了,而且7.X版本较之前版本做了一些比较大改进。改进的内容有如下:

  • 1、彻底废弃多type支持,包括api层面,之前版本可在一个索引库下创建多个type。
  • 2、彻底废弃_all字段支持,为提升性能默认不再支持全文检索,即7.0之后版本进行该项配置会报错。
  • 3、新增应用程序主动监测功能,搭配对应的kibana版本,用户可监测应用服务的健康状态,并在出现问题后及时发出通知。
  • 4、取消query结果中hits count的支持(聚合查询除外),使得查询性能大幅提升(3x-7x faster)。这意味着,每次查询后将不能得到精确的结果集数量。
  • 5、新增intervals query ,用户可设置多字符串在文档中出现的先后顺序进行检索。
  • 6、新增script_core ,通过此操作用户可以精确控制返回结果的score分值。
  • 7、优化集群协调子系统,缩减配置项提升稳定性。
  • 8、新增 alias、date_nanos、features、vector等数据类型。
  • 9、7.x自带java环境,所以我们在安装es时不再需要单独下载和配置java_home。
  • 10、7.x将不会再有OOM的情况,JVM引入了新的circuit breaker(熔断)机制,当查询或聚合的数据量超出单机处理的最大内存限制时会被截断,并抛出异常(有点类似clickhouse)。

正文

什么是elasticsearch

ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java语言开发的,并作为Apache许可条款下的开放源码发布,是一种流行的企业级搜索引擎。ElasticSearch用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。官方客户端在Java、.NET(C#)、PHP、Python、Apache Groovy、Ruby和许多其他语言中都是可用的。

ElasticSearch的功能

1、分布式的搜索引擎和数据分析引擎

  • 搜索:网站的站内搜索,IT系统的检索
  • 数据分析:电商网站,统计销售排名前10的商家

2、全文检索,结构化检索,数据分析

  • 全文检索:我想搜索商品名称包含某个关键字的商品
  • 结构化检索:我想搜索商品分类为日化用品的商品都有哪些
  • 数据分析:我们分析每一个商品分类下有多少个商品

3、对海量数据进行近实时的处理

  • 分布式:ES自动可以将海量数据分散到多台服务器上去存储和检索
  • 海联数据的处理:分布式以后,就可以采用大量的服务器去存储和检索数据,自然而然就可以实现海量数据的处理了
  • 近实时:检索数据要花费1小时(这就不要近实时,离线批处理,batch-processing);在秒级别对数据进行搜索和分析

正文

ElasticSearch的优点
  • 横向可扩展性: 作为大型分布式集群,很容易就能扩展新的服务器到ES集群中;也可运行在单机上作为轻量级搜索引擎使用。
  • 更丰富的功能: 与传统关系型数据库相比,ES提供了全文检索、同义词处理、相关度排名、复杂数据分析、海量数据的近实时处理等功能。
  • 分片机制提供更好地分布性: 同一个索引被分为多个分片(Shard),利用分而治之的思想提升处理效率。
  • 高可用: 提供副本(Replica)机制,一个分片可以设置多个副本,即使在某些服务器宕机后,集群仍能正常工作。
  • 开箱即用: 提供简单易用的 API,服务的搭建、部署和使用都很容易操作。
ElasticSearch的缺点

elasticSearch 目前主要用于大量数据的挖掘和搜索。使用的优势是在数据量较大的时候可以进行快速搜索,并且本身还带有分词器,可以对elasticSearch内的数据进行分词搜索。有利于数据管理。

但相对的来说,缺点也很明显。在需要添加新数据与新字段的时候,如果elasticSearch进行搜索是可能需要重新修改格式。之前的数据需要重新同步,对数据的管理有很多困难。一旦数据格式出现改变,会变得非常麻烦。另一个缺点就是在搜索的时候,和之前的mysql不同,有许多mysql可以搜索到的东西,在elasticSearch里就不能搜或很难搜。

ElasticSearch应用场景
  • 搜索领域: 百度、谷歌,全文检索,高亮,搜索推荐等。
  • 内容网站: 用户行为日志(点击、浏览、收藏、评论) 社交网络数据,数据分析(将公众对文章的反馈提交至文章作者),包括网站内容搜索等。
  • Stack Overflow(IT技术论坛): 全文检索,搜索相关问题和答案。
  • GitHub(开源代码管理), 搜索管理其托管的上千亿行代码。
  • 日志数据分析: ELK技术栈(Elasticsearch Logstash Kibana)对日志数据进行采集和分析。
  • 商品价格监控网站: 用户设定某商品的价格阈值,当价格低于该阈值时,向用户推送降价消息。
  • BI系统(Business Intelligence, 商业智能): 分析某区域最近 3 年的用户消费额的趋势、用户群体的组成结构等。
  • 其他应用: 电商、招聘、门户等网站的内部搜索服务,IT系统(OA、CRM、ERP等)的内部搜索服务、数据分析等。

springboot与elasticsearch整合

之前整合大都是以一个用户的增删改查为demo,这次就搞点不一样的。以长链接转短链接为demo,这个例子的搜索采用elasticsearch来搜索。在介绍整合之前,大家先看一张图

用过spring-data-*的同学可能知道,这个玩意儿集成了各种data,比如redis,mongo,elasticsearch等客户端操作,便于开发人员使用。上面的图片是我从官方github截图,以最新版的springboot2.1.8版本,其spring-boot-starter-data-elasticsearch引用的spring-data-elasticsearch最新版jar为3.1.9。这就意味着如果你elasticsearch的版本是7.x版本,目前是无法使用spring-data-elasticsearch封装的客户端的。如果你不信邪,可以试下,基本上会出现客户端与服务端不兼容。那怎么破?答案是可以使用官方提供的High Level REST Client的java客户端。废话说太多了,下边介绍如何整合

1、本例子所使用jar版本

代码语言:javascript复制
<elasticsearch.version>7.3.1</elasticsearch.version>
 <springboot.version>2.1.6.RELEASE</springboot.version>

2、pom.xml

代码语言:javascript复制
<dependency>
      <groupId>org.elasticsearch.client</groupId>
      <artifactId>elasticsearch-rest-high-level-client</artifactId>
      <exclusions>
        <exclusion>
          <artifactId>elasticsearch</artifactId>
          <groupId>org.elasticsearch</groupId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch</artifactId>
    </dependency>
    <dependency>
      <groupId>org.elasticsearch.client</groupId>
      <artifactId>elasticsearch-rest-client</artifactId>
      <exclusions>
        <exclusion>
          <artifactId>commons-logging</artifactId>
          <groupId>commons-logging</groupId>
        </exclusion>
      </exclusions>
    </dependency>

3、application.yml配置

代码语言:javascript复制
spring:
  data:
    elasticsearch:
      cluster-name: docker-cluster
      cluster-nodes: localhost:9300
  elasticsearch:
    rest:
      uris: ["http://localhost:9200"]
      ipAddrs: # 自定义的属性
          - localhost:9200

4、创建ElasticsearchRestClient类

代码语言:javascript复制
@ConfigurationProperties(prefix = "spring.elasticsearch.rest")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ElasticsearchRestClientConfig {

  private static final int ES_IPADDRS_LENGTH = 2;
  private static final String HTTP_SCHEME = "http";


  private List<String> ipAddrs = new ArrayList<>();


  @Bean
  public RestClientBuilder restClientBuilder() {
    HttpHost[] hosts = ipAddrs.stream()
        .map(this::makeHttpHost)
        .filter(Objects::nonNull)
        .toArray(HttpHost[]::new);
    return RestClient.builder(hosts);
  }

  @Bean(name = "highLevelClient")
  public RestHighLevelClient highLevelClient(RestClientBuilder restClientBuilder) {
    return new RestHighLevelClient(restClientBuilder);
  }



  private HttpHost makeHttpHost(String s) {
    assert StringUtils.isNotEmpty(s);
    String[] address = s.split(":");
    if (address.length == ES_IPADDRS_LENGTH) {
      String ip = address[0];
      int port = Integer.parseInt(address[1]);
      return new HttpHost(ip, port, HTTP_SCHEME);
    }

      return null;

  }

ps:该类从博主唯有遇见清的一篇文章->ElasticSearch7.2学习—springboot集成elasticsearch,链接如下

https://blog.csdn.net/pyq666/article/details/99639810

拿来稍微修改实现

5、elasticsearch工具类创建

代码语言:javascript复制
@Component
@Slf4j
public class ElasticsearchHelper {

  @Autowired
  private RestHighLevelClient highLevelClient;

  /**
   * 扫描指定包且含有EsDocument的注解,并创建索引
   * @param packageName 包名
   */
  public void createIndexs(String packageName){
    Set<Class<?>> clzSet = ReflectionUtil.getClasses(packageName,
        EsDocument.class);

   if(CollectionUtils.isNotEmpty(clzSet)) {
     for (Class<?> clz : clzSet) {
       createIndex(clz);

     }

   }

  }

  public void createIndex(Class<?> clz) {

    CachedClass cachedClass = ReflectionCache.getCachedClass(clz);
    EsIndex esIndex = convertDocumentToIndexEs(clz);
    boolean isIndexExist = this.isIndexExist(esIndex.getIndexName());
    if (!isIndexExist) {
      CachedField[] fields = cachedClass.getFields();
      Map<String, Map<String, Object>> properties = ReflectionUtil
          .covertFieldsIncludeAnnotationValueToMap(fields, EsField.class);
      boolean isSuccess =  this.createIndex(esIndex,properties);
      if(isSuccess){
        log.info("创建{}索引成功",esIndex.getIndexName());
      }
    }
  }

  public EsIndex convertDocumentToIndexEs(Class<?> clz) {

    EsDocument document = clz.getAnnotation(EsDocument.class);
    EsIndex esIndex = new EsIndex();
    esIndex.setIndexName(document.indexName());
    esIndex.setType(document.type());
    esIndex.setReplicas(document.replicas());
    esIndex.setShards(document.shards());
    return esIndex;
  }

  public Object getEsId(Object retVal) {

    Object id = null;

    List<Field> fields = FieldUtils.getFieldsListWithAnnotation(retVal.getClass(), EsId.class);

    if(CollectionUtils.isNotEmpty(fields)){
      Field idField = fields.get(0);
      try {
        id = FieldUtils.readDeclaredField(retVal,idField.getName(),true);
      } catch (IllegalAccessException e) {
        log.error(e.getMessage(),e);
      }
    }
    return id;
  }

  /**
   * 创建索引
   *
   * @param indexName 索引名称
   * @param properties 最外层的map的key为属性字段名称,value为属性字段的具体需要设置的属性,比如分词、类型等
   * 比如:@Field(type = FieldType.Text, analyzer = "ik_smart",searchAnalyzer = "ik_smart", fielddata=true)
   *      private String urlName;
   * 其最外层map为urlName->@Field,最里层为map为type->FieldType.Text等
   *

   * @return 如果创建成功返回true、创建失败返回 fasle
   */
  public boolean createIndex(String indexName, Map<String,Map<String,Object>> properties){

    try {
      XContentBuilder builder = XContentFactory.jsonBuilder();
      builder.startObject()
          .startObject("mappings")
         // .startObject("_doc")
          .field("properties", properties)
          //.endObject()
          .endObject()
          .startObject("settings")
          .field("number_of_shards", 5)
          .field("number_of_replicas", 1)
          .endObject()
          .endObject();

      CreateIndexRequest request = new CreateIndexRequest(indexName).source(builder);
      CreateIndexResponse response = highLevelClient.indices().create(request, RequestOptions.DEFAULT);

      return response.isAcknowledged();
    } catch (IOException e) {
       log.error("createIndex error:" e.getMessage(),e);
    }

    return false;

  }

  /**
   * 创建索引
   * @param esIndex 索引实体
   * @param properties
   * @return
   */
  public boolean createIndex(EsIndex esIndex, Map<String,Map<String,Object>> properties){

    try {
      XContentBuilder builder = XContentFactory.jsonBuilder();
      builder.startObject()
          .startObject("mappings")
          //.startObject(esIndex.getType()) es7版本已经废弃type
          .field("properties", properties)
          //.endObject()
          .endObject()
          .startObject("settings")
          .field("number_of_shards", esIndex.getShards())
          .field("number_of_replicas", esIndex.getReplicas())
          .endObject()
          .endObject();

      CreateIndexRequest request = new CreateIndexRequest(esIndex.getIndexName()).source(builder);
      CreateIndexResponse response = highLevelClient.indices().create(request, RequestOptions.DEFAULT);

      return response.isAcknowledged();
    } catch (IOException e) {
      log.error("createIndex error:" e.getMessage(),e);
    }

    return false;

  }

  /**
   * 判断索引是否已经存在
   * @param indexName 索引名
   * @return
   */
  public boolean isIndexExist(String indexName){

    try {
      GetIndexRequest request = new GetIndexRequest(indexName);
      request.local(false);
      request.humanReadable(true);
      request.includeDefaults(false);
      return highLevelClient.indices().exists(request, RequestOptions.DEFAULT);
    } catch (IOException e) {
      log.error("isIndexExist error:" e.getMessage(),e);
    }

    return false;
  }

  /**
   * 新增/更新一条记录
   * @param indexName 索引名称
   * @param entity
   */
  public boolean saveOrUpdate(String indexName, EsEntity entity) {
    boolean isSuccess = false;
    IndexRequest request = new IndexRequest(indexName);
    request.id(entity.getId());
    request.source(JSON.toJSONString(entity.getData()), XContentType.JSON);
    try {
      IndexResponse response = highLevelClient.index(request, RequestOptions.DEFAULT);
      isSuccess = response.status() == RestStatus.CREATED || response.status() == RestStatus.OK;
    } catch (IOException e) {
      log.error("saveOrUpdate error:" e.getMessage(),e);
    }

    return isSuccess;
  }

  /**
   *
   * 批量插入
   * @param indexName
   * @param list
   */
  public boolean insertBatch(String indexName, List<EsEntity> list) {
    BulkRequest request = new BulkRequest();
    list.forEach(item -> request.add(new IndexRequest(indexName).id(item.getId())
        .source(JSON.toJSONString(item.getData()), XContentType.JSON)));
    try {
      BulkResponse responses = highLevelClient.bulk(request, RequestOptions.DEFAULT);
      return !responses.hasFailures();
    } catch (Exception e) {
      log.error("insertBatch error:" e.getMessage(),e);
    }
    return false;
  }

  /**
   * 批量删除
   * @param indexName
   * @param idList 主键列表
   * @param <T>
   */
  public <T> boolean deleteBatch(String indexName, Collection<T> idList) {
    BulkRequest request = new BulkRequest();
    idList.forEach(item -> request.add(new DeleteRequest(indexName, item.toString())));
    try {
      BulkResponse responses = highLevelClient.bulk(request, RequestOptions.DEFAULT);
      return !responses.hasFailures();
    } catch (Exception e) {
      log.error("deleteBatch error:" e.getMessage(),e);
    }

    return false;
  }

  /**
   * 搜索
   *
   * @param indexName 索引名称
   * @param builder 查询参数
   * @param searchTargetClz 结果类对象
   *
   */
  public <T> List<T> search(String indexName, SearchSourceBuilder builder, Class<T> searchTargetClz) {
    SearchRequest request = new SearchRequest(indexName);
    request.source(builder);
    try {
      SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
      SearchHit[] hits = response.getHits().getHits();
      List<T> res = new ArrayList<>(hits.length);
      for (SearchHit hit : hits) {
        res.add(JSON.parseObject(hit.getSourceAsString(), searchTargetClz));
      }
      return res;
    } catch (Exception e) {
      log.error("search error:" e.getMessage(),e);
    }

    return null;
  }

  /**
   * 分页搜索
   *
   * @param indexName 索引名称
   * @param builder 查询参数
   * @param searchTargetClz 结果类对象
   *
   */
  public <T> PageResult <T> pageSearch(String indexName, SearchSourceBuilder builder, Class<T> searchTargetClz){
    SearchRequest request = new SearchRequest(indexName);

    request.source(builder);
    PageResult<T> pageResult = new PageResult<>();
    try {
      SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
      SearchHit[] hits = response.getHits().getHits();
      if(ArrayUtils.isNotEmpty(hits)){
        List<T> res = new ArrayList<>(hits.length);
        for (SearchHit hit : hits) {
          res.add(JSON.parseObject(hit.getSourceAsString(), searchTargetClz));
        }
        int pageSize = builder.size();
        int pageNo = builder.from() / pageSize   1;
        long total = response.getHits().getTotalHits().value;
        Long totalPage =(total   pageSize - 1) / pageSize;
        pageResult.setTotal(total);
        pageResult.setPageNo(pageNo);
        pageResult.setPageSize(pageSize);
        pageResult.setTotalPages(Integer.valueOf(totalPage.toString()));
        pageResult.setList(res);
      }

      return pageResult;
    } catch (Exception e) {
      log.error("pageSearch error:" e.getMessage(),e);
    }
    return null;
  }

  /**
   * 删除索引
   *
   * @param indexName 索引名称
   *
   */
  public boolean deleteIndex(String indexName) {
    try {
      AcknowledgedResponse response =  highLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
      return response.isAcknowledged();
    } catch (Exception e) {
      log.error("deleteIndex error:" e.getMessage(),e);
    }
    return false;
  }

  /**
   * 按查询条件删除记录
   *
   * @param indexName 索引名称
   * @param builder builder 查询条件
   *
   */
  public boolean deleteByQuery(String indexName, QueryBuilder builder) {
    DeleteByQueryRequest request = new DeleteByQueryRequest(indexName);
    request.setQuery(builder);
    //设置批量操作数量,最大为10000
    request.setBatchSize(10000);
    request.setConflicts("proceed");
    try {
      BulkByScrollResponse response = highLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
      List<Failure> bulkFailures = response.getBulkFailures();
      return CollectionUtils.isEmpty(bulkFailures);
    } catch (Exception e) {
      log.error("deleteByQuery error:" e.getMessage(),e);
    }

    return false;
  }



}

ps:该类的实现内核主要拿博主FleyX在其发表文章->springboot整合elasticsearch(基于es7.2和官方high level client,其链接如下

https://www.tapme.top/blog/detail/2019-07-29-14-59/

中的工具类进行完善,完善的内容有通过扫描注解自动创建索引、分页等。

通过上面的步骤,基本上就可以实现elasticsearch的增删改查操作了。但是作为一个spring-boot-starter-data-*的频繁使用者,肯定不满足这样。于是自己就瞎折腾一个类似的轮子,因为其具体实现细节,代码还挺多,就不讲了。后边直接讲如何使用

自造的elasticsearch客户端使用

1、在启动类上加上EnableCustomElasticsearchRepositories注解,形如下:

代码语言:javascript复制
@SpringBootApplication(exclude = ElasticsearchDataAutoConfiguration.class)
@EnableCustomElasticsearchRepositories(basePackages = "com.github.lybgeek.elasticsearch.repository")
public class ElasticsearchApplication {

    public static void main(String[] args) {
        /**
         * Springboot整合Elasticsearch 在项目启动前设置一下的属性,防止报错
         * 解决netty冲突后初始化client时还会抛出异常
         * java.lang.IllegalStateException: availableProcessors is already set to [4], rejecting [4]
         */
        System.setProperty("es.set.netty.runtime.available.processors", "false");
        SpringApplication.run(ElasticsearchApplication.class, args);
    }

2、创建一个elasticsearchRepository,代码如下

代码语言:javascript复制
@ElasticsearchRepository
public interface CustomShortUrlEsRepository extends CustomElasticsearchRepository<ShortUrlDTO,Long>{
}

3、在业务service使用Repository,代码如下

代码语言:javascript复制
@Service
public class CustomShortUrlEsServiceImpl implements CustomShortUrlEsService {


  @Autowired
  private CustomShortUrlEsRepository customShortUrlEsRepository ;



  @Override
  public boolean saveShortUrl(ShortUrlDTO shortUrlDTO) {

    return customShortUrlEsRepository.save(shortUrlDTO);
  }

  @Override
  public boolean deleteShortUrlById(Long id) {

    return customShortUrlEsRepository.deleteById(id);
  }

  @Override
  public List<ShortUrlDTO> listShortUrls(ShortUrlDTO shortUrlDTO) {
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
    if (StringUtils.isNotBlank(shortUrlDTO.getLongUrl())) {
      QueryBuilder builder = QueryBuilders.matchQuery("longUrl", shortUrlDTO.getLongUrl());
      boolBuilder.must(builder);

    }

    if (StringUtils.isNotBlank(shortUrlDTO.getRemark())) {
      QueryBuilder builder = QueryBuilders.matchQuery("remark", shortUrlDTO.getRemark());
      boolBuilder.must(builder);
    }

    if (StringUtils.isNotBlank(shortUrlDTO.getUrlName())) {
      QueryBuilder builder = QueryBuilders.matchQuery("urlName", shortUrlDTO.getUrlName());
      boolBuilder.must(builder);
    }

    searchSourceBuilder.query(boolBuilder);




    return customShortUrlEsRepository.search(searchSourceBuilder);
  }

  @Override
  public ShortUrlDTO getShortUrlById(Long id) {

    return customShortUrlEsRepository.getEntityById(id);
  }

4、验证集成结果

代码语言:javascript复制
@Test
   public void testListCustomShortUrls(){
     ShortUrlDTO dto = new ShortUrlDTO();
     dto.setUrlName("美团");
   //  dto.setRemark("门户");
     List<ShortUrlDTO> shortUrlDTOS = customShortUrlEsService.listShortUrls(dto);

     System.out.println(shortUrlDTOS);
   }

代码语言:javascript复制
2019-09-27 11:32:05.740  INFO 37104 --- [           main] org.reflections.Reflections              : Reflections took 198 ms to scan 1 urls, producing 1 keys and 1 values
2019-09-27 11:32:07.087  INFO 37104 --- [           main] g.l.c.e.r.p.ElasticsearchRepositoryProxy : entityClz:class com.github.lybgeek.shorturl.dto.ShortUrlDTO,methodName:search,args:[{"query":{"bool":{"must":[{"match":{"urlName":{"query":"美团","operator":"OR","prefix_length":0,"max_expansions":50,"fuzzy_transpositions":true,"lenient":false,"zero_terms_query":"NONE","auto_generate_synonyms_phrase_query":true,"boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}}}]
[ShortUrlDTO(id=null, longUrl=https://www.meituan.com, urlName=美团外卖, remark=美团外卖APP)]

总结

ElasticSearch这几年发展太快了,现在使用solr的人越来越少,但感觉也越来越学不动了。ElasticSearch目前也出了Elastic相关的证书了,考一次证书需要400刀,哈哈。本文没有对ElasticSearch的概念进行相关介绍,这方面的内容,可以查阅官方文档,或者看下我这边给的链接

ElasticSearch入门,看这篇文章就够了

https://www.toutiao.com/a6585098500915069443/

至于对使用场景的深入详解,可以查看这篇博文

https://www.toutiao.com/a6619185846794846727/

参考文档

ElasticSearch 7 正式发布!

https://blog.csdn.net/csdnnews/article/details/89310209

一网打尽Elasticsearch的优点,使用场景,全文检索,近实时等知识点

https://www.xttblog.com/?p=4372

大白话ElasticSearch是什么以及应用场景

https://blog.csdn.net/paicmis/article/details/82535018

elasticSearch优缺点感想

https://www.clzg.cn/article/47706.html

百度百科

https://baike.baidu.com/item/elasticsearch/3411206?fr=aladdin

demo链接

https://github.com/lyb-geek/springboot-learning/tree/master/springboot-elasticsearch

0 人点赞