1. MySQL信息
代码语言:javascript复制vim /etc/my.cnf
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
service mysqld restart
代码语言:javascript复制CREATE USER canal IDENTIFIED BY 'canal!3AD';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
2. canal操作
代码语言:javascript复制# 下载
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
mkdir -p /usr/local/canal
tar -zxv -f canal.deployer-1.1.4.tar.gz -C /usr/local/canal
# 修改连接数据库的配置文件 kafka 配置
vim canal/conf/canal.properties
# tcp, kafka, RocketMQ
canal.serverMode = kafka
canal.mq.servers = 127.0.0.1:9092
# 修改连接数据库的配置文件
cd /usr/local/canal
vim conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 123
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .*\\..*
# kafka主题
canal.mq.topic=canal_manager
# 启动
bash bin/startup.sh
# 关闭
bash bin/stop.sh
3. canal-adapter使用操作
代码语言:javascript复制server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT 8
default-property-inclusion: non_null
canal.conf:
mode: kafka #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
username: root
password: root
canalAdapters:
- instance: canal_manager # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
- name: es7
hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
# # security.auth: test:123456 # only used for rest mode
cluster.name: docker-cluster
# - name: kudu
# key: kudu
# properties:
编写 mer_index.yml (注意:sql 语句中每张表的主键都要出现在查询中)
代码语言:javascript复制dataSourceKey: defaultDS
destination: canal_manager
groupId: g1
esMapping:
_index: mer_index
_id: _id
_type: _doc
upsert: true
# relations:
# customer_order:
# name: customer
sql: "SELECT a.Role_No _id,a.Mer_Id merId,a.Role_Code roleCode,b.Mer_logo merlogo,b.Mer_Name merName,b.Mer_Status merStatus,c.Src_Tbl_No srcTblNo,c.Province_Code provinceCode,c.City_Code cityCode,c.County_Code countyCode,c.Addr addr,c.Latitude latitude,c.Longitude longitude FROM b_merchants_role a INNER JOIN b_merchants_info b ON a.Mer_Id=b.Mer_Id INNER JOIN s_address c ON b.Mer_Id=c.Src_Tbl_No"
etlCondition: "where s.c_time>={}"
commitBatch: 3000
使用startup.sh脚本启动canal-adapter服务。
4. es操作
在Elasticsearch中创建索引:
代码语言:javascript复制PUT activy_index
{
"mappings": {
"properties": {
"channelTypeCode":{
"type":"text"
},
"actvyId":{
"type":"text"
},
"actvyName":{
"type":"text"
},
"actvyStatusCd":{
"type":"text"
},
"provinceShrCd":{
"type":"text"
},
"cityCd":{
"type":"text"
},
"districtCd":{
"type":"text"
},
"competitionOrgId":{
"type":"text"
},
"enrollStartTm":{
"type":"date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"enrollEndTm":{
"type":"date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"actvyStartTm":{
"type":"date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"actvyEndTm":{
"type":"date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"addr":{
"type":"text"
},
"longitude":{
"type":"text"
},
"latitude":{
"type":"text"
},
"actvyAdvertisePicLink":{
"type":"text"
}
}
}
}
代码语言:javascript复制PUT info_index
{
"mappings": {
"properties": {
"channelTypeCode":{
"type":"text"
},
"infoId":{
"type":"text"
},
"title":{
"type":"text"
},
"infoTypeCd":{
"type":"text"
},
"provinceShrCd":{
"type":"text"
},
"cityCd":{
"type":"text"
},
"picLink":{
"type":"text"
},
"topFlag":{
"type":"text",
"fields":{
"keyword":{
"type":"keyword"
}
}
},
"validFlag":{
"type":"text"
},
"relFlag":{
"type":"text"
},
"relTm":{
"type":"date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"infoTypeDesc":{
"type":"text"
}
}
}
}
代码语言:javascript复制PUT mer_index
{
"mappings": {
"properties": {
"merId":{
"type":"text"
},
"roleCode":{
"type":"text"
},
"merlogo":{
"type":"text"
},
"merName":{
"type":"text"
},
"merStatus":{
"type":"text"
},
"provinceCode":{
"type":"text"
},
"cityCode":{
"type":"text"
},
"countyCode":{
"type":"text"
},
"addr":{
"type":"text"
},
"latitude":{
"type":"text"
},
"longitude":{
"type":"text"
}
}
}
}
代码语言:javascript复制PUT resource_index
{
"mappings": {
"properties": {
"resGuid":{
"type":"text"
},
"resLink":{
"type":"text"
},
"validFlag":{
"type":"text"
}
}
}
}
5.数据同步:
curl -X POST http://127.0.0.1:8081/etl/es7/product.yml