k8s 安装canal 注意事项

2022-12-23 08:48:12 浏览数 (1)

1. MySQL信息

代码语言:javascript复制
vim /etc/my.cnf
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
service mysqld restart
代码语言:javascript复制
CREATE USER canal IDENTIFIED BY 'canal!3AD';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

2. canal操作

代码语言:javascript复制
# 下载
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
mkdir -p /usr/local/canal
tar -zxv -f canal.deployer-1.1.4.tar.gz -C /usr/local/canal

# 修改连接数据库的配置文件 kafka 配置
vim canal/conf/canal.properties
# tcp, kafka, RocketMQ
canal.serverMode = kafka
canal.mq.servers = 127.0.0.1:9092


# 修改连接数据库的配置文件
cd /usr/local/canal
vim conf/example/instance.properties
	## mysql serverId
	canal.instance.mysql.slaveId = 123
	#position info,需要改成自己的数据库信息
	canal.instance.master.address = 127.0.0.1:3306 
	canal.instance.master.journal.name = 
	canal.instance.master.position = 
	canal.instance.master.timestamp = 
	#canal.instance.standby.address = 
	#canal.instance.standby.journal.name =
	#canal.instance.standby.position = 
	#canal.instance.standby.timestamp = 
	#username/password,需要改成自己的数据库信息
	canal.instance.dbUsername = canal  
	canal.instance.dbPassword = canal
	canal.instance.defaultDatabaseName =
	canal.instance.connectionCharset = UTF-8
	#table regex
	canal.instance.filter.regex = .*\\..*
 
 # kafka主题
 canal.mq.topic=canal_manager


# 启动
bash bin/startup.sh
# 关闭
bash bin/stop.sh

3. canal-adapter使用操作

代码语言:javascript复制
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT 8
    default-property-inclusion: non_null

canal.conf:
  mode: kafka #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
      username: root
      password: root
  canalAdapters:
  - instance: canal_manager  # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
     - name: es7
        hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest # or rest
#          # security.auth: test:123456 #  only used for rest mode
          cluster.name: docker-cluster
#        - name: kudu
#          key: kudu
#          properties:

编写 mer_index.yml (注意:sql 语句中每张表的主键都要出现在查询中)

代码语言:javascript复制
dataSourceKey: defaultDS
destination: canal_manager
groupId: g1
esMapping:
  _index: mer_index
  _id: _id
  _type: _doc
  upsert: true
#  relations:
#    customer_order:
#      name: customer
  sql: "SELECT a.Role_No _id,a.Mer_Id merId,a.Role_Code roleCode,b.Mer_logo merlogo,b.Mer_Name merName,b.Mer_Status merStatus,c.Src_Tbl_No srcTblNo,c.Province_Code provinceCode,c.City_Code cityCode,c.County_Code countyCode,c.Addr addr,c.Latitude latitude,c.Longitude longitude FROM b_merchants_role a INNER JOIN b_merchants_info b ON a.Mer_Id=b.Mer_Id INNER JOIN s_address c ON b.Mer_Id=c.Src_Tbl_No"
  etlCondition: "where s.c_time>={}"
  commitBatch: 3000

使用startup.sh脚本启动canal-adapter服务。

4. es操作

在Elasticsearch中创建索引:

代码语言:javascript复制
PUT activy_index
{
  "mappings": {
    "properties": {
      "channelTypeCode":{
        "type":"text"
      },
      "actvyId":{
        "type":"text"
      },
      "actvyName":{
        "type":"text"
      },
      "actvyStatusCd":{
        "type":"text"
      },
      "provinceShrCd":{
        "type":"text"
      },
      "cityCd":{
        "type":"text"
      },
      "districtCd":{
        "type":"text"
      },
      "competitionOrgId":{
        "type":"text"
      },
      "enrollStartTm":{
        "type":"date",
        "format": "yyyy-MM-dd HH:mm:ss"
      },
      "enrollEndTm":{
        "type":"date",
        "format": "yyyy-MM-dd HH:mm:ss"
      },
      "actvyStartTm":{
        "type":"date",
        "format": "yyyy-MM-dd HH:mm:ss"
      },
      "actvyEndTm":{
        "type":"date",
        "format": "yyyy-MM-dd HH:mm:ss"
      },
      "addr":{
        "type":"text"
      },
      "longitude":{
        "type":"text"
      },
      "latitude":{
        "type":"text"
      },
      "actvyAdvertisePicLink":{
        "type":"text"
      }
    }
  }
}
代码语言:javascript复制
PUT info_index
{
  "mappings": {
    "properties": {
      "channelTypeCode":{
        "type":"text"
      },
      "infoId":{
        "type":"text"
      },
      "title":{
        "type":"text"
      },
      "infoTypeCd":{
        "type":"text"
      },
      "provinceShrCd":{
        "type":"text"
      },
      "cityCd":{
        "type":"text"
      },
      "picLink":{
        "type":"text"
      },
      "topFlag":{
        "type":"text",
        "fields":{
		        "keyword":{
		          "type":"keyword"
		        }
		      }
      },
      "validFlag":{
        "type":"text"
      },
      "relFlag":{
        "type":"text"
      },
      "relTm":{
        "type":"date",
        "format": "yyyy-MM-dd HH:mm:ss"

      },
      "infoTypeDesc":{
        "type":"text"
      }
    }
  }
}
代码语言:javascript复制
PUT mer_index
{
  "mappings": {
    "properties": {
      "merId":{
        "type":"text"
      },
      "roleCode":{
        "type":"text"
      },
	  "merlogo":{
		"type":"text"
	  },
      "merName":{
        "type":"text"
      },
      "merStatus":{
        "type":"text"
      },
      "provinceCode":{
        "type":"text"
      },
      "cityCode":{
        "type":"text"
      },
      "countyCode":{
        "type":"text"
      },
      "addr":{
        "type":"text"
      },
      "latitude":{
        "type":"text"
      },
      "longitude":{
        "type":"text"
      }
    }
  }
}
代码语言:javascript复制
PUT resource_index
{
  "mappings": {
    "properties": {
      "resGuid":{
        "type":"text"
      },
      "resLink":{
        "type":"text"
      },
      "validFlag":{
        "type":"text"
      }
    }
  }
}

5.数据同步:

curl -X POST http://127.0.0.1:8081/etl/es7/product.yml

0 人点赞