2 年前的实验室项目需要对 exploit-db 进行爬虫, 这里回顾一下知识.
基本思路, 使用 urllib.request 访问 exploit-db, 使用 BeautifulSoup 对 Response 进行解析, 然后将提取出的内容存储至 MySQL 中.
urllib
写这个 demo 的时候 Python2 还没有废弃, 这里将代码移植至 Python3 中.
由于 exploit-db 中漏洞页面的 url 是 https://www.exploit-db.com/exploits/ + eid 的方式构成的, 因此遍历 eid 即可爬取所有的漏洞.
构造 Request 与网页访问
Request 文档
class urllib.request.Request(url, data=None, headers={}, origin_req_host=None, unverifiable=False, method=None)
headers 可以在构造函数中指定, 也可以通过 add_header 方法进行添加
urllib.request.urlopen(url, data=None, [timeout, ]*, cafile=None, capath=None, cadefault=False, context=None)
urlopen()函数既可以接收 url, 也可以接收 Request.urlopen()将返回一个字节对象, 需要我们自行处理编码.
- def spider(spider_url):
- # 构造 request
- user_agent = random.choice(ua_list)
- spider_request = request.Request(spider_url)
- spider_request.add_header('User-Agent', user_agent)
- spider_response = request.urlopen(spider_request, timeout=30)
- html = spider_response.read().decode('utf-8')
异常处理
urllib.request 中的异常
爬取过程中遇到的一些异常
- URLError // 页面不存在
- socket.timeout //read()超时
- UnicodeDecodeError // 目标页面是 PDF,decode('utf-8')错误
- def spider(spider_url):
- # 构造 request
- user_agent = random.choice(ua_list)
- spider_request = request.Request(spider_url)
- spider_request.add_header('User-Agent', user_agent)
- try:
- spider_response = request.urlopen(spider_request, timeout=30)
- except error.URLError as e:
- return 'error, URLError'
- # noinspection PyBroadException
- try:
- HTML = spider_response.read().decode('utf-8')
- except socket.timeout as e:
- return 'error, socket.timeout'
- except UnicodeDecodeError as e:
- return 'error, UnicodeDecodeError'
- except Exception as e:
- return 'error, Exception: %s' % e
- return HTML
- BeautifulSoup
exploit-db 在这段时间也更新了页面, 之前写的解析函数已经无法运行.
的安装和详细使用方法可以参考官方文档, 这里对使用的函数进行说明:
BeautifulSoup 通过将 HTML/xml 文件转变成一个 BeautifulSoup 对象, 然后根据该对象提供的一些方法对 HTML/xml 进行查找和修改.
BeautifulSoup 可以通过. 访问标签, 通过 [] 访问属性, 通过 find()和 find_all()选择需要的标签, 然后提取其中的信息.
Chrome 提供的检查工具可以很容易确定元素的位置, 分析 HTML 中需要的标签的位置, 然后选择合适的过滤器.
- def bs4html(HTML):
- # 实现对 HTML 的解析
- soup = BeautifulSoup(HTML, 'html.parser')
- for div in soup.find_all('div', class_='col-sm-12 col-md-6 col-lg-3 d-flex align-items-stretch'):
- for h in div.find_all('div', class_='col-6 text-center'):
- print(h.h4.get_text().strip() + h.h6.get_text().strip())
- for s in div.find_all('div', class_='stats h5 text-center'):
- if s.strong.string.strip() == 'EDB Verified:':
- if s.i['class'] == ['mdi', 'mdi-24px', 'mdi-check']:
- print('EDB Verified: Yes')
- else:
- print('EDB Verified: No')
- elif s.strong.string.strip() == 'Exploit:':
- print(s.strong.string.strip() + s.a['href'])
- else:
- if s.find('a') is None:
- print(s.strong.string.strip())
- else:
- print(s.strong.string.strip() + s.a['href'])
数据库存储
ORM 也就将数据库映射成对象, 然后使用对象的方式操作 SQL 语句, 这里使用 SQLalchemy 框架.
需要实现两个类, 一个类用于和数据库通信, 完成增删改查等操作, 另一个类是映射类, 将数据库中的表与之形成映射.
数据库中的表
- class DBPoc(Base):
- __tablename__ = 'exp_poc_info'
- id = Column(Integer, primary_key=True)
- eid = Column(Integer)
- cve = Column(String)
- title = Column(String)
- author = Column(String)
- published_time = Column(String)
- verified = Column(String)
- platform = Column(String)
- exploit_type = Column(String)
- exploit_url = Column(String)
- exploit_app = Column(String)
- def __init__(self, eid, cve,
- title, author, published_time, verified,
- platform, exploit_type, exploit_url, exploit_app):
- self.eid = eid
- self.cve = cve
- self.title = title
- self.author = author
- self.published_time = published_time
- self.verified = verified
- self.platform = platform
- self.exploit_type = exploit_type
- self.exploit_url = exploit_url
- self.exploit_app = exploit_app
与数据库的通信
create_engine()与数据库进行连接, 而具体的增删改查需要使用 session 进行操作
- class DBEngine(object):
- def __init__(self):
- #
- self.engine = create_engine('sqlite:///exploit_db.sqlite', echo=False)
- db_session = sessionmaker(autocommit=False, autoflush=False, bind=self.engine)
- self.session = db_session()
- def close_db(self):
- #
- self.session.close()
- # interface
- # lower words connected with '_'
- def add_poc(self, Poc):
- # 添加 poc
- self.session.add(Poc)
- self.session.commit()
- def del_poc(self, eid):
- # 删除 poc
- poc = self.session.query(DBPoc).filter(DBPoc.eid == eid).first()
- try:
- self.session.delete(poc)
- self.session.commit()
- except Exception as e:
- print(e)
- def is_eid_exist(self, eid):
- # exist True
- # not exist False
- if self.session.query(DBPoc).filter(DBPoc.eid == eid).first() is None:
- return False
- else:
- return True
- def view_all_poc(self):
- print('DBPoc:')
- all_poc = self.session.query(DBPoc)
- for poc in all_poc:
- print(poc)
完整的代码见 https://github.com/wugedz/exploitdbspider
来源: http://www.bubuko.com/infodetail-3456568.html