学习感悟
(1)配置环境最费劲
(2)动手写,动手写,动手写
WordCount
代码语言:javascript复制package wordcount
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author CBeann
* @create 2019-08-10 18:02
*/
object WordCount {
def main(args: Array[String]): Unit = {
//创建SparkConf()并且设置App名称
val conf = new SparkConf().setMaster("local[8]").setAppName("cbeann")
//创建SparkContext
val sc = new SparkContext(conf);
//使用sc创建rdd并且执行相应的tranformation和action
val data = sc.textFile("C:\Users\Lenovo\Desktop\leetcode.txt")
//操作
val result = data.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ _,1)
//控制台打印
result.collect().foreach(println _)
//保存
result.saveAsTextFile("F:\temp\aa")
sc.stop()
println("-----over-----")
}
}
排序
代码语言:javascript复制第一种方式:按照某一字段排序
val result = data.sortBy(_._2, false)
第二种方式:用类继承Ordered
val result =data.sortBy(x => Boy(x._1,x._2,x._3),false)
代码语言:javascript复制package mysort
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author CBeann
* @create 2019-08-10 18:26
*/
object MysortDemo {
def main(args: Array[String]): Unit = {
//创建SparkConf()并且设置App名称
val conf = new SparkConf().setMaster("local[8]").setAppName("cbeann")
//创建SparkContext
val sc = new SparkContext(conf);
//使用sc创建rdd并且执行相应的tranformation和action
val data = sc.makeRDD(List(("张三", 10, 14), ("张三", 9, 9), ("张三", 13, 15)))
// //第一种方式:按照某一字段排序
// val result = data.sortBy(_._2, false)
//第二种方式:用类继承Ordered
val result =data.sortBy(x => Boy(x._1,x._2,x._3),false)
//控制台打印
result.collect().foreach(println _)
}
}
case class Boy(name: String, faceVale: Int, age: Int) extends Ordered[Boy]{
override def compare(that: Boy): Int = {
if(this.faceVale!=that.faceVale){
this.faceVale-that.faceVale
}else{
this.age-that.age
}
}
}
自定义分区
自定义分区器
代码语言:javascript复制package mypartition
import org.apache.spark.Partitioner
import scala.collection.mutable
/**
* @author CBeann
* @create 2019-08-10 18:36
* 自定义分区器,继承Partitioner
*/
class MyPartitioner extends Partitioner {
val map = new mutable.HashMap[String, Int]()
map.put("Java", 0)
map.put("Scala", 1)
map.put("Go", 2)
//一共分多少个区
override def numPartitions: Int = map.size
//分区的业务逻辑
override def getPartition(key: Any): Int = {
map.getOrElse(key.toString, 0)
}
}
测试类
代码语言:javascript复制package mypartition
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author CBeann
* @create 2019-08-10 18:59
*/
object PartitionDemo {
def main(args: Array[String]): Unit = {
//创建SparkConf()并且设置App名称
val conf = new SparkConf().setMaster("local[8]").setAppName("cbeann")
//创建SparkContext
val sc = new SparkContext(conf);
val data = sc.makeRDD(List(("Java", 11), ("Java", 9), ("Scala", 13), ("Go", 11)))
val result = data.partitionBy(new MyPartitioner)
result.saveAsTextFile("F:\temp\aaa")
println("--------------OVER------------")
}
}
SparkSQL
person.json
代码语言:javascript复制{ "name": "王小二", "age": 15}
{ "name": "王小三", "age": 25}
{ "name": "王小四", "age": 35}
测试类
代码语言:javascript复制package sparksql
import org.apache.spark.sql.SparkSession
/**
* @author CBeann
* @create 2019-08-10 18:20
*/
object SparkSqlDemo {
def main(args: Array[String]): Unit = {
//创建SparkConf()并设置App名称
val spark = SparkSession
.builder()
.appName("Spark SQL basic example").master("local[8]")
//.config("spark.some.config.option", "some-value")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
val df = spark.read.json("E:\IntelliJ IDEA 2019.1.3Workspace\ScalaSpark\SparkDemo\src\main\resources\json\person.json")
// Displays the content of the DataFrame to stdout
df.show()
df.filter($"age" > 21).show()
df.createOrReplaceTempView("persons")
spark.sql("SELECT * FROM persons where age > 21").show()
spark.stop()
printf("-----over---------")
}
}
SparkStream
无状态wordcount
代码语言:javascript复制package stream
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @author CBeann
* @create 2019-08-10 18:38
*/
object StreamDemo {
def main(args: Array[String]): Unit = {
//需要新建了一个sparkconf变量
val conf = new SparkConf().setAppName("SparkStreamDemo").setMaster("local[8]")
//新建一个StreamContext入口
val ssc = new StreamingContext(conf, Seconds(5))
//从hostname 机器上的9999短空不断的获取数据
val lines = ssc.socketTextStream("iZm5ea99qngm2v98asii1aZ", 9999);
//val lines = ssc.receiverStream(new MyRecever("iZm5ea99qngm2v98asii1aZ",9999));
//处理数据(wordcount)
val result = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ _)
result.print()
//启动流式处理程序
ssc.start()
//等待你的停止信号
ssc.awaitTermination()
printf("--------OVER-------------")
}
}
有状态wordcount
updateStateByKey方法是关键,传入一个固定参数的方法
代码语言:javascript复制package stream.withstatus
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import stream.MyRecever
/**
* @author CBeann
* @create 2019-08-10 19:24
*/
object UpdateStateByKeyTest {
def main(args: Array[String]): Unit = {
//需要新建了一个sparkconf变量
val conf = new SparkConf().setAppName("SparkStreamDemo").setMaster("local[8]")
//新建一个StreamContext入口
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("F:\temp\aaa")
//从hostname 机器上的9999短空不断的获取数据
val lines = ssc.socketTextStream("iZm5ea99qngm2v98asii1aZ", 9999);
//处理数据
val result = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ _)
//重点
val allresult = result.updateStateByKey(updateFunction)
allresult.print()
//启动流式处理程序
ssc.start()
//等待你的停止信号
ssc.awaitTermination()
printf("--------OVER-------------")
}
//参数列表的类型是固定的,参数名称不是固定的,参数类型是固定的
// currValues是当前批次RDD中相同的key的value集合
//preValue是框架提供的上一次的值
def updateFunction(currValues: Seq[Int], preValue: Option[Int]): Option[Int] = {
//当前时间段内的数据
val currValueSum = currValues.sum
//当前时间段以前的数据
val oldValueSum = preValue.getOrElse(0)
//当前值的和加上历史值
Some(currValueSum oldValueSum)
}
}
自定义接收器
代码语言:javascript复制package stream
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
/**
* @author CBeann
* @create 2019-08-10 18:39
*/
class MyRecever (host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
//recever启动调用的方法
override def onStart(): Unit = {
new Thread() {
override def run(): Unit = {
receive()
}
}.start()
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// Connect to host:port
socket = new Socket(host, port)
// Until stopped or connection broken continue reading
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
while (!isStopped && userInput != null) {
// 传送出来
store(userInput)
userInput = reader.readLine()
}
reader.close()
socket.close()
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// restart if could not connect to server
restart("Error connecting to " host ":" port, e)
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
override def onStop(): Unit = ???
}
代码语言:javascript复制val lines = ssc.receiverStream(new MyRecever("iZm5ea99qngm2v98asii1aZ",9999));
pom.xml
代码语言:javascript复制 <properties>
<mysql.version>6.0.5</mysql.version>
<spring.version>4.3.6.RELEASE</spring.version>
<spring.data.jpa.version>1.11.0.RELEASE</spring.data.jpa.version>
<log4j.version>1.2.17</log4j.version>
<quartz.version>2.2.3</quartz.version>
<slf4j.version>1.7.22</slf4j.version>
<hibernate.version>5.2.6.Final</hibernate.version>
<camel.version>2.18.2</camel.version>
<config.version>1.10</config.version>
<jackson.version>2.8.6</jackson.version>
<servlet.version>3.0.1</servlet.version>
<net.sf.json.version>2.4</net.sf.json.version>
<activemq.version>5.14.3</activemq.version>
<spark.version>2.1.1</spark.version>
<scala.version>2.11.11</scala.version>
<hadoop.version>2.7.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>