Logstash: 应用实践 - 装载 CSV 文档到 Elasticsearch

2021-01-08 16:25:00 浏览数 (1)

腾讯云 Elasticsearch Service】高可用,可伸缩,云端全托管。集成X-Pack高级特性,适用日志分析/企业搜索/BI分析等场景


在进行我们这个实践之前,相信大家已经安装好自己的 Logstash 环境。如果大家还没安装好Logstash,可以参照我之前的文章 “如何安装Elastic栈中的Logstash”。

Logstash 到底是做什么的?

我们先看一下如下的图:

简单地说,Logstash 就是位于 Data 和 Elasticsearch 之间的一个中间件。Logstash 是一个功能强大的工具,可与各种部署集成。 它提供了大量插件。 它从数据源实时地把数据进行采集,可帮助您解析,丰富,转换和缓冲来自各种来源的数据,并最终把数据传入到Elasticsearch之中。 如果您的数据需要 Beats 中没有的其他处理,则需要将 Logstash 添加到部署中。Logstash 部署于 ingest node 之中。

更具体的描述:

在Logstash之中,它也分为三个部分:

  • Inputs
  • Filters
  • Ouput

Input 创建事件,Filter 修改输入事件,然后 Ouput 将它们发送到目标。 Input 和 Output 支持编解码器,使用编解码器,你可以在数据进入或退出管道时进行编码或解码,而不必使用单独的过滤器。 默认情况下,Logstash 在管道(pipeline)阶段之间使用内存中有界队列(输入到过滤器和过滤器到输出)来缓冲事件。 如果Logstash 不安全地终止,则存储在内存中的所有事件都将丢失。 为防止数据丢失,您可以使 Logstash 通过使用持久队列将正在进行的事件持久化到磁盘上。可以通过在 logstash.yml 文件中设置 queue.type: persistent 属性来启用持久队列,该文件位于LOGSTASH_HOME/config 文件夹下。 logstash.yml 是一个配置文件,其中包含与 Logstash 相关的设置。 默认情况下,文件存储在 LOGSTASH_HOME/data /queue 中。 你可以通过在 logstash.yml 中设置 path.queue 属性来覆盖它。

Logstash 配置由一系列输入,过滤器和输出插件及其相应的属性组成。 每个插件在解析,处理和最终以所需格式放置数据方面起着重要作用。 输入插件生成事件,过滤器修改它们,输出将它们发送到其他系统。 Logstash 提供超过 200 个插件,以及创建和贡献自己的灵活性:

在我们安装的 Logstash bin 目录下,我们可以使用如下的命令来获得所有的 plugins:

代码语言:javascript复制
  $ ./logstash-plugin list

下载 Data

为了能够使得我的练习能够进行,我们必须先得到数据。我们可以到网址 kaggle.com 进行下载。该网站含有大量的数据可以供我们进行下载。我们在该网页搜索 “cars”:

我们可以得到一叫做“Classified Ads for Cars”的 dataset。从上面我们可以看到有大概有 92M 的数据,不多,但是确实也不少啊。对于我们做这个练习来说,确实应该是可以的了。在网页上,我们可以看到这个 csv 文件的所有的数据描述。我们可以先注册一个账号,然后点击 “Download” 按钮,下载这个数据,并解压这个文件。在我们的电脑的 “Downloads” 文件目录下可以看到被解压的文件 “all_anonymized_2015_11_2017_03.csv”。我们可以在我们的 home 目录下创建一个叫做 data 的目录,并把这个文件拷贝过去,并重新命名为 cars.csv。

代码语言:javascript复制
localhost:data liuxg$ tree.└── cars.csv 0 directories, 1 filelocalhost:data liuxg$ pwd/Users/liuxg/data

我们可以看到在data目录下只有叫做cars.csv的文件。我们可以打开这个文件,我们可以看到如下的内容:

这是一个非常大的文件。在这个文件里,我们可以看到 maker, model, mileage, manufacture 等等字段。

Index CSV 文件到 Elasticsearch

在上一节中,我们已经把我们的数据存入到我们的data目录中。在这节里我们来讲述如何把数据写入到 Elasticsearch 之中。首先,我们可以参阅链接 “Configuring Logstash”。我们需要创建一个属于我们自己的 config 文件。通常一个 config 文件由如下的三个部分组成:

代码语言:javascript复制
# This is a comment. You should use comments to describe# parts of your configuration.input {  ...} filter {  ...} output {  ...}

我们在之前已经创建好的目录data下创建一个叫做 longstash_cars.config 的文件。我们可以使用我们喜欢的编辑器来编辑这个文件。

logstash_cars.config 文件的内容如下:

代码语言:javascript复制
input {	file {		path => "/Users/liuxg/data/cars.csv"		start_position => "beginning"		sincedb_path => "/dev/null"	}} filter {	csv {		separator => "," 		columns => [ "maker", "model", "mileage", "manufacture_year", "engine_displacement",		"engine_power", "body_type", "color_slug", "stk_year", "transmission", "door_count",		"seat_count", "fuel_type", "date_created", "date_last_seen", "price_eur" ]	} 	mutate { convert => ["mileage", "integer"] }	mutate { convert => ["price_eur", "float"] }	mutate { convert => ["engine_power", "integer"] }	mutate { convert => ["door_count", "integer"] }	mutate { convert => ["seat_count", "integer"] }} output {	elasticsearch {		hosts => "localhost:9200"		index => "cars"		document_type => "sold_cars"	} 	stdout {}}

这里有几点需要说明的:

  • 在 input 中,我们定义了一个文件,它的path指向我们的 csv 文件的位置。start_position 指向beginning。如果对于一个实时的数据源来说,它通常是 ending,这样表示它每次都是从最后拿到那个数据。sincedb_path 通常指向一个文件。这个文件保存上次操作的位置。针对我们的情况,我们设置为 /dev/null,表明,我们不存储这个数据
  • 在 filter 中,CSV filter 是非常直接的,不太需要很多的解释。这里的 column 都来自于我们的 csv 表格。通常 Logstash 会把每个数据都当做是字符串。针对我们的情况,我们可看到 mileage 是一个整型数,price_eur 是一个浮点数。这些我们都需要进行调整。这个调整我们可以通过 mutate 来完成
  • 在 output 之中,我们制定本地的 Elasticsearch 为我们的数据库,它的 index 是 cars,同时 document_type 为_doc。我们也同时使用 stdout,这样我们可以在terminal屏幕中看出数据在处理之中

装载数据到 Elasticsearch

我们首先进入到 Logstash 的安装目录,然后打入如下的命令:

代码语言:javascript复制
sudo ./bin/logstash -f ~/data/logstash_cars.config

提示:在运行 Logstash 时使用 -r 标志可让您在更改和保存配置后自动重新加载配置。 在测试新配置时,这将很有用,因为你可以对其进行修改,这样就不必在每次更改配置时都手动启动Logstash。

然后,我们可以在屏幕上看到如下的输出:

同时如果我们这在 Kibana 上可以看到正在 index 的 cars index数量(count)是一直变化的。

因为这是一个很大的文件,所以建立索引需要一段时间,而且我的电脑也将会是非常的热。

经过一段时间的运行,我们可以看到屏幕上不再滚动了,表明Logstash已经完成了数据的传输。我们可以在 Kibana上打入如下的命令:

代码语言:javascript复制
GET cars/_stats

我看可以看到我们的cars Index的统计数据:

我们可以看到共有 3,167,984 条数据,并且它的大小有 2.466G 这么大的数据。

我们可以打入如下的命令:

代码语言:javascript复制
GET cars

我们可以看到 Elasticsearch 已经为我们创建好了 mapping,并且它们的数据类型也可以看到:

代码语言:javascript复制
{  "cars" : {    "aliases" : { },    "mappings" : {      "properties" : {        "@timestamp" : {          "type" : "date"        },     ...        "color_slug" : {          "type" : "text",          "fields" : {            "keyword" : {              "type" : "keyword",              "ignore_above" : 256            }          }        },        "door_count" : {          "type" : "long"        },        "engine_power" : {          "type" : "long"        },        "fuel_type" : {          "type" : "text",          "fields" : {            "keyword" : {              "type" : "keyword",              "ignore_above" : 256            }          }        },        "host" : {          "type" : "text",          "fields" : {            "keyword" : {              "type" : "keyword",              "ignore_above" : 256            }          }        },    ...    },    "settings" : {      "index" : {        "creation_date" : "1567949013407",        "number_of_shards" : "1",        "number_of_replicas" : "1",        "uuid" : "tYT-XWpFTGiqwn_rMX4S3A",        "version" : {          "created" : "7030099"        },        "provided_name" : "cars"      }    }  }}

我们可以看到 door_count 及 engine_power 都已经修改为 long 数据类型了。在默认的情况下已经为我们创建了一个 replica。

在 Kibana 上显示数据

我们打开我们的 Kibana。我们首先现在我们的 Kibana 进行如下的操作:

我们建立一个叫做 cars* 的 index pattern。

最后我的到如下的画面:

再接下来,我们选择 Discover(左边最上面的那个图标)来装载我们的数据:

我们可以选择最近一个小时的数据的数据。我们立马可以看到一个统计图,显示在每个时间有多少个数据进来。

我们也可以做一个报表通过点击 “add” 按钮选择 maker, fuel_type,price_eur 及

最终我们看到如下的一个列表:

创建 Visualization

我们选择创建一个 Visualization:

我们选择 “Pie” Chart:

我们可以按照如下的选择生产一个 top 10 的生产厂商

按照如下的方法可以生产 top 10 的汽车型号:

生成最终 Dashboard


最新活动

包含文章发布时段最新活动,前往ES产品介绍页,可查找ES当前活动统一入口

Elasticsearch Service自建迁移特惠政策>>

Elasticsearch Service 新用户特惠狂欢,最低4折首购优惠 >>

Elasticsearch Service 企业首购特惠,助力企业复工复产>>

关注“腾讯云大数据”公众号,技术交流、最新活动、服务专享一站Get~

0 人点赞