Spark读写ES最佳实践

2023-11-14 19:31:44 浏览数 (1)

本文介绍了Spark local模式下读写ES的2种方式

Spark RDD读写ES

Spark Streaming写入ES

环境准备

Elaticsearch-7.14.2

Spark-3.2.1

jdk-1.8

maven依赖

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.qcloud.abi</groupId>
    <artifactId>esspark</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
                <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.14.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.12.8</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.12.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-30_2.12</artifactId>
            <version>7.14.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-network-common_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <!--指定入口文件的位置-->
                            <mainClass>com.xx.TestMain</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>
                            jar-with-dependencies
                        </descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Spark RDD读ES

代码语言:javascript复制
public class ReadES {
    public static void main(String[] args) {
        SparkConf  conf = new SparkConf().setAppName("readEs").setMaster("local[2]")
                .set("es.nodes", "https://es-jnycbqnd.public.tencentelasticsearch.com")
                .set("es.port", "9200")
                .set("es.net.http.auth.user", "elastic")
                .set("es.net.http.auth.pass", "passwd")
                .set("es.nodes.wan.only", "true")
                .set("es.nodes.discovery","false")
                .set("es.input.use.sliced.partitions","false")
                .set("es.resource", "spark_write")
                .set("es.scroll.size","500");

        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc);

        for ( Map<String, Object> item : rdd.values().collect()) {
            System.out.println(item);
        }

        sc.stop();
    }

}

Spark读写ES还支持JSON格式

代码语言:javascript复制
//直接读
JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc);
//ES嵌套数据格式
{test={data=39.0, feature1=1.39, feature2=0.78, feature3=-0.83}}

//选择JSON格式
JavaPairRDD<String, String> rdd = JavaEsSpark.esJsonRDD(sc);
//JSON数据格式
{"test":{"data":50.0,"feature1":1.5,"feature2":1.0,"feature3":-0.5}}

Spark RDD写ES

代码语言:javascript复制
public class SparkWriteEs {
    public static void main(String[] args) {
        //RDD方式写数据到ES
        SparkConf conf = new SparkConf().setAppName("my-app").setMaster("local[2]")
                .set("es.nodes", "https://es-jnycbqnd.public.tencentelasticsearch.com")
                .set("es.port", "9200")
                .set("es.net.http.auth.user", "elastic")
                .set("es.net.http.auth.pass", "passwd")
                .set("es.nodes.wan.only", "true")
                .set("es.resource", "spark_write/_doc")
                .set("es.nodes.discovery","false")
                .set("es.input.use.sliced.partitions","false")
                .set("es.scroll.size","500");

        JavaSparkContext sc = new JavaSparkContext(conf);

        Map<String, ?> logs = ImmutableMap.of("yesyes", "255.255.255.254",
                "request", "POST /write/using_spark_rdd HTTP/1.1",
                "status", 200,"size", 802,
                "@timestamp", 895435190);

        List<Map<String, ?>> list = ImmutableList.of(logs);

        JavaRDD<Map<String, ?>> javaRDD = sc.parallelize(list);

        JavaEsSpark.saveToEs(javaRDD, "spark_write/_doc");

        sc.stop();
    }
}

Spark Streaming消费kafka数据写入ES

代码语言:javascript复制
public class RealTime_Data {
    public static void main(String[] args) throws Exception {
        	 String master = "local[2]";

        SparkConf conf = new SparkConf().setMaster(master).setAppName("StreamingTest")
                .set("spark.es.nodes", "43.139.24.126")//指定es地址
                .set("spark.es.port", "9200")
                .set("spark.es.nodes.wan.only","true");//指定es端口
        //指定5秒获取一次kafka数据
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
        jssc.sparkContext().setLogLevel("WARN");
        String brokers = "43.139.24.126:9092";
        String groupId = "kafka";//消费者组id
        String topics = "test";//topic
        Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        //取出1秒内的数据转成rddstream
        JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(jssc,
                LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

        //取出每条message中的value
        JavaDStream<String> lines = messages.map(record -> record.value());

        //拼成可以插入Elasticsearch的格式
        JavaDStream<String> out = lines.map(str -> "{"test":" str "}");

        //打印
        out.print();

        //写入Elasticsearch
        JavaEsSparkStreaming.saveJsonToEs(out, "/spark/doc");

        //启动streaming
        jssc.start();

        // 等待生产者发送数据
        jssc.awaitTermination();
        jssc.stop();
    }
}

也可以直接写入ES或者带上指定了数据结构的Map<String,String>

代码语言:javascript复制
JavaEsSparkStreaming.saveToEs(JavaDStream , "<resource>");
JavaEsSparkStreaming.saveToEsWithMeta(JavaDStream, "spark/docs", Map<String,String>());

- 参数说明

参数

说明

es.nodes

Elasticsearch访问地址

es.port

ES访问端口号9200

es.net.http.auth.user

ES用户名

es.net.http.auth.pass

ES用户密码

es.nodes.wan.only

是否进行节点嗅探

es.nodes.discovery

是否禁用节点发现

es.index.auto.create

自动创建index开关

es.resource

指定要读写的index和type

es.mapping.names

表字段与Elasticsearch的索引字段名映射

es.input.use.sliced.partitions

是否开启slice分区

本地运行

打包

更换代码中公网ip为内网ip,选择maven assembly plugin进行打包,上传带依赖的jar包到EMR上,运行"ReadES"

代码语言:javascript复制
su - hadoop
cd /usr/local/service/spark

./bin/spark-submit  --master yarn --executor-cores 1 --class "ReadES"  /home/hadoop/esspark-1.0-SNAPSHOT-jar-with-dependencies.jar

运行"SparkWriteEs"

代码语言:javascript复制
./bin/spark-submit  --master yarn --executor-cores 1 --class "SparkWriteEs"  /home/hadoop/esspark-1.0-SNAPSHOT-jar-with-dependencies.jar

kibana上查询数据

代码语言:javascript复制
GET SparkWriteEs/_search

问题总结

1. 打包项目后上传运行报错找不到类

代码语言:javascript复制
Exception in thread "main" java.lang.NoClassDefFoundError: org/elasticsearch/spark/rdd/api/java/JavaEsSpark...

分析

显示缺少ESspark依赖,说明是因为打包没有带上依赖导致代码运行错误

解决方式

使用assembly打包,上传带依赖jar包

2. 客户端直接访问发生连接问题

代码语言:javascript复制
Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'...

分析:

ES公网地址可直接访问,用户名密码参数有填写,'es.nodes.wan.only'参数填写没问题还是出现了与ES的连接问题,索引都没有创建,说明参数配置或者依赖包版本可能存在问题。

解决方式

ES.resource参数配置问题,未填写type 正确示例: "spark_write/_doc"

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

0 人点赞