多年一直从事数据相关工作. 对数据开发存在的各种问题深有体会. 数据处理工作主要有: 爬虫, ETL, 机器学习. 开发过程是构建数据处理的管道 Pipeline 的过程. 将各种模块拼接起来. 总结步骤有: 获取数据, 转化, 合并, 存储, 发送. 数据研发工作和业务系统研发有着很多的差别. 数据项目更多是铺管道过程, 各模块通过数据依赖, 而业务系统开发是建大楼过程. 很多情况爬虫工程师, 算法工程师, 写出来的数据处理代码, 非常混乱. 因为在看到真实数据前, 无法做准确的设计, 更不用说性能上的要求. 前段时间花了大量时间对 Asyncio 库深入研究. 决定开发了数据驱动框架, 从模块化, 灵活度, 性能方面来解决数据处理工作的问题. 这就我创立 Databot 开源框架的初衷.
花大半个月时间框架基本完成, 能够解决处理数据处理工作, 爬虫, ETL, 量化交易. 并有非常好的性能表现. 欢迎大家使用和提意见.
项目地址: https://github.com/kkyon/databot
安装方法: pip3 install -U databot
代码案例: https://github.com/kkyon/databot/tree/master/examples
多线程 VS 异步协程:
总的来说高并发的数据 IO 使用异步协程更具有优势. 因为线程占用资源多, 线程切换时候代价很大, 所以建议的线程数都是 cpu*2. Python 由于 GIL 限制, 通过多线程很难提升性能.
而通过 asyncio 可以达到非常的吞吐量. 并发数几乎没有限制.
具体可以参考这篇文章:
https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html
在普通笔记本上 python asyncio 在 9 分钟 完成 100 万个网页请求.
Databot 性能测试结果:
使用百度爬虫案例来作出:
有一批关键词, 需要在百度搜索引擎. 记录前十页的文章标题. 在 SEO, 舆情等场景经常要做类似事情. 测试中使用了 100 个关键字 (需要抓取 1000 个网页) 大概三分钟就能完成. 测试环境结果如下:
# ---run result----
HTTP 返回在 1 秒左右
#post man test result for a page requrest ;1100ms
ping 的是时间 42ms
- # PING www.a.shifen.com (180.97.33.108): 56 data bytes
- # 64 bytes from 180.97.33.108: icmp_seq=0 ttl=55 time=41.159 ms
Databot 测试结果: 每秒能抓取 50 个条目, 每秒能处理 6 个网页.
- # got len item 9274 speed:52.994286 per second,total cost: 175s
- # got len item 9543 speed:53.016667 per second,total cost: 180s
- # got len item 9614 speed:51.967568 per second,total cost: 185s
Python Asyncio 的问题:
asyncio 本身, 比如概念复杂, futrue,task, 区别, ensure futer,crate_task.
协程编写要求对工程师高, 特别在数据项目中.
asyncio 支持的三方库有限, 需要结合多线程和多进程来开发.
Databot 理念和
数据工程师只关注核心逻辑, 编写模块化函数, 不需要考虑 asyncio 的特性. Databot 将处理外部 IO, 并发, 调度问题.
Databot 基本概念:
Databot 设计非常简洁, 一共只有三个概念: Pipe,Route,Node
Pipe 是主流程, 一个程序可以有多个 Pipe, 相互联系或独立. Route,Node, 都是包含在 pipe 内部.
Route 是路由器, 主要起数据路由, 汇总合并作用. 有 Branch, Return,Fork,Join,BlockedJoin. 其中 Branch,Fork, 不会改变主流程数据. Return,Join, 会将处理后的数据放回到主流程中. 可以通过嵌套 Route, 组合出复杂的数据网络.
Node 是数据驱动节点. 处理数据逻辑节点, 一些 HTTP,Mysql,AioFile , 客户自定义函数, Timer,Loop 都是属于 Node.
如何安装 Databot:
pip3 install -U databot
github 地址: https://github.com/kkyon/databot
爬虫代码解析:
更多例子参照: https://github.com/kkyon/databot/tree/master/examples
针对百度爬虫例子, 主流程代码如下:
get_all_items, 是客户编写函数用于解析网页上的条目.
get_all_page_url 是自定义编写函数用于获取网页上的翻页链接.
Loop 通过循环列表把, 链接发送到 pipe 中.
HTTPLoader 将读入 URL, 下载 HTML. 生成 HTTP response 对象放入 Pipe 中
Branch 会拷贝一份数据 (Httpresponse) 导入分支中, 然后 get_all_items 会解析成最终结果, 存入文件中. 此时主流程数据不受影响. 仍然有一份 HTTP response
Branch 拷贝 pipe 中的 Httpresponse 到分支, 然后通过 get_all_page_url 解析全部翻页链接. 然后通过 HTTPloader 下载相应的网页, 解析保持.
以上每个步骤都会通过 Databot 框架调用和并发.
BotFrame.render('baiduspider')函数可以用于生产 pipe 的结构图. 需要安装 https://www.graphviz.org/download/
主函数代码:
- def main():
- words = ['贸易战', '世界杯']
- baidu_url = 'https://www.baidu.com/s?wd=%s'
- urls = [baidu_url % (word) for word in words]
- outputfile=aiofile('baidu.txt')
- Pipe(
- Loop(urls),
- HttpLoader(),
- Branch(get_all_items,outputfile),
- Branch(get_all_page_url, HttpLoader(), get_all_items, outputfile),
- )
- #生成流程图
- BotFrame.render('baiduspider')
- BotFrame.run()
- main()
下列是生成的流程图
全部代码:
- from databot.flow import Pipe, Branch, Loop
- from databot.botframe import BotFrame
- from bs4 import BeautifulSoup
- from databot.http.http import HttpLoader
- from databot.db.aiofile import aiofile
- import logging
- logging.basicConfig(level=logging.DEBUG)
- #定义解析结构
- class ResultItem:
- def __init__(self):
- self.id: str = '' self.name: str =''
- self.url: str = ' '
- self.page_rank: int = 0
- self.page_no: int = 0
- def __repr__(self):
- return '%s,%s,%d,%d'%(str(self.id),self.name,self.page_no,self.page_rank)
- # 解析具体条目
- def get_all_items(response):
- soup = BeautifulSoup(response.text, "lxml")
- items = soup.select('div.result.c-container')
- result = []
- for rank, item in enumerate(items):
- import uuid
- id = uuid.uuid4()
- r = ResultItem()
- r.id = id
- r.page_rank = rank
- r.name = item.h3.get_text()
- result.append(r)
- return result
- # 解析分页链接
- def get_all_page_url(response):
- itemList = []
- soup = BeautifulSoup(response.text, "lxml")
- page = soup.select('div#page')
- for item in page[0].find_all('a'):
- href = item.get('href')
- no = item.get_text()
- if '下一页' in no:
- break
- itemList.append('https://www.baidu.com' + href)
- return itemList
- def main():
- words = ['贸易战', '世界杯']
- baidu_url = 'https://www.baidu.com/s?wd=%s'
- urls = [baidu_url % (word) for word in words]
- outputfile=aiofile('baidu.txt')
- Pipe(
- Loop(urls),
- HttpLoader(),
- Branch(get_all_items,outputfile),
- Branch(get_all_page_url, HttpLoader(), get_all_items, outputfile),
- )
- #生成流程图
- BotFrame.render('baiduspider')
- BotFrame.run()
- main()
来源: https://www.cnblogs.com/codemind/p/9535491.html