0*01 需求背景
日常扫描行为是一个常见的需求, 同时我们希望, 可以更方便的进行定制自动化扫描任务制定与执行. 我们不具体要求实用的扫描工具系统是什么, 开源与商业看具体自己的实际需求情况, 我们只是用 AWVS 举一个例子.
AWVS 本身提供了方便的 REST API 对外服务, 如何通过设计封装, 让 AWVS 执行的高效简单, 这篇要完成的一个任务.
如果把 AWVS 换成其它的安装扫描工具, 可否按同样的思路降低工具使用的流程复杂度, 让安全工具的使用更自动化遍历, 最初构建这个项目时考虑的, 这次我们通过 AWVS 这个例子, 来实践这种可能性. 有一个这个基础的设计可以延伸扩展到其它工具, 按本案方法进行扩展驱动其功能.
下面是整体的设计, 将 REST API 与 RPC 结合方式, 对整个扫描工具进行封装自动化.
现存在一个大家喜欢讨论的问题是 RPC 和 REST 那个好, 在我们这里不讨论那个好, 按应用场景同时使用了两个技术, REST 做业务逻辑和数据合法性检查, PRC 做功能封装驱动. 在做规模的横向扩展的时候, 我们可以通过负载的形式, 扩大 REST 和 RPC 服务的并性数和可用性. 将混合的业务逻辑用 REST 和 RPC 分层的方式时行简化, 当然除了好处一定也有基于这种设计产生的其它问题.
本次代码层底核心是, 封装了 AWVS 的 auth 认证和指定扫描特定域名的处理过程, 两个主要的「mocker」就是 auth 和 scan, 时序图很显示的就是这些.
0*02 功能实现
具体的实现部分, 将 Django Command,Django RPC,Django REST API,PyTEST,FSWatch 的部分进行介绍, 会基于整套技术方案, 产生其它的驱动方法, 本案就是基于 AWVS 展开. 最后达到的目地, 就将 AWVS 对目标域名的操作扫描任务指定, 简化成了一条命令. 如果之前还是说部署环境, 现在就是具体的业务动作.
1. 功能使用
AWVS 本身提供了 REST API 的接口, 通过进一步的抽象, 简化和隐藏了复杂的调用过程. 为了便于简单实现对 AWVS 的操作, 最后就变成了简单的一条命令调用.
python manage.py DSL -d lua.ren
Django Command 的功能实现, 是整个调用时序的入口, 假设扫描的需求和设置很简答, 只有一个扫描域名的设定.
2. 功能函数
扫描功能实现, 是靠整个时序链调用来完成的, 如果直接从 Django Command 调用 Django RPC, 参于的调用数据总体会比再加入一层 REST API 调用更简单, 而整个调用层级的构建, 让一个复杂的 API 调用, 分层解耦简单化.
对于 AWVS 最核心的驱动函数: 一个是授权 auth, 另一个就是添加测试任务.
2.1 授权
meta 数据结构中存放的是基本的授权用户信息, email 和 password.
- def auth(self, meta):
- import urllib2
- import ssl
- import JSON
- ssl._create_default_https_context = ssl._create_unverified_context
- url_login="https://localhost:3443/api/v1/me/login"
- send_headers_login={
- 'Host': 'localhost:3443',
- 'Accept': 'application/json, text/plain, */*',
- 'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Content-Type': 'application/json;charset=utf-8'
- }
- data_login='{"email":"' +meta['email'] + '",' + '"password":"'+ meta['password']+'","remember_me":false}'
- req_login = urllib2.Request(url_login,headers=send_headers_login)
- response_login = urllib2.urlopen(req_login,data_login)
- xauth = response_login.headers['X-Auth']
- COOOOOOOOkie = response_login.headers['Set-Cookie']
- print COOOOOOOOkie,xauth
- return True
2.2 添加扫描任务
用 Auth 取回的 Cookie 信息, 再进行 API 的调用, 来完玘任务注册.
- def addTarget(self, formaturl):
- url="https://localhost:3443/api/v1/targets"
- send_headers2={
- 'Host':'servers:3443',
- 'Accept': 'application/json, text/plain, */*',
- 'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
- 'Content-Type':'application/json;charset=utf-8',
- 'X-Auth':xauth,
- 'Cookie':COOOOOOOOkie,
- }
- try:
- for i in formaturl:
- target_url='http://'+i.strip()
- data='{"description":"222","address":"'+target_url+'","criticality":"10"}'
- req = urllib2.Request(url,headers=send_headers2)
- response = urllib2.urlopen(req,data)
- jo=eval(response.read())
- target_id=jo['target_id']
- url_scan="https://localhost:3443/api/v1/scans"
- headers_scan={
- 'Host': 'localhost:3443',
- 'Accept': 'application/json, text/plain, */*',
- 'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Content-Type': 'application/json;charset=utf-8',
- 'X-Auth':xauth,
- 'Cookie':COOOOOOOOkie,
- }
- data_scan='{"target_id":'+'\"'+target_id+'\"'+',"profile_id":"11111111-1111-1111-1111-111111111111","schedule":{"disable":false,"start_date":null,"time_sensitive":false},"ui_session_id":"66666666666666666666666666666666"}'
- req_scan=urllib2.Request(url_scan,headers=headers_scan)
- response_scan=urllib2.urlopen(req_scan,data_scan)
- print response_scan.read()
- except Exception,e:
- print e
- return True
这两个函数是最底层的函数, 关于 AWVS 的 API 封装 DEMO 网上有, 大家可自行参考.
3. 测试用例
如果直接联调, 调试成本其实也不低, 如果单体程序问题, 联调效率会有重复工作的问题. 为了更好的理解这套 AWVS 的函数, 是如何在当前设计结构中被调用的. 我们用 PYTSET 把重点函数做了单体测试.
后续可能会加入其它模块的封装调度, 单体测试就变的必须起来.
3.1 测试认证过程
- @pytest.mark.scan
- def test_5(setup_module):
- import awvs
- ins = awvs.AWVS()
- ins.auth({"email":"name", "password":"pwd"})
- assert True == ret
3.2 测试添加扫描任务过程
- @pytest.mark.scan
- def test_6(setup_module):
- import awvs
- ins = awvs.AWVS()
- ret = ins.addTarget(['lua.ren\n','candylab.net\n'])
- assert True == ret
3.3 添加认证并扫描的过程
- @pytest.mark.scan
- def test_7(setup_module):
- import awvs
- ins = awvs.AWVS()
- ins.auth({"email":"name", "password":"pwd"})
- ret = ins.addTarget(['lua.ren\n','candylab.net\n'])
- assert True == ret
其实认证和扫描的过程, 前期是拆开测试的, 如果不先认证, 基本上就异常了, 无法添加扫描任务. 单测试用例是为了提供单体质量, 提高结合测试的成功效率.
整体测试的还是 auth 函数用户信息字典入参的测试, 与 addTarget 函数域名列表的测试. RPC 就更像一个代理人服务程序.
3.4 自动化测试
这个工程使用的测试工具是 pytest. 我们想通过自动监听 test.py 的 python 单体测试程序源码的变更, 自动调用 pytest 去扫行单体测试脚本.
如果在 Linux 平台一下可以使用 tup, 是一个很好用的工具. 因我们在 Mac 环境下扫行单体测试程序, 我们使用 fswatch 完成这个功能.
3.4.1 安装 fswatch
brew intall fswatch
如何在 Linux 平台用 tup 其实也很好.
3.4.2 监听脚本
- #!/bin/bash
- DIR=$1
- if [ ! -n "$DIR" ] ;then
- echo "you have not choice Application directory !"
- exit
- fi
- fswatch $DIR | while read file
- do
- #echo "${file} was modify">> unittest.log 2>&1
- echo "${file} was modify"
- pytest -v -s -m"scan" ${file}
- done
3.4.3 驱动脚本
#!/bin/bash
sh autotest.sh test.py
4. RPC 接口功能
当单体功能达到我们设想的要求时, 需要封装一个 RPC 服务对外提供服务. 程序越复杂单体测试用例的量就同比量大.
- @jsonrpc_method('myapp.autoscanner')
- def auto_scanner(request, domain='lua.ren'):
- import awvs
- ins = awvs.AWVS()
- ins.auth({"email":"name", "password":"pwd"})
- ins.addTask(['lua.ren\n','candylab.net\n'])
- return True
RPC 功能相当于把单体调用集成到一个接口, 正常一个完整的单体要做入参的检查工作, 过滤掉非法入参.
因为我们最开始是考虑用新加的 REST API 作与外部调用者进行通信, 在 REST API 做入参检查, 并且 REST API 不需求外部调用者调用时, 要依赖安全 RPC 客户端.
5. Django Command 功能实现
实现了单体对 AWVS 的封装, 并实现 RPC 服务, 先不考虑 REST 和前端的控制, 实际上我们想当于把 AWVS 的 REST 功能命令行化.
- from django.core.management.base import BaseCommand, CommandError
- import traceback
- class Command(BaseCommand):
- def add_arguments(self, parser):
- parser.add_argument(
- '-d',
- '--domain',
- action='store',
- dest='domain',
- default='lua.ren',
- help='domain.',
- )
- def handle(self, *args, **options):
- try:
- if options['domain']:
- print 'scan domain, %s' % options['domain']
- from jsonrpc.proxy import ServiceProxy
- s = ServiceProxy('http://localhost:5000/json/')
- s.myapp.autoscanner(options['domain'])
- self.stdout.write(self.style.SUCCESS(u'命令 %s 执行成功, 参数为 %s' % (__file__, options['domain'])))
- except Exception, ex:
- traceback.print_exc()
- self.stdout.write(self.style.ERROR(u'命令执行出错'))
6. REST API 实现
将功能性的内容用 RPC 实现, 将 check 业务划分和检查放到了 REST API 层, 这样后端服务调用依赖 RPC Server 和 RPC Client, 而 REST API 调用层不用考虑这个问题.
- @csrf_exempt
- def addItem(request):
- if request.method == 'GET':
- return JSONResponse("GET")
- if request.method == 'POST':
- data = JSONParser().parse(request)
- flg_key = data.has_key('key')
- if not flg_key:
- return JSONResponse('key is empty!')
- access_key = data['key']
- if cmp(access_key, "test"):
- return JSONResponse("access key error.")
- flg_domain = data.has_key('domain')
- if not flg_domain:
- result = {"error":"-1","errmsg":"domain is empty"}
- return HttpResponse(JSON.dumps(result,ensure_ascii=False),content_type="application/json,charset=utf-8")
- from jsonrpc.proxy import ServiceProxy
- s = ServiceProxy('http://localhost:5000/json/')
- import awvs
- ins = awvs.AWVS()
- ins.auth({"email":"name", "password":"pwd"})
- ins.addTask(['lua.ren\n','candylab.net\n'])
- result = {"error":"0","errmsg":"none"}
- return HttpResponse(JSON.dumps(result,ensure_ascii=False),content_type="application/json,charset=utf-8")
Django REST 让 REST 的实现更便利, 这样可以把重点放到业务逻辑检查对接, 相对单层的测试更有重点.
REST API 路由可以快速建立.
- urlpatterns = [
- url(r'scanner/$', views.addItem),
- ]
用 CURL 客户端测试 REST API.
curl -l -H "Content-type: application/json" -X POST -d '{"key":"test","domain":"test.com"}' 127.0.0.1:8080/scanner/
7. 命令行
最终我们实现了 AWVS 的 REST API 的 RPC 和 REST 封装, 然后命令行化, 当然的其中 RPC 和 REST API 可以其它的地方复用.
- 7.1 Django Command
- python manage.py DSL -d lua.ren
- 7.2 CURL & REST API
- curl -l -H "Content-type: application/json" -X POST -d '{"key":"test","domain":"test.com"}' 127.0.0.1:8080/scanner/
0*03 后记
本篇是听取了 Freebuf 上老师和朋友们的建议反馈, 然后产生了这个工程. 这些老师朋友都是 SDL 的专家. 特别是李老师给个工程起了一个名字叫 semaphore, 并 PR. 在这个工程的说明中引用了他们的对需求更精准的描述, 还有以软件本身的考虑. 将 Semaphore 工程中有关 AWVS 的部分, 抽出一个演示插件化 RPC 项目: https://github.com/shengnoah/semaphore-awvs-driver , 仅供参考.
来源: http://www.tuicool.com/articles/fy6Njum