logstash-input-jdbc
代码语言:javascript复制我们在实际工作中经常会遇到这样情况:我们数据存放在mysql中,但是数据越来越多,
我们在搜索的使用使用“like”这样效率太低,所以我们需要使用es来做全文搜索。
这样我们就需要将数据同步到es中,方法有很多这里我们使用logstash-input-jdbc这个插件。
题外话:有空学习下canal,通过binlog实现。
https://mp.weixin.qq.com/s/YKqKW0n5JTPgTd9kv9RDhQ
github:https://github.com/alibaba/canal
https://www.jianshu.com/p/87944efe1005
官网参考
https://segmentfault.com/a/1190000011784259
安装logstash-input-jdbc
代码语言:javascript复制首先应该安装logstash,见elk安装篇
进入cd logstash/bin
./logstash-plugin install logstash-input-jdbc
安装完成之后,我们在config目录中创建配置文件:blog_log.conf
input {
file {
path => "/root/blog/blog_error.log"
type => "blog-error-log"
start_position => "beginning"
codec => multiline {
pattern => "^d{4}-d{2}-d{2} d{2}:d{2}:d{2}"
negate => true
auto_flush_interval => 3
what => previous
}
}
file {
path => "/root/blog/blog_info.log"
type => "blog-info-log"
start_position => "beginning"
codec => multiline {
pattern => "^d{4}-d{2}-d{2} d{2}:d{2}:d{2}"
negate => true
auto_flush_interval => 3
what => previous
}
}
jdbc {
jdbc_driver_library => "/usr/local/logstash-5.2.0/lib/mysql-connector-java-5.1.29.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/blog"
jdbc_user => "root"
jdbc_password => "root#$3"
# statement_filepath => "filename.sql"
statement => "SELECT * from t_article where test_time> :sql_last_value"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
type => "t_article"
use_column_value => true
tracking_column => "test_time"
record_last_run => true
last_run_metadata_path => "/usr/local/logstash-5.2.0/config/last_run_metadata.txt"
schedule => "* * * * *"
}
}
filter {
if [type] != "t_article" {
mutate {
split => ["host", "8"]
add_field => { "add_field_test" => "test3" }
}
grok {
match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
}
date {
match => ["logdate", "yyyy-MM-dd HH:mm:ss"]
target => "@timestamp"
}
}
if [type] == "t_article" {
json {
source => "message"
remove_field => ["message"]
}
}
}
output {
if [type] == "t_article" {
elasticsearch {
hosts => ["39.108.231.144:9200"]
index => "table_blog"
document_id => "%{id}"
}
}
if [type] != "t_article" {
elasticsearch {
hosts => ["39.108.231.144:9200"]
index => "blog-%{ YYYY.MM.dd}"
}
}
}
注意: 这里的test_time是数据库的时间戳,数据更新,时间戳就变化。 这样可以在mysql数据变化变化后也,同步到es中去,如果使用的是id不会同步变化的数据 设置mysql的id到es的id,不让es自动生成,不然每次都是新增,当你修改了数据,在es中也会新增,不会覆盖 document_id => “%{id}” schedule => “ *” 每分钟 不支持数据删除,可以通过逻辑删除, 定时物理删除(自己实现)
遇到的问题
代码语言:javascript复制[root@iZwz9278r1bks3b80puk6fZ config]# cat last_run_metadata.txt
--- 2019-09-12 14:30:54.000000000 08:00
控制台输出
[2019-09-12T15:04:00,250][INFO ][logstash.inputs.jdbc ] (0.020000s) SELECT count(*) AS `count` FROM (SELECT * from t_article where test_time> 0) AS `t1` LIMIT 1
[2019-09-12T15:04:00,270][INFO ][logstash.inputs.jdbc ] (0.003000s) SELECT * FROM (SELECT * from t_article where test_time> 0) AS `t1` LIMIT 50000 OFFSET 0
[2019-09-12T15:05:00,092][INFO ][logstash.inputs.jdbc ] (0.008000s) SELECT version() AS `v` LIMIT 1
[2019-09-12T15:05:00,100][INFO ][logstash.inputs.jdbc ] (0.002000s) SELECT count(*) AS `count` FROM (SELECT * from t_article where test_time> '2019-09-12 14:30:54') AS `t1` LIMIT 1
[2019-09-12T15:05:00,108][INFO ][logstash.inputs.jdbc ] (0.004000s) SELECT * FROM (SELECT * from t_article where test_time> '2019-09-12 14:30:54') AS `t1` LIMIT 50000 OFFSET 0
[2019-09-12T15:06:00,191][INFO ][logstash.inputs.jdbc ] (0.007000s) SELECT count(*) AS `count` FROM (SELECT * from t_article where test_time> '2019-09-12 14:56:00') AS `t1` LIMIT 1
[2019-09-12T15:07:00,267][INFO ][logstash.inputs.jdbc ] (0.008000s) SELECT count(*) AS `count` FROM (SELECT * from t_article where test_time> '2019-09-12 14:56:00') AS `t1` LIMIT 1
从上面日志分析
问题1:
SELECT * FROM (SELECT * from t_article where test_time> 0) AS `t1` LIMIT 50000 OFFSET 0
test_time是时间戳从0默认是numeric 开始明显就不合适,我们设置类型
tracking_column_type string, one of ["numeric", "timestamp"]
//
tracking_column_type => "timestamp"
use_column_value => true
tracking_column => "test_time"
这样就会从
[2019-09-12T15:31:02,525][INFO ][logstash.inputs.jdbc ] (0.006000s) SELECT * FROM (SELECT * from t_article where test_time> '1970-01-01 00:00:00') AS `t1` LIMIT 10 OFFSET 60
问题2:
假设数据顺序是
test_time
2019-09-12 13:56:00
2019-09-12 14:56:00
2019-09-12 14:30:54
last_run_metadata.txt 文件记录的是最后一条(不是最大的) --- 2019-09-12 14:30:54.000000000 08:00
[2019-09-12T15:05:00,100][INFO ][logstash.inputs.jdbc ] (0.002000s) SELECT count(*) AS `count` FROM (SELECT * from t_article where test_time> '2019-09-12 14:30:54') AS `t1` LIMIT 1
出现情况:重复覆盖导入:假设数据库顺序如下
1
2
3
4
6
2
这样last_run_metadata.txt 记入的是2,每次到定时的时间点(1分钟)都会进行同步下面的数据
SELECT * from t_article where test_time> '2' 这样会查出 3 4 6 2
可以设置排序 order by test_time asc(让最后一个数据是最大的)
问题3
kibanna上面显示的时间戳少了8个小时 时区问题,但是last_run_metadata.txt加了8个小时跟数据库一样,,,
--- 2019-09-12 14:30:54.000000000 08:00
{
"took": 79,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 1,
"max_score": 1,
"hits": [
{
"_index": "table_blog",
"_type": "t_article",
"_id": "125",
"_score": 1,
"_source": {
"summary": "本文讲解的Elasticsearch的中文分词器IKAnalyzer。",
"created_time": "2019-09-12 10:44:10",
"thumb_url": "/upload/BCB71BBC-FC70-4b9e-A55E-0559BCA99006.png",
"creator": "丁D",
"allow_comment": 0,
"recommend": 0,
"click_num": 1,
"title": "Elasticsearch(七)",
"type": "t_article",
"updator_id": 1,
"upator": "丁D",
"states": 1,
"update_time": "2019-09-12 10:44:10",
"test_time": "2019-09-12T06:30:54.000Z",
"@timestamp": "2019-09-12T07:31:03.060Z",
"category_id": 2,
"publish_time": "2019-09-12 10:19",
"creator_id": 1,
"@version": "1",
"support_num": 2,
"id": 125
}
}
]
}
}
设置时区地点 jdbc_default_timezone =>"Asia/Shanghai" 也没有什么用。。。待处理
最终配置文件
代码语言:javascript复制input {
file {
path => "/root/blog/blog_error.log"
type => "blog-error-log"
start_position => "beginning"
codec => multiline {
pattern => "^d{4}-d{2}-d{2} d{2}:d{2}:d{2}"
negate => true
auto_flush_interval => 3
what => previous
}
}
file {
path => "/root/blog/blog_info.log"
type => "blog-info-log"
start_position => "beginning"
codec => multiline {
pattern => "^d{4}-d{2}-d{2} d{2}:d{2}:d{2}"
negate => true
auto_flush_interval => 3
what => previous
}
}
jdbc {
jdbc_driver_library => "/usr/local/logstash-5.2.0/lib/mysql-connector-java-5.1.29.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/blog"
jdbc_user => "root"
jdbc_password => "root#$3"
# statement_filepath => "filename.sql"
statement => "SELECT * from t_article where test_time> :sql_last_value order by test_time asc"
jdbc_paging_enabled => "true"
jdbc_page_size => "10"
jdbc_default_timezone =>"Asia/Shanghai"
type => "t_article"
tracking_column_type => "timestamp"
use_column_value => true
tracking_column => "test_time"
record_last_run => true
last_run_metadata_path => "/usr/local/logstash-5.2.0/config/last_run_metadata.txt"
schedule => "* * * * *"
}
}
filter {
if [type] != "t_article" {
mutate {
split => ["host", "8"]
add_field => { "add_field_test" => "test3" }
}
grok {
match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
}
date {
match => ["logdate", "yyyy-MM-dd HH:mm:ss"]
target => "@timestamp"
}
}
if [type] == "t_article" {
json {
source => "message"
remove_field => ["message"]
}
}
}
output {
if [type] == "t_article" {
elasticsearch {
hosts => ["39.108.231.144:9200"]
index => "table_blog"
document_id => "%{id}"
}
}
if [type] != "t_article" {
elasticsearch {
hosts => ["39.108.231.144:9200"]
index => "blog-%{ YYYY.MM.dd}"
}
}
}