ElasticSearch 使用 Logstash 从 MySQL 中同步数据

2020-05-06 17:59:26 浏览数 (1)

目的是希望将现有的数据导入到 ElasticSearch 中,研究了好几种,除了写代码的方式,最简便的就是使用 Logstash 来导入数据到 ElasticSearch 中了。

因为现有的数据在 MySQL 数据库中,所以希望采用 logstash-input-jdbc 插件来导入数据。

安装 ElasticSearch 和 Logstash

首先需要安装 ElasticSearch 和 Logstash 环境,我选择的版本是 6.3.0

ELK 都是 Elastic 公司的产品,所以安装包可以到 http://www.elastic.co/downloads/elasticsearch 下载,老版本的归档在 https://www.elastic.co/downloads/past-releases 页面选择下载。

代码语言:javascript复制
liuqianfei@Master:~$ wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.0.tar.gz
liuqianfei@Master:~$ wget https://artifacts.elastic.co/downloads/logstash/logstash-6.3.0.tar.gz
liuqianfei@Master:~$ tar -zxvf elasticsearch-6.3.0.tar.gz
liuqianfei@Master:~$ tar -zxvf logstash-6.3.0.tar.gz
liuqianfei@Master:~$ ls -lh
总用量 228M
drwxrwxr-x  2 liuqianfei liuqianfei 4.0K 11月 21 09:56 connector
drwxr-xr-x  9 liuqianfei liuqianfei 4.0K 11月 21 09:24 elasticsearch-6.3.0
-rw-r--r--  1 liuqianfei liuqianfei  88M 11月 21 09:13 elasticsearch-6.3.0.tar.gz
drwxrwxr-x 14 liuqianfei liuqianfei 4.0K 11月 21 10:15 logstash-6.3.0
-rw-r--r--  1 liuqianfei liuqianfei 141M 11月 21 09:13 logstash-6.3.0.tar.gz

在安装上都很简单,基本上就是解压即用,ElasticSearch 的安装可以参考 ElasticSearch 6.0.0 安装配置,注意配置 IP 和修改系统参数。

直接解压 logstash 到指定目录就可以了,然后执行 ./logstash -e

代码语言:javascript复制
liuqianfei@Master:~/logstash-6.3.0$ 
liuqianfei@Master:~/logstash-6.3.0$ ./bin/logstash -e

输入 hello 输出如下则表示安装成功:

安装 logstash-input-jdbc 插件

现在使用 Logstash 比较幸福的是,logstash-6.1.1 以后已经默认支持 logstash-input-jdbc 插件,不需要再单独安装了。

如果你有不得已的原因必须要使用老版本的 Logstash,那么可以这么安装 logstash-input-jdbc 插件:

代码语言:javascript复制
liuqianfei@Master:~/logstash-6.3.0$ ./bin/logstash-plugin install logstash-input-jdbc

需要注意网络配置,因为这是在线安装的。

在线安装网络问题

建议大家在使用 Logstash 的时候使用最新版本,如果必须用老版本在先安装 logstash-input-jdbc 插件。

本节从网上摘录了一段配置,没有经过充分验证。

logstash-input-jdbc 插件是 logstash 的一个插件,使用 ruby 语言开发。下载插件过程中最大的坑是下载插件相关的依赖的时候下不动,因为国内网络的原因,访问不到亚马逊的服务器。解决办法,改成国内的 ruby 仓库镜像。此镜像托管于淘宝的阿里云服务器上。

如果没有安装 gem 的话先安装 gem,这是 ruby 的管理工具包。

代码语言:javascript复制
yum install gem
gem sources --add https://ruby.taobao.org/ --remove https://rubygems.org/
gem sources -l

*** CURRENT SOURCES ***

https://ruby.taobao.org

# 请确保只有 ruby.taobao.org
# 如果 还是显示 https://rubygems.org/ 进入 home的 .gemrc 文件
vim ~/.gemrc 
# 手动删除 https://rubygems.org/

修改 Gemfile 的数据源地址。步骤:

进入 logstash 根目录,vi Gemfile 修改 Gemfile 文件,修改 source 的值为 "https://ruby.taobao.org"

还是在 logstash 根目录,修改 lock 文件 Gemfile.lock,找到 remote: https://rubygems.org/ 修改为 remote: https://ruby.taobao.org

或者直接替换源这样你不用改你的 Gemfile 的 source。

代码语言:javascript复制
sudo gem install bundler
$ bundle config mirror.https://rubygems.org https://ruby.taobao.org

然后就可以使用 ./logstash-plugin install logstash-input-jdbc 命令来安装 jdbc 插件了。

JDBC

logstash-input-jdbc 运行任务需要对应数据库的 JDBC 驱动文件。

我们在 home 目录新建目录 connector,把 MySQL 的驱动文件放在里面。

代码语言:javascript复制
liuqianfei@Master:~$ mkdir connector
liuqianfei@Master:~$ cd connector
liuqianfei@Master:~$ ls -lah
liuqianfei@Master:~/connector$ ls -lah
总用量 836K
drwxrwxr-x 2 liuqianfei liuqianfei 4.0K 11月 21 09:56 .
drwxr-xr-x 8 liuqianfei liuqianfei 4.0K 11月 21 19:19 ..
-rw-rw-r-- 1 liuqianfei liuqianfei 827K 11月 21 09:56 mysql-connector-java-5.1.24-bin.jar
liuqianfei@Master:~/connector$

Logstash 导入导出脚本

我在 logstash 根目录新建了一个目录 script,专门存放 logstash 导入导出脚本配置。

代码语言:javascript复制
liuqianfei@Master:~/logstash-6.3.0$ cd script/
liuqianfei@Master:~/logstash-6.3.0/script$ ls -lah
总用量 12K
drwxrwxr-x  2 liuqianfei liuqianfei 4.0K 11月 21 11:33 .
drwxrwxr-x 14 liuqianfei liuqianfei 4.0K 11月 21 19:19 ..
-rw-rw-r--  1 liuqianfei liuqianfei  444 11月 21 10:34 mysql.conf
liuqianfei@Master:~/logstash-6.3.0/script$

我的脚本内容很简单,从远程 MySQL 库 test_data_100w 导入表 test1_text 的全部数据到 ElasticSearch,任务只执行一次。

代码语言:javascript复制
input {
  jdbc {
    jdbc_driver_library => "/home/liuqianfei/connector/mysql-connector-java-5.1.24-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.190.10.119:3306/test_data_100w?serverTimezone=GMT+8"
    jdbc_user => "root"
    jdbc_password => "root"
    statement => "SELECT * from test1_text"
  }
}

output {
        elasticsearch {
        hosts => [ "localhost:9200" ]
    }
}

执行 Logstash 导入任务

使用命令 ./bin/logstash -f ./script/mysql.conf 执行导入脚本。

代码语言:javascript复制
liuqianfei@Master:~/logstash-6.3.0$ ./bin/logstash -f ./script/mysql.conf
Sending Logstash's logs to /home/liuqianfei/logstash-6.3.0/logs which is now configured via log4j2.properties
[2018-11-21T10:55:37,328][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-11-21T10:55:39,760][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.3.0"}
[2018-11-21T10:55:47,803][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-11-21T10:55:49,717][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2018-11-21T10:55:49,802][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2018-11-21T10:55:50,643][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2018-11-21T10:55:50,861][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6}
[2018-11-21T10:55:50,879][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>6}
[2018-11-21T10:55:50,939][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2018-11-21T10:55:51,040][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2018-11-21T10:55:51,273][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2018-11-21T10:55:52,344][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x349c342d run>"}
[2018-11-21T10:55:52,688][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2018-11-21T10:55:54,146][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-11-21T10:55:59,368][INFO ][logstash.inputs.jdbc     ] (2.011717s) SELECT * from test1_text
[2018-11-21T10:57:18,337][INFO ][logstash.pipeline        ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x349c342d run>"}

等待几分钟任务执行完毕。

这时候查询 ElasticSearch 索引,发现已经有数据了。

代码语言:javascript复制
D:
λ curl -X GET http://192.190.10.170:9200/_cat/indices?v
health status index               uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   logstash-2018.11.21 SFS4m0oWSh6O30vEE3INhg   5   1     100000            0     66.mb         66.mb

D:
λ

如果执行脚本的时候报错:

代码语言:javascript复制
Java::JavaSql::SQLException: The server time zone value 'Öйú±ê׼ʱ¼ä' is unrecognized or represents more than one time zone. 

在 JDBC 链接的 url 后要带入时区参数 ?serverTimezone=GMT+8

注意 MySQL 要支持远程连接才行(如果是本地的 mysql 可以不管),不然后报拒绝访问的异常:

代码语言:javascript复制
 is not allowed to connect to this MySql server

脚本说明

最后附一个从 MySQL 定时增量导入数据的脚本和参数说明,仅供参考。

代码语言:javascript复制
#-----------------------------------start-----------------------------------
#输入部分
input {
  jdbc {
    #连接MySQL驱动,需要自己下载
    jdbc_driver_library => "/es/mysql-connector-java-5.1.31.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://10.112.29.30:3306/mstore"
    #连接数据库账号信息
    jdbc_user => "MySQL_admin"
    jdbc_password => "password"
    #分页
    jdbc_paging_enabled => true
    #分页大小
    jdbc_page_size => 100000
    #流式获取数据,每次取10000.
    jdbc_fetch_size => 10000
    #Maximum number of times to try connecting to database
    connection_retry_attempts => 3
    #Number of seconds to sleep between connection attempts
    connection_retry_attempts_wait_time => 1
    #Connection pool configuration. The amount of seconds to wait to acquire a connection before raising a PoolTimeoutError (default 5)
    jdbc_pool_timeout => 5
    #Whether to force the lowercasing of identifier fields
    lowercase_column_names => true
    #Whether to save state or not in last_run_metadata_path
    #保存上次运行记录,增量提取数据时使用
    record_last_run = > true
    #"* * * * *"为每分钟执行一次
    schedule => "* * * * *"
    #Use an incremental column value rather than a timestamp
    use_column_value => true
    #sql_last_value
    #The value used to calculate which rows to query. Before any query is run, this is set to Thursday, 1 January 1970, or 0 if use_column_value is true and tracking_column is set. It is updated accordingly after subsequent queries are run.
    tracking_column => "id"
    #查询语句
    statement => "SELECT id,package_name,name,sub_name,editor_comment,high_quality,sub_category,tag,update_time FROM tbl_app WHERE id > :sql_last_value"
  }
}

#过滤部分
filter {
  json {
    source => "message"
    remove_field => ["message"]
  }
  date{
    match => ["update_time","yyy-MM-dd HH:mm:ss"]
  }
}

#输出到elastsicearch
output {
  elasticsearch {
    #elasticsearch集群地址,不用列出所有节点,默认端口号也可省略
    hosts => ["10.127.92.181:9200", "10.127.92.212:9200", "10.127.92.111:9200"]
    #索引值,查询的时候会用到;需要先在elasticsearch中创建对应的mapping,也可以采用默认的mapping
    index => "store"
    #指定插入elasticsearch文档ID,对应input中sql字段id
    document_id => "%{id}"
  }
}

#------------------------------------end------------------------------------

使用时请去掉此文件中的注释,不然会报错。logstash 会把执行记录默认存在账户根目录下: /root/.logstash_jdbc_last_run,如果需要重新加载数据到 elasticsearch,需要删除这个文件。

0 人点赞