elastic是go语言中与ElasticSearch交互使用最多的一个库。
首先要根据ElasticSearch版本选择对应的库:
Elasticsearch version | Elastic version | Package URL | Remarks |
---|---|---|---|
7.x | 7.0 | github.com/olivere/elastic/v7 (source doc) | Use Go modules. |
6.x | 6.0 | github.com/olivere/elastic (source doc) | Use a dependency manager |
5.x | 5.0 | gopkg.in/olivere/elastic.v5 (source doc) | Actively maintained. |
2.x | 3.0 | gopkg.in/olivere/elastic.v3 (source doc) | Deprecated. Please update. |
1.x | 2.0 | gopkg.in/olivere/elastic.v2 (source doc) | Deprecated. Please update. |
0.9-1.3 | 1.0 | gopkg.in/olivere/elastic.v1 (source doc) | Deprecated. Please update. |
下面以7.0为例:
下载安装
代码语言:javascript复制go get gopkg.in/olivere/elastic.v7
初始化
代码语言:javascript复制esUrl = "http://127.0.0.1:9200"
//初始化
func init() {
var err error
// sniff: false, 表示关闭集群,默认是开启的
client, err = elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(host))
if err != nil {
panic(err)
}
_,_,err = client.Ping(host).Do(context.Background())
if err != nil {
panic(err)
}
//fmt.Printf("Elasticsearch returned with code %d and version %sn", code, info.Version.Number)
_,err = client.ElasticsearchVersion(host)
if err != nil {
panic(err)
}
//fmt.Printf("Elasticsearch version %sn", esversion)
}
配置client时还有以下参数:
- elastic.SetURL(url) 用来设置ES服务地址,如果是本地,就是127.0.0.1:9200。支持多个地址,用逗号分隔即可。
- elastic.SetBasicAuth("user", "secret") 这个是基于http base auth 验证机制的账号密码。
- elastic.SetGzip(true) 启动gzip压缩
- elastic.SetHealthcheckInterval(10*time.Second) 用来设置监控检查时间间隔
- elastic.SetMaxRetries(5) 设置请求失败最大重试次数,v7版本以后已被弃用
- elastic.SetSniff(false) 允许指定弹性是否应该定期检查集群(默认为true)
- elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)) 设置错误日志输出
- elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)) 设置info日志输出
创建索引
上一步,我们创建了client,接下来我们就要创建对应的索引以及mapping。根据开始介绍的功能,我们来设计我们的mapping结构:
代码语言:javascript复制mappingTpl = `{
"mappings":{
"properties":{
"id": { "type": "long" },
"name": { "type": "keyword" },
"sex": { "type": "text" },
"married": { "type": "keyword" },
"age": { "type": "long" },
"interests": { "type": "keyword" },
}
}
}`
索引设计为:index =user。 设计好了index及mapping后,我们开始编写代码进行创建:
代码语言:javascript复制func initIndex() {
ctx := context.Background()
exists, err := client.IndexExists(es.index).Do(ctx)
if err != nil {
fmt.Printf("userEs init exist failed err is %sn", err)
return
}
if !exists {
_, err := client.CreateIndex(es.index).Body(es.mapping).Do(ctx)
if err != nil {
fmt.Printf("index init failed err is %sn", err)
return
}
}
}
这里我们首先判断es中是否已经存在要创建的索引,不存在,调用CreateIndex进行创建。
添加文档
两种方式,API分别为BodyJson和BodyString(观察来说,BodyString就是用反引号包裹的es原生语法)
代码语言:javascript复制func insertDoc(){
// 添加文档方法1
user1 := User{Name:"bob",Sex:"male",Married:false,Age:23}
put1, err := client.Index().Index("user").BodyJson(user1).Id("1").Do(ctx)
if err != nil{
panic(err)
}
fmt.Printf("Indexed user %s to index %s, type %sn", put1.Id, put1.Index, put1.Type) //Indexed user 1 to index user, type _doc
// 添加文档方法2
user2 := `{"name":"mike","sex":"male","married":true,"age":"22"}`
put2, err := client.Index().Index("user").BodyString(user2).Do(ctx)// 不指定id
if err != nil{
panic(err)
}
fmt.Printf("Indexed user %s to index %s, type %sn", put1.Id, put1.Index, put1.Type) //Indexed user 1 to index user, type _doc
}
查询文档
代码语言:javascript复制func queryDoc(){
// 使用文档id查询
get1, err := client.Get().Index("user").Id("1").Do(ctx)
if err != nil{
panic(err)
}
if get1.Found{
fmt.Printf("Got document %s in version %d from index %s, type %sn", get1.Id, get1.Version, get1.Index, get1.Type)
// 数据永久化,Flush to make sure the documents got written.将文档涮入磁盘
_, err = client.Flush().Index("user").Do(ctx)
if err != nil {
panic(err)
}
// 按"term"搜索Search with a term query
termQuery := elastic.NewTermQuery("name", "mike")
searchResult, err := client.Search().
Index("user"). // 搜索的索引"user"
Query(termQuery). // specify the query
Sort("age", true). //按字段"age"排序,升序排列
From(0).Size(10). // 分页,单页显示10条
Pretty(true). // pretty print request and response JSON以json的形式返回信息
Do(ctx) // 执行
if err != nil {
// Handle error
panic(err)
}
fmt.Printf("Query took %d millisecondsn", searchResult.TookInMillis)// Query took 3 milliseconds
var user User
Each是一个简便函数,此函数忽略了错误输出
for _, item1 := range searchResult.Each(reflect.TypeOf(user)) {
if u, ok := item1.(User); ok {
fmt.Printf("Person by %s,age:%d,married:%t,Sex:%sn", u.Name, u.Age, u.Married,u.Sex) //Person by bob,age:23,married:false,Sex:male
}
}
// 搜索文档方法2
// 使用hits,获得更详细的输出结果
if searchResult.Hits.TotalHits.Value >0{
fmt.Printf("找到的数据总数是 %d n", searchResult.Hits.TotalHits.Value)
for _,hits := range searchResult.Hits.Hits{
u :=User{}
err := json.Unmarshal([]byte(hits.Source), &u)
if err != nil{
fmt.Println("反序列化失败",err)
}
fmt.Printf("User by %s,age:%d,married:%t,Sex:%sn", u.Name, u.Age, u.Married,u.Sex)
}
}else {
fmt.Println("没有搜到用户")
}
}
更新文档
代码语言:javascript复制func updateDoc(){
// 根据id更新文档 update
update, err := client.Update().Index("user").Id("1").
Script(elastic.NewScriptInline("ctx._source.age = params.num").Lang("painless").Param("num", 1)).
//Upsert(map[string]interface{}{"created": "2020-06-17"}). // 插入未初始化的字段value
Do(ctx)
if err != nil {
panic(err)
}
fmt.Printf("New version of user %q is now %dn", update.Id, update.Version)
// 根据查出来的结果更新方法2
termQuery := elastic.NewTermQuery("name", "bob")
update,err := client.UpdateByQuery("user").Query(termQuery).
Script(elastic.NewScriptInline("ctx._source.age = params.num").Lang("painless").Param("num", 1)).
Do(ctx)
if err != nil{
panic(err)
}
fmt.Printf("New version of user %q is now %dn", update.Id, update.Version)
}
删除文档
代码语言:javascript复制func deleteDoc(){
termQuery := elastic.NewTermQuery("name", "mike")
_, err = client.DeleteByQuery().Index("user"). // search in index "user"
Query(termQuery). // specify the query
Do(ctx)
if err != nil {
// Handle error
panic(err)
}
// 按文档id删除
_,err = client.Delete().Index("user").Id("2").Do(ctx)
if err != nil{
panic(err)
}
// 删除索引
_,err = client.DeleteIndex("user").Do(ctx)
if err != nil{
panic(err)
}
}
搜索文档
代码语言:javascript复制func searchDoc(){
var res *elastic.SearchResult
var err error
//取所有
res, err = client.Search("user").Type("employee").Do(context.Background())
printEmployee(res, err)
//字段相等
q := elastic.NewQueryStringQuery("name:bob")
res, err = client.Search("user").Type("employee").Query(q).Do(context.Background())
if err != nil {
println(err.Error())
}
printEmployee(res, err)
//条件查询
//年龄大于30岁的
boolQ := elastic.NewBoolQuery()
boolQ.Must(elastic.NewMatchQuery("name", "smith"))
boolQ.Filter(elastic.NewRangeQuery("age").Gt(30))
res, err = client.Search("user").Type("employee").Query(q).Do(context.Background())
printDoc(res, err)
//短语搜索 搜索interests字段中有 rock climbing
matchPhraseQuery := elastic.NewMatchPhraseQuery("interests", "rock climbing")
res, err = client.Search("user").Type("employee").Query(matchPhraseQuery).Do(context.Background())
printDoc(res, err)
//分析 interests
aggs := elastic.NewTermsAggregation().Field("interests")
res, err = client.Search("user").Type("employee").Aggregation("all_interests", aggs).Do(context.Background())
printDoc(res, err)
}
//打印查询到的文档
func printDoc(res *elastic.SearchResult, err error) {
if err != nil {
print(err.Error())
return
}
var typ Employee
for _, item := range res.Each(reflect.TypeOf(typ)) { //从搜索结果中取数据的方法
t := item.(User)
fmt.Printf("%#vn", t)
}
}
////简单分页
func list(size,page int) {
if size < 0 || page < 1 {
fmt.Printf("param error")
return
}
res,err := client.Search("user").
Type("employee").
Size(size).
From((page-1)*size).
Do(context.Background())
printEmployee(res, err)
}