前言
在正文之前,我先感慨下,elasticsearch版本更新实在是太快了。2019年4月时候,我看到的版本还是7.0版本,现在9月,已经出到了7.3版本了,而且7.X版本较之前版本做了一些比较大改进。改进的内容有如下:
- 1、彻底废弃多type支持,包括api层面,之前版本可在一个索引库下创建多个type。
- 2、彻底废弃_all字段支持,为提升性能默认不再支持全文检索,即7.0之后版本进行该项配置会报错。
- 3、新增应用程序主动监测功能,搭配对应的kibana版本,用户可监测应用服务的健康状态,并在出现问题后及时发出通知。
- 4、取消query结果中hits count的支持(聚合查询除外),使得查询性能大幅提升(3x-7x faster)。这意味着,每次查询后将不能得到精确的结果集数量。
- 5、新增intervals query ,用户可设置多字符串在文档中出现的先后顺序进行检索。
- 6、新增script_core ,通过此操作用户可以精确控制返回结果的score分值。
- 7、优化集群协调子系统,缩减配置项提升稳定性。
- 8、新增 alias、date_nanos、features、vector等数据类型。
- 9、7.x自带java环境,所以我们在安装es时不再需要单独下载和配置java_home。
- 10、7.x将不会再有OOM的情况,JVM引入了新的circuit breaker(熔断)机制,当查询或聚合的数据量超出单机处理的最大内存限制时会被截断,并抛出异常(有点类似clickhouse)。
正文
什么是elasticsearch
ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java语言开发的,并作为Apache许可条款下的开放源码发布,是一种流行的企业级搜索引擎。ElasticSearch用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。官方客户端在Java、.NET(C#)、PHP、Python、Apache Groovy、Ruby和许多其他语言中都是可用的。
ElasticSearch的功能
1、分布式的搜索引擎和数据分析引擎
- 搜索:网站的站内搜索,IT系统的检索
- 数据分析:电商网站,统计销售排名前10的商家
2、全文检索,结构化检索,数据分析
- 全文检索:我想搜索商品名称包含某个关键字的商品
- 结构化检索:我想搜索商品分类为日化用品的商品都有哪些
- 数据分析:我们分析每一个商品分类下有多少个商品
3、对海量数据进行近实时的处理
- 分布式:ES自动可以将海量数据分散到多台服务器上去存储和检索
- 海联数据的处理:分布式以后,就可以采用大量的服务器去存储和检索数据,自然而然就可以实现海量数据的处理了
- 近实时:检索数据要花费1小时(这就不要近实时,离线批处理,batch-processing);在秒级别对数据进行搜索和分析
正文
ElasticSearch的优点
- 横向可扩展性: 作为大型分布式集群,很容易就能扩展新的服务器到ES集群中;也可运行在单机上作为轻量级搜索引擎使用。
- 更丰富的功能: 与传统关系型数据库相比,ES提供了全文检索、同义词处理、相关度排名、复杂数据分析、海量数据的近实时处理等功能。
- 分片机制提供更好地分布性: 同一个索引被分为多个分片(Shard),利用分而治之的思想提升处理效率。
- 高可用: 提供副本(Replica)机制,一个分片可以设置多个副本,即使在某些服务器宕机后,集群仍能正常工作。
- 开箱即用: 提供简单易用的 API,服务的搭建、部署和使用都很容易操作。
ElasticSearch的缺点
elasticSearch 目前主要用于大量数据的挖掘和搜索。使用的优势是在数据量较大的时候可以进行快速搜索,并且本身还带有分词器,可以对elasticSearch内的数据进行分词搜索。有利于数据管理。
但相对的来说,缺点也很明显。在需要添加新数据与新字段的时候,如果elasticSearch进行搜索是可能需要重新修改格式。之前的数据需要重新同步,对数据的管理有很多困难。一旦数据格式出现改变,会变得非常麻烦。另一个缺点就是在搜索的时候,和之前的mysql不同,有许多mysql可以搜索到的东西,在elasticSearch里就不能搜或很难搜。
ElasticSearch应用场景
- 搜索领域: 百度、谷歌,全文检索,高亮,搜索推荐等。
- 内容网站: 用户行为日志(点击、浏览、收藏、评论) 社交网络数据,数据分析(将公众对文章的反馈提交至文章作者),包括网站内容搜索等。
- Stack Overflow(IT技术论坛): 全文检索,搜索相关问题和答案。
- GitHub(开源代码管理), 搜索管理其托管的上千亿行代码。
- 日志数据分析: ELK技术栈(Elasticsearch Logstash Kibana)对日志数据进行采集和分析。
- 商品价格监控网站: 用户设定某商品的价格阈值,当价格低于该阈值时,向用户推送降价消息。
- BI系统(Business Intelligence, 商业智能): 分析某区域最近 3 年的用户消费额的趋势、用户群体的组成结构等。
- 其他应用: 电商、招聘、门户等网站的内部搜索服务,IT系统(OA、CRM、ERP等)的内部搜索服务、数据分析等。
springboot与elasticsearch整合
之前整合大都是以一个用户的增删改查为demo,这次就搞点不一样的。以长链接转短链接为demo,这个例子的搜索采用elasticsearch来搜索。在介绍整合之前,大家先看一张图
用过spring-data-*的同学可能知道,这个玩意儿集成了各种data,比如redis,mongo,elasticsearch等客户端操作,便于开发人员使用。上面的图片是我从官方github截图,以最新版的springboot2.1.8版本,其spring-boot-starter-data-elasticsearch引用的spring-data-elasticsearch最新版jar为3.1.9。这就意味着如果你elasticsearch的版本是7.x版本,目前是无法使用spring-data-elasticsearch封装的客户端的。如果你不信邪,可以试下,基本上会出现客户端与服务端不兼容。那怎么破?答案是可以使用官方提供的High Level REST Client的java客户端。废话说太多了,下边介绍如何整合
代码语言:javascript复制1、本例子所使用jar版本
<elasticsearch.version>7.3.1</elasticsearch.version>
<springboot.version>2.1.6.RELEASE</springboot.version>
代码语言:javascript复制2、pom.xml
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<exclusions>
<exclusion>
<artifactId>elasticsearch</artifactId>
<groupId>org.elasticsearch</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
</exclusions>
</dependency>
代码语言:javascript复制3、application.yml配置
spring:
data:
elasticsearch:
cluster-name: docker-cluster
cluster-nodes: localhost:9300
elasticsearch:
rest:
uris: ["http://localhost:9200"]
ipAddrs: # 自定义的属性
- localhost:9200
代码语言:javascript复制4、创建ElasticsearchRestClient类
@ConfigurationProperties(prefix = "spring.elasticsearch.rest")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ElasticsearchRestClientConfig {
private static final int ES_IPADDRS_LENGTH = 2;
private static final String HTTP_SCHEME = "http";
private List<String> ipAddrs = new ArrayList<>();
@Bean
public RestClientBuilder restClientBuilder() {
HttpHost[] hosts = ipAddrs.stream()
.map(this::makeHttpHost)
.filter(Objects::nonNull)
.toArray(HttpHost[]::new);
return RestClient.builder(hosts);
}
@Bean(name = "highLevelClient")
public RestHighLevelClient highLevelClient(RestClientBuilder restClientBuilder) {
return new RestHighLevelClient(restClientBuilder);
}
private HttpHost makeHttpHost(String s) {
assert StringUtils.isNotEmpty(s);
String[] address = s.split(":");
if (address.length == ES_IPADDRS_LENGTH) {
String ip = address[0];
int port = Integer.parseInt(address[1]);
return new HttpHost(ip, port, HTTP_SCHEME);
}
return null;
}
ps:该类从博主唯有遇见清的一篇文章->ElasticSearch7.2学习—springboot集成elasticsearch,链接如下
https://blog.csdn.net/pyq666/article/details/99639810
拿来稍微修改实现
代码语言:javascript复制5、elasticsearch工具类创建
@Component
@Slf4j
public class ElasticsearchHelper {
@Autowired
private RestHighLevelClient highLevelClient;
/**
* 扫描指定包且含有EsDocument的注解,并创建索引
* @param packageName 包名
*/
public void createIndexs(String packageName){
Set<Class<?>> clzSet = ReflectionUtil.getClasses(packageName,
EsDocument.class);
if(CollectionUtils.isNotEmpty(clzSet)) {
for (Class<?> clz : clzSet) {
createIndex(clz);
}
}
}
public void createIndex(Class<?> clz) {
CachedClass cachedClass = ReflectionCache.getCachedClass(clz);
EsIndex esIndex = convertDocumentToIndexEs(clz);
boolean isIndexExist = this.isIndexExist(esIndex.getIndexName());
if (!isIndexExist) {
CachedField[] fields = cachedClass.getFields();
Map<String, Map<String, Object>> properties = ReflectionUtil
.covertFieldsIncludeAnnotationValueToMap(fields, EsField.class);
boolean isSuccess = this.createIndex(esIndex,properties);
if(isSuccess){
log.info("创建{}索引成功",esIndex.getIndexName());
}
}
}
public EsIndex convertDocumentToIndexEs(Class<?> clz) {
EsDocument document = clz.getAnnotation(EsDocument.class);
EsIndex esIndex = new EsIndex();
esIndex.setIndexName(document.indexName());
esIndex.setType(document.type());
esIndex.setReplicas(document.replicas());
esIndex.setShards(document.shards());
return esIndex;
}
public Object getEsId(Object retVal) {
Object id = null;
List<Field> fields = FieldUtils.getFieldsListWithAnnotation(retVal.getClass(), EsId.class);
if(CollectionUtils.isNotEmpty(fields)){
Field idField = fields.get(0);
try {
id = FieldUtils.readDeclaredField(retVal,idField.getName(),true);
} catch (IllegalAccessException e) {
log.error(e.getMessage(),e);
}
}
return id;
}
/**
* 创建索引
*
* @param indexName 索引名称
* @param properties 最外层的map的key为属性字段名称,value为属性字段的具体需要设置的属性,比如分词、类型等
* 比如:@Field(type = FieldType.Text, analyzer = "ik_smart",searchAnalyzer = "ik_smart", fielddata=true)
* private String urlName;
* 其最外层map为urlName->@Field,最里层为map为type->FieldType.Text等
*
* @return 如果创建成功返回true、创建失败返回 fasle
*/
public boolean createIndex(String indexName, Map<String,Map<String,Object>> properties){
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject()
.startObject("mappings")
// .startObject("_doc")
.field("properties", properties)
//.endObject()
.endObject()
.startObject("settings")
.field("number_of_shards", 5)
.field("number_of_replicas", 1)
.endObject()
.endObject();
CreateIndexRequest request = new CreateIndexRequest(indexName).source(builder);
CreateIndexResponse response = highLevelClient.indices().create(request, RequestOptions.DEFAULT);
return response.isAcknowledged();
} catch (IOException e) {
log.error("createIndex error:" e.getMessage(),e);
}
return false;
}
/**
* 创建索引
* @param esIndex 索引实体
* @param properties
* @return
*/
public boolean createIndex(EsIndex esIndex, Map<String,Map<String,Object>> properties){
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject()
.startObject("mappings")
//.startObject(esIndex.getType()) es7版本已经废弃type
.field("properties", properties)
//.endObject()
.endObject()
.startObject("settings")
.field("number_of_shards", esIndex.getShards())
.field("number_of_replicas", esIndex.getReplicas())
.endObject()
.endObject();
CreateIndexRequest request = new CreateIndexRequest(esIndex.getIndexName()).source(builder);
CreateIndexResponse response = highLevelClient.indices().create(request, RequestOptions.DEFAULT);
return response.isAcknowledged();
} catch (IOException e) {
log.error("createIndex error:" e.getMessage(),e);
}
return false;
}
/**
* 判断索引是否已经存在
* @param indexName 索引名
* @return
*/
public boolean isIndexExist(String indexName){
try {
GetIndexRequest request = new GetIndexRequest(indexName);
request.local(false);
request.humanReadable(true);
request.includeDefaults(false);
return highLevelClient.indices().exists(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("isIndexExist error:" e.getMessage(),e);
}
return false;
}
/**
* 新增/更新一条记录
* @param indexName 索引名称
* @param entity
*/
public boolean saveOrUpdate(String indexName, EsEntity entity) {
boolean isSuccess = false;
IndexRequest request = new IndexRequest(indexName);
request.id(entity.getId());
request.source(JSON.toJSONString(entity.getData()), XContentType.JSON);
try {
IndexResponse response = highLevelClient.index(request, RequestOptions.DEFAULT);
isSuccess = response.status() == RestStatus.CREATED || response.status() == RestStatus.OK;
} catch (IOException e) {
log.error("saveOrUpdate error:" e.getMessage(),e);
}
return isSuccess;
}
/**
*
* 批量插入
* @param indexName
* @param list
*/
public boolean insertBatch(String indexName, List<EsEntity> list) {
BulkRequest request = new BulkRequest();
list.forEach(item -> request.add(new IndexRequest(indexName).id(item.getId())
.source(JSON.toJSONString(item.getData()), XContentType.JSON)));
try {
BulkResponse responses = highLevelClient.bulk(request, RequestOptions.DEFAULT);
return !responses.hasFailures();
} catch (Exception e) {
log.error("insertBatch error:" e.getMessage(),e);
}
return false;
}
/**
* 批量删除
* @param indexName
* @param idList 主键列表
* @param <T>
*/
public <T> boolean deleteBatch(String indexName, Collection<T> idList) {
BulkRequest request = new BulkRequest();
idList.forEach(item -> request.add(new DeleteRequest(indexName, item.toString())));
try {
BulkResponse responses = highLevelClient.bulk(request, RequestOptions.DEFAULT);
return !responses.hasFailures();
} catch (Exception e) {
log.error("deleteBatch error:" e.getMessage(),e);
}
return false;
}
/**
* 搜索
*
* @param indexName 索引名称
* @param builder 查询参数
* @param searchTargetClz 结果类对象
*
*/
public <T> List<T> search(String indexName, SearchSourceBuilder builder, Class<T> searchTargetClz) {
SearchRequest request = new SearchRequest(indexName);
request.source(builder);
try {
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
List<T> res = new ArrayList<>(hits.length);
for (SearchHit hit : hits) {
res.add(JSON.parseObject(hit.getSourceAsString(), searchTargetClz));
}
return res;
} catch (Exception e) {
log.error("search error:" e.getMessage(),e);
}
return null;
}
/**
* 分页搜索
*
* @param indexName 索引名称
* @param builder 查询参数
* @param searchTargetClz 结果类对象
*
*/
public <T> PageResult <T> pageSearch(String indexName, SearchSourceBuilder builder, Class<T> searchTargetClz){
SearchRequest request = new SearchRequest(indexName);
request.source(builder);
PageResult<T> pageResult = new PageResult<>();
try {
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
if(ArrayUtils.isNotEmpty(hits)){
List<T> res = new ArrayList<>(hits.length);
for (SearchHit hit : hits) {
res.add(JSON.parseObject(hit.getSourceAsString(), searchTargetClz));
}
int pageSize = builder.size();
int pageNo = builder.from() / pageSize 1;
long total = response.getHits().getTotalHits().value;
Long totalPage =(total pageSize - 1) / pageSize;
pageResult.setTotal(total);
pageResult.setPageNo(pageNo);
pageResult.setPageSize(pageSize);
pageResult.setTotalPages(Integer.valueOf(totalPage.toString()));
pageResult.setList(res);
}
return pageResult;
} catch (Exception e) {
log.error("pageSearch error:" e.getMessage(),e);
}
return null;
}
/**
* 删除索引
*
* @param indexName 索引名称
*
*/
public boolean deleteIndex(String indexName) {
try {
AcknowledgedResponse response = highLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
return response.isAcknowledged();
} catch (Exception e) {
log.error("deleteIndex error:" e.getMessage(),e);
}
return false;
}
/**
* 按查询条件删除记录
*
* @param indexName 索引名称
* @param builder builder 查询条件
*
*/
public boolean deleteByQuery(String indexName, QueryBuilder builder) {
DeleteByQueryRequest request = new DeleteByQueryRequest(indexName);
request.setQuery(builder);
//设置批量操作数量,最大为10000
request.setBatchSize(10000);
request.setConflicts("proceed");
try {
BulkByScrollResponse response = highLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
List<Failure> bulkFailures = response.getBulkFailures();
return CollectionUtils.isEmpty(bulkFailures);
} catch (Exception e) {
log.error("deleteByQuery error:" e.getMessage(),e);
}
return false;
}
}
ps:该类的实现内核主要拿博主FleyX在其发表文章->springboot整合elasticsearch(基于es7.2和官方high level client,其链接如下
https://www.tapme.top/blog/detail/2019-07-29-14-59/
中的工具类进行完善,完善的内容有通过扫描注解自动创建索引、分页等。
通过上面的步骤,基本上就可以实现elasticsearch的增删改查操作了。但是作为一个spring-boot-starter-data-*的频繁使用者,肯定不满足这样。于是自己就瞎折腾一个类似的轮子,因为其具体实现细节,代码还挺多,就不讲了。后边直接讲如何使用
自造的elasticsearch客户端使用
代码语言:javascript复制1、在启动类上加上EnableCustomElasticsearchRepositories注解,形如下:
@SpringBootApplication(exclude = ElasticsearchDataAutoConfiguration.class)
@EnableCustomElasticsearchRepositories(basePackages = "com.github.lybgeek.elasticsearch.repository")
public class ElasticsearchApplication {
public static void main(String[] args) {
/**
* Springboot整合Elasticsearch 在项目启动前设置一下的属性,防止报错
* 解决netty冲突后初始化client时还会抛出异常
* java.lang.IllegalStateException: availableProcessors is already set to [4], rejecting [4]
*/
System.setProperty("es.set.netty.runtime.available.processors", "false");
SpringApplication.run(ElasticsearchApplication.class, args);
}
代码语言:javascript复制2、创建一个elasticsearchRepository,代码如下
@ElasticsearchRepository
public interface CustomShortUrlEsRepository extends CustomElasticsearchRepository<ShortUrlDTO,Long>{
}
代码语言:javascript复制3、在业务service使用Repository,代码如下
@Service
public class CustomShortUrlEsServiceImpl implements CustomShortUrlEsService {
@Autowired
private CustomShortUrlEsRepository customShortUrlEsRepository ;
@Override
public boolean saveShortUrl(ShortUrlDTO shortUrlDTO) {
return customShortUrlEsRepository.save(shortUrlDTO);
}
@Override
public boolean deleteShortUrlById(Long id) {
return customShortUrlEsRepository.deleteById(id);
}
@Override
public List<ShortUrlDTO> listShortUrls(ShortUrlDTO shortUrlDTO) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
if (StringUtils.isNotBlank(shortUrlDTO.getLongUrl())) {
QueryBuilder builder = QueryBuilders.matchQuery("longUrl", shortUrlDTO.getLongUrl());
boolBuilder.must(builder);
}
if (StringUtils.isNotBlank(shortUrlDTO.getRemark())) {
QueryBuilder builder = QueryBuilders.matchQuery("remark", shortUrlDTO.getRemark());
boolBuilder.must(builder);
}
if (StringUtils.isNotBlank(shortUrlDTO.getUrlName())) {
QueryBuilder builder = QueryBuilders.matchQuery("urlName", shortUrlDTO.getUrlName());
boolBuilder.must(builder);
}
searchSourceBuilder.query(boolBuilder);
return customShortUrlEsRepository.search(searchSourceBuilder);
}
@Override
public ShortUrlDTO getShortUrlById(Long id) {
return customShortUrlEsRepository.getEntityById(id);
}
代码语言:javascript复制4、验证集成结果
@Test
public void testListCustomShortUrls(){
ShortUrlDTO dto = new ShortUrlDTO();
dto.setUrlName("美团");
// dto.setRemark("门户");
List<ShortUrlDTO> shortUrlDTOS = customShortUrlEsService.listShortUrls(dto);
System.out.println(shortUrlDTOS);
}
代码语言:javascript复制2019-09-27 11:32:05.740 INFO 37104 --- [ main] org.reflections.Reflections : Reflections took 198 ms to scan 1 urls, producing 1 keys and 1 values
2019-09-27 11:32:07.087 INFO 37104 --- [ main] g.l.c.e.r.p.ElasticsearchRepositoryProxy : entityClz:class com.github.lybgeek.shorturl.dto.ShortUrlDTO,methodName:search,args:[{"query":{"bool":{"must":[{"match":{"urlName":{"query":"美团","operator":"OR","prefix_length":0,"max_expansions":50,"fuzzy_transpositions":true,"lenient":false,"zero_terms_query":"NONE","auto_generate_synonyms_phrase_query":true,"boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}}}]
[ShortUrlDTO(id=null, longUrl=https://www.meituan.com, urlName=美团外卖, remark=美团外卖APP)]
总结
ElasticSearch这几年发展太快了,现在使用solr的人越来越少,但感觉也越来越学不动了。ElasticSearch目前也出了Elastic相关的证书了,考一次证书需要400刀,哈哈。本文没有对ElasticSearch的概念进行相关介绍,这方面的内容,可以查阅官方文档,或者看下我这边给的链接
ElasticSearch入门,看这篇文章就够了
https://www.toutiao.com/a6585098500915069443/
至于对使用场景的深入详解,可以查看这篇博文
https://www.toutiao.com/a6619185846794846727/
参考文档
ElasticSearch 7 正式发布!
https://blog.csdn.net/csdnnews/article/details/89310209
一网打尽Elasticsearch的优点,使用场景,全文检索,近实时等知识点
https://www.xttblog.com/?p=4372
大白话ElasticSearch是什么以及应用场景
https://blog.csdn.net/paicmis/article/details/82535018
elasticSearch优缺点感想
https://www.clzg.cn/article/47706.html
百度百科
https://baike.baidu.com/item/elasticsearch/3411206?fr=aladdin
demo链接
https://github.com/lyb-geek/springboot-learning/tree/master/springboot-elasticsearch