logstash-input-jdbc

2022-08-12 21:19:45 浏览数 (1)

logstash-input-jdbc

代码语言:javascript复制
我们在实际工作中经常会遇到这样情况:我们数据存放在mysql中,但是数据越来越多, 
我们在搜索的使用使用“like”这样效率太低,所以我们需要使用es来做全文搜索。 
这样我们就需要将数据同步到es中,方法有很多这里我们使用logstash-input-jdbc这个插件。 
题外话:有空学习下canal,通过binlog实现。 
https://mp.weixin.qq.com/s/YKqKW0n5JTPgTd9kv9RDhQ 
github:https://github.com/alibaba/canal 
https://www.jianshu.com/p/87944efe1005 

官网参考

https://segmentfault.com/a/1190000011784259

安装logstash-input-jdbc

代码语言:javascript复制
首先应该安装logstash,见elk安装篇 
进入cd logstash/bin 
./logstash-plugin install logstash-input-jdbc 
安装完成之后,我们在config目录中创建配置文件:blog_log.conf 
input { 
file { 
path => "/root/blog/blog_error.log" 
type => "blog-error-log" 
start_position => "beginning" 
codec => multiline { 
pattern => "^d{4}-d{2}-d{2} d{2}:d{2}:d{2}" 
negate => true 
auto_flush_interval => 3 
what => previous 
} 
} 
file { 
path => "/root/blog/blog_info.log" 
type => "blog-info-log" 
start_position => "beginning" 
codec => multiline { 
pattern => "^d{4}-d{2}-d{2} d{2}:d{2}:d{2}" 
negate => true 
auto_flush_interval => 3 
what => previous 
} 
} 
jdbc { 
jdbc_driver_library => "/usr/local/logstash-5.2.0/lib/mysql-connector-java-5.1.29.jar" 
jdbc_driver_class => "com.mysql.jdbc.Driver" 
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/blog" 
jdbc_user => "root" 
jdbc_password => "root#$3" 
# statement_filepath => "filename.sql" 
statement => "SELECT * from t_article where test_time> :sql_last_value" 
jdbc_paging_enabled => "true" 
jdbc_page_size => "50000" 
type => "t_article" 
use_column_value => true 
tracking_column => "test_time" 
record_last_run => true 
last_run_metadata_path => "/usr/local/logstash-5.2.0/config/last_run_metadata.txt" 
schedule => "* * * * *" 
} 
} 
filter { 
if [type] != "t_article" { 
mutate { 
split => ["host", "8"] 
add_field => { "add_field_test" => "test3" } 
} 
grok { 
match => ["message", "%{TIMESTAMP_ISO8601:logdate}"] 
} 
date { 
match => ["logdate", "yyyy-MM-dd HH:mm:ss"] 
target => "@timestamp" 
} 
} 
if [type] == "t_article" { 
json { 
source => "message" 
remove_field => ["message"] 
} 
} 
} 
output { 
if [type] == "t_article" { 
elasticsearch { 
hosts => ["39.108.231.144:9200"] 
index => "table_blog" 
document_id => "%{id}" 
} 
} 
if [type] != "t_article" { 
elasticsearch { 
hosts => ["39.108.231.144:9200"] 
index => "blog-%{ YYYY.MM.dd}" 
} 
} 
} 

注意: 这里的test_time是数据库的时间戳,数据更新,时间戳就变化。 这样可以在mysql数据变化变化后也,同步到es中去,如果使用的是id不会同步变化的数据 设置mysql的id到es的id,不让es自动生成,不然每次都是新增,当你修改了数据,在es中也会新增,不会覆盖 document_id => “%{id}” schedule => “ *” 每分钟 不支持数据删除,可以通过逻辑删除, 定时物理删除(自己实现)

遇到的问题

代码语言:javascript复制
[root@iZwz9278r1bks3b80puk6fZ config]# cat last_run_metadata.txt 
--- 2019-09-12 14:30:54.000000000  08:00 
控制台输出 
[2019-09-12T15:04:00,250][INFO ][logstash.inputs.jdbc ] (0.020000s) SELECT count(*) AS `count` FROM (SELECT * from t_article where test_time> 0) AS `t1` LIMIT 1 
[2019-09-12T15:04:00,270][INFO ][logstash.inputs.jdbc ] (0.003000s) SELECT * FROM (SELECT * from t_article where test_time> 0) AS `t1` LIMIT 50000 OFFSET 0 
[2019-09-12T15:05:00,092][INFO ][logstash.inputs.jdbc ] (0.008000s) SELECT version() AS `v` LIMIT 1 
[2019-09-12T15:05:00,100][INFO ][logstash.inputs.jdbc ] (0.002000s) SELECT count(*) AS `count` FROM (SELECT * from t_article where test_time> '2019-09-12 14:30:54') AS `t1` LIMIT 1 
[2019-09-12T15:05:00,108][INFO ][logstash.inputs.jdbc ] (0.004000s) SELECT * FROM (SELECT * from t_article where test_time> '2019-09-12 14:30:54') AS `t1` LIMIT 50000 OFFSET 0 
[2019-09-12T15:06:00,191][INFO ][logstash.inputs.jdbc ] (0.007000s) SELECT count(*) AS `count` FROM (SELECT * from t_article where test_time> '2019-09-12 14:56:00') AS `t1` LIMIT 1 
[2019-09-12T15:07:00,267][INFO ][logstash.inputs.jdbc ] (0.008000s) SELECT count(*) AS `count` FROM (SELECT * from t_article where test_time> '2019-09-12 14:56:00') AS `t1` LIMIT 1 
从上面日志分析 
问题1: 
SELECT * FROM (SELECT * from t_article where test_time> 0) AS `t1` LIMIT 50000 OFFSET 0 
test_time是时间戳从0默认是numeric 开始明显就不合适,我们设置类型 
tracking_column_type string, one of ["numeric", "timestamp"] 
// 
tracking_column_type => "timestamp" 
use_column_value => true 
tracking_column => "test_time" 
这样就会从 
[2019-09-12T15:31:02,525][INFO ][logstash.inputs.jdbc ] (0.006000s) SELECT * FROM (SELECT * from t_article where test_time> '1970-01-01 00:00:00') AS `t1` LIMIT 10 OFFSET 60 
问题2: 
假设数据顺序是 
test_time 
2019-09-12 13:56:00 
2019-09-12 14:56:00 
2019-09-12 14:30:54 
last_run_metadata.txt 文件记录的是最后一条(不是最大的) --- 2019-09-12 14:30:54.000000000  08:00 
[2019-09-12T15:05:00,100][INFO ][logstash.inputs.jdbc ] (0.002000s) SELECT count(*) AS `count` FROM (SELECT * from t_article where test_time> '2019-09-12 14:30:54') AS `t1` LIMIT 1 
出现情况:重复覆盖导入:假设数据库顺序如下 
1 
2 
3 
4 
6 
2 
这样last_run_metadata.txt 记入的是2,每次到定时的时间点(1分钟)都会进行同步下面的数据 
SELECT * from t_article where test_time> '2' 这样会查出 3 4 6 2 
可以设置排序 order by test_time asc(让最后一个数据是最大的) 
问题3 
kibanna上面显示的时间戳少了8个小时 时区问题,但是last_run_metadata.txt加了8个小时跟数据库一样,,, 
--- 2019-09-12 14:30:54.000000000  08:00 
{ 
"took": 79, 
"timed_out": false, 
"_shards": { 
"total": 5, 
"successful": 5, 
"failed": 0 
}, 
"hits": { 
"total": 1, 
"max_score": 1, 
"hits": [ 
{ 
"_index": "table_blog", 
"_type": "t_article", 
"_id": "125", 
"_score": 1, 
"_source": { 
"summary": "本文讲解的Elasticsearch的中文分词器IKAnalyzer。", 
"created_time": "2019-09-12 10:44:10", 
"thumb_url": "/upload/BCB71BBC-FC70-4b9e-A55E-0559BCA99006.png", 
"creator": "丁D", 
"allow_comment": 0, 
"recommend": 0, 
"click_num": 1, 
"title": "Elasticsearch(七)", 
"type": "t_article", 
"updator_id": 1, 
"upator": "丁D", 
"states": 1, 
"update_time": "2019-09-12 10:44:10", 
"test_time": "2019-09-12T06:30:54.000Z", 
"@timestamp": "2019-09-12T07:31:03.060Z", 
"category_id": 2, 
"publish_time": "2019-09-12 10:19", 
"creator_id": 1, 
"@version": "1", 
"support_num": 2, 
"id": 125 
} 
} 
] 
} 
} 
设置时区地点 jdbc_default_timezone =>"Asia/Shanghai" 也没有什么用。。。待处理 

最终配置文件

代码语言:javascript复制
input { 
file { 
path => "/root/blog/blog_error.log" 
type => "blog-error-log" 
start_position => "beginning" 
codec => multiline { 
pattern => "^d{4}-d{2}-d{2} d{2}:d{2}:d{2}" 
negate => true 
auto_flush_interval => 3 
what => previous 
} 
} 
file { 
path => "/root/blog/blog_info.log" 
type => "blog-info-log" 
start_position => "beginning" 
codec => multiline { 
pattern => "^d{4}-d{2}-d{2} d{2}:d{2}:d{2}" 
negate => true 
auto_flush_interval => 3 
what => previous 
} 
} 
jdbc { 
jdbc_driver_library => "/usr/local/logstash-5.2.0/lib/mysql-connector-java-5.1.29.jar" 
jdbc_driver_class => "com.mysql.jdbc.Driver" 
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/blog" 
jdbc_user => "root" 
jdbc_password => "root#$3" 
# statement_filepath => "filename.sql" 
statement => "SELECT * from t_article where test_time> :sql_last_value order by test_time asc" 
jdbc_paging_enabled => "true" 
jdbc_page_size => "10" 
jdbc_default_timezone =>"Asia/Shanghai" 
type => "t_article" 
tracking_column_type => "timestamp" 
use_column_value => true 
tracking_column => "test_time" 
record_last_run => true 
last_run_metadata_path => "/usr/local/logstash-5.2.0/config/last_run_metadata.txt" 
schedule => "* * * * *" 
} 
} 
filter { 
if [type] != "t_article" { 
mutate { 
split => ["host", "8"] 
add_field => { "add_field_test" => "test3" } 
} 
grok { 
match => ["message", "%{TIMESTAMP_ISO8601:logdate}"] 
} 
date { 
match => ["logdate", "yyyy-MM-dd HH:mm:ss"] 
target => "@timestamp" 
} 
} 
if [type] == "t_article" { 
json { 
source => "message" 
remove_field => ["message"] 
} 
} 
} 
output { 
if [type] == "t_article" { 
elasticsearch { 
hosts => ["39.108.231.144:9200"] 
index => "table_blog" 
document_id => "%{id}" 
} 
} 
if [type] != "t_article" { 
elasticsearch { 
hosts => ["39.108.231.144:9200"] 
index => "blog-%{ YYYY.MM.dd}" 
} 
} 
} 

0 人点赞