本文介绍了Spark local模式下读写ES的2种方式
Spark RDD读写ES
Spark Streaming写入ES
环境准备
Elaticsearch-7.14.2
Spark-3.2.1
jdk-1.8
maven依赖
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qcloud.abi</groupId>
<artifactId>esspark</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.12.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-30_2.12</artifactId>
<version>7.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_2.11</artifactId>
<version>2.4.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<!--指定入口文件的位置-->
<mainClass>com.xx.TestMain</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>
jar-with-dependencies
</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Spark RDD读ES
代码语言:javascript复制public class ReadES {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("readEs").setMaster("local[2]")
.set("es.nodes", "https://es-jnycbqnd.public.tencentelasticsearch.com")
.set("es.port", "9200")
.set("es.net.http.auth.user", "elastic")
.set("es.net.http.auth.pass", "passwd")
.set("es.nodes.wan.only", "true")
.set("es.nodes.discovery","false")
.set("es.input.use.sliced.partitions","false")
.set("es.resource", "spark_write")
.set("es.scroll.size","500");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc);
for ( Map<String, Object> item : rdd.values().collect()) {
System.out.println(item);
}
sc.stop();
}
}
Spark读写ES还支持JSON格式
代码语言:javascript复制//直接读
JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc);
//ES嵌套数据格式
{test={data=39.0, feature1=1.39, feature2=0.78, feature3=-0.83}}
//选择JSON格式
JavaPairRDD<String, String> rdd = JavaEsSpark.esJsonRDD(sc);
//JSON数据格式
{"test":{"data":50.0,"feature1":1.5,"feature2":1.0,"feature3":-0.5}}
Spark RDD写ES
代码语言:javascript复制public class SparkWriteEs {
public static void main(String[] args) {
//RDD方式写数据到ES
SparkConf conf = new SparkConf().setAppName("my-app").setMaster("local[2]")
.set("es.nodes", "https://es-jnycbqnd.public.tencentelasticsearch.com")
.set("es.port", "9200")
.set("es.net.http.auth.user", "elastic")
.set("es.net.http.auth.pass", "passwd")
.set("es.nodes.wan.only", "true")
.set("es.resource", "spark_write/_doc")
.set("es.nodes.discovery","false")
.set("es.input.use.sliced.partitions","false")
.set("es.scroll.size","500");
JavaSparkContext sc = new JavaSparkContext(conf);
Map<String, ?> logs = ImmutableMap.of("yesyes", "255.255.255.254",
"request", "POST /write/using_spark_rdd HTTP/1.1",
"status", 200,"size", 802,
"@timestamp", 895435190);
List<Map<String, ?>> list = ImmutableList.of(logs);
JavaRDD<Map<String, ?>> javaRDD = sc.parallelize(list);
JavaEsSpark.saveToEs(javaRDD, "spark_write/_doc");
sc.stop();
}
}
Spark Streaming消费kafka数据写入ES
代码语言:javascript复制public class RealTime_Data {
public static void main(String[] args) throws Exception {
String master = "local[2]";
SparkConf conf = new SparkConf().setMaster(master).setAppName("StreamingTest")
.set("spark.es.nodes", "43.139.24.126")//指定es地址
.set("spark.es.port", "9200")
.set("spark.es.nodes.wan.only","true");//指定es端口
//指定5秒获取一次kafka数据
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
jssc.sparkContext().setLogLevel("WARN");
String brokers = "43.139.24.126:9092";
String groupId = "kafka";//消费者组id
String topics = "test";//topic
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//取出1秒内的数据转成rddstream
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
//取出每条message中的value
JavaDStream<String> lines = messages.map(record -> record.value());
//拼成可以插入Elasticsearch的格式
JavaDStream<String> out = lines.map(str -> "{"test":" str "}");
//打印
out.print();
//写入Elasticsearch
JavaEsSparkStreaming.saveJsonToEs(out, "/spark/doc");
//启动streaming
jssc.start();
// 等待生产者发送数据
jssc.awaitTermination();
jssc.stop();
}
}
也可以直接写入ES或者带上指定了数据结构的Map<String,String>
代码语言:javascript复制JavaEsSparkStreaming.saveToEs(JavaDStream , "<resource>");
JavaEsSparkStreaming.saveToEsWithMeta(JavaDStream, "spark/docs", Map<String,String>());
- 参数说明
参数 | 说明 |
---|---|
es.nodes | Elasticsearch访问地址 |
es.port | ES访问端口号9200 |
es.net.http.auth.user | ES用户名 |
es.net.http.auth.pass | ES用户密码 |
es.nodes.wan.only | 是否进行节点嗅探 |
es.nodes.discovery | 是否禁用节点发现 |
es.index.auto.create | 自动创建index开关 |
es.resource | 指定要读写的index和type |
es.mapping.names | 表字段与Elasticsearch的索引字段名映射 |
es.input.use.sliced.partitions | 是否开启slice分区 |
本地运行
打包
更换代码中公网ip为内网ip,选择maven assembly plugin进行打包,上传带依赖的jar包到EMR上,运行"ReadES"
代码语言:javascript复制su - hadoop
cd /usr/local/service/spark
./bin/spark-submit --master yarn --executor-cores 1 --class "ReadES" /home/hadoop/esspark-1.0-SNAPSHOT-jar-with-dependencies.jar
运行"SparkWriteEs"
代码语言:javascript复制./bin/spark-submit --master yarn --executor-cores 1 --class "SparkWriteEs" /home/hadoop/esspark-1.0-SNAPSHOT-jar-with-dependencies.jar
kibana上查询数据
代码语言:javascript复制GET SparkWriteEs/_search
问题总结
1. 打包项目后上传运行报错找不到类
代码语言:javascript复制Exception in thread "main" java.lang.NoClassDefFoundError: org/elasticsearch/spark/rdd/api/java/JavaEsSpark...
分析
显示缺少ESspark依赖,说明是因为打包没有带上依赖导致代码运行错误
解决方式
使用assembly打包,上传带依赖jar包
2. 客户端直接访问发生连接问题
代码语言:javascript复制Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'...
分析:
ES公网地址可直接访问,用户名密码参数有填写,'es.nodes.wan.only'参数填写没问题还是出现了与ES的连接问题,索引都没有创建,说明参数配置或者依赖包版本可能存在问题。
解决方式
ES.resource参数配置问题,未填写type 正确示例: "spark_write/_doc"
我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!