在使用Spark Streaming的应用程序时,我们可能需要将计算结果保存到MySQL中,为了高效的与MySQL进行交互,这里我们使用HikariCP这个高效的数据库连接池。
添加依赖
Gradle
Gradle添加HikariCP的依赖,build.gradle文件内容如下所示
代码语言:javascript复制plugins {
id 'idea'
id 'java'
id 'scala'
}
group 'io.zhanghao'
version '1.0-SNAPSHOT'
repositories {
mavenLocal()
maven {
url = 'http://maven.aliyun.com/nexus/content/groups/public'
}
}
dependencies {
compile 'org.apache.spark:spark-core_2.11:2.4.0'
compile 'org.apache.spark:spark-streaming_2.11:2.4.0'
compile 'org.apache.spark:spark-sql_2.11:2.4.0'
compile 'mysql:mysql-connector-java:8.0.13'
compile 'com.zaxxer:HikariCP:3.1.0'
compileOnly 'org.scala-lang:scala-compiler:2.11.8'
}
Maven
使用的构建工具是Maven的话,添加如下依赖到pom.xml即可
代码语言:html复制 <dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>3.1.0</version>
</dependency>
创建单例的HikariDataSource
代码语言:java复制object DataSourceSingleton {
@transient private var instance: HikariDataSource = _
def getDataSourceInstance: HikariDataSource = {
if (instance == null) {
try {
val config = new HikariConfig
config.setJdbcUrl("jdbc:mysql://localhost:3306/spark")
config.setUsername("root")
config.setPassword("123456")
config.addDataSourceProperty("cachePrepStmts", "true")
config.addDataSourceProperty("prepStmtCacheSize", "250")
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048")
instance = new HikariDataSource(config)
} catch {
case ex: Exception => ex.printStackTrace()
}
}
instance
}
}
使用HikariDataSource
在foreachPartition
的时候,针对每一个Partition来创建一个Connection对象,并在用完连接的时候使用evictConnection(connection)
来释放数据库连接。
这里我是统计单词的频率,并插入或更新词频统计的结果到MySQL中。
代码语言:java复制result.foreachRDD(rdd => {
rdd.foreachPartition(partitionRDD => {
//获取数据库连接池
val dsPool = DataSourceSingleton.getDataSourceInstance
//获取数据库连接池
val connection = dsPool.getConnection
partitionRDD.foreach(rdd => {
val wordcountQuery = s"SELECT wordcount FROM wordcount WHERE word='${rdd._1}'"
val set = connection.createStatement().executeQuery(wordcountQuery)
var wordcount = 0
while (set.next()) {
wordcount = set.getInt("wordcount")
}
var wordCountSql = ""
if (wordcount != 0) {
wordCountSql = s"UPDATE wordcount SET wordcount=${rdd._2.toInt} $wordcount WHERE word='${rdd._1}'"
} else {
wordCountSql = s"INSERT INTO wordcount(word,wordcount)VALUES('${rdd._1}',${rdd._2.toInt})"
}
//执行update或者insert语句
connection.createStatement().execute(wordCountSql)
}
)
//释放连接
dsPool.evictConnection(connection)
})
})