基于 Python 及 zookeeper 实现简单分布式任务调度系统设计思路及核心代码实现
by: 授客 QQ:1033553122
测试环境
功能需求
实现思路
代码实践 (关键技术点实现)
代码模块组织结构
配置文件解析
MyTCPServer.py
MyTCPClient.py
appClient.py
loadAgent.py
运行效果 13
测试环境
Win7 64 位
Linux 64 位
- Python 3.3.4
- kazoo-2.6.1-py2.py3-none-any.whl(Windows)
- kazoo-2.6.1.tar.gz (Linux)
- https://pypi.org/project/kazoo/#files
- zookeeper-3.4.13.tar.gz
下载地址 1:
- http://zookeeper.apache.org/releases.html#download
- https://www.apache.org/dyn/closer.cgi/zookeeper/
- https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
功能需求
把不同的负载主机, 注册为 zookeeper 的节点, 其它应用模块请求 zookeeper 获取相关节点信息 (服务器 ip, 端口号, 服务器任务执行状态), 通过服务器任务状态选择没有运行指定任务的服务器执行相关任务.
针对以上需求, 做一个技术预研, 核心代码实现
实现思路
负载服务器启动时, 初始化 zookeeper 客户端, 创建 tcp 服务器, 注册节点信息到 zookeeper 服务器 (信息包含 tcp 服务器 ip, 端口, 负载服务器任务执行状态), 然后定时检测负载服务器任务执行状态 (通过检测某个进程的名称是否存在进行判断), 其它应用模块通过 zookeeper 获取节点信息后, 通过 tcp socket 通信, 向负载服务器发送执行命令, 然后负载服务器根据这些命令进行不同的处理.
代码实践 (关键技术点实现)
代码模块组织结构
配置文件解析
- conf/agent.conf
- [AGENT]
- interval = 5
- proc = sftp-server
- [README]
interval = 更新服务器节点信息频率 (单位 秒
proc = 需要检测的进程名称 (程序通过查找对应进程名称来判断负载程序是否还在运行, 从而判断服务器状态
- conf/tcpserver.conf
- [TCPSERVER]
- host=10.202.7.165
- port = 8000
- [README]
host = tcp 服务器主机地址
port = tcp 服务器监听端口
- conf/zookeeper.conf
- [ZOOKEEPER]
- hosts = 10.118.52.26:2181
- nodeParentPath=/rootNode
- [README]
hosts = zookeeper 地址, 如果是集群地址, 即有多个, 用英文逗号分隔
nodeParentPath = 负载机节点所在父级路径
MyTCPServer.py
- #!/usr/bin/env python 3.4.0
- #-*- encoding:utf-8 -*-
- __author__ = 'shouke'
- importsocketserver
- fromlog importlogger
- classMyTCPHandler(socketserver.BaseRequestHandler):
- """
- The RequestHandler class for our server.
- It is instantiated once per connection to the server, and must
- override the handle() method to implement communication to the
- client.
- """
- defhandle(self):
- while True:
- # self.request is the TCP socket connected to the client
- self.data = self.request.recv(1024).decode('utf-8').strip()
- logger.info('receive data from client[host:%s port:%s]:%s'% (self.client_address[0], self.client_address[1], self.data))
- ifself.data == 'bye':
- self.request.sendall(bytes('bye', encoding='utf-8'))
- self.request.close()
- break
- else:
- self.request.sendall(self.data.upper().encode('utf-8'))
- classMyTCPServer:
- def__init__(self, host, port):
- try:
- self.host = host
- self.port = port
- # Create the server, binding to self.host on port 'self.port'
- self.server = socketserver.TCPServer((self.host, self.port), MyTCPHandler)
- exceptException ase:
- logger.error('初始化 TCPServer 失败:%s'% e)
- exit(1)
- defstart(self):
- # Activate the server; this will keep running until you interrupt the program with Ctrl-C
- self.server.serve_forever()
MyTCPClient.py
- #!/usr/bin/env python 3.4.0
- #-*- encoding:utf-8 -*-
- __author__ = 'shouke'
- importsocket
- importconfigparser
- importtime
- fromlog importlogger
- if__name__ == '__main__':
- if_sock_connected = False
- try:
- config_parser = configparser.ConfigParser()
- config_parser.read('./conf/tcpserver.conf', encoding='utf-8-sig')
- host = config_parser.get('TCPSERVER', 'host')
- port = int(config_parser.get('TCPSERVER', 'port'))
- # Create a socket (SOCK_STREAM means a TCP socket)
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- # Connect to server and send data
- sock.connect((host, port))
- if_sock_connected = True# 标记 socket 是否已连接
- i = 0
- whilei < 10000:
- ifi == 1000:
- sock.sendall(bytes('bye\n', "utf-8"))
- else:
- sock.sendall(bytes('hello world with tcp\n', "utf-8"))
- # Receive data from the server
- received = str(sock.recv(1024), "utf-8")
- logger.info('receive data from server:%s'% received)
- ifreceived == 'bye':
- break
- time.sleep(5)
- i += 1
- exceptException ase:
- logger.error('程序运行出错:%s'% e)
- finally:
- ifif_sock_connected:
- sock.close()
appClient.py
- #!/usr/bin/env python
- #-*- encoding:utf-8 -*-
- __author__ = 'shouke'
- importtime
- fromlog importlogger
- fromkazoo.client importKazooClient
- fromkazoo.client importKazooState
- defmy_listener(state):
- ifstate == KazooState.LOST:
- logger.info('LOST')
- # Register somewhere that the session was lost
- elifstate == KazooState.SUSPENDED:
- logger.info('SUSPENDED')
- # Handle being disconnected from Zookeeper
- else:
- logger.info('CONNECTED')
- # Handle being connected/reconnected to Zookeeper
- defmy_event_listener(event):
- logger.info(event)
- zk_client = KazooClient(hosts='10.118.52.26:2181')
- zk_client.add_listener(my_listener)
- zk_client.start()
- node_path = '/rootNode'
- sub_node = 'loaderAgent102027165'
- children = zk_client.get_children(node_path, watch=my_event_listener)
- logger.info('there are %s children with names %s'% (len(children), children))
- @zk_client.ChildrenWatch(node_path)
- defwatch_children(children):
- logger.info("Children are now: %s"% children)
- @zk_client.DataWatch("%s/%s"% (node_path, sub_node))
- defwatch_node(data, state):
- """监视节点数据是否变化"""
- ifstate:
- logger.info('Version:%s, data:%s'% (state.version, data))
- i = 0
- whilei < 1000:
- time.sleep(5)
- children = zk_client.get_children(node_path, watch=my_event_listener)
- logger.info('there are %s children with names %s'% (len(children), children))
- i += 1
- zk_client.stop()
- zk_client.close()
loadAgent.py
- #!/usr/bin/env python 3.4.0
- #-*- encoding:utf-8 -*-
- __author__ = 'shouke'
- importtime
- importthreading
- importconfigparser
- importjson
- importsubprocess
- fromkazoo.client importKazooClient
- fromkazoo.client importKazooState
- fromlog importlogger
- frommyTCPServer importMyTCPServer
- # 全局变量
- zk_conn_stat = 0 # zookeeper 连接状态 1-LOST 2-SUSPENDED 3-CONNECTED/RECONNECTED
- registry_status = 0 # 服务器节点在 zookeeper 的注册状态 0 - 未注册, 正在注册, 1 - 已注册
- defrestart_zk_client():
- '''重启 zookeeper 会话'''
- globalzk_client
- globalzk_conn_stat
- try:
- zk_client.restart()
- registry_zookeeper()
- exceptException ase:
- logger.error('重启 zookeeper 客户端异常:%s'% e)
- defzk_conn_listener(state):
- '''zookeeper 连接状态监听器'''
- globalzk_conn_stat
- globalregistry_status
- ifstate == KazooState.LOST:
- logger.warn('zookeeper connection lost')
- zk_conn_stat = 1
- registry_status = 0 # 重置是否完成注册
- # Register somewhere that the session was lost
- thread = threading.Thread(target=restart_zk_client)
- thread.start()
- elifstate == KazooState.SUSPENDED:
- logger.warn('zookeeper connection dicconnected')
- zk_conn_stat = 2
- # Handle being disconnected from Zookeeper
- else:
- zk_conn_stat = 3
- logger.info('zookeeper connection cconnected/reconnected')
- # Handle being connected/reconnected to Zookeeper
- defregistry_zookeeper():
- '''注册节点信息到 zookeeper'''
- globalnode_parent_path
- globalhost
- globalport
- globalzk_client
- globalzk_conn_stat
- globalregistry_status
- try:
- whilezk_conn_stat != 3: # 如果 zookeeper 客户端没连上 zookeeper, 则先不让注册
- continue
- logger.info('正在注册负载机到 zookeeper...')
- zk_client.ensure_path(node_parent_path)
- loader_agent_info = '{
- "host":"%s","port":%s,"status":"idle"
- }'% (host, port)
- if notzk_client.exists('%s/loaderAgent%s'% (node_parent_path, host.replace('.', ''))):
- zk_client.create('%s/loaderAgent%s'% (node_parent_path, host.replace('.', '')), loader_agent_info.encode('utf-8'), ephemeral=True, sequence=False)
- # children = zk_client.get_children(node_parent_path)
- # logger.info('there are %s children with names: %s' % (len(children), children))
- # for child in children:
- # logger.info(child)
- # data, stat = zk_client.get('%s/%s' % (node_parent_path, child))
- # logger.info(data)
- registry_status = 1 # 完成注册
- logger.info('注册负载机到 zookeeper 成功')
- return True
- exceptException ase:
- logger.error('注册负载机到 zookeeper 失败:%s'% e)
- return False
- defstart_tcpserver(tcpserver):
- '''启动 tcp 服务器'''
- tcpserver.start()
- defget_server_status(proc_name):
- '''通过给定进程名称获取服务器状态'''
- withsubprocess.Popen('ps -e | grep"%s"'% proc_name, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines=True) asproc:
- try:
- outs, errs = proc.communicate(timeout=30)
- outs = outs.strip()
- ifouts.find(proc_name) != -1:
- # logger.info('获取负载机状态成功 %s' % outs)
- server_status = 'busy'
- elifouts == '':
- # logger.info('获取负载机状态成功')
- server_status = 'idle'
- else:
- logger.error('获取负载机状态失败:%s'% errs)
- server_status = 'unknow'
- exceptException ase:
- proc.kill()
- logger.error('获取负载机状态失败:%s'% e)
- server_status = 'unknow'
- returnserver_status
- defupdate_server_status(interval, proc_name):
- '''定时检测并更新服务器状态: 根据进程名称是否存在来判断服务器状态, 如果存在则表示服务器被占用, 标记服务器状态为 busy, 否则标记服务器状态为 idle
- 如果根据进程名, 检查进程失败, 则标记服务器状态为 unknow'''
- globalnode_parent_path
- globalhost
- globalport
- while True:
- second_for_localtime1 = time.mktime(time.localtime()) # UTC 时间 (秒)
- ifzk_conn_stat != 3: # 如果 zookeeper 客户端还没连上 zookeeper, 则不让进行后续操作
- continue
- ifregistry_status != 1: # 如果 zookeeper 客户端已连上 zookeeper, 但是还没注册节点到 zookeeper, 则不让进行后续操作
- continue
- server_status = get_server_status(proc_name)
- loader_agent_info = '{
- "host":"%s","port":%s,"status":"%s"
- }'% (host, port, server_status)
- '''
- 这里为啥要加这个判断: zookeeper 删除临时节点存在延迟, 如果 zookeeper 客户端主动关闭后快速重启并注册节点信息 这个过程耗时比较短, 可能注册完节点信息时, zookeeper
- 还没来得及删除重启之前创建的临时节点, 而本次创建的临时节点路径和重启前的一模一样, 这样导致的结果是, zookeeper 接下来的删除操作, 会把重启后注册的节点也删除
- '''ifzk_client.exists('%s/loaderAgent%s'% (node_parent_path, host.replace('.',''))):
- zk_client.set('%s/loaderAgent%s'% (node_parent_path, host.replace('.', '')), loader_agent_info.encode('utf-8'))
- else:
- registry_zookeeper()
- second_for_localtime2 = time.mktime(time.localtime()) # UTC 时间 (秒)
- time_difference = second_for_localtime2 - second_for_localtime1
- iftime_difference < interval:
- time.sleep(interval - time_difference)
- if__name__ == '__main__':
- logger.info('正在启动代理...')
- try:
- logger.info('正在读取 zookeeper 配置...')
- config_parser = configparser.ConfigParser()
- config_parser.read('./conf/zookeeper.conf', encoding='utf-8-sig')
- zk_hosts = config_parser.get('ZOOKEEPER', 'hosts').replace(',', ',').strip()
- node_parent_path = config_parser.get('ZOOKEEPER', 'nodeParentPath').replace(',', ',').strip()
- logger.info('正在构建并启动 zookeeper 客户端...')
- zk_client = KazooClient(hosts=zk_hosts)
- zk_client.add_listener(zk_conn_listener)
- zk_client.start()
- exceptException ase:
- logger.error('初始化 zookeeper 客户端失败: %s'% e)
- exit(1)
- try:
- config_parser.clear()
- config_parser.read('./conf/tcpserver.conf', encoding='utf-8-sig')
- host = config_parser.get('TCPSERVER', 'host')
- port = int(config_parser.get('TCPSERVER', 'port'))
- tcp_server = MyTCPServer(host, port)
- thread = threading.Thread(target=start_tcpserver, args=(tcp_server,))
- thread.start()
- exceptException ase:
- logger.error('TCPServer 启动失败:%s, 请检查配置 / conf/tcpserver.conf 是否正确'% e)
- exit(1)
- try:
- # 注册到 zookeeper
- registry_zookeeper()
- config_parser.clear()
- config_parser.read('./conf/agent.conf', encoding='utf-8-sig')
- interval = int(config_parser.get('AGENT', 'interval'))
- proc = config_parser.get('AGENT', 'proc').strip()
- # 定时更新服务器节点繁忙状态
- update_server_status(interval, proc)
- exceptException ase:
- logger.error('zk_client 运行失败:%s, 请检查配置 / conf/agent.conf 是否正确'% e)
- exit(1)
运行效果
来源: https://www.cnblogs.com/shouke/p/10582572.html