准备:
-
mysql:v5.7
-
canal-server:v1.4.1
-
elasticsearch:v7.5.1
创建网络:
代码语言:javascript复制docker network create net
创建volume
:
docker volume create elasticsearch
docker volume create mysql
创建container
:
#mysql
docker run -d --name mysql -p 3306:3306 --privileged=true -v mysql:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123 --net net mysql:5.7 --log-bin=mysql-bin --binlog-format=ROW --server_id=101
#canal server
docker run -d --name canal-server --net net -p 11111:11111 -e canal.instance.master.address=mysql:3306
-e canal.instance.dbUsername=root
-e canal.instance.dbPassword=123
-e canal.instance.connectionCharset=UTF-8 canal/canal-server:v1.1.4
#elasticsearch
docker run -d --name elasticsearch -p 9200:9200 -v elasticsearch -e "discovery.type=single-node" --network net elasticsearch:7.5.1
查看mysql
配置是否成功:
mysql> show variables like 'log_bin';
--------------- -------
| Variable_name | Value |
--------------- -------
| log_bin | ON |
--------------- -------
2 rows in set (0.00 sec)
mysql> show variables like '%binlog_format';
--------------- -------
| Variable_name | Value |
--------------- -------
| binlog_format | ROW |
--------------- -------
1 row in set (0.00 sec)
mysql> show variables like '%server_id';
--------------- -------
| Variable_name | Value |
--------------- -------
| server_id | 101 |
--------------- -------
1 row in set (0.00 sec)
mysql> show master status;
------------------ ---------- -------------- ------------------ -------------------
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
------------------ ---------- -------------- ------------------ -------------------
| mysql-bin.000001 | 884 | | | |
------------------ ---------- -------------- ------------------ -------------------
1 row in set (0.01 sec)
授权 canal
链接 MySQL
账号具有作为 MySQL slave
的权限
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
elasticsearch
客户端依赖:
<properties>
<elasticsearch.version>7.5.1</elasticsearch.version>
</properties>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
实体类:
代码语言:javascript复制 public class Person {
private Long id;
private String name;
}
基本的增删改:
代码语言:javascript复制RestHighLevelClient client;
public SearchIndex() {
init();
}
private void init() {
client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"));
}
/**
* delete index
* @param person
* @throws IOException
*/
public void deleteIdx(Person person) throws IOException {
DeleteRequest request = new DeleteRequest(
generateIdx(person.getClass()),
Long.toString(person.getId()));
DeleteResponse deleteResponse = client.delete(
request, RequestOptions.DEFAULT);
client.close();
System.out.println(deleteResponse.toString());
}
/**
* update index
*/
public void updateIdx(Person person) throws IOException {
UpdateRequest request = new UpdateRequest(
generateIdx(person.getClass()),
Long.toString(person.getId()));
String jsonStr = JSON.toJSONString(person);
request.doc(jsonStr, XContentType.JSON);
request.timeout(TimeValue.timeValueSeconds(5));
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
String[] includes = new String[]{"name", "birthday",};
String[] excludes = new String[]{"id", "gender"}; // 这里无效
request.fetchSource(
new FetchSourceContext(true, includes, excludes));
UpdateResponse updateResponse = client.update(
request, RequestOptions.DEFAULT);
client.close();
}
/**
* create index
*
* @throws IOException
*/
public void createIdx(Person person) throws IOException {
IndexRequest request = new IndexRequest(generateIdx(person.getClass()));
request.id(Long.toString(person.getId()));
String jsonString = JSON.toJSONString(person);
System.out.println(jsonString);
request.source(jsonString, XContentType.JSON);
request.versionType(VersionType.INTERNAL);
request.opType(DocWriteRequest.OpType.CREATE);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.timeout(TimeValue.timeValueSeconds(5));
client.indexAsync(request, RequestOptions.DEFAULT, new EsIndexListener(client));
}
canal-client依赖:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
客户端代码:
代码语言:javascript复制public class SyncData {
public static void main(String[] args) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost",
11111), "topic", "", "");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(".*\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
continue;
}
new Thread(new DataProcessor(message.getEntries())).start();
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
} finally {
connector.disconnect();
}
}
业务处理:
代码语言:javascript复制public class DataProcessor implements Runnable {
private List<Entry> entries;
private SearchIndex idx = new SearchIndex();
public DataProcessor(List<Entry> entries) {
this.entries = entries;
}
@Override
public void run() {
try {
processor();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* processor
*/
private void processor() throws IOException {
for (Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
Person person = new Person();
List<CanalEntry.Column> columns = null;
switch (eventType) {
case DELETE:
columns = rowData.getBeforeColumnsList();
break;
case UPDATE:
case INSERT:
columns = rowData.getAfterColumnsList();
break;
}
columns.forEach(data -> {
if ("id".equals(data.getName())) {
person.setId(Long.parseLong(data.getValue()));
} else if ("name".equals(data.getName())) {
person.setName(data.getValue());
}
});
if(person.getId() == null){
continue;
}
switch (eventType) {
case DELETE:
idx.deleteIdx(person);
break;
case INSERT:
idx.createIdx(person);
break;
case UPDATE:
idx.updateIdx(person);
break;
}
}
}
}
或者可以直接使用canal-adapter
需要注意的是使用最新版本的mysql(8.x)可能会导致canal server无法启动