用java程序完成从kafka队列读取消息到sparkstreaming再从sparkstreaming里把数据导入mysql中

2020-11-26 16:39:53 浏览数 (1)

有一段时间没好好写博客了,因为一直在做一个比较小型的工程项目,也常常用在企业里,就是将流式数据处理收集,再将这些流式数据进行一些计算以后再保存在mysql上,这是一套比较完整的流程,并且可以从数据库中的数据再导入到hadoop上,再在hadoop上进行离线较慢的mapreduce计算,这是我后面要进行的项目。

项目准备环境

(1)zookeeper:

(2)spark

(3)kafka

(4)mysql

(5)navicat

(6)三台虚拟机

(7)jdk

(8)intellij IDEA

(9)虚拟机vmware

虚拟机分别配置

虚拟机

安装环境

node01

kafka zookeeper jdk 192.168.19.110

node02

kafka zookeeper jdk spark 192.168.19.111

node03

kafka zookeeper jdk mysql 192.168.19.112

具体的虚拟机的细节配置就不多说了,肯定是要关闭防火墙的。

开始实行

(1)分别在三台主机上开启zookeeper(zookeeper的集群配置可以看我这篇博客zookeeper的安装和使用)

(2)分别在三台主机上开启kafka

(3)开启产生消息队列命令(前提创建好topic:spark(我这里是spark话题))

(4)在node3上开启mysql

在mysql地下创建bigdata数据库,进入数据库后新建wordcount表,创建相应字段即可

(5)将写好的代码打成jar包:

写代码时是要写scala语言,所以要加载好相应的插件:

代码语言:javascript复制
package com.gzq.spark

import java.sql.DriverManager
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level,Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}




/**
 * @Auther: gzq
 * @Date: 2020/11/23 - 11 - 23 - 22:37 
 * @Description:
 */
object Sparkstream_kafka202020 {
  Logger.getLogger("org").setLevel(Level.WARN)
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark_stream")

    val ssc : StreamingContext = new StreamingContext(conf,Seconds(3))

    val kafkaParams: Map[String,Object] =  Map[String,Object](

      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node01:9092,node02:9092,node03:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "spark2020",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )

    val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Set("spark"), kafkaParams)
    )
    kafkaDStream
      .map(_.value())
      .flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_ _)
      .print()
    kafkaDStream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
        val connection = createConnection()
        partitionOfRecords.foreach(record => {
        System.out.println(record)
        // wordcount里的record.value()一定要加双引号这样才能是字符串类型
          val sql = "insert into wordcount(word, wordcount) values("   '"'  record.value()   '"'   ","   record.offset()  ");"
          connection.createStatement().execute(sql)
        })
        connection.close()
      })
    })



    ssc.start()
    ssc.awaitTermination()
  }
  def createConnection() = {
    Class.forName("com.mysql.jdbc.Driver")
    DriverManager.getConnection("jdbc:mysql://192.168.19.112:3306/bigdata", "root", "123456")
  }


}

maven依赖(可以根据自己的版本修改)

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.gzq.spark2020</groupId>
    <artifactId>spark2020</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
             <dependency>
                 <groupId>mysql</groupId>
                 <artifactId>mysql-connector-java</artifactId>
                 <version>5.1.1</version>
             </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

    </dependencies>



</project>

点击进去

选择自己的main

接下来apply ok

再点击

随后点击build即可:

输出在out目录下

将jar包上传到node02(有spark,直接本地运行)

输入上面的3条内容,可以看见node02上的输出:

查看数据库也输出了:

ps:踩过的坑

(1):

这行sql语句一定要注意。

因为我的word列定义的是varchar类型,所以必须传入的是字符串类型,lang.String,所以要在record.value()两侧加入双引号。

(2):

为什么我打jar包时没有用maven,是因为maven打出来jar包没有我写的主函数,所以在用spark执行时它会报错说找不到main函数的入口,找不到类,后来发现需要在pom文件中做相关的配置:

代码语言:javascript复制
    <build>
        <finalName>WordCount</finalName>
        <plugins>
            <plugin>
            <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.gzq.spark._01.WordCount</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

(3):

在开启kafka时我发现开一会它就自动关闭,查看日志文件后发现我的kafka-logs文件出了问题,所以我将三台主机这个文件夹下的所有文件全部删除重启kafka成功

(4):

因为我的zookeeper是多集群模式,所以它的选举机制是必须要开启半数以上,所以开启zookeeper时要都开启,如果只开启了其中一台也会启动不起来。

0 人点赞