Canal 同步数据坑太多?来试试 Logstash!

2023-10-24 17:14:18 浏览数 (2)

大家好,我是不才陈某~

上一篇文章已经详细介绍了如何使用Canal中间件将MySQL数据同步至ElasticSearch。然而,由于Canal已经很久没有得到维护,使用过程中可能会遇到许多问题。因此,在尝试Canal的同时,我们还可以考虑使用Logstash来实现类似的功能。本章将重点介绍如何使用Logstash将MySQL数据同步至ElasticSearch,如果你已经掌握了上一篇关于Canal的教程,可以直接从环境准备中的Logstash部分开始阅读。

Java技术指南:https://java-family.cn

环境准备

工具

版本

Linux(ARM)

Windows

备注

JDK

1.8

jdk-8u371-linux-aarch64.tar.gz

jdk-8u371-windows-x64.exe

MySQL

8.0.26

mysql-community-server-8.0.26-1.el7.aarch64.rpm

mysql-installer-community-8.0.26.0.msi

Elasticsearch

7.17.11

elasticsearch-7.17.11-linux-aarch64.tar.gz

elasticsearch-7.17.11-windows-x86_64

Logstash

7.17.10

logstash-7.17.10-linux-aarch64

logstash-7.17.10-windows-x86_64

JDK

JDK

MySQL

MySQL

Elasticsearch

Windows操作系统用户,直接解压并前往bin文件夹下运行elasticsearch.bat文件即可。

解压
代码语言:javascript复制
tar -zxvf elasticsearch-7.17.11-linux-aarch64.tar.gz -C /usr/software

mv /usr/software/elasticsearch-7.17.11-linux-aarch64 /usr/software/elasticsearch-7.17.11
创建数据目录
代码语言:javascript复制
cd /usr/software/elasticsearch-7.17.11/

mkdir data
修改配置文件
代码语言:javascript复制
cd /usr/software/elasticsearch-7.17.11/

vi config/elasticsearch.yml

在配置文件中,放开相关注释,主要修改内容如下:

代码语言:javascript复制
# 集群名称
cluster.name: xxx
# 节点名称
node.name: node-1
# 数据与日志存储目录
path.data: /usr/software/elasticsearch-7.17.11/data
path.logs: /usr/software/elasticsearch-7.17.11/logs
# 任何计算机节点访问
network.host: 0.0.0.0
# 默认端口
http.port: 9200
# 初始主节点
cluster.initial_master_nodes: ["node-1"]
开放端口
代码语言:javascript复制
firewall-cmd --add-port=9300/tcp --permanent

firewall-cmd --add-port=9200/tcp --permanent

firewall-cmd --reload

systemctl restart firewalld
用户权限问题

注意:在 Linux 系统下,root 用户无法启动 Elasticsearch,所以需额外创建专属用户用来启动 Elasticsearch。关注公众号:码猿技术专栏,回复关键词:BAT,获取大厂面试真题!

代码语言:javascript复制
# 创建用户
useradd elastic

# 授权
chown -R elastic /usr/software/elasticsearch-7.17.11/
内存不足问题

注意:如果服务器内存足够大此步骤可以跳过,不然启动时会报内存不足错误!

代码语言:javascript复制
cd /usr/software/elasticsearch-7.17.11/

vi config/jvm.options
bash复制代码# 设置堆内存大小
-Xms256m
-Xmx256m
其它问题
  1. max virtual memory areas vm.max_map_count [65530] is too low, increase to at least
代码语言:javascript复制
vi /etc/sysctl.conf 

# 在末尾添加以下配置
vm.max_map_count=655360
bash
复制代码sysctl -p
  1. max file descriptors [4096] for elasticsearch process is too low
代码语言:javascript复制
vi /etc/security/limits.conf

# 在末尾添加以下配置
* soft nofile 65536
* hard nofile 131072
* soft nproc 2048
* soft nproc 4096
后台启动
代码语言:javascript复制
su elastic

cd /usr/software/elasticsearch-7.17.11/bin/

./elasticsearch -d
可视化插件

Install Multi Elasticsearch Head (opens new window) from the Chrome Web Store.

Click the extension icon in the toolbar of your web browser.

Note that you don’t need to enable CORS (opens new window) with this method.

Java技术指南:https://java-family.cn

Logstash

注意:本教程所使用IP地址为172.16.138.130,请根据实际IP进行替换!

解压
代码语言:javascript复制
tar -zxvf logstash-7.17.10-linux-aarch64.tar.gz -C /usr/software

mv /usr/software/logstash-7.17.10-linux-aarch64 /usr/software/logstash-7.17.10
创建目录
代码语言:javascript复制
cd /usr/software/logstash-7.17.10

# 此文件夹专门存放与MySQL相关管道配置文件与驱动包
mkdir mysql
下载驱动

在Maven Repository中搜索MySQL Connector Java,选择对应MySQL版本,进行下载即可。

代码语言:javascript复制
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar
管道配置

mysql文件夹下创建.conf拓展名配置文件,文件名自定义即可:

代码语言:javascript复制
cd /usr/software/logstash-7.17.10/mysql

vi jdbc.conf

配置

说明

input

指定输入数据源。支持的数据源类型,请参见Input plugins。本文使用JDBC数据源,具体参数说明请参见input参数说明。

filter

指定对输入数据进行过滤插件。支持的插件类型,请参见Filter plugins。

output

指定目标数据源类型。支持的数据源类型,请参见Output plugins。本文需要将MySQL中的数据同步至Elasticsearch中,因此output中需要指定目标Elasticsearch的信息。

以下配置按照测试数据配置,在实际业务中,请按照业务需求进行合理配置:

代码语言:javascript复制
input {
   jdbc {
      # 多表同步时,表类型区分,建议命名为“库名_表名”
      type => "mytest_user"
      # 指定JDBC连接MySQL驱动文件
      jdbc_driver_library => "/usr/software/logstash-7.17.10/mysql/mysql-connector-java-8.0.30.jar"
      # MySQL驱动
      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
      # 数据库连接信息
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
      # 数据库用户名
      jdbc_user => "root"
      # 数据库密码
      jdbc_password => "123456"
      # 是否启用分页
      jdbc_paging_enabled => "true"
      # 分页大小
      jdbc_page_size => "500"
      # 是否记录上次执行结果。如果为true,则会把上次执行到的tracking_column字段值记录下来,保存到last_run_metadata_path指定文件中。
      record_last_run => true
      # 指定最后运行时间文件存放地址
      last_run_metadata_path => "/usr/software/logstash-7.17.10/mysql/last_run_metadata_update_time.txt"
      # 指定跟踪列,该列必须是递增的,一般是MySQL主键。
      tracking_column => "update_time"
      # 是否需要记录某个column值。当该值设置成true时,系统会记录tracking_column参数所指定的列的最新的值,并在下一次管道执行时通过该列的值来判断需要更新的记录。
      use_column_value => "true"
      # 跟踪列类型,默认是numeric。
      tracking_column_type => "timestamp"
      # 同步频率
      schedule => "*/5 * * * * *"
      # 指定SQL语句
      statement => "SELECT * FROM user WHERE update_time > :sql_last_value AND update_time < NOW()"
      # 是否清除last_run_metadata_path记录,默认为false。如果为true,那么每次都要从头开始查询所有数据库记录。
      clean_run => "false"
  }
}

filter {

}

output {
   if [type] == "mytest_user" {
      elasticsearch {
         # 配置ES集群地址
         hosts => ["127.0.0.1:9200"]
         # 索引名称(必须小写)
         index => "user"
         # 用户名
         user => ""
         # 密码
         password => ""
         # 数据唯一索引(建议使用数据库主键)
         document_id => "%{id}"
      }
   }
   stdout {
      codec => json_lines
   }
}

注意:jdbc_driver_library与last_run_metadata_path需要写绝对路径,使用相对路径可能会提示无权限。

数据同步

终于到了数据同步操作环节,现在需求如下:将MySQL中user表数据同步到ES中user索引,那么就跟着我一起动手操作吧!

创建数据库及表

代码语言:javascript复制
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for user
-- ----------------------------
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
  `id` int NOT NULL AUTO_INCREMENT,
  `username` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '用户名',
  `age` int DEFAULT NULL COMMENT '年龄',
  `gender` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '性别',
  `create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间',
  `update_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

SET FOREIGN_KEY_CHECKS = 1;

启动

代码语言:javascript复制
cd /usr/software/logstash-7.17.10

bin/logstash -f mysql/jdbc.conf

注意:管道配置文件所在路径务必保证正确!

检查是否启动成功

同步测试

往MySQL数据库中user表添加一条记录,然后前往Elasticsearch可视化界面查看是否同步成功:

常见问题

删除数据

如果一条记录从MySQL中删除,该操作并不会同步到Elasticsearch中。为了实现删除同步操作,可以考虑使用软删除,即逻辑删除方式:

在MySQL数据表中添加一个is_deleted字段,用来表示记录是否有效。一旦发生更新,is_deleted也会同步更新到Elasticsearch中。使用这个办法,在执行MySQL或Elasticsearch查询时,需要重写查询语句来过滤掉is_deletedtrue的记录,从而达到软删除效果。

0 人点赞