ElasticSearch入门

2021-05-14 17:29:10 浏览数 (1)

    全文搜索属于最常见的需求,开源的 Elasticsearch是目前全文搜索引擎的首选。它可以快速地储存、搜索和分析海量数据。维基百科、Stack Overflow、Github 都采用它。 ElasticSearch 的底层是开源库 Lucene,Elasticsearch 是 Lucene 的封装,它提供了 REST API 的操作接口,开箱即用。

本文从零开始,讲解如何使用 Elastic 搭建自己的全文搜索引擎。每一步都有详细的说明,大家跟着做就能学会。

一、搭建ElasticSearch单机/集群版

《搭建教程》

二、基本概念介绍

2.1    Index

Elasticsearch会索引所有字段,经过处理后写入一个反向索引(Inverted Index)。查找数据的时候,直接查找该索引。 所以,【ES5.0之前的版本】Elasticsearch数据管理的顶层单位就叫做 Index(索引),相当于数据库中的DataBase,且每个 Index (即数据库)的名字必须是小写

Index是指存储具有相同结构文档(Document)的数据,每个索引都有自己的mapping定义,用于定义字段名和类型。

而在ES5.0之前一个Index可以拥有多个Type,所以在ES5.0之前大部分人都把Index当做数据库中的DataBase,但是[在ES6后],官网禁止在一个Index下建立多个type,意味着一个Index下只能包含一种相同文档的数据,所以在新版的ES中将Index比喻为DataBase并不合适,它更像Table。

查看当前节点的所有 Index:

代码语言:javascript复制
$ curl -X GET 'http://hdp-01:9200/_cat/indices?v'

一个集群中可以拥有多个索引,比如nginx日志存储的时候可以按照日期每天生成一个索引来存储[索引内部应具有相同的结构]

nginx-log-2019-07-19

nginx-log-2019-07-20

nginx-log-2019-07-21

2.2    Document

Index 里面单条的记录称为 Document(文档),许多条 Document 构成了一个 Index,Document 使用 JSON 格式表示

代码语言:javascript复制
{
  "name": "张三",
  "age": "22",
  "job": "Java后端工程师"
}

常见的数据类型:

字符串:text,keyword 【text具有分词效果,而keyword不分次】 数值:long,integer,short,byte,double,float,half_float,scaled_float 布尔:boolean 日期:date 二进制:binary 范围类型:integer_range,float_range,long_range,double_range,data_range

虽然同一个 Index 里面的 Document,不要求有相同的结构(scheme),但是最好保持相同,因为这样有利于提高搜索效率。

2.3    Document元数据

_index: 文档所在的索引名

_type:文档所在的类型名

_id:文档的唯一id

_uid:组合id,在ES5之前由_type和_id组成(ES6之后与_id一致)

_source:文档的原始Json数据,可以获取每个字段的值

_all:可以将所有字段整合到该元数据字段,默认是禁止的【它对所有的字段进行分词,占用空间大,性能也不算很高】

2.4    Type

Document 可以分组,比如weather这个 Index 里面,可以按城市分组(北京和上海),也可以按气候分组(晴天和雨天)。这种分组就叫做 Type,它是虚拟的逻辑分组,用来过滤 Document,相当于数据库中的table。 不同的 Type 应该有相似的结构(schema),举例来说,id字段不能在这个组是字符串,在另一个组是数值。这是与关系型数据库的表的一个区别。性质完全不同的数据(比如products和logs)应该存成两个 Index,而不是一个 Index 里面的两个 Type(虽然可以做到)。 下面的命令可以列出每个 Index 所包含的 Type。

代码语言:javascript复制
$ curl 'hdp-01:9200/_mapping?pretty=true'

根据规划,ElasticSearch 6.x 版只允许每个 Index 包含一个 Type,7.x 版将会彻底移除 Type

往ElasticSearch中存储数据,实际上就是往Index下的Type中存储JSON数据。

2.5    id

ElasticSearch中的id,实际上就相当于数据库中的主键idid是可选的,不提供id时,ES也会自动生成

2.6    RESTFul接口的URL格式

代码语言:javascript复制
http://hdp-01:9200/<index>/<type>/[<id>]

Rest-> 表现层状态转换   可以通过RestAPI为资源(Document、Index)进行指定的操作。

RestAPI常用的两种交互方式: Curl、Kibana DevTools

其中index、type是必须提供的[ES5]。 id是可选的,不提供es会自动生成。 index、type将信息进行分层,利于管理。 index可以理解为数据库;type理解为数据表;id相当于数据库表中记录的主键,是唯一的。

三、新建和删除 Index

新建 Index,可以直接向 ElasticSearch服务器发出 PUT 请求。下面的例子是新建一个名叫weather的 Index。

代码语言:javascript复制
$ curl -X PUT 'hdp-01:9200/weather'

服务器返回一个 JSON 对象,里面的acknowledged字段表示操作成功。

代码语言:javascript复制
{
  "acknowledged":true,
  "shards_acknowledged":true
}

然后,我们发出 DELETE 请求,删除这个 Index。

代码语言:javascript复制
$ curl -X DELETE 'hdp-01:9200/weather'

四、中文分词设置【IK】

首先,安装中文分词插件。这里使用的是IK分词器,也可以考虑其他插件(比如 smartcn)。 下载对应版本的插件【ElasticSearch是什么版本IK就用啥版本吧】 https://github.com/medcl/elasticsearch-analysis-ik/releases

上面代码安装的是5.4.3版的插件,与 ElasticSearch 5.4.3 配合使用。 1、下载ES对应版本的ik分词器的zip包,上传到ES服务器上。

2、在ES的安装目录下有一个plugins的目录,在这个目录下创建一个叫ik的目录【叫什么名字无所谓,但一定要创建一个文件夹,如果直接unzip到plugins内是不行的】,然后将解压好的内容,拷贝到ik目录。

3、使用scp命令将ik目录拷贝到其他的ES节点,重新启动所有的ES。

关闭es命令可以参考:

代码语言:javascript复制
kill `ps -ef | grep Elasticsearch | grep -v grep | awk '{print $2}'`

方式1:新建一个 Index,指定需要分词的字段。这一步根据数据结构而异,下面的命令只针对本文。基本上,凡是需要搜索的中文字段,都要单独设置一下。【-d是指传参】

代码语言:javascript复制
$ curl -X PUT 'hdp-01:9200/accounts' -d '
{
  "mappings": {
    "person": {
      "properties": {
        "user": {
          "type": "text",
          "analyzer": "ik_max_word",
          "search_analyzer": "ik_max_word"
        },
        "title": {
          "type": "text",
          "analyzer": "ik_max_word",
          "search_analyzer": "ik_max_word"
        },
        "desc": {
          "type": "text",
          "analyzer": "ik_max_word",
          "search_analyzer": "ik_max_word"
        }
      }
    }
  }
}'

上面代码中,首先新建一个名称为accounts的 Index,里面有一个名称为person的 Type。person有三个字段。

代码语言:javascript复制
user
title
desc

这三个字段都是中文,而且类型都是文本(text),所以需要指定中文分词器,不能使用默认的英文分词器。 ElasticSearch 的分词器称为 analyzer。我们对每个字段指定分词器。

代码语言:javascript复制
"user": {
  "type": "text",
  "analyzer": "ik_max_word",
  "search_analyzer": "ik_max_word"
}

方式2:创建索引名字叫news

代码语言:javascript复制
curl -XPUT http://hdp-01:9200/news

创建mapping(相当于数据中的schema信息,表名和字段名以及s字段的类型)

代码语言:javascript复制
curl -XPOST http://hdp-01:9200/news/fulltext/_mapping -d'
{
        "properties": {
            "content": {
                "type": "text",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_max_word"
            }
        }
    
}'

 上面代码中,analyzer是字段文本的分词器,search_analyzer是搜索词的分词器ik_max_word分词器是插件ik提供的,可以对文本进行最大数量的分词。

分词测试:

代码语言:javascript复制
curl -XGET 'http://hdp-01:9200/_analyze?pretty&analyzer=ik_max_word' -d '联想是全球最大的笔记本厂商'
代码语言:javascript复制
curl -XGET 'http://hdp-01:9200/_analyze?pretty&analyzer=ik_smart' -d '联想是全球最大的笔记本厂商'
代码语言:javascript复制
curl -XPOST http://hdp-01:9200/news/fulltext/1 -d'
{"content":"美国留给伊拉克的是个烂摊子吗"}'

curl -XPOST http://hdp-01:9200/news/fulltext/2 -d'
{"content":"公安部:各地校车将享最高路权"}'

curl -XPOST http://hdp-01:9200/news/fulltext/3 -d'
{"content":"中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"}'

curl -XPOST http://hdp-01:9200/news/fulltext/4 -d'
{"content":"中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"}'

向content字段插入数据,分析和查找时都使用ik分词器进行分词 

代码语言:javascript复制
curl -XPOST http://hdp-01:9200/news/fulltext/_search  -d'
{
    "query" : { "match" : { "content" : "中国" }},
    "highlight" : {
        "pre_tags" : ["<font color='red'>", "<tag2>"],
        "post_tags" : ["</font>", "</tag2>"],
        "fields" : {
            "content" : {}
        }
    }
}'

参考官方例子:

https://github.com/medcl/elasticsearch-analysis-ik

五、数据操作

5.1 新增记录

向指定的 /Index/Type 发送 PUT 请求,就可以在 Index 里面新增一条记录。比如,向store索引中添加一些书籍

Index :store       Type :books     id :1

代码语言:javascript复制
curl -XPUT 'http://hdp-01:9200/store/books/1' -d '{
  "title": "Elasticsearch: The Definitive Guide",
  "name" : {
    "first" : "Zachary",
    "last" : "Tong"
  },
  "publish_date":"2015-02-06",
  "price":"49.99"
}'

服务器返回的 JSON 对象,会给出 Index、Type、Id、Version 等信息

代码语言:javascript复制
{
    "_index":"store",
    "_type":"books",
    "_id":"1",
    "_version":1,
    "result":"created",
    "_shards":{
        "total":2,
        "successful":2,
        "failed":0
    },
    "created":true
}

如果你仔细看,会发现请求路径是/store/books/1,最后的1是该条记录的 Id。它不一定是数字,任意字符串(比如abc)都可以。新增记录的时候,也可以不指定 Id,这时要改成 POST 请求。

代码语言:javascript复制
curl -X POST 'http://hdp-01:9200/store/books' -d '{
  "title": "Elasticsearch: The Definitive Guide",
  "name" : {
    "first" : "cn",
    "last" : "itcats"
  },
  "publish_date":"2019-04-26",
  "price":"99999"
}'

上面代码中,向/store/books发出一个 POST 请求,添加一个记录。这时,服务器返回的 JSON 对象里面,_id字段就是一个随机字符串。

代码语言:javascript复制
{
    "_index":"store",
    "_type":"books",
    "_id":"AWpWQ-w1_0DpkACoUwPR",
    "_version":1,
    "result":"created",
    "_shards":{
        "total":2,
        "successful":2,
        "failed":0
    },
    "created":true
}

注意,如果没有先创建 Index(这个例子是store),直接执行上面的命令,ElasticSearch 也不会报错,而是直接生成指定的 Index。所以,打字的时候要小心,不要写错 Index 的名称。

5.2 查看记录

在linux中通过curl的方式查询

代码语言:javascript复制
curl -XGET 'http://hdp-01:9200/store/books/1'

通过浏览器查询

代码语言:javascript复制
http://hdp-01:9200/store/books/1
代码语言:javascript复制
{"_index":"store","_type":"books","_id":"1","_version":1,"found":true,"_source":{
  "title": "Elasticsearch: The Definitive Guide",
  "name" : {
    "first" : "Zachary",
    "last" : "Tong"
  },
  "publish_date":"2015-02-06",
  "price":"49.99"
}}

返回的数据中,found字段表示查询是否成功,_source字段返回原始记录。

如果 Id 不正确,就查不到数据,found字段就是false,URL 的参数pretty=true表示以易读的格式【即格式化的JSON】返回

代码语言:javascript复制
[itcats@hdp-01 ~]$ curl -XGET 'http://hdp-01:9200/store/books/abc?pretty=true'
代码语言:javascript复制
{
  "_index" : "store",
  "_type" : "books",
  "_id" : "aa",
  "found" : false
}

通过_source获取指定的字段

只查title,相当于select title ...

代码语言:javascript复制
curl -XGET 'http://hdp-01:9200/store/books/1?pretty=true&_source=title'
代码语言:javascript复制
{
  "_index" : "store",
  "_type" : "books",
  "_id" : "1",
  "_version" : 1,
  "found" : true,
  "_source" : {
    "title" : "Elasticsearch: The Definitive Guide"
  }
}

 只查title和price,相当于select title,price ...

代码语言:javascript复制
curl -XGET 'http://hdp-01:9200/store/books/1?pretty=true&_source=title,price'
代码语言:javascript复制
{
  "_index" : "store",
  "_type" : "books",
  "_id" : "1",
  "_version" : 1,
  "found" : true,
  "_source" : {
    "price" : "49.99",
    "title" : "Elasticsearch: The Definitive Guide"
  }
}

_source全查询,跟没加没区别,相当于select *

代码语言:javascript复制
curl -XGET 'http://hdp-01:9200/store/books/1?pretty=true&_source'
代码语言:javascript复制
{
  "_index" : "store",
  "_type" : "books",
  "_id" : "1",
  "_version" : 1,
  "found" : true,
  "_source" : {
    "title" : "Elasticsearch: The Definitive Guide",
    "name" : {
      "first" : "Zachary",
      "last" : "Tong"
    },
    "publish_date" : "2015-02-06",
    "price" : "49.99"
  }
}

5.3    删除记录

删除记录就是发出 DELETE 请求

代码语言:javascript复制
curl -XDELETE 'http://hdp-01:9200/store/books/1'
代码语言:javascript复制
{"found":true,"_index":"store","_type":"books","_id":"1","_version":4,"result":"deleted","_shards":{"total":2,"successful":2,"failed":0}}

5.4    更新记录

更新记录就是使用 PUT 请求,重新发送一次数据。

代码语言:javascript复制
curl -XPUT 'http://hdp-01:9200/store/books/1' -d '{
  "title": "Elasticsearch: The Definitive Guide",
  "name" : {
    "first" : "Zachary",
    "last" : "Tong"
  },
  "publish_date":"2019-04-26",
  "price":"99.99"
}'
代码语言:javascript复制
{"_index":"store","_type":"books","_id":"1","_version":2,"result":"updated","_shards":{"total":2,"successful":2,"failed":0},"created":false}

可以看到,记录的 Id 没变,但是版本(version)从1变成2操作类型(result)从created变成updatedcreated字段变成false,因为这次不是新建记录。

或者通过 _update  API的方式单独更新你想要更新的

代码语言:javascript复制
curl -XPOST 'http://hdp-01:9200/store/books/1/_update' -d '{
  "doc": {
     "price" : 88.88
  }
}'
代码语言:javascript复制
{"_index":"store","_type":"books","_id":"1","_version":3,"result":"updated","_shards":{"total":2,"successful":2,"failed":0}}

6、数据查询

6.1    返回所有记录

使用 GET 方法,直接请求/Index/Type/_search,就会返回所有记录。

代码语言:javascript复制
curl 'hdp-01:9200/store/books/_search?pretty=true'
代码语言:javascript复制
{
  "took" : 36,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 3,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "store",
        "_type" : "books",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "title" : "Elasticsearch Blueprints",
          "name" : {
            "first" : "Vineeth",
            "last" : "Mohan"
          },
          "publish_date" : "2015-06-06",
          "price" : "35.99"
        }
      },
      {
        "_index" : "store",
        "_type" : "books",
        "_id" : "4",
        "_score" : 1.0,
        "_source" : {
          "title" : "Elasticsearch: The Definitive Guide",
          "author" : "Guide",
          "publish_date" : "2016-02-06",
          "price" : "35.99"
        }
      },
      {
        "_index" : "store",
        "_type" : "books",
        "_id" : "AWpWQ-w1_0DpkACoUwPR",
        "_score" : 1.0,
        "_source" : {
          "title" : "Elasticsearch: The Definitive Guide",
          "name" : {
            "first" : "cn",
            "last" : "itcats"
          },
          "publish_date" : "2019-04-26",
          "price" : "99999"
        }
      }
    ]
  }
}

上面代码中,返回结果的 took字段表示该操作的耗时(单位为毫秒),timed_out字段表示是否超时,hits字段表示命中的记录,里面子字段的含义如下:

代码语言:javascript复制
total:返回记录数,本例是2条。
max_score:最高的匹配程度,本例是1.0。
hits:返回的记录组成的数组。

返回的记录中,每条记录都有一个_score字段,表示匹配的程序,默认是按照这个字段降序排列。

6.2    最简单filter查询

代码语言:javascript复制
SELECT * FROM books WHERE price = 35.99

filtered 查询价格是35.99的【返回的的分是1.0】先查所有再过滤

代码语言:javascript复制
curl -XGET 'http://hdp-01:9200/store/books/_search' -d '{
  "query": {
    "bool": {
      "must": {
        "match_all": {}
      },
      "filter": {
        "term": {
          "price": 35.99
        }
      }
    }
  }
}'

【返回的的分是1.0】直接过滤

代码语言:javascript复制
curl -XGET 'http://hdp-01:9200/store/books/_search' -d '{
  "query": {
    "constant_score": {
      "filter": {
        "term": {
          "price": 35.99
        }
      }
    }
  }
}'

【返回的的分是0.0】直接过滤

代码语言:javascript复制
curl -XGET 'http://hdp-01:9200/store/books/_search' -d '{
    "query": {
        "bool": {
           "filter" : {
                "term" : {
                  "price" : 35.99
                }
            }
        }
    }
}'

【指定多个值】  select * from books where price = 35.99 or price = 99.99

代码语言:javascript复制
curl -XGET 'http://hdp-01:9200/store/books/_search' -d '{
    "query" : {
        "bool" : {
            "filter" : {
                "terms" : {
                    "price" : [35.99, 99.99]
                  }
              }
        }
    }
}'
代码语言:javascript复制
curl -XGET 'http://hdp-01:9200/store/books/_search' -d '{
    "query" : {
        "bool" : {
            "must": {
                "match_all": {}
            },
            "filter" : {
                "terms" : {
                    "price" : [35.99, 99.99]
                  }
              }
        }
    }
}'

6.3    bool过滤查询,可做组合过滤查询

代码语言:javascript复制
SELECT * FROM books WHERE (price = 35.99 OR price = 99.99) AND publish_date != "2016-02-06"

 类似的,Elasticsearch也有 and, or, not这样的组合条件的查询方式  格式如下:

代码语言:javascript复制
  {
    "bool" : {
    "must" :     [],
    "should" :   [],
    "must_not" : [],
    }
  }
代码语言:javascript复制
 must: 条件必须满足,相当于 and
 should: 条件可以满足也可以不满足,相当于 or
 must_not: 条件不需要满足,相当于 not
代码语言:javascript复制
curl -XGET 'http://hdp-01:9200/store/books/_search' -d '{
  "query" : {
    "bool" : {
      "should" : [
        { "term" : {"price" : 35.99}},
        { "term" : {"price" : 99.99}}
      ],
      "must_not" : {
        "term" : {"publish_date" : "2016-02-06"}
      }
    }
  }
}'

6.4    嵌套查询

代码语言:javascript复制
SELECT * FROM books WHERE price = 35.99 OR ( publish_date = "2016-02-06" AND price = 99.99 )
代码语言:javascript复制
curl -XGET 'http://hdp-01:9200/store/books/_search' -d '{
    "query": {
        "bool": {
            "should": [
                {
                    "term": {
                        "price": 35.99
                    }
                },
                {
                    "bool": {
                        "must": [
                            {
                                "term": {
                                    "publish_date": "2016-02-06"
                                }
                            },
                            {
                                "term": {
                                    "price": 99.99
                                }
                            }
                        ]
                    }
                }
            ]
        }
    }
}'

6.5    range范围过滤

代码语言:javascript复制
SELECT * FROM books WHERE price >= 10 AND price < 99
代码语言:javascript复制
gt :  > 大于
lt :  < 小于
gte :  >= 大于等于
lte :  <= 小于等于
代码语言:javascript复制
curl -XGET 'http://hdp-01:9200/store/books/_search' -d '{
    "query": {
        "range" : {
            "price" : {
                "gte" : 10,
                "lt" : 99
            }
        }
    }
}'

name和author都必须包含Guide,并且价钱等于33.99或者188.99

代码语言:javascript复制
curl -XGET 'http://hdp-01:9200/store/books/_search' -d '{
    "query": {
        "bool": {
            "must": {
                "multi_match": {
                    "operator": "and",
                    "fields": [
                        "name",
                        "author"
                    ],
                    "query": "Guide"
                }
            },
            "filter": {
                "terms": {
                    "price": [
                        35.99,
                        188.99
                    ]
                }
            }
        }
    }
}'

7、ES-head插件安装

安装过程详见我的另外一篇博客:《ES图形化管理界面——ElasticSearch-head安装配置教程》

后台进程脚本代码

代码语言:javascript复制
#/bin/sh
npm run start >> .log 2>&1 &

8、ElasticSearch-JavaAPI介绍

代码语言:javascript复制
   <!-- es的客户端-->
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>transport</artifactId>
        <version>5.4.3</version>
    </dependency>

    <!-- 依赖2.x的log4j -->
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-api</artifactId>
        <version>2.8.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-core</artifactId>
        <version>2.8.2</version>
    </dependency>

    <!-- 单元测试 -->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

    Plus  :  ES Java-API操作需要log4j相关的依赖

    在resources/ 下 创建log4j2.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
    </Appenders>
    <Loggers>
        <Root level="error">
            <AppenderRef ref="Console"/>
        </Root>
    </Loggers>
</Configuration>

书写第一个入门程序:

代码语言:javascript复制
public static void main(String[] args) {
        try {
            //设置集群名称
            Settings settings = Settings.builder()
                    .put("cluster.name", "my-es")
                    .build();
            //创建client
            TransportClient client = new PreBuiltTransportClient(settings).addTransportAddresses(
                    //用java访问ES用的端口是9300
                    new InetSocketTransportAddress(InetAddress.getByName("hdp-01"), 9300),
                    new InetSocketTransportAddress(InetAddress.getByName("hdp-01"), 9300),
                    new InetSocketTransportAddress(InetAddress.getByName("hdp-01"), 9300));
            //搜索数据(.actionGet()方法是同步的,没有返回就等待)
            GetResponse response = client.prepareGet("news", "fulltext", "1").execute().actionGet();
            //输出结果
            System.out.println(response);
            //关闭client
            client.close();

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

返回结果:

代码语言:javascript复制
{"_index":"news","_type":"fulltext","_id":"1","_version":1,"found":true,"_source":
{"content":"美国留给伊拉克的是个烂摊子吗"}}

8.1    简单的CURD操作

代码语言:javascript复制
package cn.itcats.es;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.net.InetAddress;
import java.util.Date;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;

public class ESCrud {
    private TransportClient client = null;

    @Before
    public void init() throws Exception {
        //设置集群名称
        Settings settings = Settings.builder()
                .put("cluster.name", "my-es")
                //自动感知的功能(可以通过当前指定的节点获取所有es节点的信息),但至少也写两个节点信息
                .put("client.transport.sniff", true)
                .build();
        //创建client
        client = new PreBuiltTransportClient(settings).addTransportAddresses(
                new InetSocketTransportAddress(InetAddress.getByName("hdp-01"), 9300),
                new InetSocketTransportAddress(InetAddress.getByName("hdp-01"), 9300),
                new InetSocketTransportAddress(InetAddress.getByName("hdp-01"), 9300));
    }
    @Test
    public void testCreate() throws IOException {

        //index:"gamelog"  type:"users"  id:"1"
        IndexResponse response = client.prepareIndex("gamelog", "users", "1")
                .setSource(
                        jsonBuilder()
                                //相当于{
                                .startObject()
                                .field("username", "itcats_cn")
                                .field("gender", "female")
                                .field("birthday", new Date())
                                .field("fv", 9999.99)
                                .field("message", "trying out Elasticsearch")
                                //相当于}
                                .endObject()
                ).get();
    }

    //查找一条
    @Test
    public void testGet() throws IOException {
        GetResponse response = client.prepareGet("gamelog", "users", "1").get();
        System.out.println(response.getSourceAsString());
    }

    //查找多条
    @Test
    public void testMultiGet() throws IOException {
        MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
                .add("gamelog", "users", "1")
                .add("gamelog", "users", "2", "3")
                .add("news", "fulltext", "1")
                .get();

        for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
            GetResponse response = itemResponse.getResponse();
            if (response.isExists()) {
                String json = response.getSourceAsString();
                System.out.println(json);
            }
        }
    }

    @Test
    public void testUpdate() throws Exception {
        UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index("gamelog");
        updateRequest.type("users");
        updateRequest.id("2");
        updateRequest.doc(
                jsonBuilder()
                        .startObject()
                        .field("fv", 999.9)
                        .endObject());
        client.update(updateRequest).get();
    }

    @Test
    public void testDelete() {
        DeleteResponse response = client.prepareDelete("gamelog", "users", "2").get();
        System.out.println(response);
    }

    @Test
    public void testDeleteByQuery() {
        BulkByScrollResponse response =
                DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
                        //指定查询条件
                        .filter(QueryBuilders.matchQuery("username", "itcats"))
                        //指定索引名称
                        .source("gamelog")
                        .get();

        long deleted = response.getDeleted();
        System.out.println(deleted);
    }

    //异步删除
    @Test
    public void testDeleteByQueryAsync() {
        DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
                .filter(QueryBuilders.matchQuery("gender", "male"))
                .source("gamelog")
                .execute(new ActionListener<BulkByScrollResponse>() {
                    @Override   //execute(args) args内传递了一个监听器,若成功查找到结果则回调onResponse()
                    public void onResponse(BulkByScrollResponse response) {
                        long deleted = response.getDeleted();
                        System.out.println("数据删除了");
                        System.out.println(deleted);
                    }

                    @Override  //未成功查找到时则回调onFailure()
                    public void onFailure(Exception e) {
                        e.printStackTrace();
                    }
                });
        try {
            System.out.println("异步删除");
            //Thread.sleep(10000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Test
    public void testRange() {
        QueryBuilder qb = rangeQuery("fv")
                // [88.99, 10000)
                .from(88.99)
                .to(10000)   //注意字符串的坑
                .includeLower(true)
                .includeUpper(false);

        SearchResponse response = client.prepareSearch("gamelog").setQuery(qb).get();

        System.out.println(response);
    }
}

8.2    JavaAPI创建Mapping

代码语言:javascript复制
public class AdminAPI {
    private TransportClient client = null;

    //在所有的测试方法之前执行
    @Before
    public void init() throws Exception {
        //设置集群名称
        Settings settings = Settings.builder().put("cluster.name", "my-es").build();
        //创建client
        client = new PreBuiltTransportClient(settings).addTransportAddresses(
                new InetSocketTransportAddress(InetAddress.getByName("hdp-01"), 9300),
                new InetSocketTransportAddress(InetAddress.getByName("hdp-01"), 9300),
                new InetSocketTransportAddress(InetAddress.getByName("hdp-01"), 9300));
    }

    //创建索引,并配置一些参数
    @Test
    public void createIndexWithSettings() {
        //获取Admin的API
        AdminClient admin = client.admin();
        //使用Admin API对索引进行操作
        IndicesAdminClient indices = admin.indices();
        //准备创建索引
        indices.prepareCreate("gamelog")
                //配置索引参数
                .setSettings(
                        //参数配置器
                        Settings.builder()//指定索引分区的数量
                                .put("index.number_of_shards", 4)
                                //指定索引副本的数量(注意:不包括本身,如果设置数据存储副本为2,实际上数据存储了3份)
                                .put("index.number_of_replicas", 2)
                )
                //真正执行
                .get();
    }

    //跟索引添加mapping信息(给表添加schema信息)
    @Test
    public void putMapping() {
        //创建索引
        client.admin().indices().prepareCreate("twitter")
                //创建一个type,并指定type中属性的名字和类型
                .addMapping("tweet",
                        "{n"  
                                "      "tweet": {n"  
                                "      "properties": {n"  
                                "        "message": {n"  
                                "          "type": "string"n"  
                                "        }n"  
                                "      }n"  
                                "    }n"  
                                "  }")
                .get();
    }

    /**
     * 你可以通过dynamic设置来控制这一行为,它能够接受以下的选项:
     * true:默认值。动态添加字段
     * false:忽略新字段
     * strict:如果碰到陌生字段,抛出异常
     * @throws IOException
     */
    @Test
    public void testSettingsMappings() throws IOException {
        //1:settings
        HashMap<String, Object> settings_map = new HashMap<String, Object>(2);
        settings_map.put("number_of_shards", 3);
        settings_map.put("number_of_replicas", 2);

        //2:mappings(映射、schema)
        XContentBuilder builder = XContentFactory.jsonBuilder()
                .startObject()
                    .field("dynamic", "true")
                    //设置type中的属性
                    .startObject("properties")
                        //id属性
                        .startObject("num")
                            //类型是integer
                            .field("type", "integer")
                            //不分词,但是建索引
                            .field("index", "not_analyzed")
                            //在文档中存储
                            .field("store", "yes")
                        .endObject()
                        //name属性
                        .startObject("name")
                            //string类型
                            .field("type", "string")
                            //在文档中存储
                            .field("store", "yes")
                            //建立索引
                            .field("index", "analyzed")
                            //使用ik_smart进行分词
                            .field("analyzer", "ik_smart")
                        .endObject()
                    .endObject()
                .endObject();

        CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("user_info");
        //管理索引(user_info)然后关联type(user)
        prepareCreate.setSettings(settings_map).addMapping("user", builder).get();
    }


    /**
     * index这个属性,no代表不建索引
     * not_analyzed,建索引不分词
     * analyzed 即分词,又建立索引
     * expected [no], [not_analyzed] or [analyzed]
     * @throws IOException
     */

    @Test
    public void testSettingsPlayerMappings() throws IOException {
        //1:settings
        HashMap<String, Object> settings_map = new HashMap<String, Object>(2);
        settings_map.put("number_of_shards", 3);
        settings_map.put("number_of_replicas", 1);

        //2:mappings
        XContentBuilder builder = XContentFactory.jsonBuilder()
                .startObject()//
                    .field("dynamic", "true")
                    .startObject("properties")
                        .startObject("id")
                            .field("type", "integer")
                            .field("store", "yes")
                        .endObject()
                        .startObject("name")
                            .field("type", "string")
                            .field("index", "not_analyzed")
                        .endObject()
                        .startObject("age")
                            .field("type", "integer")
                        .endObject()
                        .startObject("salary")
                            .field("type", "integer")
                        .endObject()
                        .startObject("team")
                            .field("type", "string")
                            .field("index", "not_analyzed")
                        .endObject()
                        .startObject("position")
                            .field("type", "string")
                            .field("index", "not_analyzed")
                        .endObject()
                        .startObject("description")
                            .field("type", "string")
                            .field("store", "no")
                            .field("index", "analyzed")
                            .field("analyzer", "ik_smart")
                        .endObject()
                        .startObject("addr")
                            .field("type", "string")
                            .field("store", "yes")
                            .field("index", "analyzed")
                            .field("analyzer", "ik_smart")
                        .endObject()
                    .endObject()
                .endObject();

        CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("player_info");
        prepareCreate.setSettings(settings_map).addMapping("player", builder).get();
    }
}

8.3    JavaAPI对ES做复杂的聚合查询

代码语言:javascript复制
/**
     * https://elasticsearch.cn/article/102
     *
     * select team, count(*) as player_count from player group by team;
     */
    @Test
    public void testAgg1() {

        //指定索引和type
        SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
        //按team分组然后聚合,但是并没有指定聚合函数,别名为:player_count  对team字段分组
        TermsAggregationBuilder teamAgg = AggregationBuilders.terms("player_count").field("team");
        //添加聚合器
        builder.addAggregation(teamAgg);
        //触发
        SearchResponse response = builder.execute().actionGet();
        //System.out.println(response);
        //将返回的结果放入到一个map中
        Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
//        Set<String> keys = aggMap.keySet();
//
//        for (String key: keys) {
//            System.out.println(key);
//        }

        //取出聚合属性
        StringTerms terms = (StringTerms) aggMap.get("player_count");

        //依次迭代出分组聚合数据
//        for (Terms.Bucket bucket : terms.getBuckets()) {
//            //分组的名字
//            String team = (String) bucket.getKey();
//            //count,分组后一个组有多少数据
//            long count = bucket.getDocCount();
//            System.out.println(team   " "   count);
//        }

        Iterator<Terms.Bucket> teamBucketIt = terms.getBuckets().iterator();
        while (teamBucketIt .hasNext()) {
            Terms.Bucket bucket = teamBucketIt.next();
            String team = (String) bucket.getKey();

            long count = bucket.getDocCount();

            System.out.println(team   " "   count);
        }
    }

    /**
     * select team, position, count(*) as pos_count from player group by team, position;
     */
    @Test
    public void testAgg2() {
        SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
        //指定别名和分组的字段
        TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
        TermsAggregationBuilder posAgg= AggregationBuilders.terms("pos_count").field("position");
        //添加两个聚合构建器
        builder.addAggregation(teamAgg.subAggregation(posAgg));
        //执行查询
        SearchResponse response = builder.execute().actionGet();
        //将查询结果放入map中
        Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
        //根据属性名到map中查找
        StringTerms teams = (StringTerms) aggMap.get("team_name");
        //循环查找结果
        for (Terms.Bucket teamBucket : teams.getBuckets()) {
            //先按球队进行分组
            String team = (String) teamBucket.getKey();
            Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
            StringTerms positions = (StringTerms) subAggMap.get("pos_count");
            //因为一个球队有很多位置,那么还要依次拿出位置信息
            for (Terms.Bucket posBucket : positions.getBuckets()) {
                //拿到位置的名字
                String pos = (String) posBucket.getKey();
                //拿出该位置的数量
                long docCount = posBucket.getDocCount();
                //打印球队,位置,人数
                System.out.println(team   " "   pos   " "   docCount);
            }
        }
    }


    /**
     * select team, max(age) as max_age from player group by team;
     */
    @Test
    public void testAgg3() {
        SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
        //指定按球队进行分组
        TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
        //指定分组求最大值
        MaxAggregationBuilder maxAgg = AggregationBuilders.max("max_age").field("age");
        //分组后求最大值
        builder.addAggregation(teamAgg.subAggregation(maxAgg));
        //查询
        SearchResponse response = builder.execute().actionGet();
        Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
        //根据team属性,获取map中的内容
        StringTerms teams = (StringTerms) aggMap.get("team_name");
        for (Terms.Bucket teamBucket : teams.getBuckets()) {
            //分组的属性名
            String team = (String) teamBucket.getKey();
            //在将聚合后取最大值的内容取出来放到map中
            Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
            //取分组后的最大值
            InternalMax ages = (InternalMax)subAggMap.get("max_age");
            double max = ages.getValue();
            System.out.println(team   " "   max);
        }
    }

    /**
     * select team, avg(age) as avg_age, sum(salary) as total_salary from player group by team;
     */
    @Test
    public void testAgg4() {
        SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
        //指定分组字段
        TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team");
        //指定聚合函数是求平均数据
        AvgAggregationBuilder avgAgg = AggregationBuilders.avg("avg_age").field("age");
        //指定另外一个聚合函数是求和
        SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
        //分组的聚合器关联了两个聚合函数
        builder.addAggregation(termsAgg.subAggregation(avgAgg).subAggregation(sumAgg));
        SearchResponse response = builder.execute().actionGet();
        Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
        //按分组的名字取出数据
        StringTerms teams = (StringTerms) aggMap.get("team_name");
        for (Terms.Bucket teamBucket : teams.getBuckets()) {
            //获取球队名字
            String team = (String) teamBucket.getKey();
            Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
            //根据别名取出平均年龄
            InternalAvg avgAge = (InternalAvg)subAggMap.get("avg_age");
            //根据别名取出薪水总和
            InternalSum totalSalary = (InternalSum)subAggMap.get("total_salary");
            double avgAgeValue = avgAge.getValue();
            double totalSalaryValue = totalSalary.getValue();
            System.out.println(team   " "   avgAgeValue   " "   totalSalaryValue);
        }
    }


    /**
     * select team, sum(salary) as total_salary from player group by team order by total_salary desc;
     */
    @Test
    public void testAgg5() {
        SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
        //按team进行分组,然后指定排序规则
        TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team").order(Terms.Order.aggregation("total_salary ", true));
        SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
        builder.addAggregation(termsAgg.subAggregation(sumAgg));
        SearchResponse response = builder.execute().actionGet();
        Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
        StringTerms teams = (StringTerms) aggMap.get("team_name");
        for (Terms.Bucket teamBucket : teams.getBuckets()) {
            String team = (String) teamBucket.getKey();
            Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
            InternalSum totalSalary = (InternalSum)subAggMap.get("total_salary");
            double totalSalaryValue = totalSalary.getValue();
            System.out.println(team   " "   totalSalaryValue);
        }
    }

0 人点赞