编写
App
, 从 kafka 读取数据
新建一个Maven项目:spark-streaming-project
在依赖选择上spark-streaming-kafka
此次选用0-10_2.11
而非0-08_2.11
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.1</version>
</dependency>
一. 测试是否能够从Kafka消费到数据
- 1. 新建APP(Trait)
package com.buwenbuhuo.streaming.project.app
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
/**
*
* @author 不温卜火
* @create 2020-08-14 13:41
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
*/
trait App {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("App").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(3))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "hadoop002:9092,hadoop003:9092,hadoop004:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "bigdata0814",
"auto.offset.reset" -> "latest",
// 自动提交管理
"enable.auto.commit" -> (true: java.lang.Boolean)
)
val topics = Array("ads_log0814")
val sourceStream: DStream[String] = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent, // 标配
Subscribe[String, String](topics, kafkaParams)
).map(_.value())
sourceStream.print(1000)
ssc.start()
ssc.awaitTermination()
}
}
- 2. 新建AreaTopAPP
package com.buwenbuhuo.streaming.project.app
/**
*
* @author 不温卜火
* @create 2020-08-14 13:41
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
*/
object AreaTopAPP extends App {
}
- 3. 运行AreaTopAPP
上述即为测试 ,但是其实在这个app内,有一部分可以专门封装成一个新的样例类
测试能够成功得到所想要的结果,下面给出完善最终的程序源码
二. 完整程序源码
编写App, 从 kafka 读取数据
- bean 类
AdsInfo
package com.buwenbuhuo.streaming.project.bean
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
/**
*
* @author 不温卜火
* @create 2020-08-14 17:45
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
*/
case class AdsInfo(ts: Long,
area: String,
city: String,
userId: String,
adsId: String,
var timestamp: Timestamp = null,
var dayString: String = null, // 2012-8-14
var hmString: String = null) { // 11:20
timestamp = new Timestamp(ts)
val date = new Date(ts)
dayString = new SimpleDateFormat("yyyy-MM-dd").format(date)
hmString = new SimpleDateFormat("HH:mm").format(date)
}
- 2. 工具类类
MyKafkaUtils
package com.buwenbuhuo.streaming.project.util
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
/**
*
* @author 不温卜火
* @create 2020-08-14 15:20
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
*/
object MyKafkaUtils {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "hadoop002:9092,hadoop003:9092,hadoop004:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "bigdata0814",
"auto.offset.reset" -> "latest",
// 自动提交管理
"enable.auto.commit" -> (true: java.lang.Boolean)
)
/*
* 根据传入的参数,返回从kafka得到的流
* @param ssc
* @param topic
* @return
*/
def getKafkaSteam(ssc:StreamingContext,topics:String*): DStream[String] =
KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent, // 标配
Subscribe[String, String](topics.toIterable, kafkaParams)
).map(_.value())
}
- 3. 从kafka消费数据(APP)
package com.buwenbuhuo.streaming.project.app
import com.buwenbuhuo.streaming.project.bean.AdsInfo
import com.buwenbuhuo.streaming.project.util.MyKafkaUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
*
* @author 不温卜火
* @create 2020-08-14 13:41
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
*/
trait App {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("App").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(3))
// 得到最原始的流
val sourceStream: DStream[String] = MyKafkaUtils.getKafkaSteam(ssc,"ads_log0814")
val adsInfoStream: DStream[AdsInfo] = sourceStream.map(s => {
val spilt: Array[String] = s.split(",")
AdsInfo(spilt(0).toLong, spilt(1), spilt(2), spilt(3), spilt(4))
})
adsInfoStream.print(1000)
ssc.start()
ssc.awaitTermination()
}
}
- 4. AreaTopAPP
package com.buwenbuhuo.streaming.project.app
/**
*
* @author 不温卜火
* @create 2020-08-14 13:41
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
*/
object AreaTopAPP extends App {
}
3. 运行结果
同时运行MockRealtimeData
(数据生产者)和AreaTopAPP
(数据消费者)
本次的分享就到这里了