• 为了保证你在浏览本网站时有着更好的体验,建议使用类似Chrome、Firefox之类的浏览器~~
    • 如果你喜欢本站的内容何不Ctrl+D收藏一下呢,与大家一起分享各种编程知识~
    • 本网站研究机器学习、计算机视觉、模式识别~当然不局限于此,生命在于折腾,何不年轻时多折腾一下

500lines or less 之集成系统

Python admin 2年前 (2017-07-24) 965次浏览 0个评论 扫描二维码

原文参考地址a-continuous-integration-system,本文基本上跟翻译差不多,当然也会解释下其工作原理,英文看得不爽就看看这篇

原来给了一个 git 管理代码仓库的例子,在测试代码仓库出现任何 commit 的时候会自动更新代码并执行相应的测试,这样的话可以实现自动化管理,减少了人力操作

实现步骤:

(1)初始化
初始化仓库 test_repo:

$ mkdir test_repo 
$ cd test_repo 
$ git init

test_repo 是我们 master 分支上的代码仓库,现在要 copy 几个文件到当前的仓库中并且 commit

$ cp -r /this/directory/tests /path/to/test_repo/ 
$ cd /path/to/test\_repo 
$ git add tests/ 
$ git commit -m ”add tests”

test_repo_clone_obs 仓库是为了监控 master 分支数据建立的中间仓库:

$ git clone /path/to/test_repo test_repo_clone_obs

test_repo_clone_runner是为了测试,假设当前的 master 分支出现新的提交,则自动同步数据然后执行相应的测试软件,所以在此之前需要同步一下 master 分支上的代码:

$ git clone /path/to/test_repo test_repo_clone_runner

实时获取当前代码仓库的情况代码如下

import argparse
import os
import re
import socket
import SocketServer
import subprocess
import sys
import time

import helpers


def poll():
    parser = argparse.ArgumentParser()
    parser.add_argument("--dispatcher-server",
                        help="dispatcher host:port, " \
                        "by default it uses localhost:8888",
                        default="localhost:8888",
                        action="store")
    parser.add_argument("repo", metavar="REPO", type=str,
                        help="~/test_repo")
    args = parser.parse_args()
    print args
    dispatcher_host, dispatcher_port = args.dispatcher_server.split(":")
    while True:
        try:
            # call the bash script that will update the repo and check
            # for changes. If there's a change, it will drop a .commit_id file
            # with the latest commit in the current working directory
            subprocess.check_output(["./update_repo.sh", args.repo])
        except subprocess.CalledProcessError as e:
            raise Exception("Could not update and check repository. Reason: %s" % e.output)

parse 是为了解析命令行参数,具体你可以查看上一篇文章python argparse 用法

总共两个参数一个时端口,另外一个是 master 代码仓库地址

然后开启子进程执行 update_repo.sh 脚本,该脚本的代码如下

 

#!/bin/bash

source run_or_fail.sh

# delete previous id 
rm -f .commit_id

# go to repo and update it to given commit
run_or_fail "Repository folder not found!" pushd $1 1> /dev/null
run_or_fail "Could not reset git" git reset --hard HEAD

# get the most recent commit
COMMIT=$(run_or_fail "Could not call 'git log' on repository" git log -n1)
if [ $? != 0 ]; then
  echo "Could not call 'git log' on repository"
  exit 1
fi
# get its id
COMMIT_ID=`echo $COMMIT | awk '{ print $2 }'`

# update the repo
run_or_fail "Could not pull from repository" git pull

# get the most recent commit
COMMIT=$(run_or_fail "Could not call 'git log' on repository" git log -n1)
if [ $? != 0 ]; then
  echo "Could not call 'git log' on repository"
  exit 1
fi
# get its id
NEW_COMMIT_ID=`echo $COMMIT | awk '{ print $2 }'`

# if the id changed, then write it to a file
if [ $NEW_COMMIT_ID != $COMMIT_ID ]; then
  popd 1> /dev/null
  echo $NEW_COMMIT_ID > .commit_id
fi

监听的方式是先同步,然后观察 commit id 与没同步之前的 id 是否相同,如果相同则当前 master 仓库没有任何更新,如果不一样则出现的新的代码更新

如果出现更新则将当前的 commit id 写入到文件当中,文件的名字是.commit_id

如果存在这个文件则需要通知 dispatcher 模块操作,通知代码都是通过 socket 实现的

代码如下:

 if os.path.isfile(".commit_id"):
            # great, we have a change! let's execute the tests
            # First, check the status of the dispatcher server to see
            # if we can send the tests
            try:
                response = helpers.communicate(dispatcher_host,
                                               int(dispatcher_port),
                                               "status")
            except socket.error as e:
                raise Exception("Could not communicate with dispatcher server: %s" % e)
            if response == "OK":
                # Dispatcher is present, let's send it a test
                commit = ""
                with open(".commit_id", "r") as f:
                    commit = f.readline()
                response = helpers.communicate(dispatcher_host,
                                               int(dispatcher_port),
                                               "dispatch:%s" % commit)
                if response != "OK":
                    raise Exception("Could not dispatch the test: %s" %
                    response)
                print "dispatched!"
            else:
                # Something wrong happened to the dispatcher
                raise Exception("Could not dispatch the test: %s" %
                response)
        time.sleep(5)

当存在记录 commit id 文件是就会向 dispatcher 发送消息,然后等待 dispatcher 返回消息


2017-7-26 23:07 深夜继续撰写

之前已经提到 dispatcher 这个东西,它在这个集成系统中占据很重要的位置,可以说是枢纽,oberserver 和 test runner 都会与它有通信

"""
This is the test dispatcher.

It will dispatch tests against any registered test runners when the repo
observer sends it a 'dispatch' message with the commit ID to be used. It
will store results when the test runners have completed running the tests and
send back the results in a 'results' messagee

It can register as many test runners as you like. To register a test runner,
be sure the dispatcher is started, then start the test runner.
"""
import argparse
import os
import re
import socket
import SocketServer
import time
import threading

import helpers


# Shared dispatcher code
def dispatch_tests(server, commit_id):
    # NOTE: usually we don't run this forever
    while True:
        print "trying to dispatch to runners"
        for runner in server.runners:
            response = helpers.communicate(runner["host"],
                                           int(runner["port"]),
                                           "runtest:%s" % commit_id)
            if response == "OK":
                print "adding id %s" % commit_id
                server.dispatched_commits[commit_id] = runner
                if commit_id in server.pending_commits:
                    server.pending_commits.remove(commit_id)
                return
        time.sleep(2)


class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    runners = [] # Keeps track of test runner pool
    dead = False # Indicate to other threads that we are no longer running
    dispatched_commits = {} # Keeps track of commits we dispatched
    pending_commits = [] # Keeps track of commits we have yet to dispatch


class DispatcherHandler(SocketServer.BaseRequestHandler):
    """
    The RequestHandler class for our dispatcher.
    This will dispatch test runners against the incoming commit
    and handle their requests and test results
    """

    command_re = re.compile(r"(\w+)(:.+)*")
    BUF_SIZE = 1024

    def handle(self):
        # self.request is the TCP socket connected to the client
        self.data = self.request.recv(self.BUF_SIZE).strip()
        command_groups = self.command_re.match(self.data)
        if not command_groups:
            self.request.sendall("Invalid command")
            return
        command = command_groups.group(1)
        if command == "status":
            print "in status"
            self.request.sendall("OK")
        elif command == "register":
            # Add this test runner to our pool
            print "register"
            address = command_groups.group(2)
            host, port = re.findall(r":(\w*)", address)
            runner = {"host": host, "port":port}
            self.server.runners.append(runner)
            self.request.sendall("OK")
        elif command == "dispatch":
            print "going to dispatch"
            commit_id = command_groups.group(2)[1:]
            if not self.server.runners:
                self.request.sendall("No runners are registered")
            else:
                # The coordinator can trust us to dispatch the test
                self.request.sendall("OK")
                dispatch_tests(self.server, commit_id)
        elif command == "results":
            print "got test results"
            results = command_groups.group(2)[1:]
            results = results.split(":")
            commit_id = results[0]
            length_msg = int(results[1])
            # 3 is the number of ":" in the sent command
            remaining_buffer = self.BUF_SIZE - (len(command) + len(commit_id) + len(results[1]) + 3)
            if length_msg > remaining_buffer:
                self.data += self.request.recv(length_msg - remaining_buffer).strip()
            del self.server.dispatched_commits[commit_id]
            if not os.path.exists("test_results"):
                os.makedirs("test_results")
            with open("test_results/%s" % commit_id, "w") as f:
                data = self.data.split(":")[3:]
                data = "\n".join(data)
                f.write(data)
            self.request.sendall("OK")
        else:
            self.request.sendall("Invalid command")


def serve():
    parser = argparse.ArgumentParser()
    parser.add_argument("--host",
                        help="dispatcher's host, by default it uses localhost",
                        default="localhost",
                        action="store")
    parser.add_argument("--port",
                        help="dispatcher's port, by default it uses 8888",
                        default=8888,
                        action="store")
    args = parser.parse_args()

    # Create the server
    server = ThreadingTCPServer((args.host, int(args.port)), DispatcherHandler)
    print 'serving on %s:%s' % (args.host, int(args.port))
    # Create a thread to check the runner pool
    def runner_checker(server):
        def manage_commit_lists(runner):
            for commit, assigned_runner in server.dispatched_commits.iteritems():
                if assigned_runner == runner:
                    del server.dispatched_commits[commit]
                    server.pending_commits.append(commit)
                    break
            server.runners.remove(runner)

        while not server.dead:
            time.sleep(1)
            for runner in server.runners:
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                try:
                    response = helpers.communicate(runner["host"],
                                                   int(runner["port"]),
                                                   "ping")
                    if response != "pong":
                        print "removing runner %s" % runner
                        manage_commit_lists(runner)
                except socket.error as e:
                    manage_commit_lists(runner)

    # this will kick off tests that failed
    def redistribute(server):
        while not server.dead:
            for commit in server.pending_commits:
                print "running redistribute"
                print server.pending_commits
                dispatch_tests(server, commit)
                time.sleep(5)

    runner_heartbeat = threading.Thread(target=runner_checker, args=(server,))
    redistributor = threading.Thread(target=redistribute, args=(server,))
    try:
        runner_heartbeat.start()
        redistributor.start()
        # Activate the server; this will keep running until you
        # interrupt the program with Ctrl+C or Cmd+C
        server.serve_forever()
    except (KeyboardInterrupt, Exception):
        # if any exception occurs, kill the thread
        server.dead = True
        runner_heartbeat.join()
        redistributor.join()


if __name__ == "__main__":
    serve()

在 dispatcher 的 handle 中处理各种来自其他 client 访问,oberserver 会向 diapatcher 发送 commit id ,但是在发送之前还是要询问下当前的 dispatcher 服务是否正在运行,所以会有个 status 状态咨询请求,如果返回 ok,则继续下一步向其发送 commit id

于此同时 dispatcher 还要管理 runner 测试,当前的脚本是可以跑一百个测试线程,每一个线程如果想跑起来就必须要向其注册,只有注册成功的 runner 才会继续完成测试

test runner 文件

"""
This is the test runner.

It registers itself with the dispatcher when it first starts up, and then waits
for notification from the dispatcher. When the dispatcher sends it a 'runtest'
command with a commit id, it updates its repository clone and checks out the
given commit. It will then run tests against this version and will send back the
results to the dispatcher. It will then wait for further instruction from the
dispatcher.
"""
import argparse
import errno
import os
import re
import socket
import SocketServer
import subprocess
import time
import threading
import unittest

import helpers


class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    dispatcher_server = None # Holds the dispatcher server host/port information
    last_communication = None # Keeps track of last communication from dispatcher
    busy = False # Status flag
    dead = False # Status flag


class TestHandler(SocketServer.BaseRequestHandler):
    """
    The RequestHandler class for our server.
    """

    command_re = re.compile(r"(\w+)(:.+)*")

    def handle(self):
        # self.request is the TCP socket connected to the client
        self.data = self.request.recv(1024).strip()
        command_groups = self.command_re.match(self.data)
        command = command_groups.group(1)
        if not command:
            self.request.sendall("Invalid command")
            return
        if command == "ping":
            print "pinged"
            self.server.last_communication = time.time()
            self.request.sendall("pong")
        elif command == "runtest":
            print "got runtest command: am I busy? %s" % self.server.busy
            if self.server.busy:
                self.request.sendall("BUSY")
            else:
                self.request.sendall("OK")
                print "running"
                commit_id = command_groups.group(2)[1:]
                self.server.busy = True
                self.run_tests(commit_id,
                               self.server.repo_folder)
                self.server.busy = False
        else:
            self.request.sendall("Invalid command")

    def run_tests(self, commit_id, repo_folder):
        # update repo
        output = subprocess.check_output(["./test_runner_script.sh",
                                        repo_folder, commit_id])
        print output
        # run the tests
        test_folder = os.path.join(repo_folder, "tests")
        suite = unittest.TestLoader().discover(test_folder)
        result_file = open("results", "w")
        unittest.TextTestRunner(result_file).run(suite)
        result_file.close()
        result_file = open("results", "r")
        # give the dispatcher the results
        output = result_file.read()
        helpers.communicate(self.server.dispatcher_server["host"],
                            int(self.server.dispatcher_server["port"]),
                            "results:%s:%s:%s" % (commit_id, len(output), output))


def serve():
    range_start = 8900
    parser = argparse.ArgumentParser()
    parser.add_argument("--host",
                        help="runner's host, by default it uses localhost",
                        default="localhost",
                        action="store")
    parser.add_argument("--port",
                        help="runner's port, by default it uses values >=%s" % range_start,
                        action="store")
    parser.add_argument("--dispatcher-server",
                        help="dispatcher host:port, by default it uses " \
                        "localhost:8888",
                        default="localhost:8888",
                        action="store")
    parser.add_argument("repo", metavar="REPO", type=str,
                        help="path to the repository this will observe")
    args = parser.parse_args()

    runner_host = args.host
    runner_port = None
    tries = 0
    if not args.port:
        runner_port = range_start
        while tries < 100: try: server = ThreadingTCPServer((runner_host, runner_port), TestHandler) print server print runner_port break except socket.error as e: if e.errno == errno.EADDRINUSE: tries += 1 runner_port = runner_port + tries continue else: raise e else: raise Exception("Could not bind to ports in range %s-%s" % (range_start, range_start+tries)) else: runner_port = int(args.port) server = ThreadingTCPServer((runner_host, runner_port), TestHandler) server.repo_folder = args.repo dispatcher_host, dispatcher_port = args.dispatcher_server.split(":") server.dispatcher_server = {"host":dispatcher_host, "port":dispatcher_port} response = helpers.communicate(server.dispatcher_server["host"], int(server.dispatcher_server["port"]), "register:%s:%s" % (runner_host, runner_port)) if response != "OK": raise Exception("Can't register with dispatcher!") def dispatcher_checker(server): # Checks if the dispatcher went down. If it is down, we will shut down # if since the dispatcher may not have the same host/port # when it comes back up. while not server.dead: time.sleep(5) if (time.time() - server.last_communication) > 10:
                try:
                    response = helpers.communicate(
                                       server.dispatcher_server["host"],
                                       int(server.dispatcher_server["port"]),
                                       "status")
                    if response != "OK":
                        print "Dispatcher is no longer functional"
                        server.shutdown()
                        return
                except socket.error as e:
                    print "Can't communicate with dispatcher: %s" % e
                    server.shutdown()
                    return

    t = threading.Thread(target=dispatcher_checker, args=(server,))
    try:
        t.start()
        # Activate the server; this will keep running until you
        # interrupt the program with Ctrl-C
        server.serve_forever()
    except (KeyboardInterrupt, Exception):
        # if any exception occurs, kill the thread
        server.dead = True
        t.join()


if __name__ == "__main__":
    serve()

 

test_runner_script.sh

#!/bin/bash
REPO=$1
COMMIT=$2

source run_or_fail.sh

run_or_fail "Repository folder not found" pushd "$REPO" 1> /dev/null
run_or_fail "Could not clean repository" git clean -d -f -x
run_or_fail "Could not call git pull" git pull
run_or_fail "Could not update to given commit hash" git reset --hard "$COMMIT"

 


Deeplearn, 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权 , 转载请注明500lines or less 之集成系统
喜欢 (0)
admin
关于作者:
互联网行业码农一枚/业余铲屎官/数码影音爱好者/二次元

您必须 登录 才能发表评论!