一、Spark中Master与Worker之间的通信过程
1、在启动时,Worker会向Master注册自己的信息(内存、核数等),以便
2、Master收到各Worker的注册信息后,会回复Worker已注册成功的信息
3、worker收到master的注册成功信息后,会定期向Master发送心跳包,回报自己的状态信息
4、Master定期收到Worker的心跳信息后,会更新各个Worker的状态信息。因为Worker在发送心跳包的时候会携带发送时间,Master会检查接收的心跳时间和当前的时间,如果两者的时间差值大于规定的时间,则表示Worker已挂掉。Master在分配任务的时候则不会给已挂掉的Worker分配任务
pom.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcats</groupId>
<artifactId>akka-test</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 定义一下常量 -->
<properties>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<akka.version>2.4.17</akka.version>
</properties>
<dependencies>
<!-- 添加scala的依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 添加akka的actor依赖 -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_${scala.compat.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<!-- 多进程之间的Actor通信 -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_${scala.compat.version}</artifactId>
<version>${akka.version}</version>
</dependency>
</dependencies>
<!-- 指定插件-->
<build>
<!-- 指定源码包和测试包的位置 -->
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<!-- 指定编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<!-- maven打包的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<!-- 指定main方法 -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>cn.itcats.spark.SparkMaster</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
SparkWorker
代码语言:javascript复制import java.util.UUID
import java.util.concurrent.TimeUnit
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._ //导入时间单位
//Worker向Master注册自己的信息
class SparkWorker(masterURL: String) extends Actor {
//master的actorRef
var masterProxy: ActorSelection = _
val workId = UUID.randomUUID().toString
override def preStart(): Unit = {
masterProxy = context.actorSelection(masterURL)
}
override def receive = {
case "started" => { //自己已就绪
//向master注册自己的信息(id,核数,内存大小)
masterProxy ! RegisterWorkerInfo(workId, 4, 32 * 1024)
}
case RegisteredWorkerInfo => { //Master给Worker发送的成功信息
import context.dispatcher //使用调度器的时候必须导入Dispatcher
//接收到Master发来的成功消息后,worker启动一个定时器,定时地向Master发送心跳信息
context.system.scheduler.schedule(Duration.Zero, Duration(1500, TimeUnit.MILLISECONDS), self, SendHeartBeat)
}
case SendHeartBeat => {
//向Master发送心跳
masterProxy ! HeartBeat(workId) //此时Master将会收到心跳消息
println(s"--------------- $workId 发送心跳 ---------------")
}
}
}
object SparkWorker {
def main(args: Array[String]): Unit = {
if (args.length != 4) {
println("请输入参数 <host> <port> <workName> <masterURL>")
sys.exit()
}
val host = args(0)
val port = args(1)
val workerName = args(2)
val masterURL = args(3)
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$host
|akka.remote.netty.tcp.port=$port
""".stripMargin)
val sparkWorkerSystem = ActorSystem("sparkWorker", config)
val workActorRef = sparkWorkerSystem.actorOf(Props(new SparkWorker(masterURL)), workerName)
workActorRef ! "started"
}
}
SparkMaster
代码语言:javascript复制import java.util.concurrent.TimeUnit
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration.Duration
class SparkMaster extends Actor{
// override def preStart(): Unit = {
// context.system.scheduler.schedule(Duration.Zero, Duration(6000, TimeUnit.MILLISECONDS), self, RemoveTimeOutWorker)
// }
//存储worker信息到HashMap
val idToWorkerInfoMap = scala.collection.mutable.HashMap[String,WorkerInfo]()
override def receive = {
//收到worker注册过来的信息
case RegisterWorkerInfo(workId, core, ram) => {
//将worker的信息存储起来,存入HashMap中
if(!idToWorkerInfoMap.contains(workId)){
val workerInfo = new WorkerInfo(workId,core,ram)
idToWorkerInfoMap = ((workId,workerInfo)) //等同于idToWorkerInfoMap.put(workId,workerInfo)
sender() ! RegisteredWorkerInfo //此时对应的worker会收到注册成功的消息
}
}
case HeartBeat(workId) => {
if(workId != null && !workId.trim.equals("")){
//master收到worker的心跳包后更新上一次心跳的时间
val workerInfo = idToWorkerInfoMap(workId)
//更新上一次心跳时间
workerInfo.lastHeartBeatTime = System.currentTimeMillis()
}
}
//接收到自己发来的检查worker超时信息
case CheckTimeOutWorker => {
import context.dispatcher //使用调度器的时候必须导入Dispatcher
//检查策略,周期性(6000ms)的取出两次心跳间隔超过3000ms的worker,并从map中剔除
context.system.scheduler.schedule(Duration.Zero, Duration(6000, TimeUnit.MILLISECONDS), self, RemoveTimeOutWorker)
}
case RemoveTimeOutWorker => {
//遍历map 查看当前时间和上一次心跳时间差 3000
val workerInfos = idToWorkerInfoMap.values
//过滤之后结果是超时的worker,使用foreach删除 没有返回值
workerInfos.filter(workerInfo => System.currentTimeMillis() - workerInfo.lastHeartBeatTime > 3000 )
.foreach(workerTimeOutNode => idToWorkerInfoMap.remove(workerTimeOutNode.id))
println(s"还剩 ${idToWorkerInfoMap.size}个 存活的worker")
}
}
}
object SparkMaster{
def main(args: Array[String]): Unit = {
if (args.length != 3) {
println("请输入参数 <host> <port> <workName>")
sys.exit()
}
val host = args(0)
val port = args(1)
val masterName = args(2)
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$host
|akka.remote.netty.tcp.port=$port
""".stripMargin)
val sparkWorkerSystem = ActorSystem("sparkMaster", config)
val workActorRef = sparkWorkerSystem.actorOf(Props[SparkMaster], masterName)
workActorRef ! CheckTimeOutWorker
}
}
协议样例类MessageProtocol
代码语言:javascript复制//worker -> master
case class RegisterWorkerInfo(id: String, core: Int, ram: Int)
//worker给Master发送心跳信息(需要告知Master是谁)
case class HeartBeat(id: String)
//master -> worker
//master向worker发送注册成功的消息
case object RegisteredWorkerInfo
//Worker自己发送给自己 意味着需要定期向Master发送心跳信息
case object SendHeartBeat
//master给自己发送一个检查worker超时的信息,并启动一个调度器,周期性检查超时的worker
case object CheckTimeOutWorker
//master发送给自己的消息 删除超时的worker
case object RemoveTimeOutWorker
//存储worker信息的对象类
class WorkerInfo(val id: String, core: Int, ram: Int) {
var lastHeartBeatTime: Long = _ //初始值为0
}
编辑Program arguments
SparkMaster
代码语言:javascript复制127.0.0.1 8877 master
SparkWorker
代码语言:javascript复制127.0.0.1 8878 wk-01 akka.tcp://sparkMaster@127.0.0.1:8877/user/master