全文搜索属于最常见的需求,开源的 Elasticsearch是目前全文搜索引擎的首选。它可以快速地储存、搜索和分析海量数据。维基百科、Stack Overflow、Github 都采用它。 ElasticSearch 的底层是开源库 Lucene,Elasticsearch 是 Lucene 的封装,它提供了 REST API 的操作接口,开箱即用。
本文从零开始,讲解如何使用 Elastic 搭建自己的全文搜索引擎。每一步都有详细的说明,大家跟着做就能学会。
一、搭建ElasticSearch单机/集群版
《搭建教程》
二、基本概念介绍
2.1 Index
Elasticsearch会索引所有字段,经过处理后写入一个反向索引(Inverted Index)。查找数据的时候,直接查找该索引。 所以,【ES5.0之前的版本】Elasticsearch数据管理的顶层单位就叫做 Index(索引),相当于数据库中的DataBase,且每个 Index (即数据库)的名字必须是小写。
Index是指存储具有相同结构文档(Document)的数据,每个索引都有自己的mapping定义,用于定义字段名和类型。
而在ES5.0之前一个Index可以拥有多个Type,所以在ES5.0之前大部分人都把Index当做数据库中的DataBase,但是[在ES6后],官网禁止在一个Index下建立多个type,意味着一个Index下只能包含一种相同文档的数据,所以在新版的ES中将Index比喻为DataBase并不合适,它更像Table。
查看当前节点的所有 Index:
代码语言:javascript复制$ curl -X GET 'http://hdp-01:9200/_cat/indices?v'
一个集群中可以拥有多个索引,比如nginx日志存储的时候可以按照日期每天生成一个索引来存储[索引内部应具有相同的结构]
nginx-log-2019-07-19
nginx-log-2019-07-20
nginx-log-2019-07-21
2.2 Document
Index 里面单条的记录称为 Document(文档),许多条 Document 构成了一个 Index,Document 使用 JSON 格式表示。
代码语言:javascript复制{
"name": "张三",
"age": "22",
"job": "Java后端工程师"
}
常见的数据类型:
字符串:text,keyword 【text具有分词效果,而keyword不分次】 数值:long,integer,short,byte,double,float,half_float,scaled_float 布尔:boolean 日期:date 二进制:binary 范围类型:integer_range,float_range,long_range,double_range,data_range
虽然同一个 Index 里面的 Document,不要求有相同的结构(scheme),但是最好保持相同,因为这样有利于提高搜索效率。
2.3 Document元数据
_index: 文档所在的索引名
_type:文档所在的类型名
_id:文档的唯一id
_uid:组合id,在ES5之前由_type和_id组成(ES6之后与_id一致)
_source:文档的原始Json数据,可以获取每个字段的值
_all:可以将所有字段整合到该元数据字段,默认是禁止的【它对所有的字段进行分词,占用空间大,性能也不算很高】
2.4 Type
Document 可以分组,比如weather这个 Index 里面,可以按城市分组(北京和上海),也可以按气候分组(晴天和雨天)。这种分组就叫做 Type,它是虚拟的逻辑分组,用来过滤 Document,相当于数据库中的table。 不同的 Type 应该有相似的结构(schema),举例来说,id字段不能在这个组是字符串,在另一个组是数值。这是与关系型数据库的表的一个区别。性质完全不同的数据(比如products和logs)应该存成两个 Index,而不是一个 Index 里面的两个 Type(虽然可以做到)。 下面的命令可以列出每个 Index 所包含的 Type。
代码语言:javascript复制$ curl 'hdp-01:9200/_mapping?pretty=true'
根据规划,ElasticSearch 6.x 版只允许每个 Index 包含一个 Type,7.x 版将会彻底移除 Type。
往ElasticSearch中存储数据,实际上就是往Index下的Type中存储JSON数据。
2.5 id
ElasticSearch中的id,实际上就相当于数据库中的主键id。id是可选的,不提供id时,ES也会自动生成。
2.6 RESTFul接口的URL格式
代码语言:javascript复制http://hdp-01:9200/<index>/<type>/[<id>]
Rest-> 表现层状态转换 可以通过RestAPI为资源(Document、Index)进行指定的操作。
RestAPI常用的两种交互方式: Curl、Kibana DevTools
其中index、type是必须提供的[ES5]。 id是可选的,不提供es会自动生成。 index、type将信息进行分层,利于管理。 index可以理解为数据库;type理解为数据表;id相当于数据库表中记录的主键,是唯一的。
三、新建和删除 Index
新建 Index,可以直接向 ElasticSearch服务器发出 PUT 请求。下面的例子是新建一个名叫weather
的 Index。
$ curl -X PUT 'hdp-01:9200/weather'
服务器返回一个 JSON 对象,里面的acknowledged
字段表示操作成功。
{
"acknowledged":true,
"shards_acknowledged":true
}
然后,我们发出 DELETE 请求,删除这个 Index。
代码语言:javascript复制$ curl -X DELETE 'hdp-01:9200/weather'
四、中文分词设置【IK】
首先,安装中文分词插件。这里使用的是IK分词器,也可以考虑其他插件(比如 smartcn)。 下载对应版本的插件【ElasticSearch是什么版本IK就用啥版本吧】 https://github.com/medcl/elasticsearch-analysis-ik/releases
上面代码安装的是5.4.3版的插件,与 ElasticSearch 5.4.3 配合使用。 1、下载ES对应版本的ik分词器的zip包,上传到ES服务器上。
2、在ES的安装目录下有一个plugins的目录,在这个目录下创建一个叫ik的目录【叫什么名字无所谓,但一定要创建一个文件夹,如果直接unzip到plugins内是不行的】,然后将解压好的内容,拷贝到ik目录。
3、使用scp命令将ik目录拷贝到其他的ES节点,重新启动所有的ES。
关闭es命令可以参考:
代码语言:javascript复制kill `ps -ef | grep Elasticsearch | grep -v grep | awk '{print $2}'`
方式1:新建一个 Index,指定需要分词的字段。这一步根据数据结构而异,下面的命令只针对本文。基本上,凡是需要搜索的中文字段,都要单独设置一下。【-d是指传参】
代码语言:javascript复制$ curl -X PUT 'hdp-01:9200/accounts' -d '
{
"mappings": {
"person": {
"properties": {
"user": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word"
},
"title": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word"
},
"desc": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word"
}
}
}
}
}'
上面代码中,首先新建一个名称为accounts
的 Index,里面有一个名称为person
的 Type。person
有三个字段。
user
title
desc
这三个字段都是中文,而且类型都是文本(text),所以需要指定中文分词器,不能使用默认的英文分词器。 ElasticSearch 的分词器称为 analyzer。我们对每个字段指定分词器。
代码语言:javascript复制"user": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word"
}
方式2:创建索引名字叫news
代码语言:javascript复制curl -XPUT http://hdp-01:9200/news
创建mapping(相当于数据中的schema信息,表名和字段名以及s字段的类型)
代码语言:javascript复制curl -XPOST http://hdp-01:9200/news/fulltext/_mapping -d'
{
"properties": {
"content": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word"
}
}
}'
上面代码中,analyzer
是字段文本的分词器,search_analyzer
是搜索词的分词器。ik_max_word
分词器是插件ik
提供的,可以对文本进行最大数量的分词。
分词测试:
代码语言:javascript复制curl -XGET 'http://hdp-01:9200/_analyze?pretty&analyzer=ik_max_word' -d '联想是全球最大的笔记本厂商'
代码语言:javascript复制curl -XGET 'http://hdp-01:9200/_analyze?pretty&analyzer=ik_smart' -d '联想是全球最大的笔记本厂商'
代码语言:javascript复制curl -XPOST http://hdp-01:9200/news/fulltext/1 -d'
{"content":"美国留给伊拉克的是个烂摊子吗"}'
curl -XPOST http://hdp-01:9200/news/fulltext/2 -d'
{"content":"公安部:各地校车将享最高路权"}'
curl -XPOST http://hdp-01:9200/news/fulltext/3 -d'
{"content":"中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"}'
curl -XPOST http://hdp-01:9200/news/fulltext/4 -d'
{"content":"中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"}'
向content字段插入数据,分析和查找时都使用ik分词器进行分词
代码语言:javascript复制curl -XPOST http://hdp-01:9200/news/fulltext/_search -d'
{
"query" : { "match" : { "content" : "中国" }},
"highlight" : {
"pre_tags" : ["<font color='red'>", "<tag2>"],
"post_tags" : ["</font>", "</tag2>"],
"fields" : {
"content" : {}
}
}
}'
参考官方例子:
https://github.com/medcl/elasticsearch-analysis-ik
五、数据操作
5.1 新增记录
向指定的 /Index/Type 发送 PUT 请求,就可以在 Index 里面新增一条记录。比如,向store索引中添加一些书籍
Index :store Type :books id :1
代码语言:javascript复制curl -XPUT 'http://hdp-01:9200/store/books/1' -d '{
"title": "Elasticsearch: The Definitive Guide",
"name" : {
"first" : "Zachary",
"last" : "Tong"
},
"publish_date":"2015-02-06",
"price":"49.99"
}'
服务器返回的 JSON 对象,会给出 Index、Type、Id、Version 等信息
代码语言:javascript复制{
"_index":"store",
"_type":"books",
"_id":"1",
"_version":1,
"result":"created",
"_shards":{
"total":2,
"successful":2,
"failed":0
},
"created":true
}
如果你仔细看,会发现请求路径是/store/books/1
,最后的1
是该条记录的 Id。它不一定是数字,任意字符串(比如abc
)都可以。新增记录的时候,也可以不指定 Id,这时要改成 POST 请求。
curl -X POST 'http://hdp-01:9200/store/books' -d '{
"title": "Elasticsearch: The Definitive Guide",
"name" : {
"first" : "cn",
"last" : "itcats"
},
"publish_date":"2019-04-26",
"price":"99999"
}'
上面代码中,向/store/books
发出一个 POST 请求,添加一个记录。这时,服务器返回的 JSON 对象里面,_id
字段就是一个随机字符串。
{
"_index":"store",
"_type":"books",
"_id":"AWpWQ-w1_0DpkACoUwPR",
"_version":1,
"result":"created",
"_shards":{
"total":2,
"successful":2,
"failed":0
},
"created":true
}
注意,如果没有先创建 Index(这个例子是store
),直接执行上面的命令,ElasticSearch 也不会报错,而是直接生成指定的 Index。所以,打字的时候要小心,不要写错 Index 的名称。
5.2 查看记录
在linux中通过curl的方式查询
代码语言:javascript复制curl -XGET 'http://hdp-01:9200/store/books/1'
通过浏览器查询
代码语言:javascript复制http://hdp-01:9200/store/books/1
代码语言:javascript复制{"_index":"store","_type":"books","_id":"1","_version":1,"found":true,"_source":{
"title": "Elasticsearch: The Definitive Guide",
"name" : {
"first" : "Zachary",
"last" : "Tong"
},
"publish_date":"2015-02-06",
"price":"49.99"
}}
返回的数据中,found
字段表示查询是否成功,_source
字段返回原始记录。
如果 Id 不正确,就查不到数据,found
字段就是false,
URL 的参数pretty=true
表示以易读的格式【即格式化的JSON】返回。
[itcats@hdp-01 ~]$ curl -XGET 'http://hdp-01:9200/store/books/abc?pretty=true'
代码语言:javascript复制{
"_index" : "store",
"_type" : "books",
"_id" : "aa",
"found" : false
}
通过_source获取指定的字段
只查title,相当于select title ...
代码语言:javascript复制curl -XGET 'http://hdp-01:9200/store/books/1?pretty=true&_source=title'
代码语言:javascript复制{
"_index" : "store",
"_type" : "books",
"_id" : "1",
"_version" : 1,
"found" : true,
"_source" : {
"title" : "Elasticsearch: The Definitive Guide"
}
}
只查title和price,相当于select title,price ...
代码语言:javascript复制curl -XGET 'http://hdp-01:9200/store/books/1?pretty=true&_source=title,price'
代码语言:javascript复制{
"_index" : "store",
"_type" : "books",
"_id" : "1",
"_version" : 1,
"found" : true,
"_source" : {
"price" : "49.99",
"title" : "Elasticsearch: The Definitive Guide"
}
}
_source全查询,跟没加没区别,相当于select *
代码语言:javascript复制curl -XGET 'http://hdp-01:9200/store/books/1?pretty=true&_source'
代码语言:javascript复制{
"_index" : "store",
"_type" : "books",
"_id" : "1",
"_version" : 1,
"found" : true,
"_source" : {
"title" : "Elasticsearch: The Definitive Guide",
"name" : {
"first" : "Zachary",
"last" : "Tong"
},
"publish_date" : "2015-02-06",
"price" : "49.99"
}
}
5.3 删除记录
删除记录就是发出 DELETE 请求
代码语言:javascript复制curl -XDELETE 'http://hdp-01:9200/store/books/1'
代码语言:javascript复制{"found":true,"_index":"store","_type":"books","_id":"1","_version":4,"result":"deleted","_shards":{"total":2,"successful":2,"failed":0}}
5.4 更新记录
更新记录就是使用 PUT 请求,重新发送一次数据。
代码语言:javascript复制curl -XPUT 'http://hdp-01:9200/store/books/1' -d '{
"title": "Elasticsearch: The Definitive Guide",
"name" : {
"first" : "Zachary",
"last" : "Tong"
},
"publish_date":"2019-04-26",
"price":"99.99"
}'
代码语言:javascript复制{"_index":"store","_type":"books","_id":"1","_version":2,"result":"updated","_shards":{"total":2,"successful":2,"failed":0},"created":false}
可以看到,记录的 Id 没变,但是版本(version)从1
变成2
,操作类型(result)从created
变成updated
,created
字段变成false
,因为这次不是新建记录。
或者通过 _update API的方式单独更新你想要更新的
代码语言:javascript复制curl -XPOST 'http://hdp-01:9200/store/books/1/_update' -d '{
"doc": {
"price" : 88.88
}
}'
代码语言:javascript复制{"_index":"store","_type":"books","_id":"1","_version":3,"result":"updated","_shards":{"total":2,"successful":2,"failed":0}}
6、数据查询
6.1 返回所有记录
使用 GET 方法,直接请求/Index/Type/_search
,就会返回所有记录。
curl 'hdp-01:9200/store/books/_search?pretty=true'
代码语言:javascript复制{
"took" : 36,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 3,
"max_score" : 1.0,
"hits" : [
{
"_index" : "store",
"_type" : "books",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"title" : "Elasticsearch Blueprints",
"name" : {
"first" : "Vineeth",
"last" : "Mohan"
},
"publish_date" : "2015-06-06",
"price" : "35.99"
}
},
{
"_index" : "store",
"_type" : "books",
"_id" : "4",
"_score" : 1.0,
"_source" : {
"title" : "Elasticsearch: The Definitive Guide",
"author" : "Guide",
"publish_date" : "2016-02-06",
"price" : "35.99"
}
},
{
"_index" : "store",
"_type" : "books",
"_id" : "AWpWQ-w1_0DpkACoUwPR",
"_score" : 1.0,
"_source" : {
"title" : "Elasticsearch: The Definitive Guide",
"name" : {
"first" : "cn",
"last" : "itcats"
},
"publish_date" : "2019-04-26",
"price" : "99999"
}
}
]
}
}
上面代码中,返回结果的 took
字段表示该操作的耗时(单位为毫秒),timed_out
字段表示是否超时,hits
字段表示命中的记录,里面子字段的含义如下:
total:返回记录数,本例是2条。
max_score:最高的匹配程度,本例是1.0。
hits:返回的记录组成的数组。
返回的记录中,每条记录都有一个_score
字段,表示匹配的程序,默认是按照这个字段降序排列。
6.2 最简单filter查询
代码语言:javascript复制SELECT * FROM books WHERE price = 35.99
filtered 查询价格是35.99的【返回的的分是1.0】先查所有再过滤
代码语言:javascript复制curl -XGET 'http://hdp-01:9200/store/books/_search' -d '{
"query": {
"bool": {
"must": {
"match_all": {}
},
"filter": {
"term": {
"price": 35.99
}
}
}
}
}'
【返回的的分是1.0】直接过滤
代码语言:javascript复制curl -XGET 'http://hdp-01:9200/store/books/_search' -d '{
"query": {
"constant_score": {
"filter": {
"term": {
"price": 35.99
}
}
}
}
}'
【返回的的分是0.0】直接过滤
代码语言:javascript复制curl -XGET 'http://hdp-01:9200/store/books/_search' -d '{
"query": {
"bool": {
"filter" : {
"term" : {
"price" : 35.99
}
}
}
}
}'
【指定多个值】 select * from books where price = 35.99 or price = 99.99
代码语言:javascript复制curl -XGET 'http://hdp-01:9200/store/books/_search' -d '{
"query" : {
"bool" : {
"filter" : {
"terms" : {
"price" : [35.99, 99.99]
}
}
}
}
}'
代码语言:javascript复制curl -XGET 'http://hdp-01:9200/store/books/_search' -d '{
"query" : {
"bool" : {
"must": {
"match_all": {}
},
"filter" : {
"terms" : {
"price" : [35.99, 99.99]
}
}
}
}
}'
6.3 bool过滤查询,可做组合过滤查询
代码语言:javascript复制SELECT * FROM books WHERE (price = 35.99 OR price = 99.99) AND publish_date != "2016-02-06"
类似的,Elasticsearch也有 and, or, not这样的组合条件的查询方式 格式如下:
代码语言:javascript复制 {
"bool" : {
"must" : [],
"should" : [],
"must_not" : [],
}
}
代码语言:javascript复制 must: 条件必须满足,相当于 and
should: 条件可以满足也可以不满足,相当于 or
must_not: 条件不需要满足,相当于 not
代码语言:javascript复制curl -XGET 'http://hdp-01:9200/store/books/_search' -d '{
"query" : {
"bool" : {
"should" : [
{ "term" : {"price" : 35.99}},
{ "term" : {"price" : 99.99}}
],
"must_not" : {
"term" : {"publish_date" : "2016-02-06"}
}
}
}
}'
6.4 嵌套查询
代码语言:javascript复制SELECT * FROM books WHERE price = 35.99 OR ( publish_date = "2016-02-06" AND price = 99.99 )
代码语言:javascript复制curl -XGET 'http://hdp-01:9200/store/books/_search' -d '{
"query": {
"bool": {
"should": [
{
"term": {
"price": 35.99
}
},
{
"bool": {
"must": [
{
"term": {
"publish_date": "2016-02-06"
}
},
{
"term": {
"price": 99.99
}
}
]
}
}
]
}
}
}'
6.5 range范围过滤
代码语言:javascript复制SELECT * FROM books WHERE price >= 10 AND price < 99
代码语言:javascript复制gt : > 大于
lt : < 小于
gte : >= 大于等于
lte : <= 小于等于
代码语言:javascript复制curl -XGET 'http://hdp-01:9200/store/books/_search' -d '{
"query": {
"range" : {
"price" : {
"gte" : 10,
"lt" : 99
}
}
}
}'
name和author都必须包含Guide,并且价钱等于33.99或者188.99
代码语言:javascript复制curl -XGET 'http://hdp-01:9200/store/books/_search' -d '{
"query": {
"bool": {
"must": {
"multi_match": {
"operator": "and",
"fields": [
"name",
"author"
],
"query": "Guide"
}
},
"filter": {
"terms": {
"price": [
35.99,
188.99
]
}
}
}
}
}'
7、ES-head插件安装
安装过程详见我的另外一篇博客:《ES图形化管理界面——ElasticSearch-head安装配置教程》
后台进程脚本代码
代码语言:javascript复制#/bin/sh
npm run start >> .log 2>&1 &
8、ElasticSearch-JavaAPI介绍
代码语言:javascript复制 <!-- es的客户端-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.4.3</version>
</dependency>
<!-- 依赖2.x的log4j -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<!-- 单元测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
Plus : ES Java-API操作需要log4j相关的依赖
在resources/ 下 创建log4j2.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="error">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
书写第一个入门程序:
代码语言:javascript复制public static void main(String[] args) {
try {
//设置集群名称
Settings settings = Settings.builder()
.put("cluster.name", "my-es")
.build();
//创建client
TransportClient client = new PreBuiltTransportClient(settings).addTransportAddresses(
//用java访问ES用的端口是9300
new InetSocketTransportAddress(InetAddress.getByName("hdp-01"), 9300),
new InetSocketTransportAddress(InetAddress.getByName("hdp-01"), 9300),
new InetSocketTransportAddress(InetAddress.getByName("hdp-01"), 9300));
//搜索数据(.actionGet()方法是同步的,没有返回就等待)
GetResponse response = client.prepareGet("news", "fulltext", "1").execute().actionGet();
//输出结果
System.out.println(response);
//关闭client
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
返回结果:
代码语言:javascript复制{"_index":"news","_type":"fulltext","_id":"1","_version":1,"found":true,"_source":
{"content":"美国留给伊拉克的是个烂摊子吗"}}
8.1 简单的CURD操作
代码语言:javascript复制package cn.itcats.es;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Date;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
public class ESCrud {
private TransportClient client = null;
@Before
public void init() throws Exception {
//设置集群名称
Settings settings = Settings.builder()
.put("cluster.name", "my-es")
//自动感知的功能(可以通过当前指定的节点获取所有es节点的信息),但至少也写两个节点信息
.put("client.transport.sniff", true)
.build();
//创建client
client = new PreBuiltTransportClient(settings).addTransportAddresses(
new InetSocketTransportAddress(InetAddress.getByName("hdp-01"), 9300),
new InetSocketTransportAddress(InetAddress.getByName("hdp-01"), 9300),
new InetSocketTransportAddress(InetAddress.getByName("hdp-01"), 9300));
}
@Test
public void testCreate() throws IOException {
//index:"gamelog" type:"users" id:"1"
IndexResponse response = client.prepareIndex("gamelog", "users", "1")
.setSource(
jsonBuilder()
//相当于{
.startObject()
.field("username", "itcats_cn")
.field("gender", "female")
.field("birthday", new Date())
.field("fv", 9999.99)
.field("message", "trying out Elasticsearch")
//相当于}
.endObject()
).get();
}
//查找一条
@Test
public void testGet() throws IOException {
GetResponse response = client.prepareGet("gamelog", "users", "1").get();
System.out.println(response.getSourceAsString());
}
//查找多条
@Test
public void testMultiGet() throws IOException {
MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
.add("gamelog", "users", "1")
.add("gamelog", "users", "2", "3")
.add("news", "fulltext", "1")
.get();
for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
GetResponse response = itemResponse.getResponse();
if (response.isExists()) {
String json = response.getSourceAsString();
System.out.println(json);
}
}
}
@Test
public void testUpdate() throws Exception {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("gamelog");
updateRequest.type("users");
updateRequest.id("2");
updateRequest.doc(
jsonBuilder()
.startObject()
.field("fv", 999.9)
.endObject());
client.update(updateRequest).get();
}
@Test
public void testDelete() {
DeleteResponse response = client.prepareDelete("gamelog", "users", "2").get();
System.out.println(response);
}
@Test
public void testDeleteByQuery() {
BulkByScrollResponse response =
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
//指定查询条件
.filter(QueryBuilders.matchQuery("username", "itcats"))
//指定索引名称
.source("gamelog")
.get();
long deleted = response.getDeleted();
System.out.println(deleted);
}
//异步删除
@Test
public void testDeleteByQueryAsync() {
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male"))
.source("gamelog")
.execute(new ActionListener<BulkByScrollResponse>() {
@Override //execute(args) args内传递了一个监听器,若成功查找到结果则回调onResponse()
public void onResponse(BulkByScrollResponse response) {
long deleted = response.getDeleted();
System.out.println("数据删除了");
System.out.println(deleted);
}
@Override //未成功查找到时则回调onFailure()
public void onFailure(Exception e) {
e.printStackTrace();
}
});
try {
System.out.println("异步删除");
//Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testRange() {
QueryBuilder qb = rangeQuery("fv")
// [88.99, 10000)
.from(88.99)
.to(10000) //注意字符串的坑
.includeLower(true)
.includeUpper(false);
SearchResponse response = client.prepareSearch("gamelog").setQuery(qb).get();
System.out.println(response);
}
}
8.2 JavaAPI创建Mapping
代码语言:javascript复制public class AdminAPI {
private TransportClient client = null;
//在所有的测试方法之前执行
@Before
public void init() throws Exception {
//设置集群名称
Settings settings = Settings.builder().put("cluster.name", "my-es").build();
//创建client
client = new PreBuiltTransportClient(settings).addTransportAddresses(
new InetSocketTransportAddress(InetAddress.getByName("hdp-01"), 9300),
new InetSocketTransportAddress(InetAddress.getByName("hdp-01"), 9300),
new InetSocketTransportAddress(InetAddress.getByName("hdp-01"), 9300));
}
//创建索引,并配置一些参数
@Test
public void createIndexWithSettings() {
//获取Admin的API
AdminClient admin = client.admin();
//使用Admin API对索引进行操作
IndicesAdminClient indices = admin.indices();
//准备创建索引
indices.prepareCreate("gamelog")
//配置索引参数
.setSettings(
//参数配置器
Settings.builder()//指定索引分区的数量
.put("index.number_of_shards", 4)
//指定索引副本的数量(注意:不包括本身,如果设置数据存储副本为2,实际上数据存储了3份)
.put("index.number_of_replicas", 2)
)
//真正执行
.get();
}
//跟索引添加mapping信息(给表添加schema信息)
@Test
public void putMapping() {
//创建索引
client.admin().indices().prepareCreate("twitter")
//创建一个type,并指定type中属性的名字和类型
.addMapping("tweet",
"{n"
" "tweet": {n"
" "properties": {n"
" "message": {n"
" "type": "string"n"
" }n"
" }n"
" }n"
" }")
.get();
}
/**
* 你可以通过dynamic设置来控制这一行为,它能够接受以下的选项:
* true:默认值。动态添加字段
* false:忽略新字段
* strict:如果碰到陌生字段,抛出异常
* @throws IOException
*/
@Test
public void testSettingsMappings() throws IOException {
//1:settings
HashMap<String, Object> settings_map = new HashMap<String, Object>(2);
settings_map.put("number_of_shards", 3);
settings_map.put("number_of_replicas", 2);
//2:mappings(映射、schema)
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("dynamic", "true")
//设置type中的属性
.startObject("properties")
//id属性
.startObject("num")
//类型是integer
.field("type", "integer")
//不分词,但是建索引
.field("index", "not_analyzed")
//在文档中存储
.field("store", "yes")
.endObject()
//name属性
.startObject("name")
//string类型
.field("type", "string")
//在文档中存储
.field("store", "yes")
//建立索引
.field("index", "analyzed")
//使用ik_smart进行分词
.field("analyzer", "ik_smart")
.endObject()
.endObject()
.endObject();
CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("user_info");
//管理索引(user_info)然后关联type(user)
prepareCreate.setSettings(settings_map).addMapping("user", builder).get();
}
/**
* index这个属性,no代表不建索引
* not_analyzed,建索引不分词
* analyzed 即分词,又建立索引
* expected [no], [not_analyzed] or [analyzed]
* @throws IOException
*/
@Test
public void testSettingsPlayerMappings() throws IOException {
//1:settings
HashMap<String, Object> settings_map = new HashMap<String, Object>(2);
settings_map.put("number_of_shards", 3);
settings_map.put("number_of_replicas", 1);
//2:mappings
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()//
.field("dynamic", "true")
.startObject("properties")
.startObject("id")
.field("type", "integer")
.field("store", "yes")
.endObject()
.startObject("name")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("age")
.field("type", "integer")
.endObject()
.startObject("salary")
.field("type", "integer")
.endObject()
.startObject("team")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("position")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("description")
.field("type", "string")
.field("store", "no")
.field("index", "analyzed")
.field("analyzer", "ik_smart")
.endObject()
.startObject("addr")
.field("type", "string")
.field("store", "yes")
.field("index", "analyzed")
.field("analyzer", "ik_smart")
.endObject()
.endObject()
.endObject();
CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("player_info");
prepareCreate.setSettings(settings_map).addMapping("player", builder).get();
}
}
8.3 JavaAPI对ES做复杂的聚合查询
代码语言:javascript复制/**
* https://elasticsearch.cn/article/102
*
* select team, count(*) as player_count from player group by team;
*/
@Test
public void testAgg1() {
//指定索引和type
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//按team分组然后聚合,但是并没有指定聚合函数,别名为:player_count 对team字段分组
TermsAggregationBuilder teamAgg = AggregationBuilders.terms("player_count").field("team");
//添加聚合器
builder.addAggregation(teamAgg);
//触发
SearchResponse response = builder.execute().actionGet();
//System.out.println(response);
//将返回的结果放入到一个map中
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
// Set<String> keys = aggMap.keySet();
//
// for (String key: keys) {
// System.out.println(key);
// }
//取出聚合属性
StringTerms terms = (StringTerms) aggMap.get("player_count");
//依次迭代出分组聚合数据
// for (Terms.Bucket bucket : terms.getBuckets()) {
// //分组的名字
// String team = (String) bucket.getKey();
// //count,分组后一个组有多少数据
// long count = bucket.getDocCount();
// System.out.println(team " " count);
// }
Iterator<Terms.Bucket> teamBucketIt = terms.getBuckets().iterator();
while (teamBucketIt .hasNext()) {
Terms.Bucket bucket = teamBucketIt.next();
String team = (String) bucket.getKey();
long count = bucket.getDocCount();
System.out.println(team " " count);
}
}
/**
* select team, position, count(*) as pos_count from player group by team, position;
*/
@Test
public void testAgg2() {
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//指定别名和分组的字段
TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
TermsAggregationBuilder posAgg= AggregationBuilders.terms("pos_count").field("position");
//添加两个聚合构建器
builder.addAggregation(teamAgg.subAggregation(posAgg));
//执行查询
SearchResponse response = builder.execute().actionGet();
//将查询结果放入map中
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
//根据属性名到map中查找
StringTerms teams = (StringTerms) aggMap.get("team_name");
//循环查找结果
for (Terms.Bucket teamBucket : teams.getBuckets()) {
//先按球队进行分组
String team = (String) teamBucket.getKey();
Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
StringTerms positions = (StringTerms) subAggMap.get("pos_count");
//因为一个球队有很多位置,那么还要依次拿出位置信息
for (Terms.Bucket posBucket : positions.getBuckets()) {
//拿到位置的名字
String pos = (String) posBucket.getKey();
//拿出该位置的数量
long docCount = posBucket.getDocCount();
//打印球队,位置,人数
System.out.println(team " " pos " " docCount);
}
}
}
/**
* select team, max(age) as max_age from player group by team;
*/
@Test
public void testAgg3() {
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//指定按球队进行分组
TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
//指定分组求最大值
MaxAggregationBuilder maxAgg = AggregationBuilders.max("max_age").field("age");
//分组后求最大值
builder.addAggregation(teamAgg.subAggregation(maxAgg));
//查询
SearchResponse response = builder.execute().actionGet();
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
//根据team属性,获取map中的内容
StringTerms teams = (StringTerms) aggMap.get("team_name");
for (Terms.Bucket teamBucket : teams.getBuckets()) {
//分组的属性名
String team = (String) teamBucket.getKey();
//在将聚合后取最大值的内容取出来放到map中
Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
//取分组后的最大值
InternalMax ages = (InternalMax)subAggMap.get("max_age");
double max = ages.getValue();
System.out.println(team " " max);
}
}
/**
* select team, avg(age) as avg_age, sum(salary) as total_salary from player group by team;
*/
@Test
public void testAgg4() {
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//指定分组字段
TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team");
//指定聚合函数是求平均数据
AvgAggregationBuilder avgAgg = AggregationBuilders.avg("avg_age").field("age");
//指定另外一个聚合函数是求和
SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
//分组的聚合器关联了两个聚合函数
builder.addAggregation(termsAgg.subAggregation(avgAgg).subAggregation(sumAgg));
SearchResponse response = builder.execute().actionGet();
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
//按分组的名字取出数据
StringTerms teams = (StringTerms) aggMap.get("team_name");
for (Terms.Bucket teamBucket : teams.getBuckets()) {
//获取球队名字
String team = (String) teamBucket.getKey();
Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
//根据别名取出平均年龄
InternalAvg avgAge = (InternalAvg)subAggMap.get("avg_age");
//根据别名取出薪水总和
InternalSum totalSalary = (InternalSum)subAggMap.get("total_salary");
double avgAgeValue = avgAge.getValue();
double totalSalaryValue = totalSalary.getValue();
System.out.println(team " " avgAgeValue " " totalSalaryValue);
}
}
/**
* select team, sum(salary) as total_salary from player group by team order by total_salary desc;
*/
@Test
public void testAgg5() {
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//按team进行分组,然后指定排序规则
TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team").order(Terms.Order.aggregation("total_salary ", true));
SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
builder.addAggregation(termsAgg.subAggregation(sumAgg));
SearchResponse response = builder.execute().actionGet();
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
StringTerms teams = (StringTerms) aggMap.get("team_name");
for (Terms.Bucket teamBucket : teams.getBuckets()) {
String team = (String) teamBucket.getKey();
Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
InternalSum totalSalary = (InternalSum)subAggMap.get("total_salary");
double totalSalaryValue = totalSalary.getValue();
System.out.println(team " " totalSalaryValue);
}
}