Maxwell简介
Maxwell 是一个开源的实时数据捕获工具,主要用于从 MySQL 数据库中捕获数据变化并将这些变化实时推送到其他系统,比如消息队列、数据仓库等。通俗来说,Maxwell 就像一个“数据监控员”,它时刻关注数据库的变化,并将这些变化及时报告给需要的地方。
例子:
想象一下一个餐厅的点餐系统。
实时数据捕获:当顾客在餐厅点菜时,点单系统会立即记录下顾客的选择,比如点了什么菜、数量等。Maxwell 就像这个点单系统,它实时监控数据库中的变化。
数据推送:一旦顾客下单,点单信息会被迅速发送到厨房,厨师可以立即开始准备食物。这个过程就像 Maxwell 将数据变化推送到其他系统,确保信息及时传递。
简化数据集成:如果餐厅还使用外部的配送服务,点单信息可以直接发送给配送系统,让外卖员及时了解订单情况。这就类似于 Maxwell 将数据变化推送到不同的系统,简化了信息流转的过程。
Maxwell输出数据格式
参数解释
Maxwell原理
Maxwell的工作原理是实时读取MySQL数据库的二进制日志(Binlog),从中获取变更数据,再将变更数据以JSON格式发送至Kafka等流处理平台。其实就是Maxwell将自己伪装成slave,并遵循MySQL主从复制的协议,从master同步数据。
MySQL二进制日志
二进制日志(Binlog)是MySQL服务端非常重要的一种日志,它会保存MySQL数据库的所有数据变更记录。Binlog的主要作用包括主从复制和数据恢复。Maxwell的工作原理和主从复制密切相关。
MySQL主从复制
MySQL的主从复制,就是用来建立一个和主数据库完全一样的数据库环境,这个数据库称为从数据库。
<1> MySQL主从复制应用场景
代码语言:txt复制1)主数据库的备用:主数据库服务器故障后,可切换到从数据库继续工作。
2)读写分离:主数据库只负责业务数据的写入操作,而多个从数据库只负责业务数据的查询工作,在读多写少场景下,可以提高数据库工作效率。
<2>MySQL主从复制原理
代码语言:txt复制类似一个图书馆和一个分馆的关系。
1)主馆记录变化:主图书馆(Master)购入了新书,并将这些变更记录在一个清单上(相当于二进制日志)。
2)分馆请求更新:分馆(Slave)定期向主馆询问新书的清单,以便获取最新的书籍信息。这个过程中,分馆把收到的新书信息放到自己的临时清单上(相当于中继日志)。
3)应用变化:分馆根据这个临时清单将新书上架,以确保分馆的藏书与主馆的一样。
具体情况如下:
代码语言:txt复制1)主库记录变化:当主数据库(Master)上发生数据变化(比如新增、更新或删除数据)时,这些变化会被记录到一个特殊的日志文件中,叫做二进制日志(binary log)。
2)从库请求数据:从数据库(Slave)会向主数据库请求这些日志数据,获取记录的数据变化。这些变化会被存储到从库的一个临时日志中,叫做中继日志(relay log)。
3)应用数据变化:接下来,从库会读取中继日志中的记录,并将这些变化应用到自己的数据库中,以确保从库的数据与主库保持一致。
Maxwell部署
下载安装包: https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz
解压缩重命名
代码语言:shell复制cd /opt/module
tar -zxvf maxwell-1.29.2.tar.gz
mv maxwell-1.29.2/ maxwell
MySQL开启Binlog
代码语言:shell复制# MySQL服务器的Binlog默认是未开启的,如需进行同步,需要先进行开启
sudo vim /etc/my.cnf
#数据库id
server-id = 1
#启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
#binlog类型,maxwell要求为row类型
binlog_format=row
#启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=gmall
MySQL Binlog模式
代码语言:txt复制Statement-based:
基于语句,Binlog会记录所有写操作的SQL语句,包括insert、update、delete等。
优点:节省空间
缺点:有可能造成数据不一致,例如insert语句中包含now()函数。
Row-based:
基于行,Binlog会记录每次写操作后被操作行记录的变化。
优点:保持数据的绝对一致性。
缺点:占用较大空间。
mixed:
混合模式,默认是Statement-based,如果SQL语句可能导致数据不一致,就自动切换到Row-based。
初始化MySQL的用户Maxwell
代码语言:shell复制sudo systemctl restart mysqld
mysql -uroot -p000000
# 创建数据库
msyql> CREATE DATABASE maxwell;
# 创建Maxwell用户并赋予其必要权限
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
配置Maxwell
代码语言:shell复制# 修改Maxwell配置文件名称
cd /opt/module/maxwell
cp config.properties.example config.properties
# 修改Maxwell配置文件
vim config.properties
#Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
# 目标Kafka集群地址
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
# 目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=topic_db
# MySQL相关配置
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
# 过滤gmall中的z_log表数据,该表是日志数据的备份,无须采集
filter=exclude:gmall.z_log
# 指定数据按照主键分组进入Kafka不同分区,避免数据倾斜
producer_partition_by=primary_key
Maxwell启停脚本
/root/bin目录下创建
代码语言:shell复制vim mxw.sh
#!/bin/bash
MAXWELL_HOME=/opt/module/maxwell
status_maxwell(){
result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
return $result
}
start_maxwell(){
status_maxwell
if [[ $? -lt 1 ]]; then
echo "启动Maxwell"
$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon
else
echo "Maxwell正在运行"
fi
}
stop_maxwell(){
status_maxwell
if [[ $? -gt 0 ]]; then
echo "停止Maxwell"
ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
else
echo "Maxwell未在运行"
fi
}
case $1 in
start )
start_maxwell
;;
stop )
stop_maxwell
;;
restart )
stop_maxwell
start_maxwell
;;
esac
启停命令
代码语言:shell复制mxw.sh start
mxw.sh stop
启动成功会有个Maxwell进程