如果使用非阻塞 I/O,它就不会傻傻地等在那里 (比如等连接、等读取),而是会返回一个错误信息,虽然说是说错误信息,它其实就是叫你过一会再来的意思,编程的时候都不把它当错误看。
非阻塞 I/O 代码如下:
- sock = socket.socket()
- sock.setblocking(False)
- try:
- sock.connect(('xkcd.com', 80))
- except BlockingIOError:
- pass
这里抛出的异常无视掉就可以了。
有了非阻塞 I/O 这个特性,我们就能够实现单线程上多个
的处理了,学过 C 语言网络编程的同学应该都认识
- sockets
这个函数吧?不认识也不要紧,
- select
函数如果你不设置它的超时时间它就是默认一直阻塞的,只有当有 I/O 事件发生时它才会被激活,然后告诉你哪个
- select
上发生了什么事件(读 | 写 | 异常),在
- socket
中也有
- Python
,还有跟
- select
功能相同但是更高效的
- select
,它们都是底层 C 函数的
- poll
实现。
- Python
不过这里我们不使用
,而是用更简单好用的
- select
,是
- DefaultSelector
后才出现的一个模块里的类,你只需要在非阻塞
- Python 3.4
和事件上绑定回调函数就可以了。
- socket
代码如下:
- from selectors import DefaultSelector, EVENT_WRITE
- selector = DefaultSelector()
- sock = socket.socket()
- sock.setblocking(False)
- try:
- sock.connect(('localhost', 3000))
- except BlockingIOError:
- pass
- def connected():
- selector.unregister(sock.fileno())
- print('connected!')
- selector.register(sock.fileno(), EVENT_WRITE, connected)
这里看一下
的原型
- selector.register
- register(fileobj, events, data=None)
其中
可以是文件描述符也可以是文件对象(通过
- fileobj
得到),
- fileno
是位掩码,指明发生的是什么事件,
- events
则是与指定文件(也就是我们的 socket)与指定事件绑定在一起的数据。
- data
如代码所示,
在该
- selector.register
的写事件上绑定了回调函数
- socket
(这里作为数据绑定)。在该
- connected
上第一次发生的写事件意味着连接的建立,
- socket
函数在连接建立成功后再解除了该
- connected
上所有绑定的数据。
- socket
看了以上
的使用方式,我想你会发现它很适合写成事件驱动的形式。
- selector
我们可以创建一个事件循环,在循环中不断获得 I/O 事件:
- def loop():
- while True:
- events = selector.select()
- #遍历事件并调用相应的处理
- for event_key, event_mask in events:
- callback = event_key.data
- callback()
其中
是一个
- events_key
,它的结构大致如下 (fileobj,fd,events,data),我们从 data 得到之前绑定的回调函数并调用。
- namedtuple
则是事件的位掩码。
- event_mask
关于
的更多内容,可参考官方文档:
- selectors
现在我们已经明白了基于回调函数实现事件驱动是怎么一回事了,接着来完成我们的爬虫吧。
首先创建两个 set,一个是待处理 url 的集合,一个是已抓取 url 的集合,同时初始化为根 url '/'
- urls_todo = set(['/'])
- seen_urls = set(['/'])
抓取一个页面会需要许多回调函数。比如
,它会在连接建立成功后向服务器发送一个
- connected
请求请求页面。当然它不会干等着服务器响应(那就阻塞了),而是再绑定另一个接收响应的回调函数
- GET
。如果
- read_response
在事件触发时无法一次性读取完整的响应,那么就会等下次事件触发时继续读取,直到读取到了完整的响应才解除绑定。
- read_response
我们将这些回调函数封装在
类中。它有三个成员变量:抓取的
- Fetcher
、
- url
对象与得到的服务器响应
- socket
。
- response
- class Fetcher:
- def __init__(self, url):
- self.response = b''
- self.url = url
- self.sock = None
实现
函数,绑定
- fetch
:
- connected
- # 在Fetcher类中实现
- def fetch(self):
- self.sock = socket.socket()
- self.sock.setblocking(False)
- try:
- self.sock.connect(('xkcd.com', 80))
- except BlockingIOError:
- pass
- selector.register(self.sock.fileno(),
- EVENT_WRITE,
- self.connected)
注意到
函数在内部调用
- fetch
尝试建立
- connect
连接并绑定回调函数,I/O 的处理则都是交给事件循环控制的。
- socket
与事件循环的关系如下:
- Fetcher
- # Begin fetching http://xkcd.com/353/
- fetcher = Fetcher('/353/')
- fetcher.fetch()
- # 事件循环
- while True:
- events = selector.select()
- for event_key, event_mask in events:
- callback = event_key.data
- callback(event_key, event_mask)
的实现:
- connected
- def connected(self, key, mask):
- print('connected!')
- #解除该socket上的所有绑定
- selector.unregister(key.fd)
- request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url)
- self.sock.send(request.encode('ascii'))
- # 连接建立后绑定读取响应的回调函数
- selector.register(key.fd,
- EVENT_READ,
- self.read_response)
的实现:
- read_response
- def read_response(self, key, mask):
- global stopped
- chunk = self.sock.recv(4096) # 每次接收最多4K的信息
- if chunk:
- self.response += chunk
- else:
- selector.unregister(key.fd) # 完成接收则解除绑定
- links = self.parse_links()
- # Python set-logic:
- for link in links.difference(seen_urls):
- urls_todo.add(link)
- Fetcher(link).fetch() # 抓取新的url
- seen_urls.update(links)
- urls_todo.remove(self.url)
- if not urls_todo:
- stopped = True # 当抓取队列为空时结束事件循环
如上一节课,它的作用是返回抓取到的页面中的所有发现的 url 的集合。
- parse_links
之后,遍历了每一个没抓取过的 url 并为其创建一个新的
- parse_links
对象并调用
- Fetcher
函数开始抓取。
- fetch
等其它函数的实现:
- parse_links
- def body(self):
- body = self.response.split(b'\r\n\r\n', 1)[1]
- return body.decode('utf-8')
- def parse_links(self):
- if not self.response:
- print('error: {}'.format(self.url))
- return set()
- if not self._is_html():
- return set()
- urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''',
- self.body()))
- links = set()
- for url in urls:
- normalized = urllib.parse.urljoin(self.url, url)
- parts = urllib.parse.urlparse(normalized)
- if parts.scheme not in ('', 'http', 'https'):
- continue
- host, port = urllib.parse.splitport(parts.netloc)
- if host and host.lower() not in ('xkcd.com', 'www.xkcd.com'):
- continue
- defragmented, frag = urllib.parse.urldefrag(parts.path)
- links.add(defragmented)
- return links
- def _is_html(self):
- head, body = self.response.split(b'\r\n\r\n', 1)
- headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:])
- return headers.get('Content-Type', '').startswith('text/html')
将事件循环改为 stopped 时停止:
- start = time.time()
- stopped = False
- def loop():
- while not stopped:
- events = selector.select()
- for event_key, event_mask in events:
- callback = event_key.data
- callback()
- print('{} URLs fetched in {:.1f} seconds'.format(
- len(seen_urls), time.time() - start))
这里先奉上完整代码:
- from selectors import *
- import socket
- import re
- import urllib.parse
- import time
- urls_todo = set(['/'])
- seen_urls = set(['/'])
- #追加了一个可以看最高并发数的变量
- concurrency_achieved = 0
- selector = DefaultSelector()
- stopped = False
- class Fetcher:
- def __init__(self, url):
- self.response = b''
- self.url = url
- self.sock = None
- def fetch(self):
- global concurrency_achieved
- concurrency_achieved = max(concurrency_achieved, len(urls_todo))
- self.sock = socket.socket()
- self.sock.setblocking(False)
- try:
- self.sock.connect(('localhost', 3000))
- except BlockingIOError:
- pass
- selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)
- def connected(self, key, mask):
- selector.unregister(key.fd)
- get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(self.url)
- self.sock.send(get.encode('ascii'))
- selector.register(key.fd, EVENT_READ, self.read_response)
- def read_response(self, key, mask):
- global stopped
- chunk = self.sock.recv(4096) # 4k chunk size.
- if chunk:
- self.response += chunk
- else:
- selector.unregister(key.fd) # Done reading.
- links = self.parse_links()
- for link in links.difference(seen_urls):
- urls_todo.add(link)
- Fetcher(link).fetch()
- seen_urls.update(links)
- urls_todo.remove(self.url)
- if not urls_todo:
- stopped = True
- print(self.url)
- def body(self):
- body = self.response.split(b'\r\n\r\n', 1)[1]
- return body.decode('utf-8')
- def parse_links(self):
- if not self.response:
- print('error: {}'.format(self.url))
- return set()
- if not self._is_html():
- return set()
- urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''',
- self.body()))
- links = set()
- for url in urls:
- normalized = urllib.parse.urljoin(self.url, url)
- parts = urllib.parse.urlparse(normalized)
- if parts.scheme not in ('', 'http', 'https'):
- continue
- host, port = urllib.parse.splitport(parts.netloc)
- if host and host.lower() not in ('localhost'):
- continue
- defragmented, frag = urllib.parse.urldefrag(parts.path)
- links.add(defragmented)
- return links
- def _is_html(self):
- head, body = self.response.split(b'\r\n\r\n', 1)
- headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:])
- return headers.get('Content-Type', '').startswith('text/html')
- start = time.time()
- fetcher = Fetcher('/')
- fetcher.fetch()
- while not stopped:
- events = selector.select()
- for event_key, event_mask in events:
- callback = event_key.data
- callback(event_key, event_mask)
- print('{} URLs fetched in {:.1f} seconds, achieved concurrency = {}'.format(
- len(seen_urls), time.time() - start, concurrency_achieved))
输入
命令查看效果。不要忘了先开网站的服务器哦。
- python3 callback.py
想想之前从建立连接到读取响应到解析新的 url 到工作队列中,这一切都能够在一个函数中完成,就像下面这样:
- def fetch(url):
- sock = socket.socket()
- sock.connect(('localhost', 3000))
- request = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url)
- sock.send(request.encode('ascii'))
- response = b''
- chunk = sock.recv(4096)
- while chunk:
- response += chunk
- chunk = sock.recv(4096)
- links = parse_links(response)
- q.add(links)
而用回调函数实现,整个整体就支离破碎了,哪里阻塞就又不得不在那里把函数一切为二,代码会显得非常乱,维护也变的很麻烦。更麻烦的是如果在回调函数中抛出了异常,你根本得不到什么有用的信息:
- Traceback (most recent call last):
- File "loop-with-callbacks.py", line 111, in <module>
- loop()
- File "loop-with-callbacks.py", line 106, in loop
- callback(event_key, event_mask)
- File "loop-with-callbacks.py", line 51, in read_response
- links = self.parse_links()
- File "loop-with-callbacks.py", line 67, in parse_links
- raise Exception('parse error')
- Exception: parse error
你看不到这个回调函数的上下文是什么,你只知道它在事件循环里。你想在这个函数外抓取它的异常都没地方下手。但是这又是回调实现无法避免的缺陷,那我们想实现并发异步应该怎么办咧?
来源: http://www.bubuko.com/infodetail-1967785.html