Logstash实现数据处理

2023-11-17 14:30:18 浏览数 (2)

Logstash 是一款强大的数据传输工具,支持丰富多样的数据输入源与数据输出端,并且可以在管道中进行数据处理。Logstash的一条完整的数据传输链路就是一个管道,Logstash支持多个管道的自定义配置和并行。

下面是Demo演示:

Demo1

  • 仅传输符合Query的数据到目标索引

Demo2

  • 符合通配符匹配规则的多个索引数据输出到同一个目标索引中

Demo3

  • 将源端索引的不同type的数据分别输出到不同索引中,此场景通常用于ES 6版本集群索引迁移到ES 7集群

Demo4

  • 将源端索引的单个字段数据拆分到多个字段

Demo5

  • 如何实现ES中不同索引间的join操作

Query过滤

代码语言:javascript复制
input {
  elasticsearch {
    hosts => ["10.0.xx.xx:9200"]
    user => "elastic"
    password => "passwd"
    index => "test"
    query => '{
  "query": {
    "range": {
      "@timestamp": {
        "gte": "2023-09-26T07:20:48.124111Z",
        "lte": "2023-09-26T08:20:48.126163Z"
      }
    }
  }
}'
    docinfo => true
    size => 5000
    scroll => "5m"
  }
}
output {
    elasticsearch {
        hosts => ["10.0.xx.xx:9200"]
        user => "elastic"
        password => "passwd"
        index => "result"
        document_type => "%{[@metadata][_type]}"
        document_id => "%{[@metadata][_id]}"
    }
}

通配符匹配

将匹配符合规则的源端索引输出到相同index

代码语言:javascript复制
input {
  elasticsearch {
    hosts => ["10.0.xx.xx:9200"]
    user => "elastic"
    password => "passwd"
    index => "test1"
    docinfo => true
    size => 5000
    scroll => "5m"
  }
}
#输出所有tes开头的索引
output {
  if [@metadata][_index] =~ /^tes/ {
    elasticsearch {
        hosts => ["http://172.16.xx.xx:9200"]
        user => "elastic"
        password => "passwd"
        index => "result"
        document_type => "%{[@metadata][_type]}"
        document_id => "%{[@metadata][_id]}"
    }
  }

}

将多Type索引的不同Type输出到不同索引中

代码语言:javascript复制
input {
  elasticsearch {
    hosts => ["10.0.xx.xx:9200"]
    user => "elastic"
    password => "passwd"
    index => "test"
    docinfo => true
    size => 5000
    scroll => "5m"
  }
}


output {
        if [@metadata][_type] == "type1" {
            elasticsearch {
                hosts => ["http://10.0.xx.xx:9200"]
                user => "elastic"
                password => "passwd"
                index => "type1"
                document_type => "_doc"
                document_id => "%{[@metadata][_id]}"
                }
        }else if [@metadata][_type] == "type2"{
                elasticsearch {
                hosts => ["http://10.0.xx.xx:9200"]
                user => "elastic"
                password => "passwd"
                index => "type2"
                document_type => "_doc"
                document_id => "%{[@metadata][_id]}"
                }
        }
}

索引单个字段数据拆分为多字段

某个索引下面的字段 c : abc_123 想拆成两个字段 c1 :abc c2: 123

代码语言:javascript复制
input {
  elasticsearch {
    hosts => ["10.0.xx.xx:9200"]
    user => "elastic"
    password => "passwd"
    index => "test1"
    docinfo => true
    size => 5000
    scroll => "5m"
  }
}

filter {
    mutate {
        split => ["c","_"]
    }

    if [c][0] {
        mutate {                
            add_field =>   {
                "c1" => "%{[c][0]}"
            }
        }
    }
    
    if [c][1] {
        mutate {                
            add_field =>   {
                "c2" => "%{[c][1]}"
            }
        }
    }   
}


output {
    elasticsearch {
        hosts => ["http://10.0.xx.xx:9200"]
        user => "elastic"
        password => "passwd"
        index => "result"
        document_type => "%{[@metadata][_type]}"
        document_id => "%{[@metadata][_id]}"
    }
}

join

Logstash中无法实现多个索引间的聚合操作,ES中可以通过父子文档关系达到聚合效果

下面语句在ES上实现聚合操作(需要拆分其他多个索引数据到同一索引中)

1. 创建索引

关联字段为field3,field1为索引1独有数据,field2为索引2独有数据

field3作为父文档,关联子文档field1和field2

代码语言:javascript复制
PUT my_index
{
  "mappings": {
    "properties": {
      "my_join_field": {
        "type": "join",
        "relations": {
          "field3": ["field1", "field2"]
        }
      }
    }
  }
}

2. 插入数据

指定插入的field2,field3父文档field3 id为1达到关联效果

代码语言:javascript复制
PUT my_index/_doc/1
{
  "my_join_field":"field3"
}

PUT my_index/_doc/2?routing=1
{
  "my_join_field": {
    "name": "field2",
    "parent": "1"
  }
}

PUT my_index/_doc/3?routing=1
{
  "my_join_field": {
    "name": "field1",
    "parent": "1"
  }
}

3. 查询数据

对父文档进行查询即可找到所有field3相同的数据

代码语言:javascript复制
GET my_index/_search
{
  "query": {
    "has_parent": {
      "parent_type": "field3", 
      "query": {
        "match": {
          "_id": "1"
        }
      }
    }
  }
}

详细使用可参考Parent Id Query | Elasticsearch Guide [6.8] | Elastic

其他logstash使用见Elasticsearch filter plugin | Logstash Reference [6.8] | Elastic

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

0 人点赞