w9scan 是一款全能型的网站漏洞扫描器, 借鉴了各位前辈的优秀代码内置 1200 + 插件可对网站进行一次规模的检测, 功能包括但不限于 web 指纹检测端口指纹检测网站结构分析各种流行的漏洞检测爬虫以及 SQL 注入检测 XSS 检测等等, w9scan 会自动生成精美 html 格式结果报告
在开发 w9scan 之前笔者已经开发过了 w8scan, 了解的朋友可能知道, w8scan 的扫描功能并不强, 所以, 笔者想通过开发 w9scan, 这款在本地运行的扫描器来为 w8scan 的扫描器代码探路~
俗话说工欲善其事必先利其器, 笔者用过很多扫描器, 无不为他们强大的扫描功能所折服而笔者作为一名脚本小子, 本想开车驶向远方却造起了轮子乐此不疲仅以此文纪念 w9scan 的开发过程, 纪念那逝去的青春
额,,, 将视线拉回到本渣作 w9scan 的初期代码是模仿 bugscan 而写的, 因为 w9scan 在编写的初期就是为了兼容 bugscan 的插件, 因此做了大量兼容工作来兼容 w9scan 的代码而做兼容工作必不可少要了解下 bugscan 的工作原理, 所以笔者用自己的渣渣理解力简述下 bugscan 的功能流程:
首先加载服务类型为 www 的插件, www 插件扫描的过程中同时解析服务, 得到 ip, 加载服务类型为 ip 的插件继续扫描, 得到端口信息加载各种端口插件对目标分析, 得到 CMS 信息, 加载对应 cms 的插件等等
不得不说, 这是一个很棒的结构
于是笔者在代码结构层次模仿 w9scan, 功能结构层次模仿 bugscan, 凭着脑中若干的想像!@#@# 制造了 w9scan 第一版本..
对于 bugscan 插件的兼容: 可以在
lib/code/exploit.py
Exploit_run 类中找到, 主要通过下面几个阶段完成
1. 获取 plugins 目录下所有的 py 文件,(__init__.py 除外), 将内容存入一个字典中
2. 插件代码加载函数, 通过调用 imp 模块将字典存储的代码加载进来, 返回模块对象
- def_load_module(self,chunk,name='<w9scan>'):
- try:
- pluginObj = imp.new_module(str(name))
- exec chunk in pluginObj.__dict__
- except Exception as err_info:
- raise LoadModuleException
- return pluginObj
3. 为返回的模块类加上内置的 API(就是一些 bugscan 常用的内置 API, 如 curlhackhttp 之类)
4. 一切就绪, 然后根据 bugscan API 说明, bugscan 插件需定义两个函数 assign 为验证 audit 为执行函数所有接下来来调用这两个函数了
- pluginObj_tuple = pluginObj.assign(service,url)
- if notisinstance(pluginObj_tuple, tuple): # 判断是否是元组
- continue
- bool_value, agrs =pluginObj_tuple[0], pluginObj_tuple[1]
- if bool_value:
- pluginObj.audit(agrs)
CMS 识别
目前 CMS 大多数都是依靠指纹, 看了很多依靠机器识别来验证 webshell 的列子, 笔者也在学习如何用机器学习来识别 CMS
W9scan 的 CMS 识别的指纹库以及代码都是 bugscan 的, 里面有一些有趣的技巧, 分享一下 CMS 指纹文件在 lib/utils/cmsdata.py
可以看到主要是根据 MD5 以及关键字来识别的笔者之前写的 CMS 识别都会用个字段来表示识别方式, 这个似乎没有, 来看看有什么奥秘?
来到 plugins/www/whatcms.py
- import re,urlparse
- from lib.utils.cmsdata import cms_dict
- import hashlib
- def getMD5(password):
- m= hashlib.md5()
- m.update(password)
- return m.hexdigest()
- def makeurl(url):
- prox = "http://"
- if(url.startswith("https://")):
- prox = "https://"
- url_info = urlparse.urlparse(url)
- url = prox + url_info.netloc + "/"
- return url
- def isMatching(f_path, cms_name, sign, res,code, host, head):
- isMatch = False
- if f_path.endswith(".gif"):
- if sign:
- isMatch = getMD5(res) == sign
- else:
- isMatch = res.startswith("GIF89a")
- elif f_path.endswith(".png"):
- if sign:
- isMatch = getMD5(res) == sign
- else:
- isMatch = res.startswith("\x89PNG\x0d\x0a\x1a\x0a")
- elif f_path.endswith(".jpg"):
- if sign:
- isMatch = getMD5(res) == sign
- else:
- isMatch = res.startswith("\xff\xd8\xff\xe0\x00\x10JFIF")
- elif f_path.endswith(".ico"):
- if sign:
- isMatch = getMD5(res) == sign
- else:
- isMatch = res.startswith("\x00\x00\x00")
- elif code == 200:
- if sign and res.find(sign) != -1 or head.find(sign) != -1:
- isMatch = True
- elif sign and head.find(sign) != -1:
- isMatch = True
- if isMatch:
- task_push(cms_name, host, target=util.get_url_host(host))
- security_note(cms_name,'whatcms')
- #print "%s %s" % (cms_name, host)
- return True
- return False
- def assign(service, arg):
- if service == "www":
- return True,makeurl(arg)
- def audit(arg):
- cms_cache = {}
- cache = {}
- def _cache(url):
- if url in cache:
- return cache[url]
- else:
- status_code, header, html_body, error, error = curl.curl2(url)
- if status_code != 200 or not html_body:
- html_body = ""
- cache[url] = (status_code, header, html_body)
- return status_code, header, html_body
- for cmsname in cms_dict:
- cms_hash_list = cms_dict[cmsname]
- for cms_hash in cms_hash_list:
- if isinstance(cms_hash, tuple):
- f_path, sign = cms_hash
- else:
- f_path, sign = cms_hash, None
- if not isinstance(f_path, list):
- f_path = [f_path]
- for file_path in f_path:
- if file_path not in cms_cache:
- cms_cache[file_path] = []
- cms_cache[file_path].append((cmsname, sign))
- cms_key = cms_cache.keys()
- cms_key.sort(key=len)
- isMatch = False
- for f_path in cms_key:
- if isMatch:
- break
- for cms_name, sign in cms_cache[f_path]:
- code, head, res = _cache(arg + f_path)
- isMatch =isMatching(f_path, cms_name, sign, res, code, arg, head)
- if isMatch:
- break
从 audit 函数看起, 第一个 for 循环是给指纹排序
- for cmsname in cms_dict:
- cms_hash_list = cms_dict[cmsname]
- for cms_hash in cms_hash_list:
- if isinstance(cms_hash, tuple):
- f_path, sign = cms_hash
- else:
- f_path, sign = cms_hash, None
- if not isinstance(f_path, list):
- f_path = [f_path]
- for file_path in f_path:
- if file_path not in cms_cache:
- cms_cache[file_path] = []
- cms_cache[file_path].append((cmsname, sign))
为了减少访问网页的次数, cms 识别即最好访问一次路径后, 同时找出相同路径下的 cms 的特征, 例如访问一次 robots.txt 会有很多 cms 的指纹路径是这个
最后在 isMatch() 函数下找到我们问题的答案~
- def isMatching(f_path, cms_name, sign, res,code, host, head):
- isMatch = False
- if f_path.endswith(".gif"):
- if sign:
- isMatch = getMD5(res) == sign
- else:
- isMatch = res.startswith("GIF89a")
- elif f_path.endswith(".png"):
- if sign:
- isMatch = getMD5(res) == sign
- else:
- isMatch = res.startswith("\x89PNG\x0d\x0a\x1a\x0a")
- elif f_path.endswith(".jpg"):
- if sign:
- isMatch = getMD5(res) == sign
- else:
- isMatch = res.startswith("\xff\xd8\xff\xe0\x00\x10JFIF")
- elif f_path.endswith(".ico"):
- if sign:
- isMatch = getMD5(res) == sign
- else:
- isMatch = res.startswith("\x00\x00\x00")
- elif code == 200:
- if sign and res.find(sign) != -1 or head.find(sign) != -1:
- isMatch = True
- elif sign and head.find(sign) != -1:
- isMatch = True
- if isMatch:
- task_push(cms_name, host, target=util.get_url_host(host))
- security_note(cms_name,'whatcms')
- #print "%s %s" % (cms_name, host)
- return True
- return False
可以看到, 是通过对访问路径的后缀做判断, 并且对读取的图片前几位做校验这种判断模式比一些暴力用指纹枚举的判断模式在效率方面应该是更好的
最后, 如果匹配成功, 会像我们上面说到的一样, 调用相应 CMS 的相应插件
- if isMatch:
- task_push(cms_name, host, target=util.get_url_host(host))
- security_note(cms_name,'whatcms')
- #print "%s %s" % (cms_name, host)
- return True
在我们 exploit_run 类的内部实现 task_push 的功能就好啦~
整理插件目录
刚开始笔者接触到的插件是这样的
这种对查找插件和添加都不太方便, 于是笔者一个一个的将它们整理到了目录, 变成了这个样子这里面的辛苦!
其实整理也不辛苦拉, 像笔者这么懒的人怎么会一个个手动干呢, 一个正则一个移动文件 一个自动创建文件夹就把这些搞定了
为什么要放出来说呢? 因为笔者认为这是笔者用程序干过的最省力的事情了! QAQ
. 子域名查询
子域名查询是比较头疼的东西, 因为对于 w9scan, 不想太依赖第三方庞大的库支持, 重写 DNS 协议一个觉得麻烦而且重复造轮子没有意义, 参考 github 上别人的子域名工具都太庞大了, 短小精悍的不知道有木有最后用下面这十行的代码搞定了
- try:
- l =socket.gethostbyname_ex(hostnames)
- security_info(str(l),'subdomain')
- except socket.error:
- pass
O(_)O 哈哈~, 其实通过 socket.gethostname 就可以完成子域名爆破的工作, 为什么要用自己重写 dns 协议呢
集成 Buildwith
这是一个识别网站框架, 网站信息的脚本, 当初认识它是在一本爬虫的书上, 后来发现原来调用的是 Wappalyzer 的脚本, 但是 Buildwith 更短小精悍, 一个 py 文件就写好了全部的功能, 简直深得我意于是集成之~
大家可以自行 pipinstall buildwith 安装这个库试试, 非常强大!
当然如果大家下载 w9scan 是不需要安装这个的, 因为我已经将他集成上去了!!
原理其实也非常简单, 就是这么多的指纹!!!
其实, 一款好用的扫描器无外乎需要在速度和精准度之间权衡, 精准度需要大量的插件做支撑, 而这些插件, 也都是一些特定的指纹访问特定的路径, 但此时速度方面就得不到提高似乎陷入一个二者不可得兼的状况
线程问题
对于像扫描器这样的 IO 密集型作业, 用线程似乎可以大大提升速度当然, 用多进程联动的方式也行, 这应该是 w9scan 后面研究的对象, 现在说下 w9scan 怎样控制线程的
起初, 用了网上常用的 ThreadPool 模块, 但是不知怎的, 经常会出现 python 运行已停止的提示(Windows 上), 而且夹杂一大堆不知道有什么用的模块上回说到, 笔者并不洗用那种一大堆文件的模块, 不利于集成就算了, 还 ctrl+c 暂停的时候卡住了! 是可忍孰不可忍, 于是笔者立马操起菜刀, 小心翼翼的在 Google 上搜索答案
以笔者的渣渣水平, 十个答案里面只能看懂 3 个, 转而想想看代码好了之前笔者用 POC-T 这款软件的时候觉得挺稳, 于是乎拉它开刀, 还真开对了!
笔者模仿 [POC-T]( https://github.com/Xyntax/POC-T/blob/2.0/lib/controller/engine.py ) 的线程引擎不仅解决了程序异常, 也解决了 ctrl+c 阻塞的问题请听我慢慢道来
程序运行异常可能在于 ThreadPool 模块中有不少自动调节线程的程序, 经过笔者多次的测试, 每次出现异常时加载的插件并不相同, 可以排除是笔者插件代码问题导致实际上笔者的程序不需要这么高端的线程池, 仿照 POC-T, 笔者写了一个简陋, 但是好用的线程池
- # coding:utf-8
- # 模拟一个 线程池, 可以向里面添加任务,
- import threading
- import time
- import traceback
- import Queue
- import random
- class w8_threadpool:
- def__init__(self,threadnum,func_scan):
- self.thread_count = self.thread_nums = threadnum
- self.scan_count_lock = threading.Lock()
- self.thread_count_lock = threading.Lock()
- self.load_lock = threading.Lock()
- self.scan_count = 0
- self.isContinue = True
- self.func_scan = func_scan
- self.queue = Queue.Queue()
- def push(self,payload):
- self.queue.put(payload)
- defchangeScanCount(self,num):
- self.scan_count_lock.acquire()
- self.scan_count += num
- self.scan_count_lock.release()
- defchangeThreadCount(self,num):
- self.thread_count_lock.acquire()
- self.thread_count += num
- self.thread_count_lock.release()
- defrun(self):
- for iin range(self.thread_nums):
- t= threading.Thread(target=self.scan, name=str(i))
- t.setDaemon(True)
- t.start()
- #It can quit with Ctrl-C
- while1:
- ifself.thread_count > 0 and self.isContinue:
- time.sleep(0.01)
- else:
- break
- defstop(self):
- self.load_lock.acquire()
- self.isContinue = False
- self.load_lock.release()
- defscan(self):
- while1:
- self.load_lock.acquire()
- ifself.queue.qsize() > 0 and self.isContinue:
- payload = self.queue.get()
- self.load_lock.release()
- else:
- self.load_lock.release()
- break
- try:
- # POC 在执行时报错如果不被处理, 线程框架会停止并退出
- self.func_scan(payload)
- except KeyboardInterrupt:
- self.isContinue = False
- raise KeyboardInterrupt
- except Exception:
- errmsg = traceback.format_exc()
- self.isContinue = False
- self.changeScanCount(-1)
- self.changeThreadCount(-1)
- if __name__ == '__main__':
- defcalucator(num):
- i =random.randint(1, 100)
- u =num
- a = i* u
- if (a% 6 == 0):
- for x in range(5):
- print "new thread"
- p.push(x)
- p =w8_threadpool(3, calucator)
- for i inrange(100000):
- p.push(i)
- p.run()
这么一个简陋的线程池对于笔者的 w9scan 来说足够了(其实笔者的要求也不高是不是)
Ctrl+c 阻塞主线程可能是 join()这个方法阻塞了, 解决办法就是不用这个方法, 用 while 循环和 time.sleep 功能达到一个模拟 join()的作用
后记
我觉得, 一些成熟的扫描器在一些常规扫描方式达到登峰造极的地步, 以至于在想用这些常规方法扫描漏洞, 效果可能不太好想要有效挖掘一个网站的漏洞, 应该从那些不起眼的地方着力, 比如 w9scan 扫描器中有 git svm 泄露路径寻找, 会基于域名的字典来查找备份文件, 以及根据爬虫寻找出的文件逐个进行一句话密码破解等等
为什么写此文呢? 因为笔者觉得开发过程很无聊, 每次发布的版本都是更新了什么修复了什么, 如果能加些佐料, 加以记录像武侠小说一样, 又能分享经验, 何乐而不为呀!
来源: http://www.tuicool.com/articles/JfAveiQ