前段时间遇到这么个事
产品和我说项目里的人员搜索功能他看着有点不对劲,说罢,用需要加需求的眼神看着我...
哥们虎躯一震...
搞了半天,原来是因为他在人员搜索框内填写部门名称,结果搜索器觉着他是傻子,没理他,所以他边哭喊着爸爸,边来找我
我说那地方本来就是填人名的呀,正常人都填人名...
他回复道,钉钉有啊,钉钉有的我们也要有,如果我们没有,那就是对像他这种有追求的产品职业操守上的侮辱...
我像看弱智一样看了他一会,然后道,“好吧 就为了你的节操...”
后来我查了下源码和数据发现,搜索的数据存储在ES里,但ES里的部门数据只有ID没有名称,所以面临的问题可能需要同步数据,从mysql同步到ES
可是问题是,如果只是同步数据的话还简单,但是同步数据之后需要维护的事情还有很多
就比如,如果当前人换部门了或者部门名称变了,在变更这些数据的时候同时都要考虑ES要怎么更新,这样会越来越繁琐
万一之后的需求有一项极复杂的骚修改...那真是恶心它儿子回家
这倒让我想起了之前的一件恶心事,也是和ES数据同步有关
倒不是问题有多恶心,而是没太看懂前人的代码,他的代码里没有ES的数据更新,但是在修改完mysql数据后,神奇的事情却发生了
我反复走了好几遍代码,拦截器也翻看了好几遍,都没有操作ES的地方,这确实让我挺懵的...
好巧不巧,那二货中午的时候给我发过来一个并夕夕友尽链接,以此为要挟下才只知道原来果然是魔法!
用的是阿里的一个中间件canal,功能确实比较神奇,它会伪装成mysql集群里的一个子节点,当主节点向子节点同步binlog日志的时候,canal可以解析binlog日志,然后发送一条消息到消息队列来同步es数据
具体来说呢,如此操作
Mysql
首先要有个mysql服务器,肯定有集群才有master和slave
然后在MySQL中需要创建一个用户,并授权
代码语言:javascript复制// 使用命令登录:mysql -u root -p
// 创建用户 用户名:canal 密码:Canal@123456
create user 'canal'@'%' identified by 'Canal@123456';
// 授权 *.*表示所有库
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'Canal@123456';
下一步在MySQL配置文件my.cnf设置如下信息
代码语言:javascript复制[mysqld]
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1
改了配置文件之后,重启MySQL,使用命令查看是否打开binlog模式:
代码语言:javascript复制show variables like 'log_bin'
//查看bin日志文件列表
show binary logs
//查看当前正在写入的binlog文件
show master status
canal
去官网下载页面进行下载:https://github.com/alibaba/canal/releases
解压canal.deployer-1.1.4.tar.gz,我们可以看到里面有四个文件夹 bin conf lib logs
在bin目录下找到startup.bat启动就可以了
java客户端操作
引入maven依赖
代码语言:javascript复制<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
在CannalClient类使用Spring Bean的生命周期函数afterPropertiesSet():
代码语言:javascript复制@Component
public class CannalClient implements InitializingBean {
private final static int BATCH_SIZE = 1000;
@Override
public void afterPropertiesSet() throws Exception {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
try {
//打开链接
connector.connect();
//订阅全部表
connector.subscribe(".*\..*");
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(BATCH_SIZE);
//获取批量ID
long batchId = message.getId();
//获取批量的数量
int size = message.getEntries().size();
//如果没有数据
if (batchId == -1 || size == 0) {
try {
//线程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//如果有数据,处理数据
printEntry(message.getEntries());
}
//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
/**
* 打印canal server解析binlog获得的实体类信息
*/
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
//开启/关闭事务的实体类型,跳过
continue;
}
//RowChange对象,包含了一行数据变化的所有特征
//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
RowChange rowChage;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" entry.toString(), e);
}
//获取操作类型:insert/update/delete类型
EventType eventType = rowChage.getEventType();
//打印Header信息
System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
//判断是否是DDL语句
if (rowChage.getIsDdl()) {
System.out.println("================》;isDdl: true,sql:" rowChage.getSql());
}
//获取RowChange对象里的每一行数据,打印出来
for (RowData rowData : rowChage.getRowDatasList()) {
//删除语句
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
//新增语句
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
//更新的语句
} else {
//变更前的数据
System.out.println("------->; before");
printColumn(rowData.getBeforeColumnsList());
//变更后的数据
System.out.println("------->; after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() " : " column.getValue() " update=" column.getUpdated());
}
}
}
这样执行就可以了!
canal的好处在于对业务代码没有侵入,因为是基于监听binlog日志去进行同步数据的。实时性也能做到准实时,是很多企业一种比较常见的数据同步的方案
以上