- class Scheduler(object):
- """调度器"""
- def __init__(self, engine):
- """
- :param engine: 引擎
- """
- self.q = queue.Queue()
- self.engine = engine
- def put(self, request):
- self.q.put(request)
- def get(self):
- try:
- req = self.q.get(block=False)
- except Exception as e:
- req = None
- return req
- def size(self):
- return self.q.qsize()
利用队列实现调度器功能
2. 引擎
- class Engine(object):
- """引擎"""
- def __init__(self, max=5):
- """
- :param max: 最大并发数
- """
- self._close = None #用于 defered 对象的关闭
- self.max = max
- self.crawling = [] #存储正在进行处理的 request
- self.scheduler = Scheduler(self) #实例化调度器
- def get_response_callback(self, content, request):
- """
- 1. 将此 request 从 self.crawing 中删除
- 2. 执行爬虫对象中的回调函数
- 3. 若返回的结果是一个生成器, 则将其遍历, 并且添加到调度器中
- """
- self.crawling.remove(request)
- result = request.callback(content)
- if isinstance(result, types.GeneratorType):
- for req in result:
- self.scheduler.put(req)
- def _next_request(self):
- """
- 1. 判断调度器中队列的大小是否为 0 和正在处理的 request 个数, 若都为 0, 则表明爬虫执行完毕, self._close.callback(None) 关闭 Deferred 对象
- 2. 当处理中的 request 数大于或等于最大并发数时, 让后续等待
- 3. 当处理中的 request 数小于最大并发数时, 从调度器中调取任务, 并添加到 self.crawling 中, 利用 getPage 下载页面, 并为此对象添加回调函数 get_response_callback
- 4. 处理完成后添加回调函数_next_request, 继续下一个任务的处理
- """
- if self.scheduler.size() == 0 and len(self.crawling) == 0:
- self._close.callback(None)
- return
- if len(self.crawling)>= self.max:
- return
- while len(self.crawling) < self.max:
- req = self.scheduler.get()
- if req is None:
- return
- self.crawling.append(req)
- d = getPage(req.url.encode('utf-8'))
- d.addCallback(self.get_response_callback, req)
- d.addCallback(lambda _: reactor.callLater(0, self._next_request))
3. 爬虫对象
- class Crawler(object):
- """爬虫对象"""
- def __init__(self, spidercls):
- """
- :param spidercls: 爬虫类
- """
- self.spidercls = spidercls
- self.spider = None #爬虫
- self.engine = None #引擎
- @defer.inlineCallbacks
- def crawl(self):
- """
- 1.start_request 爬取的网页地址生成器
- 2. 获取 url, 并添加到调度器中
- 3. 执行 self,engine._next_request() 来处理 request
- 4.self.engine._close 赋值为一个 Deferred 对象
- """
- self.engine = Engine()
- self.spider = self.spidercls()
- start_request = iter(self.spider.start_requests())
- while 1:
- try:
- request = next(start_request)
- self.engine.scheduler.put(request)
- except StopIteration as e:
- break
- self.engine._next_request()
- self.engine._close = defer.Deferred()
- yield self.engine._close
@defer.inlineCallbacks 装饰器, 被装饰函数必须是一个生成器
Deferred 对象是一个类似于 Socket 对象的一个无限循环的对象, 应用程序将一连串函数添加到 Deferred 对象中, 当异步请求的结果准备就绪时, 这一连串函数将被按顺序调用 (这一连串函数被称为一个 callback 序列, 或是一
条 callback 链), 一起添加的还有另外一连串函数, 当异步请求出现错误的时候, 他们将被调用 (称作一个 errback 序列, 或是一条 errback 链). 异步库代码会在结果准备就绪时, 调用第一个 callback, 或是在出现错误时,
调用第一个 errback, 然后 Deferred 对象就会将 callback 或 errback 的返回结果传递给链中的下一个函数.
4. 爬虫进程
- class CrawlProcess(object):
- """爬虫进程"""
- def __init__(self):
- self._active = [] #存储每一个 Deferred 对象
- self.crawlers = [] #要执行的爬虫的集合
- def crawl(self, spidercls):
- """
- 实例化 Crawler, 获得爬虫对象, 每一个都执行 crawl 函数, 添加到_active, 实现后续并发
- """
- crawler = Crawler(spidercls,)
- self.crawlers.append(crawler)
- d = crawler.crawl()
- self._active.append(d)
- return d
- def start(self):
- """
- 开始处理 Deferred 对象, 当所有的 Deferred 对象的回调函数都被触发, 利用 callback(None) 结束之后, 执行匿名函数关闭 reactor
- """
- dd = defer.DeferredList(self._active)
- dd.addBoth(lambda _: reactor.stop())
- reactor.run()
爬虫的实例化, 和爬取工作的开启
- 5.Request
- class Request(object):
- def __init__(self, url, callback):
- """
- :param url: 网址
- :param callback: 回调函数
- """
- self.url = url
- self.callback = callback
用于存储爬虫每一个 url 与其对应的处理函数.
- 6.main
- if __name__ == '__main__':
- spider_cls_list = [] #添加爬虫类名
- crawler_process = CrawlProcess()
- for spider_cls in spider_cls_list:
- crawler_process.crawl(spider_cls)
- crawler_process.start()
- 7.spider
- class XXXSpider(object):
- name = XXX'
- def start_requests(self):
- start_url = []
- for url in start_url:
- yield Request(url, self.parse)
- def parse(self, response):
- print(response)
爬虫类格式, parse 为回调函数, 后续还能继续添加回调函数
来源: http://www.bubuko.com/infodetail-3039007.html