本文描述一个 python 实现的多进程压测工具,这个压测工具的特点如下:
多进程
在大多数情况下,压测一般适用于 IO 密集型场景(如访问接口并等待返回),在这种场景下多线程多进程的区分并不明显 (详情请参见 GIL 相关).不过一旦出现词表参数加密,返回内容校验等事情的话,多进程对发送效率的提升还是很明显的.
可以指定发送 QPS
可以指定发压的 QPS,根据并行度和请求相应时间,可以估算出可发送 QPS 峰值.例如并行度是 10,响应时间是 100ms,那么 QPS 峰值应该是(1s/100ms * 10)=100,此工具可以将 QPS 稳定的维持在小于峰值的一个量上.
便于扩展
为什么要 DIY 压测工具了?一般的服务端压测工具,例如 http_load 和 jmeter,不是 http 协议的,就是需要通过代码进行扩展.例如在压测 thrift 接口的时候,即使通过 jmeter 扩展 java 程序也很麻烦.但是当涉及到场景化压测,或者是奇怪的 SDK,例如本文要压测的接口是通过 java 代码自动生成的 python 消息类 SDK,并且涉及到场景化的压测,很难通过一般的服务端压测工具搞定.
1,发压代码解耦
下面是压测代码的实现,可以看到,我这里使用 abc 包,做了一个抽象类.
业务测试代码,例如自动化 case,只要继承了这个抽象类,就获得压测的能力,做到压测和自动化测试的解耦.
这里有两个抽象方法
vocab() - 构造词表
press() - 发压逻辑
是被 @abc.abstractmethod 装饰器装饰,在子类中,是一定要被实现的.
run() 方法是压测执行的方法,实现子类的词表方法和发压逻辑之后,直接调用 run() 方法就可以压测了.
固定 QPS
固定 QPS 是通过管理进程实现的.可以看到有两种进程:
一种是 worker_process 进程,调用了 press() 发压逻辑函数,并且这个进程可以指定并发度 concurrent,是实际的发压进程,值得注意的是在 worker_process 中使用了 time.sleep(),是为了控制发送速度.
另一种是 manager_process 进程,这个进程每隔一段时间计算实际的 qps,并和设置的 qps 比较,然后调整 worker_process 中的 sleep 时间,例如实际 qps 小于设定 qps,那么就少睡一会儿.
这里不得不提到的是,多进程如何共享变量?
这里使用的是 multiprocessing 中的 Manager 包,这个包提供了多进程共享变量的能力,我这里用到的是 Namespace 数据结构来存储多进程的计数.在使用过程中我怀疑 Manager Namespace 是通过读写文件的形式进行进程间共享变量的,这个我没有深入的研究.
2,实际压测
# -*- coding:utf-8 -*-
import abc
import time
from multiprocessing import Lock, Process, Manager
class Press(object):
__metaclass__ = abc.ABCMeta
def __init__(self, qps=100, concurrent=10):
self.qps = qps
self.concurrent = concurrent
self.mutex = Lock()
self.local = Manager().Namespace()
self.local.count = 0
self.local.sleep = 0.1
self.manager_gap = 0.5
self.precision = 0.1
self.vocab_list = list()
self.vocab()
def manager_process(self):
while True:
with self.mutex:
current_qps = self.local.count / self.manager_gap
self.local.count = 0
print self.local.sleep, current_qps
if current_qps < self.qps:
self.local.sleep = self.local.sleep * (1.0 - self.precision)
else:
self.local.sleep = self.local.sleep * (1.0 + self.precision)
time.sleep(self.manager_gap)
def worker_process(self):
while True:
with self.mutex:
self.local.count += 1
time.sleep(self.local.sleep)
self.press()
@abc.abstractmethod
def vocab(self):
return
@abc.abstractmethod
def press(self):
return
def run(self):
processes = [Process(target=self.worker_process) for index in range(self.concurrent)]
processes.append(Process(target=self.manager_process))
for process in processes:
process.start()
for process in processes:
process.join()
给出一个发压的例子.分三步~
QueryVmPress 继承了 Press 类,获得了发压能力.
然后实现了 vocab 方法,构造了词表.
实现了 press 方法,这里是发压逻辑,可以看到 QueryVmScenario.press_vm(vocab),QueryVmScenario 放的是自动化 case.发压只是调用了其中的一个接口.这个接口的编写很复杂,也是为什么要自己做一个压测工具的原因.
QueryVmPress(qps=100, concurrent=10).run(),就按照 100QPS 进行压测了.
# -*- coding:utf-8 -*-
import random
from query.query_vm_scenario import QueryVmScenario
from db.vm_dao import Dao as vm_dao
from db.account_dao import Dao as account_dao
from press import Press
from lib import common
from vocab import Vocab
class QueryVmVocab(Vocab):
def __init__(self):
Vocab.__init__(self)
class QueryVmPress(Press):
def __init__(self, qps=100, concurrent=10):
Press.__init__(self, qps, concurrent)
def vocab(self):
for account in account_dao.query_all_account(limit=10):
account_name = account[1]
account_password = account[2]
res = common.login_by_account(account_name, account_password)
for item in vm_dao.query_vm_by_account(account_name, limit=100):
vm_uuid = item[1]
vocab = QueryVmVocab()
vocab.add('session_uuid', res.inventory.uuid)
vocab.add('vm_uuid', vm_uuid)
self.vocab_list.append(vocab)
return self.vocab_list
def press(self):
vocab = self.vocab_list[random.randint(0, len(self.vocab_list)-1)]
QueryVmScenario.press_vm(vocab)
if __name__ == '__main__':
QueryVmPress(qps=100, concurrent=10).run()
第一列是 sleep 时间,第二列是实际 QPS,可以看到,qps 会被动态的稳定在设置的值上.
0.1 20.0
0.09 40.0
0.081 60.0
0.0729 80.0
0.06561 60.0
0.059049 80.0
0.0531441 60.0
0.04782969 80.0
0.043046721 80.0
0.0387420489 80.0
0.03486784401 80.0
0.031381059609 100.0
0.0345191655699 80.0
0.0310672490129 88.0
0.0279605241116 92.0
0.0251644717005 100.0
0.0276809188705 80.0
0.0249128269835 100.0
0.0274041096818 100.0
0.03014452065 80.0
0.027130068585 100.0
0.0298430754435 80.0
0.0268587678991 100.0
0.029544644689 92.0
3,混压
当要做多个接口混压的时候,可以这样做.
先写好单压的 python 类,在单压的代码里,可以看到我实现了 QueryVmVocab 类,表名了词表的类型,这个类集成自 Vocab,Vocab 就是一个字典的封装.
混压的时候,先将词表汇总,并且 shuffle,然后弹出词表的时候,使用 isinstance 判断词表的类型,调用不同的发压函数进行压测.
vocab 的实现
混压的实现
# -*- coding:utf-8 -*-
import abc
class Vocab(object):
__metaclass__ = abc.ABCMeta
def __init__(self):
self.vocab = dict()
def add(self, key, value):
self.vocab[key] = value
def get(self, key):
return self.vocab.get(key)
def remove(self, key):
del self.vocab[key]
后记
# -*- coding:utf-8 -*-
import random
from press import Press
from query_eip_press import QueryEipPress, QueryEipVocab
from query_image_press import QueryImagePress, QueryImageVocab
from query_snapshot_press import QuerySnapshotPress, QuerySnapshotVocab
from query_vm_press import QueryVmPress, QueryVmVocab
from query.query_eip_scenario import QueryEipScenario
from query.query_image_scenario import QueryImageScenario
from query.query_snapshot_scenario import QuerySnapshotScenario
from query.query_vm_scenario import QueryVmScenario
class MixedPress(Press):
def __init__(self, qps=100, concurrent=10):
Press.__init__(self, qps, concurrent)
def vocab(self):
self.vocab_list.extend(QueryEipPress().vocab())
self.vocab_list.extend(QueryImagePress().vocab())
self.vocab_list.extend(QuerySnapshotPress().vocab())
self.vocab_list.extend(QueryVmPress().vocab())
def press(self):
vocab = self.vocab_list[random.randint(0, len(self.vocab_list)-1)]
if isinstance(vocab, QueryEipVocab):
QueryEipScenario.press_eip(vocab)
elif isinstance(vocab, QueryImageVocab):
QueryImageScenario.press_image(vocab)
elif isinstance(vocab, QuerySnapshotVocab):
QuerySnapshotScenario.press_snapshot(vocab)
elif isinstance(vocab, QueryVmVocab):
QueryVmScenario.press_vm(vocab)
if __name__ == '__main__':
MixedPress(200, 50).run()
这只是一个很小的功能实现,提供给大家参考.如果有不对的地方,希望得到大家指正.
来源: https://www.cnblogs.com/kangoroo/p/8377465.html