在docker中使用`canal`同步数据到`elasticsearch`

2021-12-28 12:43:08 浏览数 (1)

准备:

  • mysql:v5.7
  • canal-server:v1.4.1
  • elasticsearch:v7.5.1

创建网络:

代码语言:javascript复制
docker network create net

创建volume

代码语言:javascript复制
docker volume create elasticsearch
docker volume create mysql

创建container

代码语言:javascript复制
#mysql
docker run -d --name mysql -p 3306:3306 --privileged=true  -v mysql:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123 --net net mysql:5.7 --log-bin=mysql-bin --binlog-format=ROW --server_id=101

#canal server
docker run -d --name canal-server --net net  -p 11111:11111 -e canal.instance.master.address=mysql:3306 
         -e canal.instance.dbUsername=root 
         -e canal.instance.dbPassword=123 
         -e canal.instance.connectionCharset=UTF-8 canal/canal-server:v1.1.4
         
#elasticsearch
docker run -d --name elasticsearch -p 9200:9200 -v elasticsearch -e "discovery.type=single-node"  --network net elasticsearch:7.5.1          

查看mysql配置是否成功:

代码语言:javascript复制
mysql> show variables like 'log_bin';
 --------------- ------- 
| Variable_name | Value |
 --------------- ------- 
| log_bin       | ON    |
 --------------- ------- 
2 rows in set (0.00 sec)

mysql> show variables like '%binlog_format';
 --------------- ------- 
| Variable_name | Value |
 --------------- ------- 
| binlog_format | ROW   |
 --------------- ------- 
1 row in set (0.00 sec)

mysql> show variables like '%server_id';
 --------------- ------- 
| Variable_name | Value |
 --------------- ------- 
| server_id     | 101   |
 --------------- ------- 
1 row in set (0.00 sec)

mysql> show master status;
 ------------------ ---------- -------------- ------------------ ------------------- 
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
 ------------------ ---------- -------------- ------------------ ------------------- 
| mysql-bin.000001 |      884 |              |                  |                   |
 ------------------ ---------- -------------- ------------------ ------------------- 
1 row in set (0.01 sec)

授权 canal 链接 MySQL 账号具有作为 MySQL slave的权限

代码语言:javascript复制
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

elasticsearch客户端依赖:

代码语言:javascript复制
  <properties>
        <elasticsearch.version>7.5.1</elasticsearch.version>
    </properties>
 <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>

实体类:

代码语言:javascript复制
   public class Person {
    private Long id;
    private String name;
   }

基本的增删改:

代码语言:javascript复制
RestHighLevelClient client;

    public SearchIndex() {
        init();
    }

    private void init() {
        client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http"));
    }

    /**
     * delete index
     * @param person
     * @throws IOException
     */
    public void deleteIdx(Person person) throws IOException {
        DeleteRequest request = new DeleteRequest(
                generateIdx(person.getClass()),
                Long.toString(person.getId()));
        DeleteResponse deleteResponse = client.delete(
                request, RequestOptions.DEFAULT);
        client.close();
        System.out.println(deleteResponse.toString());
    }


    /**
     * update index
     */
    public void updateIdx(Person person) throws IOException {
        UpdateRequest request = new UpdateRequest(
                generateIdx(person.getClass()),
                Long.toString(person.getId()));
        String jsonStr = JSON.toJSONString(person);
        request.doc(jsonStr, XContentType.JSON);
        request.timeout(TimeValue.timeValueSeconds(5));
        request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        String[] includes = new String[]{"name", "birthday",};
        String[] excludes = new String[]{"id", "gender"}; // 这里无效
        request.fetchSource(
                new FetchSourceContext(true, includes, excludes));
        UpdateResponse updateResponse = client.update(
                request, RequestOptions.DEFAULT);
        client.close();

    }


    /**
     * create index
     *
     * @throws IOException
     */
    public void createIdx(Person person) throws IOException {
        IndexRequest request = new IndexRequest(generateIdx(person.getClass()));
        request.id(Long.toString(person.getId()));

        String jsonString = JSON.toJSONString(person);
        System.out.println(jsonString);
        request.source(jsonString, XContentType.JSON);
        request.versionType(VersionType.INTERNAL);
        request.opType(DocWriteRequest.OpType.CREATE);
        request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        request.timeout(TimeValue.timeValueSeconds(5));
        client.indexAsync(request, RequestOptions.DEFAULT, new EsIndexListener(client));

    }

canal-client依赖:

代码语言:javascript复制
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>

客户端代码:

代码语言:javascript复制
public class SyncData {
    public static void main(String[] args) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost",
                11111), "topic", "", "");

        int batchSize = 1000;
        try {
            connector.connect();
            connector.subscribe(".*\..*");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                    continue;
                }

                new Thread(new DataProcessor(message.getEntries())).start();
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

        } finally {
            connector.disconnect();
        }
    }

业务处理:

代码语言:javascript复制
public class DataProcessor implements Runnable {
    private List<Entry> entries;
    private SearchIndex idx = new SearchIndex();

    public DataProcessor(List<Entry> entries) {
        this.entries = entries;
    }

    @Override
    public void run() {
        try {
            processor();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    /**
     * processor
     */
    private void processor() throws IOException {
        for (Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChange;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"   entry.toString(),
                        e);
            }

            CanalEntry.EventType eventType = rowChange.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                Person person = new Person();
                List<CanalEntry.Column> columns = null;
                switch (eventType) {
                    case DELETE:
                        columns = rowData.getBeforeColumnsList();
                        break;
                    case UPDATE:
                    case INSERT:
                        columns = rowData.getAfterColumnsList();
                        break;
                }
                columns.forEach(data -> {
                    if ("id".equals(data.getName())) {
                        person.setId(Long.parseLong(data.getValue()));
                    } else if ("name".equals(data.getName())) {
                        person.setName(data.getValue());
                    }

                });
                if(person.getId() == null){
                    continue;
                }
                switch (eventType) {
                    case DELETE:
                        idx.deleteIdx(person);
                        break;
                    case INSERT:
                        idx.createIdx(person);
                        break;
                    case UPDATE:
                        idx.updateIdx(person);
                        break;
                }
            }
        }
    }

或者可以直接使用canal-adapter

需要注意的是使用最新版本的mysql(8.x)可能会导致canal server无法启动

0 人点赞