代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.shi</groupId>
<artifactId>flinkTest</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!--引入日志管理相关依赖-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.34</version>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 设置编译版本为1.8 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
代码语言:javascript复制package com.shi.kafkaTest;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClientBuilder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* @author shiye
* @create 2023-02-20 17:06
*/
public class KafkSourceEsSinkTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>("sungrow_cdc", new SimpleStringSchema(), getKafkaPropertes()));
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("10.0.81.167", 9201, "http"));
httpHosts.add(new HttpHost("10.0.81.168", 9201, "http"));
httpHosts.add(new HttpHost("10.0.81.169", 9201, "http"));
ElasticsearchSink.Builder<String> esBuilder = new ElasticsearchSink.Builder<>(httpHosts, getEsSinkFunction());
//刷新前最大缓存的操作数。
esBuilder.setBulkFlushMaxActions(1);
//刷新前最大缓存的数据量(以兆字节为单位)。
esBuilder.setBulkFlushMaxSizeMb(100);
//刷新的时间间隔(不论缓存操作的数量或大小如何)
esBuilder.setBulkFlushInterval(10000);
//设置用户名密码
esBuilder.setRestClientFactory(getRestClientFactory());
stream.addSink(esBuilder.build());
env.execute();
}
//定义kafka配置信息
public static Properties getKafkaPropertes() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "10.0.81.158:9092,10.0.81.159:9092,10.0.81.160:9092");
properties.setProperty("group.id", "sungrow_cdc_shiye_test_group4");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "earliest");
return properties;
}
/**
* 定义ES Sink算子
*
* @return
*/
public static ElasticsearchSinkFunction<String> getEsSinkFunction() {
ElasticsearchSinkFunction<String> esSinkFunction = new ElasticsearchSinkFunction<String>() {
@Override
public void process(String event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
System.out.println("------> " event);
JSONObject jsonObject = JSONObject.parseObject(event);
// Map<String, Object> map = new HashMap<>();
// map.put("dbName", jsonObject.get("dbName"));
// map.put("tableName", jsonObject.get("tableName"));
// map.put("operation", jsonObject.get("operation"));
IndexRequest request = Requests.indexRequest()
.index("temp_cdc_test")
.source(jsonObject);
requestIndexer.add(request);
}
};
return esSinkFunction;
}
/**
* es 用户名密码
*/
public static RestClientFactory getRestClientFactory() {
RestClientFactory restClientFactory = new RestClientFactory() {
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "123456"));
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
httpAsyncClientBuilder.disableAuthCaching();
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
}
};
return restClientFactory;
}
}