Spark Streaming中使用HikariCP数据库连接池与MySQL交互

2018-12-24 11:15:04 浏览数 (1)

在使用Spark Streaming的应用程序时,我们可能需要将计算结果保存到MySQL中,为了高效的与MySQL进行交互,这里我们使用HikariCP这个高效的数据库连接池。

添加依赖

Gradle

Gradle添加HikariCP的依赖,build.gradle文件内容如下所示

代码语言:javascript复制
plugins {
    id 'idea'
    id 'java'
    id 'scala'
}

group 'io.zhanghao'
version '1.0-SNAPSHOT'

repositories {
    mavenLocal()
    maven {
        url = 'http://maven.aliyun.com/nexus/content/groups/public'
    }
}

dependencies {
    compile 'org.apache.spark:spark-core_2.11:2.4.0'
    compile 'org.apache.spark:spark-streaming_2.11:2.4.0'
    compile 'org.apache.spark:spark-sql_2.11:2.4.0'
    compile 'mysql:mysql-connector-java:8.0.13'
    compile 'com.zaxxer:HikariCP:3.1.0'
    compileOnly 'org.scala-lang:scala-compiler:2.11.8'
}

Maven

使用的构建工具是Maven的话,添加如下依赖到pom.xml即可

代码语言:html复制
 <dependency>
        <groupId>com.zaxxer</groupId>
        <artifactId>HikariCP</artifactId>
        <version>3.1.0</version>
 </dependency>

创建单例的HikariDataSource

代码语言:java复制
object DataSourceSingleton {
  @transient private var instance: HikariDataSource = _

  def getDataSourceInstance: HikariDataSource = {
    if (instance == null) {
      try {
        val config = new HikariConfig
        config.setJdbcUrl("jdbc:mysql://localhost:3306/spark")
        config.setUsername("root")
        config.setPassword("123456")
        config.addDataSourceProperty("cachePrepStmts", "true")
        config.addDataSourceProperty("prepStmtCacheSize", "250")
        config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048")
        instance = new HikariDataSource(config)
      } catch {
        case ex: Exception => ex.printStackTrace()
      }
    }
    instance
  }
}

使用HikariDataSource

foreachPartition的时候,针对每一个Partition来创建一个Connection对象,并在用完连接的时候使用evictConnection(connection)来释放数据库连接。

这里我是统计单词的频率,并插入或更新词频统计的结果到MySQL中。

代码语言:java复制
result.foreachRDD(rdd => {
  rdd.foreachPartition(partitionRDD => {
    //获取数据库连接池
    val dsPool = DataSourceSingleton.getDataSourceInstance
    //获取数据库连接池
    val connection = dsPool.getConnection
    partitionRDD.foreach(rdd => {
      val wordcountQuery = s"SELECT wordcount FROM wordcount WHERE word='${rdd._1}'"
      val set = connection.createStatement().executeQuery(wordcountQuery)
      var wordcount = 0
      while (set.next()) {
        wordcount = set.getInt("wordcount")
      }
      var wordCountSql = ""
      if (wordcount != 0) {
        wordCountSql = s"UPDATE wordcount SET wordcount=${rdd._2.toInt} $wordcount WHERE word='${rdd._1}'"
      } else {
        wordCountSql = s"INSERT INTO wordcount(word,wordcount)VALUES('${rdd._1}',${rdd._2.toInt})"
      }
      //执行update或者insert语句
      connection.createStatement().execute(wordCountSql)
    }
    )
    //释放连接
    dsPool.evictConnection(connection)
  })

})

0 人点赞