利用logstash将mysql多表数据增量同步到es

2020-09-08 15:35:11 浏览数 (1)

同步原理:

代码语言:javascript复制
第一次发送sql请求查询,修改时间参数值是为系统最开始的时间(1970年),可以查询的
到所有大于1970年的数据,并且会将最后一条数据的update_time时间记录下来,
作为下一次定时查询的条件

一、启动es kibana

代码语言:javascript复制
如何安装,以及如何运行,这里就不做描述,没有装过的,可以参考我的这篇文章
https://www.jianshu.com/p/f52d9c843bd8

二、安装mysql

查询mysql版本
代码语言:javascript复制
docker search mysql
通过docker下载MySQL5.7版本
代码语言:javascript复制
如何安装docker,不是本文重点,这里不做多描述
docker pull mysql:5.7  (这里选择的是第一个mysql镜像, :5.7选择的5.7版本)
docker pull mysql       # 拉取最新版mysql镜像
运行mysql
代码语言:javascript复制
docker run -p 3306:3306 --name mysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7

账号:root
密码:123456
通过工具连接mysql
新建数据库
代码语言:javascript复制
DROP DATABASE IF EXISTS `test`;
CREATE DATABASE `test` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;

USE test;

#商品表
DROP TABLE IF EXISTS `goods`;
CREATE TABLE `goods` (
  `id` int(11) NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO `goods` VALUES (1, '黎明电脑', '2020-07-01 00:40:19');
INSERT INTO `goods` VALUES (2, '黎明手机', '2020-07-01 00:40:32');

#用户表
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
  `id` int(11) NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO `user` VALUES (1, '黎明1', '2020-07-01 00:40:01');
INSERT INTO `user` VALUES (2, '黎明2', '2020-07-01 00:40:09');

现在已经启动了3台容器了

三、下载logstash源码包

代码语言:javascript复制
官方地址
https://www.elastic.co/cn/downloads/logstash

国内加速下载网址
https://www.newbe.pro/Mirrors/Mirrors-Logstash/

下载地址
wget https://mirrors.huaweicloud.com/logstash/6.7.2/logstash-6.7.2.zip

下载zip命令解压
yum -y install zip

解压
unzip logstash-6.7.2.zip

四、下载mysql驱动

可能会有人疑问?为什么要下载mysql驱动 因为logstash需要连接mysql,并查询表数据,才确定是否同步数据

如下,是maven仓库,所有版本mysql驱动连接

代码语言:javascript复制
https://mvnrepository.com/artifact/mysql/mysql-connector-java

我的数据库是5.7版本,我这里下载5.1.47的驱动了,当然如果你们的数据库是8.0以上的版本,那么就下相应的版本就行

现在两种下载方式 1.下载到本地,然后通过ftp工具上传到服务器 2.在服务器上下载,右击复制链接地址,通过wget命令下载即可

五、进入logstash目录,安装同步插件

代码语言:javascript复制
安装会有点慢,大概2分钟左右吧
 bin/logstash-plugin install logstash-input-jdbc
 bin/logstash-plugin install logstash-output-elasticsearch

六、添加Mysql与ES同步配置

代码语言:javascript复制
进入logstash/config目录下,新建 user.conf文件
vim user.conf


添加内容
input {
  jdbc {
    jdbc_driver_library => "/usr/local/software/my/mysql-connector-java-5.1.47.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.137.11:3306/test"
    jdbc_user => "root"
    jdbc_password => "123456"
    schedule => "* * * * *"
    statement => "SELECT * FROM user WHERE update_time >= :sql_last_value"
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "update_time"
    last_run_metadata_path => "syncpoint_table"
  }
}


output {
    elasticsearch {
        # ES的IP地址及端口
        hosts => ["192.168.137.11:9200"]
        # 索引名称 可自定义
        index => "user"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        document_id => "%{id}"
        document_type => "user"
    }
    stdout {
        # JSON格式输出
        codec => json_lines
    }
}

进入bin目录启动

代码语言:javascript复制
./logstash -f ../config/user.conf

可以看到下图,如我标记的地方,logstash在第一次进行同步数据,会先从1970年开始,进行一次同步数据

之后每隔一分钟,会以最后的update_time作为条件,查询是否同步数据,如果查询的结果update_time时间大于所记录的update_time时间,则会继续同步数据,接下来在记录最后一次同步的update_time时间,依次类推

然后我们通过kibana,查询一下我们的索引结果

七、多表同步

到此,我们的单表同步已经完成,接下来我们开始实现多表同步
代码语言:javascript复制
规则如下:
一个表,一个配置
多个表,多个配置
需要同步多少表,就需要加多少配置
当然配置的内容都差不多,改的地方是查询的表名,和es的索引以及类型的名称

添加第二张表的配置,配置就是上面的配置,稍微改动即可

进入logstash/config目录,修改配置文件
代码语言:javascript复制
vim pipelines.yml
编辑文件

直接到最后,添加配置

代码语言:javascript复制
- pipeline.id: table1
  path.config: "/usr/local/software/my/logstash-6.7.2/config/user.conf"
- pipeline.id: table2
  path.config: "/usr/local/software/my/logstash-6.7.2/config/goods.conf"

启动方式稍有改变

代码语言:javascript复制
进入bin目录
./logstash

这里goods同步,为什么不是1970年呢,因为之前同步一次过,logstash会帮你记录,所以就以logstash最后一次同步时间计算

现在商品表也同步数据了

那如何证明,能够多表同步呢,很简单,我们修改两个表的数据,看是否都能查询的到,如下图,就可以证明商品表和用户表,都是根据各自表的最后时间进行同步的数据的

注意:有数据才会创建索引哦

0 人点赞