Mysql To ES By Flink-CDC

2023-11-27 16:59:31 浏览数 (3)

本文将介绍如何通过Flink实现Mysql到ES的CDC近实时数据同步。

CDC是(Change Data Capture 变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、

更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

目前市面上大多数flink cdc到ES的方法都是flink sql client建源端表同步mysql表,建终端表同步关联ES索引,建立一个同步任务

代码语言:javascript复制
insert into es_table select * from mysql_table;

实时地完成Mysql到ES的数据同步,依赖flink内核实现,非常简便。但如果需要在CDC过程中进行数据处理则需要手动建立CDC

1. 环境准备

Mysql 8.0

ElasticSearch 7.16

Flink 1.14.4

JDK 1.8

pom文件

代码语言:javascript复制
<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7</artifactId>
            <version>1.15.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.17</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.27</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>   <artifactId>flink-walkthrough-common_${scala.binary.version}
            </artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
2. 连接Mysql获取binlog Datastream
代码语言:javascript复制
RestHighLevelClient  client = new RestHighLevelClient(RestClient.builder(new HttpHost("es-ip", 9200, "http")));
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("ip")
                .port(3306)
                .databaseList("database_name")
                .tableList("table_name")
                .username("root")
                .password("password")
             .deserializer(newJsonDebeziumDeserializationSchema())
                .build();

每隔三秒向mysql查询新的binlog

代码语言:javascript复制
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(3000);
        DataStream<String> input = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(1);
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("es-ip", 9200, "http"));
3. 解析binlog-对应处理ES中数据后sink到ES
代码语言:javascript复制
//创建ElasticsearchSink sink to es
        ElasticsearchSink.Builder<String> esSinkBuilder = 
        new ElasticsearchSink.Builder<>(httpHosts,
        new ElasticsearchSinkFunction<String>() {
        //向ES添加数据
        public IndexRequest createIndexRequest(
        String index,HashMap<String,Object> map) 
       {
       return Requests.indexRequest().index(index).source(map);
       }
        //向ES删除数据
       public void delete(String index,String element,
       String before_after) throws IOException {
       System.out.println("删除该数据");
       client.delete(Requests.deleteRequest(index)
       .id(getID(element,before_after)),
       RequestOptions.DEFAULT);
       }
        //根据binlog中字段,在ES中进行多重匹配查询数据ID
        public String getID(String element,String before_after) throws IOException {
        JSONObject object = JSON.parseObject(element);
        JSONObject json_value =object.getJSONObject(before_after);
        if(json_value.toString().equals("null")){
           System.out.println("这是条删除binlog,数据已删除无法找到");
              return "";
           }
        int i = 0;
Set<Map.Entry<String, Object>> entrySet = json_value.entrySet();

        HashMap<String,Object> map = new HashMap<>();
        //通过迭代器获取这段json当中所有的key值
        for (Map.Entry<String, Object> entry : entrySet) {
            map.put("field" i,entry.getKey());
            map.put("value" i,entry.getValue());
            i  ;
            }
        //添加字段匹配查询
        MultiSearchRequest request = new MultiSearchRequest();
        SearchRequest firstSearchRequest = new SearchRequest();
        for (i = 0; i < entrySet.size(); i  ) {
            SearchSourceBuilder searchSourceBuilder = 
            new SearchSourceBuilder();
        //多重查询
searchSourceBuilder.query(QueryBuilders.matchQuery(map.get("field" i).toString(), map.get("value" i).toString()));
                              firstSearchRequest.source(searchSourceBuilder);
        request.add(firstSearchRequest);
        }
        //在response中拿到配对数据id
        MultiSearchResponse response = client.msearch
        (request, RequestOptions.DEFAULT);
        MultiSearchResponse.Item firstResponse = response
        .getResponses()[0];
        SearchResponse searchResponse=firstResponse.getResponse();
        SearchHits hits = searchResponse.getHits();
        return firstResponse.getResponse().toString()
        .contains(""hits":[]") ? "空数据" : hits.getHits()[0].getId();
        }
        @Override
        public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        //对binlog进行判断,对ES数据做出"增" | "删" | "改"动作
        String index = "mysql_es";
        if(element.contains(""before":null")){   
        //解析增加数据binlog
        JSONObject json_value = JSON.parseObject(element)
        .getJSONObject("after");
        int i = 0;
        Set<Map.Entry<String, Object>> entrySet = json_value
        .entrySet();
        HashMap<String,Object> map = new HashMap<>();
        //通过迭代器获取这段json当中所有的key value
        for (Map.Entry<String, Object> entry : entrySet) {
                map.put(entry.getKey(),entry.getValue());
            }
                                      indexer.add(createIndexRequest(index,map));
         }else if (element.contains(""after":null")){         
         //解析删除数据binlog
         try {
             delete(index,element,"before");
            } catch (IOException e) {
             System.out.println("运行异常");
             throw new RuntimeException(e);
            }
          }else if (!element.contains(""after":null") && !element.contains(""before":null)")){
          try {
              delete(index,element,"before");  
              //解析更新数据binlog
             } catch (IOException e) {
               throw new RuntimeException(e);
            }

           JSONObject json_value = JSON.parseObject(element)
           .getJSONObject("after");

           Set<Map.Entry<String, Object>> entrySet = json_value
           .entrySet();

           HashMap<String,Object> map = new HashMap<>();
            //通过迭代器获取这段json当中所有的key值
           for (Map.Entry<String, Object> entry : entrySet) {
                  map.put(entry.getKey(),entry.getValue());
               }
                            indexer.add(createIndexRequest(index,map));
           }else {
           System.out.println("binlog不在判断范围内");
             }
            }
          }
        );
4. 配置ES sink
代码语言:javascript复制
// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
        esSinkBuilder.setBulkFlushMaxActions(1);

// provide a RestClientFactory for custom configuration on the internally created REST client
        esSinkBuilder.setRestClientFactory(
                restClientBuilder -> {}
        );

// finally, build and add the sink to the job's pipeline
        input.addSink(esSinkBuilder.build());

        env.execute();
程序局限
  1. 不适用mysql数据库内有相同数据场景,mysql表需要有主键
  2. 不支持断点进行,每次启动程序重新同步
  3. 未考虑字段嵌套场景(多层JSON)

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

0 人点赞