本文将介绍如何通过Flink实现Mysql到ES的CDC近实时数据同步。
CDC是(Change Data Capture 变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、
更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
目前市面上大多数flink cdc到ES的方法都是flink sql client建源端表同步mysql表,建终端表同步关联ES索引,建立一个同步任务
代码语言:javascript复制insert into es_table select * from mysql_table;
实时地完成Mysql到ES的数据同步,依赖flink内核实现,非常简便。但如果需要在CDC过程中进行数据处理则需要手动建立CDC
1. 环境准备
Mysql 8.0
ElasticSearch 7.16
Flink 1.14.4
JDK 1.8
pom文件
代码语言:javascript复制<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.17</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId> <artifactId>flink-walkthrough-common_${scala.binary.version}
</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
2. 连接Mysql获取binlog Datastream
代码语言:javascript复制RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("es-ip", 9200, "http")));
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("ip")
.port(3306)
.databaseList("database_name")
.tableList("table_name")
.username("root")
.password("password")
.deserializer(newJsonDebeziumDeserializationSchema())
.build();
每隔三秒向mysql查询新的binlog
代码语言:javascript复制 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
DataStream<String> input = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.setParallelism(1);
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("es-ip", 9200, "http"));
3. 解析binlog-对应处理ES中数据后sink到ES
代码语言:javascript复制//创建ElasticsearchSink sink to es
ElasticsearchSink.Builder<String> esSinkBuilder =
new ElasticsearchSink.Builder<>(httpHosts,
new ElasticsearchSinkFunction<String>() {
//向ES添加数据
public IndexRequest createIndexRequest(
String index,HashMap<String,Object> map)
{
return Requests.indexRequest().index(index).source(map);
}
//向ES删除数据
public void delete(String index,String element,
String before_after) throws IOException {
System.out.println("删除该数据");
client.delete(Requests.deleteRequest(index)
.id(getID(element,before_after)),
RequestOptions.DEFAULT);
}
//根据binlog中字段,在ES中进行多重匹配查询数据ID
public String getID(String element,String before_after) throws IOException {
JSONObject object = JSON.parseObject(element);
JSONObject json_value =object.getJSONObject(before_after);
if(json_value.toString().equals("null")){
System.out.println("这是条删除binlog,数据已删除无法找到");
return "";
}
int i = 0;
Set<Map.Entry<String, Object>> entrySet = json_value.entrySet();
HashMap<String,Object> map = new HashMap<>();
//通过迭代器获取这段json当中所有的key值
for (Map.Entry<String, Object> entry : entrySet) {
map.put("field" i,entry.getKey());
map.put("value" i,entry.getValue());
i ;
}
//添加字段匹配查询
MultiSearchRequest request = new MultiSearchRequest();
SearchRequest firstSearchRequest = new SearchRequest();
for (i = 0; i < entrySet.size(); i ) {
SearchSourceBuilder searchSourceBuilder =
new SearchSourceBuilder();
//多重查询
searchSourceBuilder.query(QueryBuilders.matchQuery(map.get("field" i).toString(), map.get("value" i).toString()));
firstSearchRequest.source(searchSourceBuilder);
request.add(firstSearchRequest);
}
//在response中拿到配对数据id
MultiSearchResponse response = client.msearch
(request, RequestOptions.DEFAULT);
MultiSearchResponse.Item firstResponse = response
.getResponses()[0];
SearchResponse searchResponse=firstResponse.getResponse();
SearchHits hits = searchResponse.getHits();
return firstResponse.getResponse().toString()
.contains(""hits":[]") ? "空数据" : hits.getHits()[0].getId();
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
//对binlog进行判断,对ES数据做出"增" | "删" | "改"动作
String index = "mysql_es";
if(element.contains(""before":null")){
//解析增加数据binlog
JSONObject json_value = JSON.parseObject(element)
.getJSONObject("after");
int i = 0;
Set<Map.Entry<String, Object>> entrySet = json_value
.entrySet();
HashMap<String,Object> map = new HashMap<>();
//通过迭代器获取这段json当中所有的key value
for (Map.Entry<String, Object> entry : entrySet) {
map.put(entry.getKey(),entry.getValue());
}
indexer.add(createIndexRequest(index,map));
}else if (element.contains(""after":null")){
//解析删除数据binlog
try {
delete(index,element,"before");
} catch (IOException e) {
System.out.println("运行异常");
throw new RuntimeException(e);
}
}else if (!element.contains(""after":null") && !element.contains(""before":null)")){
try {
delete(index,element,"before");
//解析更新数据binlog
} catch (IOException e) {
throw new RuntimeException(e);
}
JSONObject json_value = JSON.parseObject(element)
.getJSONObject("after");
Set<Map.Entry<String, Object>> entrySet = json_value
.entrySet();
HashMap<String,Object> map = new HashMap<>();
//通过迭代器获取这段json当中所有的key值
for (Map.Entry<String, Object> entry : entrySet) {
map.put(entry.getKey(),entry.getValue());
}
indexer.add(createIndexRequest(index,map));
}else {
System.out.println("binlog不在判断范围内");
}
}
}
);
4. 配置ES sink
代码语言:javascript复制// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
esSinkBuilder.setBulkFlushMaxActions(1);
// provide a RestClientFactory for custom configuration on the internally created REST client
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {}
);
// finally, build and add the sink to the job's pipeline
input.addSink(esSinkBuilder.build());
env.execute();
程序局限
- 不适用mysql数据库内有相同数据场景,mysql表需要有主键
- 不支持断点进行,每次启动程序重新同步
- 未考虑字段嵌套场景(多层JSON)
我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!