异步 IO
所谓「异步 IO」, 就是你发起一个 IO 阻塞 操作, 却不用等它结束, 你可以继续做其他事情, 当它结束时, 你会得到通知.
实现异步 IO 的方式
单线程 + 异步协程实现异步 IO 操作
异步协程用法
从 Python 3.4 开始, Python 中加入了协程的概念, 但这个版本的协程还是以生成器对象为基础的, 在 Python 3.5 则增加了 async/await, 使得协程的实现更加方便. 首先我们需要了解下面几个概念:
event_loop: 事件循环, 相当于一个无限循环, 我们可以把一些函数注册到这个事件循环上, 当满足某些条件的时候, 函数就会被循环执行. 程序是按照设定的顺序从头执行到尾, 运行的次数也是完全按照设定. 当在编写异步程序时, 必然其中有部分程序的运行耗时是比较久的, 需要先让出当前程序的控制权, 让其在背后运行, 让另一部分的程序先运行起来. 当背后运行的程序完成后, 也需要及时通知主程序已经完成任务可以进行下一步操作, 但这个过程所需的时间是不确定的, 需要主程序不断的监听状态, 一旦收到了任务完成的消息, 就开始进行下一步. loop 就是这个持续不断的监视器.
coroutine: 中文翻译叫协程, 在 Python 中常指代为协程对象类型, 我们可以将协程对象注册到事件循环中, 它会被事件循环调用. 我们可以使用 async 关键字来定义一个方法, 这个方法在调用时不会立即被执行, 而是返回一个协程对象.
task: 任务, 它是对协程对象的进一步封装, 包含了任务的各个状态.
future: 代表将来执行或还没有执行的任务, 实际上和 task 没有本质区别.
我们还需要了解 async/await 关键字, 它是从 Python 3.5 才出现的, 专门用于定义协程. 其中, async 定义一个协程, await 用来挂起阻塞方法的执行.
定义一个协程
- import asyncio
- async def execute(x):
- print('Number:', x)
- coroutine = execute(1)
- print('Coroutine:', coroutine)
- print('After calling execute')
- loop = asyncio.get_event_loop()
- loop.run_until_complete(coroutine)
- print('After calling loop')
- -----------------
输出结果
- Coroutine:
- After calling execute
- Number: 1
- After calling loop
我们引入了 asyncio 这个包, 这样我们才可以使用 async 和 await, 然后我们使用 async 定义了一个 execute() 方法, 方法接收一个数字参数, 方法执行之后会打印这个数字.
随后我们直接调用了这个方法, 然而这个方法并没有执行, 而是返回了一个 coroutine 协程对象. 随后我们使用 get_event_loop() 方法创建了一个事件循环 loop, 并调用了 loop 对象的 run_until_complete() 方法将协程注册到事件循环 loop 中, 然后启动. 最后我们才看到了 execute() 方法打印了输出结果.
可见, async 定义的方法就会变成一个无法直接执行的 coroutine 对象, 必须将其注册到事件循环中才可以执行.
task 的使用:
task, 它是对 coroutine 对象的进一步封装, 它里面相比 coroutine 对象多了运行状态, 比如 running,finished 等, 我们可以用这些状态来获取协程对象的执行情况.
在上面的例子中, 当我们将 coroutine 对象传递给 run_until_complete() 方法的时候, 实际上它进行了一个操作就是将 coroutine 封装成了 task 对象, 我们也可以显式地进行声明
- import asyncio
- async def execute(x):
- print('Number:', x)
- return x
- coroutine = execute(1)
- print('Coroutine:', coroutine)
- print('After calling execute')
- loop = asyncio.get_event_loop()
- task = loop.create_task(coroutine)
- print('Task:', task)
- loop.run_until_complete(task)
- print('Task:', task)
- print('After calling loop')
- ------------------
输出结果
- Coroutine:
- After calling execute
- Task:>
- Number: 1
- Task: result=1>
- After calling loop
ensure_future() 的使用
定义 task 对象还有一种方式, 就是直接通过 asyncio 的 ensure_future() 方法, 返回结果也是 task 对象, 这样的话我们就可以不借助于 loop 来定义, 即使我们还没有声明 loop 也可以提前定义好 task 对象, 写法如下:
- import asyncio
- async def execute(x):
- print('Number:', x)
- return x
- coroutine = execute(1)
- print('Coroutine:', coroutine)
- print('After calling execute')
- task = asyncio.ensure_future(coroutine)
- print('Task:', task)
- loop = asyncio.get_event_loop()
- loop.run_until_complete(task)
- print('Task:', task)
- print('After calling loop')
绑定回调
也可以为某个 task 绑定一个回调方法, 来看下面的例子:
- import asyncio
- import requests
- async def request():
- url = 'https://www.baidu.com'
- status = requests.get(url).status_code
- return status
- def callback(task):
- print('Status:', task.result())
- coroutine = request()
- task = asyncio.ensure_future(coroutine)
- task.add_done_callback(callback)
- print('Task:', task)
- loop = asyncio.get_event_loop()
- loop.run_until_complete(task)
- print('Task:', task)
- ---------------------
输出
- Task: cb=[callback() at demo.py:11]>
- Status:
- Task: result=>
我们定义了一个 request() 方法, 请求了百度, 返回状态码, 但是这个方法里面我们没有任何 print() 语句. 随后我们定义了一个 callback() 方法, 这个方法接收一个参数, 是 task 对象,
然后调用 print() 方法打印了 task 对象的结果. 这样我们就定义好了一个 coroutine 对象和一个回调方法, 我们现在希望的效果是, 当 coroutine 对象执行完毕之后, 就去执行声明的 callback() 方法.
那么它们二者怎样关联起来呢? 很简单, 只需要调用 add_done_callback() 方法即可, 我们将 callback() 方法传递给了封装好的 task 对象, 这样当 task 执行完毕之后就可以调用 callback() 方法了,
同时 task 对象还会作为参数传递给 callback() 方法, 调用 task 对象的 result() 方法就可以获取返回结果了.
多任务协程
想执行多次请求应该怎么办呢? 我们可以定义一个 task 列表, 然后使用 asyncio 的 wait() 方法即可执行.
- import asyncio
- import time
- async def request(url):
- print('正在下载', url)
- # 在异步协程中如果出现了同步模块相关的代码, 那么就无法实现异步.
- # time.sleep(2)
- # 当在 asyncio 中遇到阻塞操作必须进行手动挂起
- await asyncio.sleep(2)
- print('下载完毕', url)
- return url
- # 回调函数
- def callback_func(task):
- print('callback ->' + task.result()) # callback 对应函数的返回值
- start = time.time()
- urls = [
- 'www.baidu.com',
- 'www.sogou.com',
- 'www.goubanjia.com'
- ]
- # 任务列表: 存放多个任务对象
- stasks = []
- for url in urls:
- c = request(url)
- task = asyncio.ensure_future(c)
- # 将回调函数绑定到对象中
- task.add_done_callback(callback_func)
- stasks.append(task)
- # 创建事件循环对象
- loop = asyncio.get_event_loop()
- # 需要将任务列表封装到 wait 中
- loop.run_until_complete(asyncio.wait(stasks))
- print(time.time() - start)
- -----
输出:
正在下载 www.baidu.com
正在下载 www.sogou.com
正在下载 www.goubanjia.com
下载完毕 www.baidu.com
下载完毕 www.sogou.com
下载完毕 www.goubanjia.com
- callback ->www.baidu.com
- callback ->www.sogou.com
- callback ->www.goubanjia.com
- 2.003114700317383
高性能异步爬虫 ---aiohttp
aiohttp 可以实现单线程并发 IO 操作,
安装 pip install aiohttp
aiohttp 使用
发起请求
- async def fetch():
- async with aiohttp.ClientSession() as session:
- async with session.get('https://www.baidu.com') as resposne:
- print(await resposne.text())
- loop = asyncio.get_event_loop()
- tasks = [fetch(),]
- loop.run_until_complete(asyncio.wait(tasks))
添加请求参数
- params = {'key': 'value', 'page': 10}
- async def fetch():
- async with aiohttp.ClientSession() as session:
- async with session.get('https://www.baidu.com/s',params=params) as resposne:
- print(await resposne.url)
- loop = asyncio.get_event_loop()
- tasks = [fetch(),]
- loop.run_until_complete(asyncio.wait(tasks))
UA 伪装
- url = 'http://httpbin.org/user-agent'
- headers = {'User-Agent': 'test_user_agent'}
- async def fetch():
- async with aiohttp.ClientSession() as session:
- async with session.get(url,headers=headers) as resposne:
- print(await resposne.text())
自定义 cookies
- url = 'http://httpbin.org/cookies'
- cookies = {'cookies_name': 'test_cookies'}
- async def fetch():
- async with aiohttp.ClientSession() as session:
- async with session.get(url,cookies=cookies) as resposne:
- print(await resposne.text())
post 请求参数
- url = 'http://httpbin.org'
- payload = {'username': 'zhang', 'password': '123456'}
- async def fetch():
- async with aiohttp.ClientSession() as session:
- async with session.post(url, data=payload) as resposne:
- print(await resposne.text())
设置代理
- url = "http://python.org"
- async def fetch():
- async with aiohttp.ClientSession() as session:
- async with session.get(url, proxy="http://some.proxy.com") as resposne:
- print(resposne.status)
异步 IO 处理
- # 环境安装: pip install aiohttp
- # 使用该模块中的 ClientSession
- import requests
- import asyncio
- import time
- import aiohttp
- async def get_page(url):
- async with aiohttp.ClientSession() as session:
- # get(),post():
- # headers,params/data,proxy='http://ip:port'
- async with await session.get(url) as response:
- # text() 返回字符串形式的响应数据
- # read() 返回的二进制形式的响应数据
- # JSON() 返回的就是 JSON 对象
- # 注意: 获取响应数据操作之前一定要使用 await 进行手动挂起
- page_text = await response.text()
- # print(page_text)
- return page_text
- # 回调函数
- def callback_func(task):
- print('callback ->' + task.result()[0:50]) # callback 对应函数的返回值
- start = time.time()
- urls = [
- 'https://www.baidu.com/',
- 'https://www.sogou.com/',
- 'https://www.douban.com/'
- ]
- # 任务列表: 存放多个任务对象
- stasks = []
- for url in urls:
- c = get_page(url)
- task = asyncio.ensure_future(c)
- # 将回调函数绑定到对象中
- task.add_done_callback(callback_func)
- stasks.append(task)
- loop = asyncio.get_event_loop()
- # 需要将任务列表封装到 wait 中
- loop.run_until_complete(asyncio.wait(stasks))
- print(time.time() - start)
来源: http://www.bubuko.com/infodetail-3398146.html