Logstash 是一款强大的数据传输工具,支持丰富多样的数据输入源与数据输出端,并且可以在管道中进行数据处理。Logstash的一条完整的数据传输链路就是一个管道,Logstash支持多个管道的自定义配置和并行。
下面是Demo演示:
Demo1
- 仅传输符合Query的数据到目标索引
Demo2
- 将符合通配符匹配规则的多个索引数据输出到同一个目标索引中
Demo3
- 将源端索引的不同type的数据分别输出到不同索引中,此场景通常用于ES 6版本集群索引迁移到ES 7集群
Demo4
- 将源端索引的单个字段数据拆分到多个字段
Demo5
- 如何实现ES中不同索引间的join操作
Query过滤
代码语言:javascript复制input {
elasticsearch {
hosts => ["10.0.xx.xx:9200"]
user => "elastic"
password => "passwd"
index => "test"
query => '{
"query": {
"range": {
"@timestamp": {
"gte": "2023-09-26T07:20:48.124111Z",
"lte": "2023-09-26T08:20:48.126163Z"
}
}
}
}'
docinfo => true
size => 5000
scroll => "5m"
}
}
output {
elasticsearch {
hosts => ["10.0.xx.xx:9200"]
user => "elastic"
password => "passwd"
index => "result"
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"
}
}
通配符匹配
将匹配符合规则的源端索引输出到相同index
代码语言:javascript复制input {
elasticsearch {
hosts => ["10.0.xx.xx:9200"]
user => "elastic"
password => "passwd"
index => "test1"
docinfo => true
size => 5000
scroll => "5m"
}
}
#输出所有tes开头的索引
output {
if [@metadata][_index] =~ /^tes/ {
elasticsearch {
hosts => ["http://172.16.xx.xx:9200"]
user => "elastic"
password => "passwd"
index => "result"
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"
}
}
}
将多Type索引的不同Type输出到不同索引中
代码语言:javascript复制input {
elasticsearch {
hosts => ["10.0.xx.xx:9200"]
user => "elastic"
password => "passwd"
index => "test"
docinfo => true
size => 5000
scroll => "5m"
}
}
output {
if [@metadata][_type] == "type1" {
elasticsearch {
hosts => ["http://10.0.xx.xx:9200"]
user => "elastic"
password => "passwd"
index => "type1"
document_type => "_doc"
document_id => "%{[@metadata][_id]}"
}
}else if [@metadata][_type] == "type2"{
elasticsearch {
hosts => ["http://10.0.xx.xx:9200"]
user => "elastic"
password => "passwd"
index => "type2"
document_type => "_doc"
document_id => "%{[@metadata][_id]}"
}
}
}
索引单个字段数据拆分为多字段
某个索引下面的字段 c : abc_123 想拆成两个字段 c1 :abc c2: 123
代码语言:javascript复制input {
elasticsearch {
hosts => ["10.0.xx.xx:9200"]
user => "elastic"
password => "passwd"
index => "test1"
docinfo => true
size => 5000
scroll => "5m"
}
}
filter {
mutate {
split => ["c","_"]
}
if [c][0] {
mutate {
add_field => {
"c1" => "%{[c][0]}"
}
}
}
if [c][1] {
mutate {
add_field => {
"c2" => "%{[c][1]}"
}
}
}
}
output {
elasticsearch {
hosts => ["http://10.0.xx.xx:9200"]
user => "elastic"
password => "passwd"
index => "result"
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"
}
}
join
Logstash中无法实现多个索引间的聚合操作,ES中可以通过父子文档关系达到聚合效果
下面语句在ES上实现聚合操作(需要拆分其他多个索引数据到同一索引中)
1. 创建索引
关联字段为field3,field1为索引1独有数据,field2为索引2独有数据
field3作为父文档,关联子文档field1和field2
代码语言:javascript复制PUT my_index
{
"mappings": {
"properties": {
"my_join_field": {
"type": "join",
"relations": {
"field3": ["field1", "field2"]
}
}
}
}
}
2. 插入数据
指定插入的field2,field3父文档field3 id为1达到关联效果
代码语言:javascript复制PUT my_index/_doc/1
{
"my_join_field":"field3"
}
PUT my_index/_doc/2?routing=1
{
"my_join_field": {
"name": "field2",
"parent": "1"
}
}
PUT my_index/_doc/3?routing=1
{
"my_join_field": {
"name": "field1",
"parent": "1"
}
}
3. 查询数据
对父文档进行查询即可找到所有field3相同的数据
代码语言:javascript复制GET my_index/_search
{
"query": {
"has_parent": {
"parent_type": "field3",
"query": {
"match": {
"_id": "1"
}
}
}
}
}
详细使用可参考Parent Id Query | Elasticsearch Guide [6.8] | Elastic
其他logstash使用见Elasticsearch filter plugin | Logstash Reference [6.8] | Elastic
我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!