分布式学习十四:协调任务

2022-04-06 18:43:13 浏览数 (1)

分布式协调/通知服务

mysql备份数据时,我们会通过读取binlog方式备份,但是如果当从服务器宕机时,则备份就会停止,我们可以通过zookeeper实现分布式协调备份

主服务进行备份提交,其他服务监听主服务器状态,如果宕机失去联系,则替代主服务进行工作.

实现原理

在zookeeper节点结构如下:

代码语言:javascript复制
test
└── customBackUp
    └── tasks  任务列表
        └── task01  任务
            ├── instance  服务实例列表
            │   └── server_1_00000001  有序/临时 节点
            ├── lastCommit 最后提交id
            └── status 当前状态

server进程:

1:判断tasks是否存在 task01 任务

2:如果不存在则初始化 task01 任务的节点列表

monitor进程:

1:监听tasks所有任务下的 status 节点,进行监控报警

task进程

1:多台服务初始化之后,先获取指定任务列表的节点数据(task01)

2:在instance中注册自己的有序/临时节点

3:注册完成之后,判断instance自己的节点是否为最小的,如果是,则节点状态为 "主服务"

4:如果不是最小的,则节点状态为:"从服务"

5:主服务进行处理数据,将status状态更新为"running",并将处理的进度id保留到 lastCommit 中

6:从服务进行监听instance节点列表,当主服务断线后,临时节点将会被删除,从而触发监听

7:从服务将status改为"stop"状态,重新进行判断节点是否最小

8:重复3-7 

完整架构图解

简单实现代码

代码语言:javascript复制
package main

import (
   "errors"
   "fmt"
   "github.com/go-zookeeper/zk"
   "os"
   "strconv"
   "strings"
   "time"
)

var serverId int = 1

type instanceStatus int

var (
   StatusRunning instanceStatus = 1
   StatusStandby instanceStatus = -1
)

func main() {
   serverId, _ = strconv.Atoi(os.Args[1])
   conn, _, err := zk.Connect([]string{"127.0.0.1:20005"}, time.Second*10)
   if err != nil {
      panic(err)
   }
   logWithTime(fmt.Sprintf("serverId:%v start.",serverId))
   //task path
   path := "/customBackUp/tasks/task01"

   //check path exits
   exists, _, err := conn.Exists(path)
   if err != nil {
      panic(err)
   }
   logWithTime(fmt.Sprintf("check task node exist."))

   //if path  not exits,create path
   if exists == false {
      err := createPath(conn, path)
      if err != nil {
         panic(err)
      }
      err = createPath(conn, path "/instance/")
      if err != nil {
         panic(err)
      }
      err = createPath(conn, path "/lastCommit/")
      if err != nil {
         panic(err)
      }
      err = createPath(conn, path "/status/")
      if err != nil {
         panic(err)
      }
      logWithTime(fmt.Sprintf("create task node"))
   }

   //register task(create a node sequence and ephemeral path)
   registerInstanceNodePath := path   "/instance/"   "server_"   strconv.Itoa(serverId)
   createPath, err := conn.Create(registerInstanceNodePath, []byte{}, zk.FlagSequence|zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
   if err != nil {
      panic(err)
   }
   logWithTime(fmt.Sprintf("register instance node: %v",createPath))

start_worker:
   status, err := checkInstanceStatus(conn, createPath, path "/instance")
   if err != nil {
      panic(err)
   }
   logWithTime(fmt.Sprintf("current server status: %v",status))

   if status == StatusRunning {
      //status need to update to running
      _, err := conn.Set(path "/status", []byte("1"), -1)
      if err != nil {
          panic(err)
      }
      logWithTime(fmt.Sprintf("current server handle task..."))
      handleTask(conn, path "/lastCommit")

   } else if status == StatusStandby {

      logWithTime(fmt.Sprintf("current server watch node..."))
      watchTaskNode(conn, path "/instance")
      //status need to update to stop
      _, err := conn.Set(path "/status", []byte("0"), -1)
      if err != nil {
         panic(err)
      }
      goto start_worker

   }
}

func handleTask(conn *zk.Conn, commitPath string) {
   //get commitId
   bytes, _, err := conn.Get(commitPath)
   if err != nil {
      panic(err)
   }
   str := string(bytes)
   id, _ := strconv.Atoi(str)
   for {
      id  
      str := strconv.Itoa(id)
      _, err := conn.Set(commitPath, []byte(str), -1)
      if err != nil {
         panic(err)
      }
      fmt.Printf("[%v]serverId(%v),commitId:%v n", time.Now().Format("2006-01-02 15:04:05"), serverId, id)
      time.Sleep(time.Second * 5)
   }
}

func watchTaskNode(conn *zk.Conn, watchPath string) {
   _, _, events, err := conn.ChildrenW(watchPath)
   if err != nil {
      panic(err)
   }
   fmt.Printf("event change: %v", <-events)
   return
}

func checkInstanceStatus(conn *zk.Conn, nodeName string, path string) (status instanceStatus, err error) {
   currentId := getInstanceNodeId(nodeName)
   minId := currentId
   //get all child nodes of task Instance node
   nodeArr, _, err := conn.Children(path)
   if err != nil {
      return 0, err
   }
   for _, v := range nodeArr {
      nodeId := getInstanceNodeId(v)
      if nodeId <= minId {
         minId = nodeId
      }
   }
   if minId == currentId {
      return StatusRunning, nil
   } else {
      return StatusStandby, nil
   }
}

func getInstanceNodeId(nodeName string) int {
   //nodeanme='/customBackUp/tasks/task01/instance/server_10000000001'
   //only need to intercept the last 8 digits
   id := nodeName[len(nodeName)-8:]
   intId, _ := strconv.Atoi(id)
   return intId
}

func createPath(conn *zk.Conn, path string) (err error) {
   strArr := strings.Split(path, "/")
   var node string
   for _, str := range strArr {
      if str == "" {
         continue
      }
      node = node   "/"   str
      exists, _, err := conn.Exists(node)
      if err != nil {
         return errors.New(err.Error())
      }
      if exists {
         continue
      } else {
         _, err = conn.Create(node, []byte{}, 0, zk.WorldACL(zk.PermAll))
         if err != nil {
            return errors.New(err.Error())
         }
      }
   }
   return err
}

func logWithTime(log string) {
   fmt.Printf("%v %vn", time.Now().Format("2006-01-02 15:04:05"), log)
}

运行工作图:

注意:此代码部分逻辑缺失,例如:

1:发布任务的task进程没有体现

2:监控任务的monitor没有体现

本文为仙士可原创文章,转载无需和我联系,但请注明来自仙士可博客www.php20.cn

0 人点赞