手写一个 Mini Flink 分布式集群

2021-07-09 16:27:50 浏览数 (1)

一、开篇说两句

已经很多天没有写文章了,直到我今天在飞机上看了一本书《如何成为一个不完美主义者》,让我重新认识了“完美主义”这四个字。

所谓“完美主义者”,一般有三种标准:情境、品质和数量。

  • 如果过于追求情境的完美,就会丧失很多情境下行动的机会,比如没有健身房绝不运动(跑步除外),时间错过也绝不运动,没有资源(衣服、运动器械)也难以运动。
  • 过于追求质量,比如有人难以容忍家里有垃圾,有灰尘。
  • 过于追求数量,比如一定要给自己定目标(一个月瘦多少斤),每次运动至少多少分钟,每周读完一本书,等等。这样的目标就像撑杆跳,跳不过就失败,跳过了才成功。

我们总爱把部分成功定义成失败,无法接受微小的价值进步,只看到宏大、顺利、完美的成功。

而作为程序员,多多少少有一点完美主义。就拿写作来说,一定要把一个东西搞的混瓜烂熟,一大块知识点都吃透,所有的东西都准备好了,才愿意下笔。

生活就是一边前行,一边调整,在前行中平衡。

等到一切都完美,那就晚了。

二、切入正题

分布式系统相信大家都很熟悉了,一般都是主从架构,一个主备的 Master,负责协调,很多 Worker,负责具体干活。

而他们的启动流程,如果精简到一定程序也会很相似,有一些通用的功能,比如:

  • Master 启动;
  • Worker 启动向 Master 注册自己;
  • Master 处理注册消息,并返回注册结果给 Worker;
  • Worker 如果注册成功,则开始发送心跳;
  • Master 消息匹配,如果是心跳,则处理心跳;如果是其他消息,则处理其他消息;
  • Master 每隔一段时间扫描一次心跳主机的集合,检测 Woker 的存活状态。

(当然这只是一小块启动流程了)

我们能否自己用代码来实现这个流程呢?

一方面检验一下编码能力,一方面熟悉下分布式架构的代码设计,何乐而不为。

三、代码实现

这里我们使用 akka 框架来实现 Master 和 Worker 的通信。

Akka ,我对这个框架的评价是,一个不温不火的框架,网络性能不是最优,并不是为了解决多线程问题而生。而且几乎把所有分布式通信中会出现的问题全部交给开发人员自己考虑和设计。

所以,还是 Netty 香,哈哈。

这里使用 Akka 的通信能力,来完成这个小 Demo

代码语言:javascript复制
package com.mazh.rpc.akka.flink

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.collection.mutable


class MyJobManager(var hostname: String, var port: Int) extends Actor {
    
    // 用来存储每个注册的 TaskManager 节点的信息
    private var id2taskManagerInfo = new mutable.HashMap[String, TaskManagerInfo]()
    
    // 对所有注册的 TaskManager 进行去重,其实就是一个 HashSet
    private var taskManagerInfos = new mutable.HashSet[TaskManagerInfo]()
    
    //actor 在最开始的时候,会执行一次
    override def preStart(): Unit = {
        import scala.concurrent.duration._
        import context.dispatcher
        
        // 调度一个任务, 每隔五秒钟执行一次
        context.system.scheduler.schedule(0 millis, 5000 millis, self, CheckTimeOut)
    }
    
    // 正经服务方法
    override def receive: Receive = {
        
        //  接收 注册消息
        case RegisterTaskManager(taskmanagerid, memory, cpu) => {
            val taskManagerInfo = new TaskManagerInfo(taskmanagerid, memory, cpu)
            println(s"节点 ${taskmanagerid} 上线")
            
            //  对注册的 TaskManager 节点进行存储管理
            id2taskManagerInfo.put(taskmanagerid, taskManagerInfo)
            taskManagerInfos  = taskManagerInfo
            
            //  把信息存到zookeeper
            //  sender() 谁给我发消息,sender方法返回的就是谁
            sender() ! RegisteredTaskManager(hostname   ":"   port)
        }
        
            // 接收心跳消息
        case Heartbeat(taskManagerId) => {
            val currentTime = System.currentTimeMillis()
            val taskManagerInfo = id2taskManagerInfo(taskManagerId)
            taskManagerInfo.lastHeartBeatTime = currentTime
    
            id2taskManagerInfo(taskManagerId) = taskManagerInfo
            taskManagerInfos  = taskManagerInfo
        }
        
        //  检查过期失效的 NodeManager
        case CheckTimeOut => {
            val currentTime = System.currentTimeMillis()
            
            // TODO_MA 注释:15 秒钟失效
            taskManagerInfos.filter(nm => {
                val heartbeatTimeout = 15000
                val bool = currentTime - nm.lastHeartBeatTime > heartbeatTimeout
                if (bool) {
                    println(s"节点 ${nm.taskmanagerid} 下线")
                }
                bool
            }).foreach(deadnm => {
                taskManagerInfos -= deadnm
                id2taskManagerInfo.remove(deadnm.taskmanagerid)
            })
            println("当前注册成功的节点数"   taskManagerInfos.size   "t分别是:"   taskManagerInfos.map(x => x.toString)
              .mkString(","));
        }
    }
}


object MyJobManager {
    def main(args: Array[String]): Unit = {
        
        //  地址参数
        val str =
        s"""
           |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
           |akka.remote.netty.tcp.hostname = localhost
           |akka.remote.netty.tcp.port = 6789
      """.stripMargin
        val conf = ConfigFactory.parseString(str)
        
        // ActorSystem
        val actorSystem = ActorSystem(Constant.JMAS, conf)
        
        // 启动了一个actor :MyJobManager
        actorSystem.actorOf(Props(new MyJobManager("localhost", 6789)), Constant.JMA)
   
    }
}

代码语言:javascript复制
package com.mazh.rpc.akka.flink

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory


class MyTaskManager(val tmhostname: String, val jobmanagerhostname: String, val jobmanagerport: Int, val memory: Int,
    val cpu: Int) extends Actor {
    
    var taskManagerId: String = tmhostname
    var rmRef: ActorSelection = _
    
    // 会提前执行一次
    // 当前NM启动好了之后,就应该给 RM 发送一个注册消息
    //发给谁,就需要获取这个谁的一个ref实例
    override def preStart(): Unit = {
        
        //获取消息发送对象的一个ref实例
        // 远程path  akka.tcp://(ActorSystem的名称)@(远程地址的IP):(远程地址的端口)/user/(Actor的名称)
        rmRef = context.actorSelection(s"akka.tcp://${
            Constant.JMAS
        }@${jobmanagerhostname}:${jobmanagerport}/user/${Constant.JMA}")
        
        //  发送消息
        println(taskManagerId   " 正在注册")
        rmRef ! RegisterTaskManager(taskManagerId, memory, cpu)
    }
    
    //正常服务方法
    override def receive: Receive = {
        
        //接收到注册成功的消息
        case RegisteredTaskManager(masterURL) => {
            println(masterURL);
            
            //  initialDelay: FiniteDuration, 多久以后开始执行
            //  interval:     FiniteDuration, 每隔多长时间执行一次
            // receiver:     ActorRef, 给谁发送这个消息
            //  message:      Any  发送的消息是啥
            import scala.concurrent.duration._
            import context.dispatcher
            context.system.scheduler.schedule(0 millis, 4000 millis, self, SendMessage)
        }
        
        // 发送心跳
        case SendMessage => {
            // TODO_MA 注释: 向主节点发送心跳信息
            rmRef ! Heartbeat(taskManagerId)
            
            println(Thread.currentThread().getId)
        }
    }
}


object MyTaskManager {
    def main(args: Array[String]): Unit = {
        
        // 远程主机名称
        val HOSTNAME = args(0)
        
        //   JobManager 的 hostname 和 port
        val JM_HOSTNAME = args(1)
        val JM_PORT = args(2).toInt
        
        //  抽象的内存资源 和 CPU 个数
        val TaskManager_MEMORY = args(3).toInt
        val TaskManager_CORE = args(4).toInt
        
        //  当前 TaskManager 的 hostname 和 port
        var TaskManager_PORT = args(5).toInt
        var TaskManager_HOSTNAME = args(6)
        
        //   指定主机名称和端口号相关的配置
        val str =
            s"""
               |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
               |akka.remote.netty.tcp.hostname = ${HOSTNAME}
               |akka.remote.netty.tcp.port = ${TaskManager_PORT}
            """.stripMargin
        val conf = ConfigFactory.parseString(str)
        
        // 启动一个 ActorSystem
        val actorSystem = ActorSystem(Constant.JMAS, conf)
        
        //  启动一个Actor
        actorSystem.actorOf(Props(new MyTaskManager(TaskManager_HOSTNAME, JM_HOSTNAME, JM_PORT, TaskManager_MEMORY,
            TaskManager_CORE)), Constant.TMA)
    }
}

0 人点赞