【云+社区年度征文】什么?500行就可以撸个CI系统?

2020-12-18 10:57:37 浏览数 (1)

某个平平淡淡的日子,趁着划水时间在逛g站,千篇一律的xx学习路线,已经不足以满足哀家了(其实并不是高深的研究不懂)。

就在准备切换知乎看故事的时候,一不小心瞅到了让哀家都觉得装逼的东西,对,没错,就是号称500行可以实现一些系统,而且可以运行!什么!本杠精在此,一定要数一数有没有五百行。

啊!啊!

当前,在找茬之前,为了让杠来得更猛烈些,哀家决定挑一个亲手试验一下,如果连运行都不能,那肯定要吐槽一番。

废话不多说,先来给大家展示下这个500行就能实现很多系统功能的地址:500lines

大致的描述大致的描述

带着愉悦的(找茬的)心情,下载了那本500lines,哦豁,还是全英文的,正好可以练习英语了(练习如何快速翻译)。不得不说,这本书虽然篇幅不错,但是内容挺硬核,看上去干货满满的样子,如果也想找茬学习的小伙伴,可以下载下来看看,网上也有部分翻译的,但是不全,可以搭配欧路词典哦「微信.jpg」

这里面很多实现都是用不同语言完成的,哀家最近正好在学习python,就挑了个python的来瞅瞅:对,就是下面这个持续集成系统。

目录目录

找好目标之后,开始安装文章实现这个系统,主要目的有两个:

  • 能否成功运行
  • 是否有500行

确定好之后,跟着作者的描述一步一步来。首先作者都会讲下这个实现的是什么,主要是实现开发完成猴能够验证新功能或错误修复是ok的。这个500行代码做的系统主要是用来测试新代码提交的一个专用系统,对于提交后的代码,持续集成系统负责验证,系统会获取新的更改,运行测试并且提交运行结果,还具有抗故障能力,如果发生故障,能够从该节点恢复,还要求 这个测试系统能够处理负载。啧啧,功能要求还挺多啊,不知道能不能实现。

后面作者也进行了一系列的讲解和介绍,这里就不一一展示了,直接跳到实现过程:

步骤果然和代码里一样简洁明了,就算是小白如我,也能做,不错不错,直接跟着一步一步来就行 。

测试目录

(注意 :下面的是测试代码,并不是CI系统代码)

没啥好说的  跟着来就行了没啥好说的 跟着来就行了

项目的目录结构如下:

目录结构目录结构

正式CI系统

这里是正式的CI系统代码(此处记得数一数有没有超过500行[龇牙笑.jpg])

CI项目结构CI项目结构

repo_observer.py(71 lines)

如果发现有新的提交,就会通知调度程序

代码语言:python代码运行次数:0复制
"""
This is the repo observer.

It checks for new commits to the master repo, and will notify the dispatcher of
the latest commit ID, so the dispatcher can dispatch the tests against this
commit ID.
"""
import argparse
import os
import re
import socket
import SocketServer
import subprocess
import sys
import time
import helpers


def poll():
    parser = argparse.ArgumentParser()
    parser.add_argument("--dispatcher-server",
                        help="dispatcher host:port, " 
                             "by default it uses localhost:8888",
                        default="localhost:8888",
                        action="store")
    parser.add_argument("repo",
                        metavar="REPO",
                        type=str,
                        help="path to the repository this will observe")
    args = parser.parse_args()
    dispatcher_host, dispatcher_port = args.dispatcher_server.split(":")
    while True:
        try:
            # call the bash script that will update the repo and check
            # for changes. If there's a change, it will drop a .commit_id file
            # with the latest commit in the current working directory
            subprocess.check_output(["./update_repo.sh", args.repo])
        except subprocess.CalledProcessError as e:
            raise Exception("Could not update and check repository. Reason: %s" % e.output)

        if os.path.isfile(".commit_id"):
            # great, we have a change! let's execute the tests
            # First, check the status of the dispatcher server to see
            # if we can send the tests
            try:
                response = helpers.communicate(dispatcher_host,
                                               int(dispatcher_port),
                                               "status")
            except socket.error as e:
                raise Exception("Could not communicate with dispatcher server: %s" % e)
            if response == "OK":
                # Dispatcher is present, let's send it a test
                commit = ""
                with open(".commit_id", "r") as f:
                    commit = f.readline()
                response = helpers.communicate(dispatcher_host,
                                               int(dispatcher_port),
                                               "dispatch:%s" % commit)
                if response != "OK":
                    raise Exception("Could not dispatch the test: %s" %
                                    response)
                print("dispatched!")
            else:
                # Something wrong happened to the dispatcher
                raise Exception("Could not dispatch the test: %s" %
                                response)
        time.sleep(5)


if __name__ == "__main__":
    poll()

dispatcher.py 测试调度器(172 lines)

代码语言:python代码运行次数:0复制
"""
This is the test dispatcher.

It will dispatch tests against any registered test runners when the repo
observer sends it a 'dispatch' message with the commit ID to be used. It
will store results when the test runners have completed running the tests and
send back the results in a 'results' messagee

It can register as many test runners as you like. To register a test runner,
be sure the dispatcher is started, then start the test runner.
"""
import argparse
import os
import re
import socket
import SocketServer
import time
import threading
import helpers


# Shared dispatcher code
def dispatch_tests(server, commit_id):
    # NOTE: usually we don't run this forever
    while True:
        print("trying to dispatch to runners")
        for runner in server.runners:
            response = helpers.communicate(runner["host"],
                                           int(runner["port"]),
                                           "runtest:%s" % commit_id)
            if response == "OK":
                print("adding id %s" % commit_id)
                server.dispatched_commits[commit_id] = runner
                if commit_id in server.pending_commits:
                    server.pending_commits.remove(commit_id)
                return
        time.sleep(2)


class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    runners = []  # Keeps track of test runner pool
    dead = False  # Indicate to other threads that we are no longer running
    dispatched_commits = {}  # Keeps track of commits we dispatched
    pending_commits = []  # Keeps track of commits we have yet to dispatch


class DispatcherHandler(SocketServer.BaseRequestHandler):
    """
    The RequestHandler class for our dispatcher.
    This will dispatch test runners against the incoming commit
    and handle their requests and test results
    """

    command_re = re.compile(r"(w )(:. )*")
    BUF_SIZE = 1024

    def handle(self):
        # self.request is the TCP socket connected to the client
        self.data = self.request.recv(self.BUF_SIZE).strip()
        command_groups = self.command_re.match(self.data)
        if not command_groups:
            self.request.sendall("Invalid command")
            return
        command = command_groups.group(1)
        if command == "status":
            print("in status")
            self.request.sendall("OK")
        elif command == "register":
            # Add this test runner to our pool
            print("register")
            address = command_groups.group(2)
            host, port = re.findall(r":(w*)", address)
            runner = {"host": host, "port": port}
            self.server.runners.append(runner)
            self.request.sendall("OK")
        elif command == "dispatch":
            print("going to dispatch")
            commit_id = command_groups.group(2)[1:]
            if not self.server.runners:
                self.request.sendall("No runners are registered")
            else:
                # The coordinator can trust us to dispatch the test
                self.request.sendall("OK")
                dispatch_tests(self.server, commit_id)
        elif command == "results":
            print("got test results")
            results = command_groups.group(2)[1:]
            results = results.split(":")
            commit_id = results[0]
            length_msg = int(results[1])
            # 3 is the number of ":" in the sent command
            remaining_buffer = self.BUF_SIZE - (len(command)   len(commit_id)   len(results[1])   3)
            if length_msg > remaining_buffer:
                self.data  = self.request.recv(length_msg - remaining_buffer).strip()
            del self.server.dispatched_commits[commit_id]
            if not os.path.exists("test_results"):
                os.makedirs("test_results")
            with open("test_results/%s" % commit_id, "w") as f:
                data = self.data.split(":")[3:]
                data = "n".join(data)
                f.write(data)
            self.request.sendall("OK")
        else:
            self.request.sendall("Invalid command")


def serve():
    parser = argparse.ArgumentParser()
    parser.add_argument("--host",
                        help="dispatcher's host, by default it uses localhost",
                        default="localhost",
                        action="store")
    parser.add_argument("--port",
                        help="dispatcher's port, by default it uses 8888",
                        default=8888,
                        action="store")
    args = parser.parse_args()

    # Create the server
    server = ThreadingTCPServer((args.host, int(args.port)), DispatcherHandler)
    print('serving on %s:%s' % (args.host, int(args.port)))

    # Create a thread to check the runner pool
    def runner_checker(server):
        def manage_commit_lists(runner):
            for commit, assigned_runner in server.dispatched_commits.iteritems():
                if assigned_runner == runner:
                    del server.dispatched_commits[commit]
                    server.pending_commits.append(commit)
                    break
            server.runners.remove(runner)

        while not server.dead:
            time.sleep(1)
            for runner in server.runners:
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                try:
                    response = helpers.communicate(runner["host"],
                                                   int(runner["port"]),
                                                   "ping")
                    if response != "pong":
                        print("removing runner %s" % runner)
                        manage_commit_lists(runner)
                except socket.error as e:
                    manage_commit_lists(runner)

    # this will kick off tests that failed
    def redistribute(server):
        while not server.dead:
            for commit in server.pending_commits:
                print("running redistribute")
                print(server.pending_commits)
                dispatch_tests(server, commit)
                time.sleep(5)

    runner_heartbeat = threading.Thread(target=runner_checker, args=(server,))
    redistributor = threading.Thread(target=redistribute, args=(server,))
    try:
        runner_heartbeat.start()
        redistributor.start()
        # Activate the server; this will keep running until you
        # interrupt the program with Ctrl C or Cmd C
        server.serve_forever()
    except (KeyboardInterrupt, Exception):
        # if any exception occurs, kill the thread
        server.dead = True
        runner_heartbeat.join()
        redistributor.join()


if __name__ == "__main__":
    serve()

test_runner.py (173 lines)

代码语言:javascript复制
"""
This is the test runner.

It registers itself with the dispatcher when it first starts up, and then waits
for notification from the dispatcher. When the dispatcher sends it a 'runtest'
command with a commit id, it updates its repository clone and checks out the
given commit. It will then run tests against this version and will send back the
results to the dispatcher. It will then wait for further instruction from the
dispatcher.
"""
import argparse
import errno
import os
import re
import socket
import SocketServer
import subprocess
import time
import threading
import unittest

import helpers


class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    dispatcher_server = None  # Holds the dispatcher server host/port information
    last_communication = None  # Keeps track of last communication from dispatcher
    busy = False  # Status flag
    dead = False  # Status flag


class TestHandler(SocketServer.BaseRequestHandler):
    """
    The RequestHandler class for our server.
    """

    command_re = re.compile(r"(w )(:. )*")

    def handle(self):
        # self.request is the TCP socket connected to the client
        self.data = self.request.recv(1024).strip()
        command_groups = self.command_re.match(self.data)
        command = command_groups.group(1)
        if not command:
            self.request.sendall("Invalid command")
            return
        if command == "ping":
            print("pinged")
            self.server.last_communication = time.time()
            self.request.sendall("pong")
        elif command == "runtest":
            print("got runtest command: am I busy? %s" % self.server.busy)
            if self.server.busy:
                self.request.sendall("BUSY")
            else:
                self.request.sendall("OK")
                print("running")
                commit_id = command_groups.group(2)[1:]
                self.server.busy = True
                self.run_tests(commit_id,
                               self.server.repo_folder)
                self.server.busy = False
        else:
            self.request.sendall("Invalid command")

    def run_tests(self, commit_id, repo_folder):
        # update repo
        output = subprocess.check_output(["./test_runner_script.sh",
                                          repo_folder, commit_id])
        print(output)
        # run the tests
        test_folder = os.path.join(repo_folder, "tests")
        suite = unittest.TestLoader().discover(test_folder)
        result_file = open("results", "w")
        unittest.TextTestRunner(result_file).run(suite)
        result_file.close()
        result_file = open("results", "r")
        # give the dispatcher the results
        output = result_file.read()
        helpers.communicate(self.server.dispatcher_server["host"],
                            int(self.server.dispatcher_server["port"]),
                            "results:%s:%s:%s" % (commit_id, len(output), output))


def serve():
    range_start = 8900
    parser = argparse.ArgumentParser()
    parser.add_argument("--host",
                        help="runner's host, by default it uses localhost",
                        default="localhost",
                        action="store")
    parser.add_argument("--port",
                        help="runner's port, by default it uses values >=%s" % range_start,
                        action="store")
    parser.add_argument("--dispatcher-server",
                        help="dispatcher host:port, by default it uses " 
                             "localhost:8888",
                        default="localhost:8888",
                        action="store")
    parser.add_argument("repo", metavar="REPO", type=str,
                        help="path to the repository this will observe")
    args = parser.parse_args()

    runner_host = args.host
    runner_port = None
    tries = 0
    if not args.port:
        runner_port = range_start
        while tries < 100:
            try:
                server = ThreadingTCPServer((runner_host, runner_port),
                                            TestHandler)
                print(server)
                print(runner_port)
                break
            except socket.error as e:
                if e.errno == errno.EADDRINUSE:
                    tries  = 1
                    runner_port = runner_port   tries
                    continue
                else:
                    raise e
        else:
            raise Exception("Could not bind to ports in range %s-%s" % (range_start, range_start   tries))
    else:
        runner_port = int(args.port)
        server = ThreadingTCPServer((runner_host, runner_port), TestHandler)
    server.repo_folder = args.repo

    dispatcher_host, dispatcher_port = args.dispatcher_server.split(":")
    server.dispatcher_server = {"host": dispatcher_host, "port": dispatcher_port}
    response = helpers.communicate(server.dispatcher_server["host"],
                                   int(server.dispatcher_server["port"]),
                                   "register:%s:%s" %
                                   (runner_host, runner_port))
    if response != "OK":
        raise Exception("Can't register with dispatcher!")

    def dispatcher_checker(server):
        # Checks if the dispatcher went down. If it is down, we will shut down
        # if since the dispatcher may not have the same host/port
        # when it comes back up.
        while not server.dead:
            time.sleep(5)
            if (time.time() - server.last_communication) > 10:
                try:
                    response = helpers.communicate(
                        server.dispatcher_server["host"],
                        int(server.dispatcher_server["port"]),
                        "status")
                    if response != "OK":
                        print("Dispatcher is no longer functional")
                        server.shutdown()
                        return
                except socket.error as e:
                    print("Can't communicate with dispatcher: %s" % e)
                    server.shutdown()
                    return

    t = threading.Thread(target=dispatcher_checker, args=(server,))
    try:
        t.start()
        # Activate the server; this will keep running until you
        # interrupt the program with Ctrl-C
        server.serve_forever()
    except (KeyboardInterrupt, Exception):
        # if any exception occurs, kill the thread
        server.dead = True
        t.join()


if __name__ == "__main__":
    serve()

以上三个文件是主要文件,还有几个shell没展示出来,可以 先看下图关系流程,捋一捋:

作者画的很清晰,不需要哀家画蛇添足了作者画的很清晰,不需要哀家画蛇添足了

shell文件就不一一展示了,想要实现的,可以去git看看,里面有具体的代码实现。但是,shell的行数可以统计下:

行数行数
果然没过500果然没过500

运行效果如下图:

代码语言:javascript复制
.F
======================================================================
FAIL
 test_fail (test_fail.TestFileFail)
----------------------------------------------------------------------
Traceback (most recent call last)

  File "/usr/local/var/www/zoe_projects/test_repo/test_repo_clone_runner/tests/test_fail.py", line 6, in test_fail
    self.fail("I will fail")
AssertionError
 I will fail

----------------------------------------------------------------------
Ran 2 tests in 0.000s

FAILED (failures=1)

鉴定完毕,真香现场,短小精悍,功能可以。

杠精本精退场,不过还是很推荐这个500lines,里面有很多不错的项目,设计思路适合新手和对这方面感兴趣的学习。

0 人点赞