- # coding=utf-8
- import requests, JSON
- import hashlib, time
- import base64
- from locust import HttpLocust,TaskSet,task
- IP_server = "192.168.1.25"
- def md5_int(vale):
- md5_int = vale
- str_md5 = hashlib.md5(md5_int.encode()).hexdigest()
- return str_md5
- def getsession():
- url = "https://{}/login".format(IP_server)
- token = {}
- data = JSON.dumps({
- "name": "admin",
- "password": md5_int('123456'),
- })
- url = requests.post(url=url, data=data)
- token["session_id"] = url.JSON()["session_id"]
- token["cluster_id"] = token["session_id"].split('@')[1]
- return token
- token = getsession()
- class MyBlogs(TaskSet):
- data = JSON.dumps({
- "cameid": "3",
- "limit": 20,
- })
- @task(1)
- def post_test(self):
- # 定义请求
- with self.client.post("/cick/weibocke/", headers=token, data=self.data, catch_response=True) as response:
- if response.status_code == 200:
- print("success")
- else:
- print("fails")
- class websitUser(HttpLocust):
- task_set = MyBlogs
- min_wait = 3000 # 单位为毫秒
- max_wait = 6000 # 单位为毫秒
- if __name__ == "__main__":
- import os
- os.system("locust -f test.py")
来源: http://www.bubuko.com/infodetail-3382016.html