painless数字类型转换_笔记四十五: Ingest Pipeline 与 Painless Script

2022-07-23 16:01:19 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

需求:修复与增强写入的数据

Tags 字段中,逗号分割的文本应该是数组,而不是一个字符串需求:后期需要对 Tags 进行 Aggregation 统计

Ingest Node

Elasticsearch 5.0 后,引入的一种新的节点类型。默认配置下,每个节点都是 Ingest Node具有预处理数据的能力,可拦截 Index 或者 Bulck API 的请求

对数据进行转换,并重新返回给 Index 和 Bluck API

无需 Logstash ,就可以进行数据的预处理,例如为某个字段设置默认值;重命名某个字段的字段名;对字段值进行 Split 操作

支持设置 Painless 脚本,对数据进行更加复杂的加工

Pipeline & Processor

Pipeline – 管道会对通过的数据(文档),按照顺序进行加工

Processor – Elasticsearch 对一些加工的行为进行了抽象包装Elasticsearch 有很多内置的 Processors。也支持通过插件的方式,实现自己的 Processsor

使用 Pipeline 切分字符串

# 测试split tags

POST _ingest/pipeline/_simulate

{

“pipeline”: {

“description”: “to split blog tags”,

“processors”: [

{

“split”: {

“field”: “tags”,

“separator”: “,”

}

}

]

},

“docs”: [

{

“_index”: “index”,

“_id”: “id”,

“_source”: {

“title”: “Introducing big data……”,

“tags”: “hadoop,elasticsearch,spark”,

“content”: “You konw, for big data”

}

},

{

“_index”: “index”,

“_id”: “idxx”,

“_source”: {

“title”: “Introducing cloud computering”,

“tags”: “openstack,k8s”,

“content”: “You konw, for cloud”

}

}

]

}

为文档增加字段

#同时为文档,增加一个字段。blog查看量

POST _ingest/pipeline/_simulate

{

“pipeline”: {

“description”: “to split blog tags”,

“processors”: [

{

“split”: {

“field”: “tags”,

“separator”: “,”

}

},

{

“set”: {

“field”: “views”,

“value”: 0

}

}

]

},

“docs”: [

{

“_index”: “index”,

“_id”: “id”,

“_source”: {

“title”: “Introducing big data……”,

“tags”: “hadoop,elasticsearch,spark”,

“content”: “You konw, for big data”

}

},

{

“_index”: “index”,

“_id”: “idxx”,

“_source”: {

“title”: “Introducing cloud computering”,

“tags”: “openstack,k8s”,

“content”: “You konw, for cloud”

}

}

]

}

Pipeline API

添加 Pipeline 并测试

# 为ES添加一个 Pipeline

PUT _ingest/pipeline/blog_pipeline

{

“description”: “a blog pipeline”,

“processors”: [

{

“split”: {

“field”: “tags”,

“separator”: “,”

}

},

{

“set”: {

“field”: “views”,

“value”: 0

}

}

]

}

#测试pipeline

POST _ingest/pipeline/blog_pipeline/_simulate

{

“docs”: [

{

“_source”: {

“title”: “Introducing cloud computering”,

“tags”: “openstack,k8s”,

“content”: “You konw, for cloud”

}

}

]

}

Index & Update By Query

#不使用pipeline更新数据

PUT tech_blogs/_doc/1

{

“title”:”Introducing big data……”,

“tags”:”hadoop,elasticsearch,spark”,

“content”:”You konw, for big data”

}

#使用pipeline更新数据

PUT tech_blogs/_doc/2?pipeline=blog_pipeline

{

“title”: “Introducing cloud computering”,

“tags”: “openstack,k8s”,

“content”: “You konw, for cloud”

}

#查看两条数据,一条被处理,一条未被处理

POST tech_blogs/_search

{}

#update_by_query 会导致错误

POST tech_blogs/_update_by_query?pipeline=blog_pipeline

{

}

#增加update_by_query的条件

POST tech_blogs/_update_by_query?pipeline=blog_pipeline

{

“query”: {

“bool”: {

“must_not”: {

“exists”: {

“field”: “views”

}

}

}

}

}

一些内置的 Processors

https://www.elastic.co/guide/en/elasticsea…Split Processor (例如:将给定字段分成一个数组)

Remove / Rename Processor (移除一个重命名字段)

Append(为商品增加一个新的标签)

Convert (将商品价格,从字符串转换成 float 类型)

Date / JSON (日期格式转换,字符串转 JSON 对象)

Date Index Name Processor (将通过该处理器的文档,分配到指定时间格式的索引中)

Fail Processor (一旦出现异常,该 Pipeline 指定的错误信息能返回给用户)

Foreach Process (数组字段,数组的每个元素都会使用到一个相同的处理器)

Grok Processor (日志的日志格式切割)

Gsub / Join / Split (字符串替换、数组转字符串、字符串转数组)

Lowercase / Upcase(大小写转换)

Ingest Node v.s Logstash

|| Logstash| Ingest Node|

|–|–|

|数据输入与输出|支持从不同的数据源读取,并写入不同的数据源|支持从ES REST API 获取数据,并且写入ES|

|数据源缓冲| 实现了简单的数据队列,支持重写| 不支持缓冲|

|数据处理| 支持大量的的插件,也支持定制开发|内置的插件,可以开发 Plugin 进行扩展(Plugin 更新需要重启)|

|配置和使用| 增加了一定的架构复杂度| 无需额外部署|

https://www.elastic.co/cn/blog/should-i-us…

Painless 简介

自 ES 5.x 后引入,专门为 ES 设置,扩展了 Java 的语法

6.0 开始,ES 只支持 Painless。Grooby ,JavaScript 和 Python 都不在支持

Painless 支持所有的 Java 的数据类型及 Java API 子集

Painless Script 具备以下特性高性能 、 安全

支持显示类型或者动态定义类型

Painless 的用途

可以对文档字段进行加工处理更新或者删除字段,处理数据聚合操作

Script Field: 对返回的字段提前进行计算

Function Score:对文档的算分进行处理

在Ingest Pipeline 中执行脚本

在Reindex API,Update By Query 时,对数据进行处理

通过 Painless 脚本访问字段

上线文

语法Ingestion

ctx.field_name

Update

ctx._source.field_name

Search & Aggregation

doc{“field_name”]

案例1:Script Processsor

# 增加一个 Script Prcessor

POST _ingest/pipeline/_simulate

{

“pipeline”: {

“description”: “to split blog tags”,

“processors”: [

{

“split”: {

“field”: “tags”,

“separator”: “,”

}

},

{

“script”: {

“source”: “””

if(ctx.containsKey(“content”)){

ctx.content_length = ctx.content.length();

}else{

ctx.content_length=0;

}

“””

}

},

{

“set”: {

“field”: “views”,

“value”: 0

}

}

]

},

“docs”: [

{

“_index”: “index”,

“_id”: “id”,

“_source”: {

“title”: “Introducing big data……”,

“tags”: “hadoop,elasticsearch,spark”,

“content”: “You konw, for big data”

}

},

{

“_index”: “index”,

“_id”: “idxx”,

“_source”: {

“title”: “Introducing cloud computering”,

“tags”: “openstack,k8s”,

“content”: “You konw, for cloud”

}

}

]

}

案例2:文档更新计数

DELETE tech_blogs

PUT tech_blogs/_doc/1

{

“title”:”Introducing big data……”,

“tags”:”hadoop,elasticsearch,spark”,

“content”:”You konw, for big data”,

“views”:0

}

POST tech_blogs/_update/1

{

“script”: {

“source”: “ctx._source.views = params.new_views”,

“params”: {

“new_views”:100

}

}

}

# 查看views计数

POST tech_blogs/_search

案例3:搜索时的Script 字段

GET tech_blogs/_search

{

“script_fields”: {

“rnd_views”: {

“script”: {

“lang”: “painless”,

“source”: “””

java.util.Random rnd = new Random();

doc[‘views’].value rnd.nextInt(1000);

“””

}

}

},

“query”: {

“match_all”: {}

}

}

Script :Inline v.s Stored

#保存脚本在 Cluster State

POST _scripts/update_views

{

“script”:{

“lang”: “painless”,

“source”: “ctx._source.views = params.new_views”

}

}

POST tech_blogs/_update/1

{

“script”: {

“id”: “update_views”,

“params”: {

“new_views”:1000

}

}

}

脚本缓存

编译的开销相较大

Elasticsearch 会将甲苯编译后缓存在 Cache 中Inline scripts 和 Stored Scripts 都会被缓存

默认缓存 100个脚本

本节知识点

概念讲解:Ingest Node,Pipeline 与 Processor

Ingest Node 与 Logstash 的⽐较

Pipeline 的 相关操作 / 内置 Processor 讲解与演示

Painless 脚本与Ingestion (Pipeline)

Update

Search & Aggregation

本作品采用《CC 协议》,转载必须注明作者和本文链接

快乐就是解决一个又一个的问题!

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/126741.html原文链接:https://javaforall.cn

0 人点赞